diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 18aa00cc..d1c04450 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -71,7 +71,7 @@ namespace phlex::experimental { template auto fold(std::string name, is_fold_like auto f, - concurrency c = concurrency::serial, + concurrency c, std::string partition = "job", InitArgs&&... init_args) { @@ -92,17 +92,21 @@ namespace phlex::experimental { std::move(pred), std::move(unf), c, std::move(destination_data_layer)); } - auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::serial) + auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::unlimited) { return make_glue().observe(std::move(name), std::move(f), c); } - auto predicate(std::string name, is_predicate_like auto f, concurrency c = concurrency::serial) + auto predicate(std::string name, + is_predicate_like auto f, + concurrency c = concurrency::unlimited) { return make_glue().predicate(std::move(name), std::move(f), c); } - auto transform(std::string name, is_transform_like auto f, concurrency c = concurrency::serial) + auto transform(std::string name, + is_transform_like auto f, + concurrency c = concurrency::unlimited) { return make_glue().transform(std::move(name), std::move(f), c); } diff --git a/phlex/core/glue.hpp b/phlex/core/glue.hpp index 7f0cfdbb..bd5afac3 100644 --- a/phlex/core/glue.hpp +++ b/phlex/core/glue.hpp @@ -57,7 +57,7 @@ namespace phlex::experimental { } template - auto observe(std::string name, FT f, concurrency c) + auto observe(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -70,7 +70,7 @@ namespace phlex::experimental { } template - auto transform(std::string name, FT f, concurrency c) + auto transform(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -83,7 +83,7 @@ namespace phlex::experimental { } template - auto predicate(std::string name, FT f, concurrency c) + auto predicate(std::string name, FT f, concurrency c = concurrency::unlimited) { detail::verify_name(name, config_); return make_registration(config_, @@ -124,7 +124,7 @@ namespace phlex::experimental { std::move(destination_data_layer)); } - auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial) + auto output(std::string name, is_output_like auto f, concurrency c = concurrency::unlimited) { return output_api{nodes_.registrar_for(errors_), config_, diff --git a/phlex/core/graph_proxy.hpp b/phlex/core/graph_proxy.hpp index 8e5239d8..dc5f39ff 100644 --- a/phlex/core/graph_proxy.hpp +++ b/phlex/core/graph_proxy.hpp @@ -46,7 +46,7 @@ namespace phlex::experimental { template auto fold(std::string name, is_fold_like auto f, - concurrency c = concurrency::serial, + concurrency c, std::string partition = "job", InitArgs&&... init_args) { @@ -57,37 +57,38 @@ namespace phlex::experimental { std::forward(init_args)...); } - auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::serial) + auto observe(std::string name, is_observer_like auto f, concurrency c = concurrency::unlimited) { return create_glue().observe(std::move(name), std::move(f), c); } - auto predicate(std::string name, is_predicate_like auto f, concurrency c = concurrency::serial) + auto predicate(std::string name, + is_predicate_like auto f, + concurrency c = concurrency::unlimited) { return create_glue().predicate(std::move(name), std::move(f), c); } - auto transform(std::string name, is_transform_like auto f, concurrency c = concurrency::serial) + auto transform(std::string name, + is_transform_like auto f, + concurrency c = concurrency::unlimited) { return create_glue().transform(std::move(name), std::move(f), c); } template - auto unfold(std::string name, - is_predicate_like auto pred, - auto unf, - concurrency c = concurrency::serial) + auto unfold(std::string name, is_predicate_like auto pred, auto unf, concurrency c) { return create_glue(false).unfold(std::move(name), std::move(pred), std::move(unf), c); } template - auto unfold(is_predicate_like auto pred, auto unf, concurrency c = concurrency::serial) + auto unfold(is_predicate_like auto pred, auto unf, concurrency c) { return create_glue(false).unfold(std::move(pred), std::move(unf), c); } - auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial) + auto output(std::string name, is_output_like auto f, concurrency c = concurrency::unlimited) { return create_glue().output(std::move(name), std::move(f), c); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7ef6f56b..3f5947f1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,6 +49,7 @@ add_catch_test(product_matcher LIBRARIES phlex::model) add_catch_test(product_store LIBRARIES phlex::core) add_catch_test(fold LIBRARIES phlex::core) add_catch_test(replicated LIBRARIES TBB::tbb phlex::utilities spdlog::spdlog) +add_catch_test(repeater LIBRARIES TBB::tbb spdlog::spdlog) add_catch_test(serializer LIBRARIES phlex::core TBB::tbb) add_catch_test(specified_label LIBRARIES phlex::core) add_catch_test(unfold LIBRARIES Boost::json phlex::core TBB::tbb) diff --git a/test/benchmarks/accept_even_ids.cpp b/test/benchmarks/accept_even_ids.cpp index e490c46b..f15df997 100644 --- a/test/benchmarks/accept_even_ids.cpp +++ b/test/benchmarks/accept_even_ids.cpp @@ -5,9 +5,7 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.predicate( - "accept_even_ids", - [](phlex::experimental::level_id const& id) { return id.number() % 2 == 0; }, - phlex::experimental::concurrency::unlimited) + m.predicate("accept_even_ids", + [](phlex::experimental::level_id const& id) { return id.number() % 2 == 0; }) .input_family(config.get("product_name")); } diff --git a/test/benchmarks/accept_even_numbers.cpp b/test/benchmarks/accept_even_numbers.cpp index 33982374..3aa7c22d 100644 --- a/test/benchmarks/accept_even_numbers.cpp +++ b/test/benchmarks/accept_even_numbers.cpp @@ -4,9 +4,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.predicate( - "accept_even_numbers", - [](int i) { return i % 2 == 0; }, - phlex::experimental::concurrency::unlimited) + m.predicate("accept_even_numbers", [](int i) { return i % 2 == 0; }) .input_family(config.get("consumes")); } diff --git a/test/benchmarks/accept_fibonacci_numbers.cpp b/test/benchmarks/accept_fibonacci_numbers.cpp index e1438f20..1feba486 100644 --- a/test/benchmarks/accept_fibonacci_numbers.cpp +++ b/test/benchmarks/accept_fibonacci_numbers.cpp @@ -6,7 +6,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.make(config.get("max_number")) - .predicate( - "accept", &test::fibonacci_numbers::accept, phlex::experimental::concurrency::unlimited) + .predicate("accept", &test::fibonacci_numbers::accept) .input_family(config.get("consumes")); } diff --git a/test/benchmarks/last_index.cpp b/test/benchmarks/last_index.cpp index bf113dfc..b4a9b9d3 100644 --- a/test/benchmarks/last_index.cpp +++ b/test/benchmarks/last_index.cpp @@ -9,7 +9,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.transform("last_index", last_index, concurrency::unlimited) + m.transform("last_index", last_index) .input_family("id") .output_products(config.get("produces", "a")); } diff --git a/test/benchmarks/plus_101.cpp b/test/benchmarks/plus_101.cpp index 0d7b0ed5..39d20c52 100644 --- a/test/benchmarks/plus_101.cpp +++ b/test/benchmarks/plus_101.cpp @@ -8,5 +8,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_101", plus_101, concurrency::unlimited).input_family("a").output_products("c"); + m.transform("plus_101", plus_101).input_family("a").output_products("c"); } diff --git a/test/benchmarks/plus_one.cpp b/test/benchmarks/plus_one.cpp index fef141cc..c996fa0f 100644 --- a/test/benchmarks/plus_one.cpp +++ b/test/benchmarks/plus_one.cpp @@ -8,5 +8,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_one", plus_one, concurrency::unlimited).input_family("a").output_products("b"); + m.transform("plus_one", plus_one).input_family("a").output_products("b"); } diff --git a/test/benchmarks/read_id.cpp b/test/benchmarks/read_id.cpp index cc8c1667..b72fe016 100644 --- a/test/benchmarks/read_id.cpp +++ b/test/benchmarks/read_id.cpp @@ -5,7 +5,4 @@ namespace { void read_id(phlex::experimental::level_id const&) {} } -PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) -{ - m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited).input_family("id"); -} +PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { m.observe("read_id", read_id).input_family("id"); } diff --git a/test/benchmarks/read_index.cpp b/test/benchmarks/read_index.cpp index f2de1b18..2430b0db 100644 --- a/test/benchmarks/read_index.cpp +++ b/test/benchmarks/read_index.cpp @@ -7,6 +7,5 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { - m.observe("read_index", read_index, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + m.observe("read_index", read_index).input_family(config.get("consumes")); } diff --git a/test/benchmarks/verify_difference.cpp b/test/benchmarks/verify_difference.cpp index 9294650a..33ee4f65 100644 --- a/test/benchmarks/verify_difference.cpp +++ b/test/benchmarks/verify_difference.cpp @@ -8,7 +8,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.observe( "verify_difference", - [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }, - concurrency::unlimited) + [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }) .input_family(config.get("i", "b"), config.get("j", "c")); } diff --git a/test/benchmarks/verify_even_fibonacci_numbers.cpp b/test/benchmarks/verify_even_fibonacci_numbers.cpp index 3da779f2..68d9a9da 100644 --- a/test/benchmarks/verify_even_fibonacci_numbers.cpp +++ b/test/benchmarks/verify_even_fibonacci_numbers.cpp @@ -18,7 +18,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { using namespace test; m.make(config.get("max_number")) - .observe( - "only_even", &even_fibonacci_numbers::only_even, phlex::experimental::concurrency::unlimited) + .observe("only_even", &even_fibonacci_numbers::only_even) .input_family(config.get("consumes")); } diff --git a/test/cached_execution.cpp b/test/cached_execution.cpp index 87b12d9f..903abf9f 100644 --- a/test/cached_execution.cpp +++ b/test/cached_execution.cpp @@ -46,36 +46,29 @@ TEST_CASE("Cached function calls", "[data model]") { framework_graph g{detail::create_next()}; - g.transform("A1", call_one, concurrency::unlimited) - .input_family("number"_in("run")) - .output_products("one"); - g.transform("A2", call_one, concurrency::unlimited) - .input_family("one"_in("run")) - .output_products("used_one"); - g.transform("A3", call_one, concurrency::unlimited) - .input_family("used_one"_in("run")) - .output_products("done_one"); + g.transform("A1", call_one).input_family("number"_in("run")).output_products("one"); + g.transform("A2", call_one).input_family("one"_in("run")).output_products("used_one"); + g.transform("A3", call_one).input_family("used_one"_in("run")).output_products("done_one"); - g.transform("B1", call_two, concurrency::unlimited) + g.transform("B1", call_two) .input_family("one"_in("run"), "another"_in("subrun")) .output_products("two"); - g.transform("B2", call_two, concurrency::unlimited) + g.transform("B2", call_two) .input_family("used_one"_in("run"), "two"_in("subrun")) .output_products("used_two"); - g.transform("C", call_two, concurrency::unlimited) + g.transform("C", call_two) .input_family("used_two"_in("subrun"), "still"_in("event")) .output_products("three"); g.execute(); - // FIXME: Need to improve the synchronization to supply strict equality - CHECK(g.execution_counts("A1") >= n_runs); - CHECK(g.execution_counts("A2") >= n_runs); - CHECK(g.execution_counts("A3") >= n_runs); + CHECK(g.execution_counts("A1") == n_runs); + CHECK(g.execution_counts("A2") == n_runs); + CHECK(g.execution_counts("A3") == n_runs); - CHECK(g.execution_counts("B1") >= n_runs * n_subruns); - CHECK(g.execution_counts("B2") >= n_runs * n_subruns); + CHECK(g.execution_counts("B1") == n_runs * n_subruns); + CHECK(g.execution_counts("B2") == n_runs * n_subruns); CHECK(g.execution_counts("C") == n_runs * n_subruns * n_events); } diff --git a/test/class_registration.cpp b/test/class_registration.cpp index bb537b28..9758cb1b 100644 --- a/test/class_registration.cpp +++ b/test/class_registration.cpp @@ -62,37 +62,37 @@ TEST_CASE("Call non-framework functions", "[programming model]") auto glueball = g.make(); SECTION("No framework") { - glueball.transform("no_framework", &A::no_framework, concurrency::unlimited) + glueball.transform("no_framework", &A::no_framework) .input_family(product_names) .output_products(oproduct_names); } SECTION("No framework, all references") { - glueball.transform("no_framework_all_refs", &A::no_framework_all_refs, concurrency::unlimited) + glueball.transform("no_framework_all_refs", &A::no_framework_all_refs) .input_family(product_names) .output_products(oproduct_names); } SECTION("No framework, all pointers") { - glueball.transform("no_framework_all_ptrs", &A::no_framework_all_ptrs, concurrency::unlimited) + glueball.transform("no_framework_all_ptrs", &A::no_framework_all_ptrs) .input_family(product_names) .output_products(oproduct_names); } SECTION("One framework argument") { - glueball.transform("one_framework_arg", &A::one_framework_arg, concurrency::unlimited) + glueball.transform("one_framework_arg", &A::one_framework_arg) .input_family(product_names) .output_products(oproduct_names); } SECTION("All framework arguments") { - glueball.transform("all_framework_args", &A::all_framework_args, concurrency::unlimited) + glueball.transform("all_framework_args", &A::all_framework_args) .input_family(product_names) .output_products(oproduct_names); } // The following is invoked for *each* section above - g.observe("verify_results", verify_results, concurrency::unlimited).input_family(product_names); + g.observe("verify_results", verify_results).input_family(product_names); g.execute(); } diff --git a/test/demo-giantdata/unfold_transform_fold.cpp b/test/demo-giantdata/unfold_transform_fold.cpp index 8c48565b..35242fed 100644 --- a/test/demo-giantdata/unfold_transform_fold.cpp +++ b/test/demo-giantdata/unfold_transform_fold.cpp @@ -130,7 +130,7 @@ int main(int argc, char* argv[]) return demo::clampWaveforms(*hwf, run_id, subrun_id, spill_id, apa_id); }; - g.transform("clamp_node", wrapped_user_function, concurrency::unlimited) + g.transform("clamp_node", wrapped_user_function) .input_family("waves_in_apa"_in("APA")) .output_products("clamped_waves"); diff --git a/test/different_hierarchies.cpp b/test/different_hierarchies.cpp index 9f82fdec..c3c062f8 100644 --- a/test/different_hierarchies.cpp +++ b/test/different_hierarchies.cpp @@ -84,12 +84,15 @@ TEST_CASE("Different hierarchies used with fold", "[graph]") .output_products("run_sum"); g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); - g.observe("verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }) + g.observe( + "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::serial) .input_family("run_sum"); - g.observe("verify_job_sum", - [](unsigned int actual) { - CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives - }) + g.observe( + "verify_job_sum", + [](unsigned int actual) { + CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives + }, + concurrency::serial) .input_family("job_sum"); g.execute(); diff --git a/test/filter.cpp b/test/filter.cpp index 5e12f236..ba5593b3 100644 --- a/test/filter.cpp +++ b/test/filter.cpp @@ -85,14 +85,14 @@ namespace { TEST_CASE("Two predicates", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("evens_only", evens_only).input_family("num"_in("event")); + g.predicate("odds_only", odds_only).input_family("num"_in("event")); g.make(20u) - .observe("add_evens", &sum_numbers::add, concurrency::unlimited) + .observe("add_evens", &sum_numbers::add, concurrency::serial) .input_family("num"_in("event")) .when("evens_only"); g.make(25u) - .observe("add_odds", &sum_numbers::add, concurrency::unlimited) + .observe("add_odds", &sum_numbers::add, concurrency::serial) .input_family("num"_in("event")) .when("odds_only"); @@ -105,12 +105,10 @@ TEST_CASE("Two predicates", "[filtering]") TEST_CASE("Two predicates in series", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited) - .input_family("num") - .when("evens_only"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num").when("evens_only"); g.make(0u) - .observe("add", &sum_numbers::add, concurrency::unlimited) + .observe("add", &sum_numbers::add, concurrency::serial) .input_family("num") .when("odds_only"); @@ -122,10 +120,10 @@ TEST_CASE("Two predicates in series", "[filtering]") TEST_CASE("Two predicates in parallel", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num"); g.make(0u) - .observe("add", &sum_numbers::add, concurrency::unlimited) + .observe("add", &sum_numbers::add, concurrency::serial) .input_family("num") .when("odds_only", "evens_only"); @@ -147,16 +145,14 @@ TEST_CASE("Three predicates in parallel", "[filtering]") framework_graph g{source{10u}}; for (auto const& [name, b, e] : configs) { - g.make(b, e) - .predicate(name, ¬_in_range::eval, concurrency::unlimited) - .input_family("num"); + g.make(b, e).predicate(name, ¬_in_range::eval).input_family("num"); } std::vector const predicate_names{ "exclude_0_to_4", "exclude_6_to_7", "exclude_gt_8"}; auto const expected_numbers = {4u, 5u, 7u}; g.make(expected_numbers) - .observe("collect", &collect_numbers::collect, concurrency::unlimited) + .observe("collect", &collect_numbers::collect, concurrency::serial) .input_family("num") .when(predicate_names); @@ -168,15 +164,15 @@ TEST_CASE("Three predicates in parallel", "[filtering]") TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only).input_family("num"); + g.predicate("odds_only", odds_only).input_family("num"); g.make(5 * 100) - .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::unlimited) + .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::serial) .input_family("num", "other_num") // <= Note input order .when("evens_only"); g.make(-5 * 100) - .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::unlimited) + .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::serial) .input_family("other_num", "num") // <= Note input order .when("odds_only"); diff --git a/test/fold.cpp b/test/fold.cpp index 6654b9f9..b9c7b42d 100644 --- a/test/fold.cpp +++ b/test/fold.cpp @@ -61,7 +61,6 @@ TEST_CASE("Different levels of fold", "[graph]") } }; - // framework_graph g{levels_to_process}; framework_graph g{levels_to_process}; g.fold("run_add", add, concurrency::unlimited, "run") @@ -74,15 +73,15 @@ TEST_CASE("Different levels of fold", "[graph]") .output_products("two_layer_job_sum"); g.observe( - "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::unlimited) + "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::serial) .input_family("run_sum"); g.observe( "verify_two_layer_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, - concurrency::unlimited) + concurrency::serial) .input_family("two_layer_job_sum"); g.observe( - "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) + "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::serial) .input_family("job_sum"); g.execute(); diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index c72d81e1..9e6688e3 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -102,31 +102,26 @@ TEST_CASE("Hierarchical nodes", "[graph]") { framework_graph g{levels_to_process}; - g.transform("get_the_time", strtime, concurrency::unlimited) - .input_family("time") - .when() - .output_products("strtime"); - g.transform("square", square, concurrency::unlimited) - .input_family("number") - .output_products("squared_number"); + g.transform("get_the_time", strtime).input_family("time").when().output_products("strtime"); + g.transform("square", square).input_family("number").output_products("squared_number"); g.fold("add", add, concurrency::unlimited, "run", 15u) .input_family("squared_number") .when() .output_products("added_data"); - g.transform("scale", scale, concurrency::unlimited) - .input_family("added_data") - .output_products("result"); - g.observe("print_result", print_result, concurrency::unlimited).input_family("result", "strtime"); + g.transform("scale", scale).input_family("added_data").output_products("result"); + g.observe("print_result", print_result).input_family("result", "strtime"); - g.make().output("save", &test::products_for_output::save).when(); + g.make() + .output("save", &test::products_for_output::save, concurrency::serial) + .when(); g.execute(); CHECK(g.execution_counts("square") == index_limit * number_limit); CHECK(g.execution_counts("add") == index_limit * number_limit); - CHECK(g.execution_counts("get_the_time") >= index_limit); + CHECK(g.execution_counts("get_the_time") == index_limit); CHECK(g.execution_counts("scale") == index_limit); CHECK(g.execution_counts("print_result") == index_limit); } diff --git a/test/memory-checks/many_events.cpp b/test/memory-checks/many_events.cpp index f6175d50..4d54f7e9 100644 --- a/test/memory-checks/many_events.cpp +++ b/test/memory-checks/many_events.cpp @@ -25,8 +25,6 @@ int main() }; framework_graph g{levels_to_process}; - g.transform("pass_on", pass_on, concurrency::unlimited) - .input_family("number") - .output_products("different"); + g.transform("pass_on", pass_on).input_family("number").output_products("different"); g.execute(); } diff --git a/test/mock-workflow/G4Stage1.libsonnet b/test/mock-workflow/G4Stage1.libsonnet index 34559327..6dddee17 100644 --- a/test/mock-workflow/G4Stage1.libsonnet +++ b/test/mock-workflow/G4Stage1.libsonnet @@ -3,7 +3,7 @@ local generators = import 'SinglesGen.libsonnet'; { largeant: { plugin: 'largeant', - duration_usec: 15662051, + duration_usec: 156, # Typical: 15662051 inputs: [f + "/MCTruths" for f in std.objectFields(generators)], outputs: ["ParticleAncestryMap", "Assns", "SimEnergyDeposits", "AuxDetHits", "MCParticles"], } diff --git a/test/mock-workflow/G4Stage2.libsonnet b/test/mock-workflow/G4Stage2.libsonnet index 89482c45..40d7720f 100644 --- a/test/mock-workflow/G4Stage2.libsonnet +++ b/test/mock-workflow/G4Stage2.libsonnet @@ -3,13 +3,13 @@ local g4stage1 = import 'G4Stage1.libsonnet'; { IonAndScint: { plugin: 'ion_and_scint', - duration_usec: 5457973, + duration_usec: 546, # Typical: 5457973 inputs: [f + "/SimEnergyDeposits" for f in std.objectFields(g4stage1)], outputs: ["SimEnergyDeposits", "SimEnergyDeposits_priorSCE"], }, PDFastSim: { plugin: 'pd_fast_sim', - duration_usec: 69, #69681950, + duration_usec: 69, # Typical: 69681950 inputs: ['SimEnergyDeposits_priorSCE'], outputs: ['SimPhotonLites', 'OpDetBacktrackerRecords'], } diff --git a/test/mock-workflow/SinglesGen.libsonnet b/test/mock-workflow/SinglesGen.libsonnet index e8d10977..b84df7b5 100644 --- a/test/mock-workflow/SinglesGen.libsonnet +++ b/test/mock-workflow/SinglesGen.libsonnet @@ -13,7 +13,7 @@ }, cosmicgenerator: { plugin: "MC_truth_algorithm", - duration_usec: 4926215, + duration_usec: 492, # Typical: 4926215 inputs: ["id"], outputs: ["MCTruths"] }, diff --git a/test/multiple_function_registration.cpp b/test/multiple_function_registration.cpp index 71f56403..1ba12a30 100644 --- a/test/multiple_function_registration.cpp +++ b/test/multiple_function_registration.cpp @@ -48,29 +48,29 @@ TEST_CASE("Call multiple functions", "[programming model]") SECTION("All free functions") { - g.transform("square_numbers", square_numbers, concurrency::unlimited) + g.transform("square_numbers", square_numbers) .input_family("numbers") .output_products("squared_numbers"); - g.transform("sum_numbers", sum_numbers, concurrency::unlimited) + g.transform("sum_numbers", sum_numbers) .input_family("squared_numbers") .output_products("summed_numbers"); - g.transform("sqrt_sum_numbers", sqrt_sum_numbers, concurrency::unlimited) + g.transform("sqrt_sum_numbers", sqrt_sum_numbers) .input_family("summed_numbers", "offset") .output_products("result"); } SECTION("Transforms, one from a class") { - g.transform("square_numbers", square_numbers, concurrency::unlimited) + g.transform("square_numbers", square_numbers) .input_family("numbers") .output_products("squared_numbers"); - g.transform("sum_numbers", sum_numbers, concurrency::unlimited) + g.transform("sum_numbers", sum_numbers) .input_family("squared_numbers") .output_products("summed_numbers"); g.make() - .transform("sqrt_sum", &A::sqrt_sum, concurrency::unlimited) + .transform("sqrt_sum", &A::sqrt_sum) .input_family("summed_numbers", "offset") .output_products("result"); } diff --git a/test/plugins/module.cpp b/test/plugins/module.cpp index de874118..beaaaf0d 100644 --- a/test/plugins/module.cpp +++ b/test/plugins/module.cpp @@ -5,14 +5,8 @@ using namespace phlex::experimental; -// TODO: Option to select which algorithm to run via configuration? - PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("add", test::add, concurrency::unlimited) - .input_family("i", "j") - .output_products("sum"); - m.observe( - "verify", [](int actual) { assert(actual == 0); }, concurrency::unlimited) - .input_family("sum"); + m.transform("add", test::add).input_family("i", "j").output_products("sum"); + m.observe("verify", [](int actual) { assert(actual == 0); }).input_family("sum"); } diff --git a/test/plugins/output.cpp b/test/plugins/output.cpp index b0d24fd7..d7e376b2 100644 --- a/test/plugins/output.cpp +++ b/test/plugins/output.cpp @@ -5,6 +5,5 @@ using namespace phlex::experimental::test; PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.make().output( - "save", &products_for_output::save, phlex::experimental::concurrency::unlimited); + m.make().output("save", &products_for_output::save); } diff --git a/test/repeater.cpp b/test/repeater.cpp new file mode 100644 index 00000000..0d453daf --- /dev/null +++ b/test/repeater.cpp @@ -0,0 +1,271 @@ +// =========================================================== // +// This test is used to determine whether "repeaters" can work. // +// // +// * source // +// *-. router // +// |\ \ // +// * | | I(run) // +// | * | E(run) // +// | | * I(spill) // +// | | |\ // +// | | |/ // +// | |/| // +// * | | exponent // +// |/ / // +// | * number // +// * | repeater // +// |/ // +// * consume // +// // +// ============================================================ // + +#include "catch2/catch_test_macros.hpp" +#include "fmt/ranges.h" +#include "fmt/std.h" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace oneapi::tbb; +using namespace spdlog; +using namespace std::chrono; + +namespace { + + constexpr int num_runs = 5; + constexpr int messages_per_run = 1000; + constexpr int num_messages = num_runs * messages_per_run; + + constexpr int spills_per_run = messages_per_run - 1; + constexpr int num_spills = num_runs * spills_per_run; + + using id_t = std::vector; + using product_t = double; + using product_ptr_t = std::shared_ptr; + + struct end_token { + int count; + id_t cell_id; + }; + + struct data_cell_id { + int msg_id; + id_t cell_id; + }; + + struct message { + data_cell_id id; + product_ptr_t product; + }; + + using repeater_node_input = std::tuple; + class repeater_node : public flow::composite_node> { + using base_t = flow::composite_node>; + using tagged_repeater_msg = flow::tagged_msg; + + struct cached_product { + product_ptr_t product{}; + concurrent_queue msg_ids{}; + std::atomic counter; + std::atomic_flag flush_received{false}; + }; + + using cached_products_t = concurrent_hash_map; + using accessor = cached_products_t::accessor; + using const_accessor = cached_products_t::const_accessor; + + public: + repeater_node(flow::graph& g) : + base_t{g}, + indexer_{g}, + repeater_{g, + flow::unlimited, + [this](tagged_repeater_msg const& tagged, auto& outputs) -> flow::continue_msg { + cached_product* entry{nullptr}; + int key = -1; + auto& output = std::get<0>(outputs); + + auto drain = [&output](int const key, cached_product* entry) -> int { + assert(entry->product); + int counter{}; + int new_msg_id{}; + while (entry->msg_ids.try_pop(new_msg_id)) { + output.try_put({.id = data_cell_id{.msg_id = new_msg_id, .cell_id = {key}}, + .product = entry->product}); + ++counter; + } + return counter; + }; + + if (tagged.is_a()) { + auto const& msg = tagged.cast_to(); + key = msg.id.cell_id[0]; + entry = entry_for(key); + entry->product = msg.product; + entry->counter += drain(key, entry); + + } else if (tagged.is_a()) { + auto const [count, cell_id] = tagged.cast_to(); + key = cell_id[0]; + entry = entry_for(key); + entry->counter -= count; + std::ignore = entry->flush_received.test_and_set(); + } else { + auto const [msg_id, cell_id] = tagged.cast_to(); + key = cell_id[0]; + entry = entry_for(key); + if (entry->product) { + output.try_put({.id = data_cell_id{.msg_id = msg_id, .cell_id = {key}}, + .product = entry->product}); + ++entry->counter; + } else { + entry->msg_ids.push(msg_id); + } + } + + // Cleanup + if (entry->flush_received.test() and entry->counter == 0) { + cached_products_.erase(key); + } + return {}; + }} + { + base_t::set_external_ports(base_t::input_ports_type{input_port<0>(indexer_), + input_port<1>(indexer_), + input_port<2>(indexer_)}, + base_t::output_ports_type{output_port<0>(repeater_)}); + make_edge(indexer_, repeater_); + } + + private: + cached_product* entry_for(int key) + { + accessor a; + std::ignore = cached_products_.insert(a, key); + return &a->second; + } + + flow::indexer_node indexer_; + flow::multifunction_node> repeater_; + concurrent_hash_map cached_products_; // FIXME: int should be the ID itself + }; + +} + +TEST_CASE("Serialize functions based on resource", "[multithreading]") +{ + spdlog::flush_on(spdlog::level::trace); + + flow::graph g; + + // 1. input + int i{}; + bool flush_run{false}; + flow::input_node src{g, + [&i, &flush_run](flow_control& fc) -> std::variant { + if (i == num_messages and not flush_run) { + fc.stop(); + return {}; + } + + auto const [quotient, remainder] = std::div(i, messages_per_run); + if (remainder == 0 and flush_run) { + flush_run = false; + return end_token{.count = spills_per_run, .cell_id = {quotient}}; + } + + ++i; + if (remainder == 0) { + return data_cell_id{.msg_id = i, .cell_id = {quotient}}; + } + flush_run = true; + return data_cell_id{.msg_id = i, .cell_id = {quotient, remainder}}; + }}; + + flow::broadcast_node run_index_set{g}; // 3. I(run) + flow::broadcast_node spill_index_set{g}; // 4. I(spill) + flow::broadcast_node end_run{g}; + + // 2. router + flow::function_node> router{ + g, + flow::unlimited, + [&run_index_set, &end_run, &spill_index_set]( + std::variant const& src_token) -> flow::continue_msg { + auto const* id = std::get_if(&src_token); + if (!id) { + auto const& end_run_token = std::get(src_token); + end_run.try_put(end_run_token); + return {}; + } + + auto const& [_, cell_id] = *id; + if (cell_id.size() == 1ull) { + run_index_set.try_put(*id); + } else { + spill_index_set.try_put(*id); + } + return {}; + }}; + make_edge(src, router); + + // 5. exponent provider + std::atomic run_counter; + flow::function_node exponent_provider{ + g, flow::unlimited, [&run_counter](data_cell_id const& id) -> message { + ++run_counter; + return {.id = id, .product = std::make_shared(id.cell_id[0])}; + }}; + make_edge(run_index_set, exponent_provider); + + // 6. number provider + flow::function_node number_provider{ + g, flow::unlimited, [](data_cell_id const& id) -> message { + return {.id = id, .product = std::make_shared(id.cell_id[1])}; + }}; + make_edge(spill_index_set, number_provider); + + // 7. repeater + repeater_node repeater{g}; + make_edge(exponent_provider, input_port<0>(repeater)); + make_edge(end_run, input_port<1>(repeater)); + make_edge(spill_index_set, input_port<2>(repeater)); + + auto use_message_id = [](message const& msg) { return msg.id.msg_id; }; + flow::join_node, flow::tag_matching> join_layers{ + g, use_message_id, use_message_id}; + make_edge(repeater, input_port<0>(join_layers)); + make_edge(number_provider, input_port<1>(join_layers)); + + // 8. consume + std::atomic spill_counter; + flow::function_node> consume{ + g, + flow::unlimited, + [&spill_counter](std::tuple const& joined_data) -> flow::continue_msg { + auto const& [a, b] = joined_data; + assert(a.id.cell_id[0] == b.id.cell_id[0]); + ++spill_counter; + return {}; + }}; + make_edge(join_layers, consume); + + auto start_time = steady_clock::now(); + src.activate(); + g.wait_for_all(); + spdlog::info("Total execution time: {} microseconds", + duration_cast(steady_clock::now() - start_time).count()); + + CHECK(run_counter == num_runs); + CHECK(spill_counter == num_spills); +} diff --git a/test/unfold.cpp b/test/unfold.cpp index 2690b1bc..ab5d3550 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -103,7 +103,7 @@ TEST_CASE("Splitting the processing", "[graph]") g.fold("add", add, concurrency::unlimited, "event") .input_family("new_number") .output_products("sum1"); - g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"); + g.observe("check_sum", check_sum, concurrency::serial).input_family("sum1"); g.unfold( &iterate_through::predicate, &iterate_through::unfold, concurrency::unlimited, "lower2") @@ -112,7 +112,7 @@ TEST_CASE("Splitting the processing", "[graph]") g.fold("add_numbers", add_numbers, concurrency::unlimited, "event") .input_family("each_number") .output_products("sum2"); - g.observe("check_sum_same", check_sum_same, concurrency::unlimited).input_family("sum2"); + g.observe("check_sum_same", check_sum_same, concurrency::serial).input_family("sum2"); g.make().output( "save", &test::products_for_output::save, concurrency::serial);