From 57ca8a245d24a087dbdfb90c9cbdfb083b060bc3 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 11:37:12 +0100 Subject: [PATCH 01/11] DPL: cleanup creation of DataProcessorInfo - Use aggregate initialisation where possible - Drop unused bits Will simplify the plugins PR. --- Framework/Core/test/test_DataAllocator.cxx | 1 - .../Core/test/test_DeviceSpecHelpers.cxx | 31 +++++++++---------- .../Core/test/test_FrameworkDataFlowToDDS.cxx | 16 +++++----- .../test_FrameworkDataFlowToO2Control.cxx | 8 ++--- .../Core/test/test_WorkflowSerialization.cxx | 10 +++--- 5 files changed, 32 insertions(+), 34 deletions(-) diff --git a/Framework/Core/test/test_DataAllocator.cxx b/Framework/Core/test/test_DataAllocator.cxx index acdae51cab8e9..fefb6438b98d5 100644 --- a/Framework/Core/test/test_DataAllocator.cxx +++ b/Framework/Core/test/test_DataAllocator.cxx @@ -73,7 +73,6 @@ DataProcessorSpec getSourceSpec() { static_assert(enable_root_serialization::value, "enable_root_serialization must be true"); auto processingFct = [](ProcessingContext& pc) { - static int counter = 0; o2::test::TriviallyCopyable a(42, 23, 0xdead); o2::test::Polymorphic b(0xbeef); std::vector c{{0xaffe}, {0xd00f}}; diff --git a/Framework/Core/test/test_DeviceSpecHelpers.cxx b/Framework/Core/test/test_DeviceSpecHelpers.cxx index 6240e784d09d3..4184be848c5ec 100644 --- a/Framework/Core/test/test_DeviceSpecHelpers.cxx +++ b/Framework/Core/test/test_DeviceSpecHelpers.cxx @@ -16,7 +16,6 @@ #include "Framework/DriverConfig.h" #include "../src/DeviceSpecHelpers.h" #include -#include #include #include #include @@ -67,10 +66,10 @@ void check(const std::vector& arguments, std::vector dataProcessorInfos; for (auto& [name, _] : matrix) { dataProcessorInfos.push_back(DataProcessorInfo{ - name, - "executable-name", - arguments, - workflowOptions, + .name = name, + .executable = "executable-name", + .cmdLineArgs = arguments, + .workflowOptions = workflowOptions, }); } DriverConfig driverConfig{}; @@ -184,7 +183,7 @@ TEST_CASE("CheckOptionReworking") { { std::vector infos = { - {{}, {}, {"--driver-client-backend", "foo"}}, + {.cmdLineArgs = {"--driver-client-backend", "foo"}}, {}}; DeviceSpecHelpers::reworkHomogeneousOption(infos, "--driver-client-backend", "stdout://"); REQUIRE(infos[0].cmdLineArgs[1] == "foo"); @@ -192,30 +191,30 @@ TEST_CASE("CheckOptionReworking") } { std::vector infos = { - {{}, {}, {"--driver-client-backend", "foo"}}, - {{}, {}, {"--driver-client-backend", "bar"}}}; + {.cmdLineArgs = {"--driver-client-backend", "foo"}}, + {.cmdLineArgs = {"--driver-client-backend", "bar"}}}; REQUIRE_THROWS_AS(DeviceSpecHelpers::reworkHomogeneousOption(infos, "--driver-client-backend", "stdout://"), o2::framework::RuntimeErrorRef); } { std::vector infos = { - {{}, {}, {"--driver-client-backend", "foo"}}, - {{}, {}, {"--driver-client-backend", "foo"}}}; + {.cmdLineArgs = {"--driver-client-backend", "foo"}}, + {.cmdLineArgs = {"--driver-client-backend", "foo"}}}; DeviceSpecHelpers::reworkHomogeneousOption(infos, "--driver-client-backend", "stdout://"); REQUIRE(infos[0].cmdLineArgs[1] == "foo"); REQUIRE(infos[1].cmdLineArgs[1] == "foo"); } { std::vector infos = { - {{}, {}, {"foo", "bar"}}, - {{}, {}, {"fnjcnak", "foo"}}}; + {.cmdLineArgs = {"foo", "bar"}}, + {.cmdLineArgs = {"fnjcnak", "foo"}}}; DeviceSpecHelpers::reworkHomogeneousOption(infos, "--driver-client-backend", "stdout://"); REQUIRE(infos[0].cmdLineArgs[3] == "stdout://"); REQUIRE(infos[1].cmdLineArgs[3] == "stdout://"); } { std::vector infos = { - {{}, {}, {"foo", "bar", "--driver-client-backend", "bar"}}, - {{}, {}, {"fnjcnak", "foo"}}}; + {.cmdLineArgs = {"foo", "bar", "--driver-client-backend", "bar"}}, + {.cmdLineArgs = {"fnjcnak", "foo"}}}; DeviceSpecHelpers::reworkHomogeneousOption(infos, "--driver-client-backend", "stdout://"); REQUIRE(infos[0].cmdLineArgs[3] == "bar"); REQUIRE(infos[1].cmdLineArgs[3] == "bar"); @@ -277,8 +276,8 @@ TEST_CASE("CheckIntegerReworking") } { std::vector infos = { - {{}, {}, {"foo", "bar", "--readers", "3"}}, - {{}, {}, {"--readers", "2"}}}; + {.cmdLineArgs = {"foo", "bar", "--readers", "3"}}, + {.cmdLineArgs = {"--readers", "2"}}}; DeviceSpecHelpers::reworkIntegerOption( infos, "--readers", []() { return 1; }, 1, [](long long x, long long y) { return x > y ? x : y; }); REQUIRE(infos[0].cmdLineArgs.size() == 4); diff --git a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx index 593728696e77a..dd3b2eb80d253 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx @@ -142,10 +142,10 @@ TEST_CASE("TestDDS") std::vector dataProcessorInfos = { { - {"A", "bcsadc/foo", {}, workflowOptions}, - {"B", "foo", {}, workflowOptions}, - {"C", "foo", {}, workflowOptions}, - {"D", "foo", {}, workflowOptions}, + {.name = "A", .executable = "bcsadc/foo", .workflowOptions = workflowOptions}, + {.name = "B", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "C", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "D", .executable = "foo", .workflowOptions = workflowOptions}, }}; DriverConfig driverConfig = { .batch = true, @@ -406,10 +406,10 @@ TEST_CASE("TestDDSExpendable") std::vector dataProcessorInfos = { { - {"A", "bcsadc/foo", {}, workflowOptions}, - {"B", "foo", {}, workflowOptions}, - {"C", "foo", {}, workflowOptions}, - {"D", "foo", {}, workflowOptions}, + {.name = "A", .executable = "bcsadc/foo", .workflowOptions = workflowOptions}, + {.name = "B", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "C", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "D", .executable = "foo", .workflowOptions = workflowOptions}, }}; DriverConfig driverConfig = { .batch = true, diff --git a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx index d5f402aa16caa..9cdbc357f9674 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx @@ -561,10 +561,10 @@ TEST_CASE("TestO2ControlDump") std::vector dataProcessorInfos = { { - {"A", "bcsadc/foo", {}, workflowOptions}, - {"B", "foo", {}, workflowOptions}, - {"C", "foo", {}, workflowOptions}, - {"D", "foo", {}, workflowOptions}, + {.name = "A", .executable = "bcsadc/foo", .workflowOptions = workflowOptions}, + {.name = "B", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "C", .executable = "foo", .workflowOptions = workflowOptions}, + {.name = "D", .executable = "foo", .workflowOptions = workflowOptions}, }}; DriverConfig driverConfig{ diff --git a/Framework/Core/test/test_WorkflowSerialization.cxx b/Framework/Core/test/test_WorkflowSerialization.cxx index 6e541f7d22f07..298956970713d 100644 --- a/Framework/Core/test/test_WorkflowSerialization.cxx +++ b/Framework/Core/test/test_WorkflowSerialization.cxx @@ -52,10 +52,10 @@ TEST_CASE("TestVerifyWorkflowSerialization") {{"key1", "v\"al'1"}, {"", "val2"}, {"key3", ""}, {"", ""}}}}; std::vector dataProcessorInfoOut{ - {"A", "test_Framework_test_SerializationWorkflow", {"foo"}, {ConfigParamSpec{"aBool", VariantType::Bool, true, {"A Bool"}}}}, - {"B", "test_Framework_test_SerializationWorkflow", {"b-bar", "bfoof", "fbdbfaso"}}, - {"C", "test_Framework_test_SerializationWorkflow", {}}, - {"D", "test_Framework_test_SerializationWorkflow", {}}, + {.name = "A", .executable = "test_Framework_test_SerializationWorkflow", .cmdLineArgs = {"foo"}, .workflowOptions = {ConfigParamSpec{"aBool", VariantType::Bool, true, {"A Bool"}}}}, + {.name = "B", .executable = "test_Framework_test_SerializationWorkflow", .cmdLineArgs = {"b-bar", "bfoof", "fbdbfaso"}}, + {.name = "C", .executable = "test_Framework_test_SerializationWorkflow"}, + {.name = "D", .executable = "test_Framework_test_SerializationWorkflow"}, }; CommandInfo commandInfoOut{"o2-dpl-workflow -b --option 1 --option 2"}; @@ -94,7 +94,7 @@ TEST_CASE("TestVerifyWildcard") }}; std::vector dataProcessorInfoOut{ - {"A", "test_Framework_test_SerializationWorkflow", {}}, + {.name = "A", .executable = "test_Framework_test_SerializationWorkflow"}, }; CommandInfo commandInfoOut{"o2-dpl-workflow -b --option 1 --option 2"}; From bf6a05e0f38dece3e0f17853542ca13d16656d64 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 20 Mar 2025 12:24:28 +0100 Subject: [PATCH 02/11] DPL: allow searching for plugins in executables as well --- Framework/Core/include/Framework/PluginManager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/PluginManager.h b/Framework/Core/include/Framework/PluginManager.h index d6b16f01ad713..96281942e667d 100644 --- a/Framework/Core/include/Framework/PluginManager.h +++ b/Framework/Core/include/Framework/PluginManager.h @@ -87,7 +87,7 @@ struct PluginManager { #else auto libraryName = fmt::format("lib{}.so", loadablePlugin.library); #endif - auto ret = uv_dlopen(libraryName.c_str(), &handle); + auto ret = uv_dlopen(loadablePlugin.library.empty() ? nullptr : libraryName.c_str(), &handle); if (ret != 0) { LOGP(error, "Could not load library {}", loadablePlugin.library); LOG(error) << uv_dlerror(&handle); From 56088c23f2d350f12013cd0ca3e4b9c3d8b58c22 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 03/11] DPL: first step to make workflow definition plugins This is the first step towards having the workflow definition inside plugins rather than in executables. This will allow accumulating plugins which are needed to instantiate a topology and do the option parsing / topology building only once, simplifying the current case. The end goal is to allow the driver to preload certain common services (e.g. ROOT) and share it among the different tasks (which at the moment it's not allowed because different tasks are in different executables). Moreover this will allow us to coalesce strictly coupled dataprocessors and reduce the number of running processes. For now the plugins are embedded in the executables and behave exactly like before. --- Framework/Core/include/Framework/Plugins.h | 3 + .../Framework/WorkflowDefinitionContext.h | 56 ++++++++ .../include/Framework/runDataProcessing.h | 126 +++++++++++------- Framework/Core/src/runDataProcessing.cxx | 78 ++++++----- .../Core/src/runDataProcessingPlugin.cxx | 54 ++++++++ cmake/O2AddWorkflow.cmake | 19 ++- 6 files changed, 249 insertions(+), 87 deletions(-) create mode 100644 Framework/Core/include/Framework/WorkflowDefinitionContext.h create mode 100644 Framework/Core/src/runDataProcessingPlugin.cxx diff --git a/Framework/Core/include/Framework/Plugins.h b/Framework/Core/include/Framework/Plugins.h index 925943c6bffc3..3d320c6f2abb5 100644 --- a/Framework/Core/include/Framework/Plugins.h +++ b/Framework/Core/include/Framework/Plugins.h @@ -44,6 +44,9 @@ enum struct DplPluginKind : int { // using the arrow dataset API RootObjectReadingImplementation, + // A plugin which defines a whole workflow. This will be used to separate + // workflows in shared libraries and run them via a separate loader. + Workflow, // A plugin which was not initialised properly. Unknown }; diff --git a/Framework/Core/include/Framework/WorkflowDefinitionContext.h b/Framework/Core/include/Framework/WorkflowDefinitionContext.h new file mode 100644 index 0000000000000..b7c29aa6077b3 --- /dev/null +++ b/Framework/Core/include/Framework/WorkflowDefinitionContext.h @@ -0,0 +1,56 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ +#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ + +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/DispatchPolicy.h" +#include "Framework/ResourcePolicy.h" +#include "Framework/CallbacksPolicy.h" +#include "Framework/SendingPolicy.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ChannelConfigurationPolicy.h" +#include + +namespace o2::framework +{ + +struct WorkflowDefinitionContext { + std::vector workflowOptions; + std::vector completionPolicies; + std::vector dispatchPolicies; + std::vector resourcePolicies; + std::vector callbacksPolicies; + std::vector sendingPolicies; + std::vector extraOptions; + std::vector channelPolicies; + std::unique_ptr configContext; + + // For the moment, let's put them here. We should + // probably move them to a different place, since these are not really part + // of the workflow definition but will be there also at runtine. + std::unique_ptr configRegistry{nullptr}; + std::unique_ptr workflowOptionsRegistry{nullptr}; + + o2::framework::WorkflowSpec specs; +}; + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct WorkflowPlugin { + virtual o2::framework::WorkflowDefinition* create() = 0; +}; + +} // namespace o2::framework +#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index 07083314af12e..4a1d1954e7075 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -26,7 +26,9 @@ #include "Framework/CustomWorkflowTerminationHook.h" #include "Framework/CommonServices.h" #include "Framework/WorkflowCustomizationHelpers.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/Logger.h" +#include "Framework/Plugins.h" #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "ResourcePolicy.h" @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector& workflow); // This comes from the framework itself. This way we avoid code duplication. -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& workflowOptions, - std::vector const& detectedOptions, - o2::framework::ConfigContext& configContext); +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext); void doDefaultWorkflowTerminationHook(); @@ -167,55 +160,44 @@ void callWorkflowTermination(T&, char const* idstring) void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflow); -o2::framework::ConfigContext createConfigContext(std::unique_ptr& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv); +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv); std::unique_ptr createRegistry(); -int mainNoCatch(int argc, char** argv) -{ - using namespace o2::framework; +char* getIdString(int argc, char** argv); - std::vector workflowOptions; - UserCustomizationsHelper::userDefinedCustomization(workflowOptions); - auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); - workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); - - std::vector completionPolicies = injectCustomizations(); - std::vector dispatchPolicies = injectCustomizations(); - std::vector resourcePolicies = injectCustomizations(); - std::vector callbacksPolicies = injectCustomizations(); - std::vector sendingPolicies = injectCustomizations(); - - std::unique_ptr configRegistry = createRegistry(); - std::vector extraOptions; - std::unique_ptr workflowOptionsRegistry{nullptr}; - auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv); - - o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); - overrideAll(configContext, specs); - for (auto& spec : specs) { - UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); - } - std::vector channelPolicies; - UserCustomizationsHelper::userDefinedCustomization(channelPolicies); - auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext); - channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); - return doMain(argc, argv, specs, - channelPolicies, completionPolicies, dispatchPolicies, - resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext); +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +// This is to allow the old "executable" based behavior +// Each executable will contain a plugin called InternalWorkflow +// In case one wants to use the new DSO based approach, the +// name of the plugin an the library name where it is located +// will have to be specified at build time. +#ifndef DPL_WORKFLOW_PLUGIN_NAME +#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow +#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY +#error Missing DPL_WORKFLOW_PLUGIN_NAME +#endif +#define DPL_WORKFLOW_PLUGIN_LIBRARY +#endif + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); } -int callMain(int argc, char** argv, int (*)(int, char**)); -char* getIdString(int argc, char** argv); +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); int main(int argc, char** argv) { using namespace o2::framework; - int result = callMain(argc, argv, mainNoCatch); + int result = callMain(argc, argv, pluginName()); char* idstring = getIdString(argc, argv); o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook; @@ -223,4 +205,52 @@ int main(int argc, char** argv) return result; } + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin { + o2::framework::WorkflowDefinition* create() override + { + return new o2::framework::WorkflowDefinition{ + .defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext { + using namespace o2::framework; + WorkflowDefinitionContext workflowContext; + + UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions); + auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); + workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); + + workflowContext.completionPolicies = injectCustomizations(); + workflowContext.dispatchPolicies = injectCustomizations(); + workflowContext.resourcePolicies = injectCustomizations(); + workflowContext.callbacksPolicies = injectCustomizations(); + workflowContext.sendingPolicies = injectCustomizations(); + + workflowContext.configRegistry = createRegistry(); + workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv); + + workflowContext.specs = defineDataProcessing(*workflowContext.configContext); + overrideAll(*workflowContext.configContext, workflowContext.specs); + for (auto& spec : workflowContext.specs) { + UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); + } + UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies); + auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext); + workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); + return workflowContext; + }}; + } +}; + +// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion +extern "C" { +DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous) +{ + previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous}; + return previous; +} +} + #endif diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 6c38945039d84..b1642fe4187bb 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -13,9 +13,7 @@ #include #include "Framework/BoostOptionsRetriever.h" #include "Framework/BacktraceHelpers.h" -#include "Framework/CallbacksPolicy.h" #include "Framework/ChannelConfigurationPolicy.h" -#include "Framework/ChannelMatching.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigContext.h" @@ -38,7 +36,6 @@ #include "Framework/ServiceRegistryHelpers.h" #include "Framework/DevicesManager.h" #include "Framework/DebugGUI.h" -#include "Framework/LocalRootFileService.h" #include "Framework/LogParsingHelpers.h" #include "Framework/Logger.h" #include "Framework/ParallelContext.h" @@ -62,6 +59,7 @@ #include "Framework/DeviceContext.h" #include "Framework/ServiceMetricsInfo.h" #include "Framework/DataTakingContext.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/CommonServices.h" #include "Framework/DefaultsHelpers.h" #include "ProcessingPoliciesHelpers.h" @@ -192,13 +190,28 @@ char* getIdString(int argc, char** argv) return nullptr; } -int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext); + +int callMain(int argc, char** argv, char const* pluginSpec) { + std::vector plugins; + auto morePlugins = PluginManager::parsePluginSpecString(pluginSpec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + // Only one for now + assert(plugins.size() == 1); + + std::vector availableWorkflows; + PluginManager::loadFromPlugin(plugins, availableWorkflows); + + assert(availableWorkflows.size() == 1); static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0"); int result = 1; + o2::framework::WorkflowDefinitionContext workflowContext = availableWorkflows.back().defineWorkflow(argc, argv); if (noCatch) { try { - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, workflowContext); } catch (o2::framework::RuntimeErrorRef& ref) { doDPLException(ref, argv[0]); throw; @@ -209,7 +222,7 @@ int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) // SFINAE expression above fit better the version which invokes user code over // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, workflowContext); } catch (boost::exception& e) { doBoostException(e, argv[0]); throw; @@ -2808,10 +2821,10 @@ void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv) +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv) { std::vector> retrievers; std::unique_ptr retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)}; @@ -2825,7 +2838,7 @@ o2::framework::ConfigContext createConfigContext(std::unique_ptr(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv); } std::unique_ptr createRegistry() @@ -2842,16 +2855,7 @@ std::unique_ptr createRegistry() // killing them all on ctrl-c). // - Child, pick the data-processor ID and start a O2DataProcessorDevice for // each DataProcessorSpec -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& currentWorkflowOptions, - std::vector const& detectedParams, - o2::framework::ConfigContext& configContext) +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext) { // Peek very early in the driver options and look for // signposts, so the we can enable it without going through the whole dance @@ -2870,7 +2874,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, WorkflowInfo currentWorkflow{ argv[0], currentArgs, - currentWorkflowOptions}; + workflowContext.workflowOptions}; ProcessingPolicies processingPolicies; enum LogParsingHelpers::LogLevel minFailureLevel; @@ -2920,7 +2924,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, bpo::options_description visibleOptions; visibleOptions.add(executorOptions); - auto physicalWorkflow = workflow; + auto physicalWorkflow = workflowContext.specs; std::map rankIndex; // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the @@ -2931,11 +2935,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, size_t workflowHashA = 0; std::hash hash_fn; - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { workflowHashA += hash_fn(dp.name); } - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { rankIndex.insert(std::make_pair(dp.name, workflowHashA)); } @@ -2987,7 +2991,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES")); ServiceSpecs driverServices = ServiceSpecHelpers::filterDisabled(CommonDriverServices::defaultServices(), driverServicesOverride); // We insert the hash for the internal devices. - WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext); + WorkflowHelpers::injectServiceDevices(physicalWorkflow, *workflowContext.configContext); auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec& spec) { return spec.name == "internal-dpl-aod-reader"; }); if (reader != physicalWorkflow.end()) { driverServices.push_back(ArrowSupport::arrowBackendSpec()); @@ -2997,7 +3001,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, continue; } WorkflowSpecNode node{physicalWorkflow}; - service.injectTopology(node, configContext); + service.injectTopology(node, *workflowContext.configContext); } for (auto& dp : physicalWorkflow) { if (dp.name.rfind("internal-", 0) == 0) { @@ -3103,7 +3107,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // Use the hidden options as veto, all config specs matching a definition // in the hidden options are skipped in order to avoid duplicate definitions // in the main parser. Note: all config specs are forwarded to devices - visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions)); + visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowContext.workflowOptions, gHiddenDeviceOptions)); bpo::options_description od; od.add(visibleOptions); @@ -3139,7 +3143,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, conflicting_options(varmap, "no-batch", "batch"); if (varmap.count("help")) { - printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions); + printHelp(varmap, executorOptions, physicalWorkflow, workflowContext.workflowOptions); exit(0); } /// Set the fair::Logger severity to the one specified in the command line @@ -3189,16 +3193,16 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, return true; }; DriverInfo driverInfo{ - .sendingPolicies = sendingPolicies, + .sendingPolicies = workflowContext.sendingPolicies, .forwardingPolicies = forwardingPolicies, - .callbacksPolicies = callbacksPolicies}; + .callbacksPolicies = workflowContext.callbacksPolicies}; driverInfo.states.reserve(10); driverInfo.sigintRequested = false; driverInfo.sigchldRequested = false; - driverInfo.channelPolicies = channelPolicies; - driverInfo.completionPolicies = completionPolicies; - driverInfo.dispatchPolicies = dispatchPolicies; - driverInfo.resourcePolicies = resourcePolicies; + driverInfo.channelPolicies = workflowContext.channelPolicies; + driverInfo.completionPolicies = workflowContext.completionPolicies; + driverInfo.dispatchPolicies = workflowContext.dispatchPolicies; + driverInfo.resourcePolicies = workflowContext.resourcePolicies; driverInfo.argc = argc; driverInfo.argv = argv; driverInfo.noSHMCleanup = varmap["no-cleanup"].as(); @@ -3230,7 +3234,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos; - driverInfo.configContext = &configContext; + driverInfo.configContext = workflowContext.configContext.get(); DriverControl driverControl; initialiseDriverControl(varmap, driverInfo, driverControl); @@ -3259,7 +3263,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo, driverConfig, gDeviceMetricsInfos, - detectedParams, + workflowContext.extraOptions, varmap, driverServices, frameworkId); diff --git a/Framework/Core/src/runDataProcessingPlugin.cxx b/Framework/Core/src/runDataProcessingPlugin.cxx new file mode 100644 index 0000000000000..c10665da157a0 --- /dev/null +++ b/Framework/Core/src/runDataProcessingPlugin.cxx @@ -0,0 +1,54 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include +#include +#include + +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); + +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); +} + +int main(int argc, char** argv) +{ + // Allow this code to lunch different plugins compared to the one + // associated with it. + auto pluginOption = "--workflow-plugin"; + int optionSize = strlen(pluginOption); + std::string pluginSpec = ""; + for (int i = 0; i < argc; i++) { + int argSize = strlen(argv[i]); + if (argSize < optionSize) { + continue; + } + if (argSize > optionSize && argv[i][optionSize] == '=') { + pluginSpec = std::string_view(argv[i] + optionSize + 1); + break; + } + if (argSize == optionSize && (strncmp(argv[i], pluginOption, optionSize) == 0) && i < argc) { + pluginSpec = argv[i+i]; + break; + } + } + + if (pluginSpec.empty()) { + pluginSpec = pluginName(); + } + + std::cerr << "Loading plugin " << pluginSpec << std::endl; + return callMain(argc, argv, pluginSpec.c_str()); +} diff --git a/cmake/O2AddWorkflow.cmake b/cmake/O2AddWorkflow.cmake index b952890921f45..17d6b95497ad7 100644 --- a/cmake/O2AddWorkflow.cmake +++ b/cmake/O2AddWorkflow.cmake @@ -35,11 +35,26 @@ function(o2_add_dpl_workflow baseTargetName) message(FATAL_ERROR "Got trailing arguments ${A_UNPARSED_ARGUMENTS}") endif() - o2_add_executable(${baseTargetName} - COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + string(REGEX REPLACE "(^|-)([a-z])" "\\2" pluginName "${baseTargetName}") + + o2_add_library(${pluginName}Plugin SOURCES ${A_SOURCES} PUBLIC_LINK_LIBRARIES O2::Framework ${A_PUBLIC_LINK_LIBRARIES}) + o2_add_executable(${baseTargetName} + COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + SOURCES ${CMAKE_SOURCE_DIR}/Framework/Core/src/runDataProcessingPlugin.cxx + PUBLIC_LINK_LIBRARIES O2::Framework ) + + o2_name_target(${pluginName}Plugin NAME pluginTargetName) + + add_dependencies(${targetExeName} ${pluginTargetName}) + + target_compile_definitions(${targetExeName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + target_compile_definitions(${pluginTargetName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + if(A_TARGETVARNAME) set(${A_TARGETVARNAME} ${targetExeName} From cd676d13226235423535d4f482909fad4e2c1273 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 04/11] Do not fork --- Framework/Core/src/runDataProcessing.cxx | 64 ++++++++++++++++-------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index b1642fe4187bb..075026df15d0a 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -683,20 +683,22 @@ void handle_crash(int sig) } /// This will start a new device by forking and executing a -/// new child -void spawnDevice(uv_loop_t* loop, - DeviceRef ref, - std::vector const& specs, - DriverInfo& driverInfo, - std::vector&, - std::vector& executions, - std::vector& deviceInfos, - std::vector& allStates, - ServiceRegistryRef serviceRegistry, - boost::program_options::variables_map& varmap, - std::vector& childFds, - unsigned parentCPU, - unsigned parentNode) +/// new child. +/// @return the PID of the new process (or 0 if we are in the driver) +std::string spawnDevice(uv_loop_t* loop, + DeviceRef ref, + std::vector const& specs, + DriverInfo& driverInfo, + DriverControl& driverControl, + std::vector&, + std::vector& executions, + std::vector& deviceInfos, + std::vector& allStates, + ServiceRegistryRef serviceRegistry, + boost::program_options::variables_map& varmap, + std::vector& childFds, + unsigned parentCPU, + unsigned parentNode) { // FIXME: this might not work when more than one DPL driver on the same // machine. Hopefully we do not care. @@ -761,6 +763,15 @@ void spawnDevice(uv_loop_t* loop, putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data())); } execvp(execution.args[0], execution.args.data()); + // In the general case we never end up here, because execvp is used. + // Once we move to plugins however, the run the plugin without loading + // a new environment so the new pid can be used to identify. + // + // Let's stop immediately so that we can attach debuggers from here. + driverControl.forcedTransitions = { + DriverState::DO_CHILD, + DriverState::BIND_GUI_PORT}; + return spec.id; } else { O2_SIGNPOST_ID_GENERATE(sid, driver); O2_SIGNPOST_EVENT_EMIT(driver, sid, "spawnDevice", "New child at %{pid}d", id); @@ -844,6 +855,7 @@ void spawnDevice(uv_loop_t* loop, // Let's add also metrics information for the given device gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{}); + return ""; } void processChildrenOutput(uv_loop_t* loop, @@ -2108,15 +2120,25 @@ int runStateMachine(DataProcessorSpecs const& workflow, runningWorkflow.devices[di], controls[di], deviceExecutions[di], infos, allStates); } else { DeviceRef ref{di}; - spawnDevice(loop, - ref, - runningWorkflow.devices, driverInfo, - controls, deviceExecutions, infos, - allStates, - serviceRegistry, varmap, - childFds, parentCPU, parentNode); + frameworkId = spawnDevice(loop, + ref, + runningWorkflow.devices, + driverInfo, + driverControl, + controls, deviceExecutions, infos, + allStates, + serviceRegistry, varmap, + childFds, parentCPU, parentNode); + // We are in the child in this case. Do not continue spawning. + if (!frameworkId.empty()) { + break; + } } } + // Do not bother about the rest of the scheduling if we are in the child. + if (!frameworkId.empty()) { + break; + } handleSignals(); handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles); for (auto& callback : postScheduleCallbacks) { From f51a69672b32229a9e0bd5ec130a72f772baebd3 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 05/11] Add support for plugins rather than executables --- .../include/Framework/DataProcessorInfo.h | 4 +++- .../Core/src/WorkflowSerializationHelpers.cxx | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/Framework/Core/include/Framework/DataProcessorInfo.h b/Framework/Core/include/Framework/DataProcessorInfo.h index 23c9d3f722bd4..ebf8ebf11bfdc 100644 --- a/Framework/Core/include/Framework/DataProcessorInfo.h +++ b/Framework/Core/include/Framework/DataProcessorInfo.h @@ -25,7 +25,9 @@ struct DataProcessorInfo { /// Name of the associated DataProcessorSpec std::string name = "Unknown"; /// The executable name of the program which holds the DataProcessorSpec - std::string executable = "/bin/false"; + std::string executable = ""; + /// The plugin spec of the plugin which holds the DataProcessorSpec + std::string plugin = ""; /// The argument passed on the command line for this DataProcessorSpec std::vector cmdLineArgs = {}; /// The workflow options which are available for the associated DataProcessorSpec diff --git a/Framework/Core/src/WorkflowSerializationHelpers.cxx b/Framework/Core/src/WorkflowSerializationHelpers.cxx index e20e23f98c90b..2cf742bfdaa48 100644 --- a/Framework/Core/src/WorkflowSerializationHelpers.cxx +++ b/Framework/Core/src/WorkflowSerializationHelpers.cxx @@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, IN_DATAPROCESSOR_INFO, IN_DATAPROCESSOR_INFO_NAME, IN_DATAPROCESSOR_INFO_EXECUTABLE, + IN_DATAPROCESSOR_INFO_PLUGIN, IN_DATAPROCESSOR_INFO_ARGS, IN_DATAPROCESSOR_INFO_ARG, IN_DATAPROCESSOR_INFO_CHANNELS, @@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, case State::IN_DATAPROCESSOR_INFO_EXECUTABLE: s << "IN_DATAPROCESSOR_INFO_EXECUTABLE"; break; + case State::IN_DATAPROCESSOR_INFO_PLUGIN: + s << "IN_DATAPROCESSOR_INFO_PLUGIN"; + break; case State::IN_DATAPROCESSOR_INFO_ARGS: s << "IN_DATAPROCESSOR_INFO_ARGS"; break; @@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, push(State::IN_DATAPROCESSOR_INFO_NAME); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE); + } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) { + push(State::IN_DATAPROCESSOR_INFO_PLUGIN); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_ARGS); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) { @@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, } else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) { assert(metadata.size()); metadata.back().executable = s; + } else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) { + assert(metadata.size()); + metadata.back().plugin = s; } else if (in(State::IN_INPUT_BINDING)) { binding = s; } else if (in(State::IN_INPUT_ORIGIN)) { @@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out, w.StartObject(); w.Key("name"); w.String(info.name.c_str()); - w.Key("executable"); - w.String(info.executable.c_str()); + if (!info.executable.empty()) { + w.Key("executable"); + w.String(info.executable.c_str()); + } + if (!info.plugin.empty()) { + w.Key("plugin"); + w.String(info.plugin.c_str()); + } w.Key("cmdLineArgs"); w.StartArray(); for (auto& arg : info.cmdLineArgs) { From f7868dab0463ac74292abcc4b1699d5fc768dd5d Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 06/11] DPL: cleanup creation of DataProcessorInfo - Use aggregate initialisation where possible - Drop unused bits Will simplify the plugins PR. --- Framework/Core/src/runDataProcessing.cxx | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 075026df15d0a..ea3711fec63c2 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1644,12 +1644,11 @@ int runStateMachine(DataProcessorSpecs const& workflow, channels.push_back(channel.name); } dataProcessorInfos.push_back( - DataProcessorInfo{ - device.id, - workflowInfo.executable, - workflowInfo.args, - workflowInfo.options, - channels}); + {.name = device.id, + .executable = workflowInfo.executable, + .cmdLineArgs = workflowInfo.args, + .workflowOptions = workflowInfo.options, + .channels = channels}); } break; case DriverState::MATERIALISE_WORKFLOW: @@ -2123,7 +2122,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, frameworkId = spawnDevice(loop, ref, runningWorkflow.devices, - driverInfo, + driverInfo, driverControl, controls, deviceExecutions, infos, allStates, @@ -2894,9 +2893,9 @@ int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& work } WorkflowInfo currentWorkflow{ - argv[0], - currentArgs, - workflowContext.workflowOptions}; + .executable = argv[0], + .args = currentArgs, + .options = workflowContext.workflowOptions}; ProcessingPolicies processingPolicies; enum LogParsingHelpers::LogLevel minFailureLevel; From 7e3cee8716e656a3956890e35669557bf5ea0a50 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 07/11] Plugin stuff --- Framework/Core/src/runDataProcessing.cxx | 1 + 1 file changed, 1 insertion(+) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index ea3711fec63c2..a6fca75968df5 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1646,6 +1646,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, dataProcessorInfos.push_back( {.name = device.id, .executable = workflowInfo.executable, + .plugin = workflowInfo.plugin, .cmdLineArgs = workflowInfo.args, .workflowOptions = workflowInfo.options, .channels = channels}); From 9b4b39d00027a6f24d8b772382d7e722d2a812e5 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 08/11] DPL: pass plugin to WorfklowInfo --- Framework/Core/src/runDataProcessing.cxx | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index a6fca75968df5..f7bbdc3e5b9aa 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -190,7 +190,7 @@ char* getIdString(int argc, char** argv) return nullptr; } -int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext); +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext); int callMain(int argc, char** argv, char const* pluginSpec) { @@ -211,7 +211,7 @@ int callMain(int argc, char** argv, char const* pluginSpec) o2::framework::WorkflowDefinitionContext workflowContext = availableWorkflows.back().defineWorkflow(argc, argv); if (noCatch) { try { - result = doMain(argc, argv, workflowContext); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (o2::framework::RuntimeErrorRef& ref) { doDPLException(ref, argv[0]); throw; @@ -222,7 +222,7 @@ int callMain(int argc, char** argv, char const* pluginSpec) // SFINAE expression above fit better the version which invokes user code over // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. - result = doMain(argc, argv, workflowContext); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (boost::exception& e) { doBoostException(e, argv[0]); throw; @@ -251,7 +251,6 @@ void getChildData(int infd, DeviceInfo& outinfo) int bytes_read; // NOTE: do not quite understand read ends up blocking if I read more than // once. Oh well... Good enough for now. - int64_t total_bytes_read = 0; int64_t count = 0; bool once = false; while (true) { @@ -1146,6 +1145,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, struct WorkflowInfo { std::string executable; + std::string plugin; std::vector args; std::vector options; }; @@ -2877,7 +2877,7 @@ std::unique_ptr createRegistry() // killing them all on ctrl-c). // - Child, pick the data-processor ID and start a O2DataProcessorDevice for // each DataProcessorSpec -int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& workflowContext) +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext) { // Peek very early in the driver options and look for // signposts, so the we can enable it without going through the whole dance @@ -2895,6 +2895,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& work WorkflowInfo currentWorkflow{ .executable = argv[0], + .plugin = pluginName, .args = currentArgs, .options = workflowContext.workflowOptions}; From 8cbfd8ba9ae756d53961d630decdb860935e8eba Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 09/11] Everything but the signals --- .../Core/include/Framework/DeviceExecution.h | 15 ++++++------ Framework/Core/src/DeviceSpecHelpers.cxx | 2 ++ Framework/Core/src/runDataProcessing.cxx | 24 +++++++++++++++---- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/Framework/Core/include/Framework/DeviceExecution.h b/Framework/Core/include/Framework/DeviceExecution.h index d417a6980cf58..1c7b1ac8c0b3e 100644 --- a/Framework/Core/include/Framework/DeviceExecution.h +++ b/Framework/Core/include/Framework/DeviceExecution.h @@ -8,26 +8,25 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_DEVICEEXECUTION_H -#define FRAMEWORK_DEVICEEXECUTION_H +#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_ +#define O2_FRAMEWORK_DEVICEEXECUTION_H_ #include -namespace o2 -{ -namespace framework +namespace o2::framework { /// This represent one single execution of a Device. It's meant to hold /// information which can change between one execution of a Device and the /// other, e.g. the executable name or the arguments it is started with. struct DeviceExecution { + std::string plugin; /// The options passed to a given device std::vector args; /// The environment to be passed to a given device std::vector environ; }; -} // namespace framework -} // namespace o2 -#endif +} // namespace o2::framework + +#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_ diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index ec0a40e44ac31..e4696d3bd92fb 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1711,6 +1711,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, } O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s", spec.id.c_str(), str.str().c_str()); + // Copy the plugin over from the DataProcessingInfo + execution.plugin = pi->plugin; } } diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index f7bbdc3e5b9aa..45b8e46a87a75 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -717,6 +717,12 @@ std::string spawnDevice(uv_loop_t* loop, id = fork(); // We are the child: prepare options and reexec. if (id == 0) { + if (driverControl.defaultStopped && (execution.plugin.empty() == false)) { + kill(getpid(), SIGSTOP); + } + // Needed in case we want to reuse the same loop of the parent. + uv_loop_fork(loop); + uv_loop_fork(uv_default_loop()); // We allow being debugged and do not terminate on SIGTRAP signal(SIGTRAP, SIG_IGN); // We immediately ignore SIGUSR1 and SIGUSR2 so that we do not @@ -761,7 +767,11 @@ std::string spawnDevice(uv_loop_t* loop, for (auto& env : execution.environ) { putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data())); } - execvp(execution.args[0], execution.args.data()); + if (execution.plugin.empty()) { + LOG(info) << "Child device runs in a separate executable. Launching " << execution.args[0] << " ..."; + execvp(execution.args[0], execution.args.data()); + } + LOG(info) << "Child device uses plugins. Loading " << execution.plugin << "."; // In the general case we never end up here, because execvp is used. // Once we move to plugins however, the run the plugin without loading // a new environment so the new pid can be used to identify. @@ -1098,14 +1108,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, &deviceProxy, &processingPolicies, &deviceContext, - &driverConfig, - &loop](fair::mq::DeviceRunner& r) { + &driverConfig](fair::mq::DeviceRunner& r) { ServiceRegistryRef serviceRef = {serviceRegistry}; simpleRawDeviceService = std::make_unique(nullptr, spec); serviceRef.registerService(ServiceRegistryHelpers::handleForService(simpleRawDeviceService.get())); deviceState = std::make_unique(); - deviceState->loop = loop; + deviceState->loop = uv_loop_new(); deviceState->tracingFlags = DeviceStateHelpers::parseTracingFlags(r.fConfig.GetPropertyAsString("dpl-tracing-flags")); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceState.get())); @@ -1962,10 +1971,15 @@ int runStateMachine(DataProcessorSpecs const& workflow, if (driverControl.defaultStopped) { kill(getpid(), SIGSTOP); } + // We are in the child here, the frameworkId must be set. + assert(!frameworkId.empty()); for (size_t di = 0; di < runningWorkflow.devices.size(); di++) { RunningDeviceRef ref{di}; if (runningWorkflow.devices[di].id == frameworkId) { - return doChild(driverInfo.argc, driverInfo.argv, + auto &execution = deviceExecutions[ref.index]; + // Last pointer is nullptr + assert(execution.args.data()[execution.args.size() -1] == nullptr); + return doChild(execution.args.size()-1, execution.args.data(), serviceRegistry, runningWorkflow, ref, driverConfig, From 50c20c64e95c4614a8b31c23db5b8f3784cefd80 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 21 Mar 2025 12:40:42 +0100 Subject: [PATCH 10/11] Ignore signals for now --- Framework/Core/src/DataProcessingDevice.cxx | 4 ++-- Framework/Core/src/runDataProcessing.cxx | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index ae25d8d3a915c..160ea377b2b65 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1085,8 +1085,8 @@ void DataProcessingDevice::InitTask() if (deviceContext.sigusr1Handle == nullptr) { deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t)); deviceContext.sigusr1Handle->data = &mServiceRegistry; - uv_signal_init(state.loop, deviceContext.sigusr1Handle); - uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1); + // uv_signal_init(state.loop, deviceContext.sigusr1Handle); + // uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1); } // If there is any signal, we want to make sure they are active for (auto& handle : state.activeSignals) { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 45b8e46a87a75..5a578522ac07e 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -729,8 +729,12 @@ std::string spawnDevice(uv_loop_t* loop, // get killed by the parent trying to force stepping children. // We will re-enable them later on, when it is actually safe to // do so. - signal(SIGUSR1, SIG_IGN); - signal(SIGUSR2, SIG_IGN); + // We do not do so if the plugin is there, because that confuses libuv + // FIXME: maybe use libuv to ignore? + if (execution.plugin.empty()) { + signal(SIGUSR1, SIG_IGN); + signal(SIGUSR2, SIG_IGN); + } // This is the child. // For stdout / stderr, we close the read part of the pipe, the From e72a17c1912f2e2680dbc2a3d69293151261e2bc Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 21 Mar 2025 11:41:46 +0000 Subject: [PATCH 11/11] Please consider the following formatting changes --- Framework/Core/src/runDataProcessing.cxx | 6 +++--- Framework/Core/src/runDataProcessingPlugin.cxx | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 5a578522ac07e..b8cb7090d0719 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1980,10 +1980,10 @@ int runStateMachine(DataProcessorSpecs const& workflow, for (size_t di = 0; di < runningWorkflow.devices.size(); di++) { RunningDeviceRef ref{di}; if (runningWorkflow.devices[di].id == frameworkId) { - auto &execution = deviceExecutions[ref.index]; + auto& execution = deviceExecutions[ref.index]; // Last pointer is nullptr - assert(execution.args.data()[execution.args.size() -1] == nullptr); - return doChild(execution.args.size()-1, execution.args.data(), + assert(execution.args.data()[execution.args.size() - 1] == nullptr); + return doChild(execution.args.size() - 1, execution.args.data(), serviceRegistry, runningWorkflow, ref, driverConfig, diff --git a/Framework/Core/src/runDataProcessingPlugin.cxx b/Framework/Core/src/runDataProcessingPlugin.cxx index c10665da157a0..d80db42909148 100644 --- a/Framework/Core/src/runDataProcessingPlugin.cxx +++ b/Framework/Core/src/runDataProcessingPlugin.cxx @@ -40,7 +40,7 @@ int main(int argc, char** argv) break; } if (argSize == optionSize && (strncmp(argv[i], pluginOption, optionSize) == 0) && i < argc) { - pluginSpec = argv[i+i]; + pluginSpec = argv[i + i]; break; } }