From e2ba42477a176a8beb365114ad208e7c5dc5218b Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 14 Apr 2025 14:01:22 +0200 Subject: [PATCH 1/3] DPL: first step to make workflow definition plugins This is the first step towards having the workflow definition inside plugins rather than in executables. This will allow accumulating plugins which are needed to instantiate a topology and do the option parsing / topology building only once, simplifying the current case. The end goal is to allow the driver to preload certain common services (e.g. ROOT) and share it among the different tasks (which at the moment it's not allowed because different tasks are in different executables). Moreover this will allow us to coalesce strictly coupled dataprocessors and reduce the number of running processes. For now the plugins are embedded in the executables and behave exactly like before. --- .../include/Framework/DataProcessorInfo.h | 6 +- .../Core/include/Framework/DeviceExecution.h | 15 +- Framework/Core/include/Framework/Plugins.h | 3 + .../Framework/WorkflowDefinitionContext.h | 56 ++++ .../include/Framework/runDataProcessing.h | 126 +++++---- Framework/Core/src/DeviceSpecHelpers.cxx | 2 + .../Core/src/WorkflowSerializationHelpers.cxx | 21 +- Framework/Core/src/runDataProcessing.cxx | 242 +++++++++++------- .../Core/src/runDataProcessingPlugin.cxx | 54 ++++ cmake/O2AddWorkflow.cmake | 19 +- 10 files changed, 394 insertions(+), 150 deletions(-) create mode 100644 Framework/Core/include/Framework/WorkflowDefinitionContext.h create mode 100644 Framework/Core/src/runDataProcessingPlugin.cxx diff --git a/Framework/Core/include/Framework/DataProcessorInfo.h b/Framework/Core/include/Framework/DataProcessorInfo.h index 23c9d3f722bd4..38c78b2b08898 100644 --- a/Framework/Core/include/Framework/DataProcessorInfo.h +++ b/Framework/Core/include/Framework/DataProcessorInfo.h @@ -25,7 +25,9 @@ struct DataProcessorInfo { /// Name of the associated DataProcessorSpec std::string name = "Unknown"; /// The executable name of the program which holds the DataProcessorSpec - std::string executable = "/bin/false"; + std::string executable = ""; + /// The plugin spec of the plugin which holds the DataProcessorSpec + std::string plugin = ""; /// The argument passed on the command line for this DataProcessorSpec std::vector cmdLineArgs = {}; /// The workflow options which are available for the associated DataProcessorSpec @@ -34,6 +36,6 @@ struct DataProcessorInfo { std::vector channels = {}; }; -} // namespace o2 +} // namespace o2::framework #endif // O2_FRAMEWORK_CORE_DATAPROCESSORINFO_H_ diff --git a/Framework/Core/include/Framework/DeviceExecution.h b/Framework/Core/include/Framework/DeviceExecution.h index d417a6980cf58..1c7b1ac8c0b3e 100644 --- a/Framework/Core/include/Framework/DeviceExecution.h +++ b/Framework/Core/include/Framework/DeviceExecution.h @@ -8,26 +8,25 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_DEVICEEXECUTION_H -#define FRAMEWORK_DEVICEEXECUTION_H +#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_ +#define O2_FRAMEWORK_DEVICEEXECUTION_H_ #include -namespace o2 -{ -namespace framework +namespace o2::framework { /// This represent one single execution of a Device. It's meant to hold /// information which can change between one execution of a Device and the /// other, e.g. the executable name or the arguments it is started with. struct DeviceExecution { + std::string plugin; /// The options passed to a given device std::vector args; /// The environment to be passed to a given device std::vector environ; }; -} // namespace framework -} // namespace o2 -#endif +} // namespace o2::framework + +#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_ diff --git a/Framework/Core/include/Framework/Plugins.h b/Framework/Core/include/Framework/Plugins.h index 925943c6bffc3..3d320c6f2abb5 100644 --- a/Framework/Core/include/Framework/Plugins.h +++ b/Framework/Core/include/Framework/Plugins.h @@ -44,6 +44,9 @@ enum struct DplPluginKind : int { // using the arrow dataset API RootObjectReadingImplementation, + // A plugin which defines a whole workflow. This will be used to separate + // workflows in shared libraries and run them via a separate loader. + Workflow, // A plugin which was not initialised properly. Unknown }; diff --git a/Framework/Core/include/Framework/WorkflowDefinitionContext.h b/Framework/Core/include/Framework/WorkflowDefinitionContext.h new file mode 100644 index 0000000000000..b7c29aa6077b3 --- /dev/null +++ b/Framework/Core/include/Framework/WorkflowDefinitionContext.h @@ -0,0 +1,56 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ +#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ + +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/DispatchPolicy.h" +#include "Framework/ResourcePolicy.h" +#include "Framework/CallbacksPolicy.h" +#include "Framework/SendingPolicy.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ChannelConfigurationPolicy.h" +#include + +namespace o2::framework +{ + +struct WorkflowDefinitionContext { + std::vector workflowOptions; + std::vector completionPolicies; + std::vector dispatchPolicies; + std::vector resourcePolicies; + std::vector callbacksPolicies; + std::vector sendingPolicies; + std::vector extraOptions; + std::vector channelPolicies; + std::unique_ptr configContext; + + // For the moment, let's put them here. We should + // probably move them to a different place, since these are not really part + // of the workflow definition but will be there also at runtine. + std::unique_ptr configRegistry{nullptr}; + std::unique_ptr workflowOptionsRegistry{nullptr}; + + o2::framework::WorkflowSpec specs; +}; + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct WorkflowPlugin { + virtual o2::framework::WorkflowDefinition* create() = 0; +}; + +} // namespace o2::framework +#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index 07083314af12e..4a1d1954e7075 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -26,7 +26,9 @@ #include "Framework/CustomWorkflowTerminationHook.h" #include "Framework/CommonServices.h" #include "Framework/WorkflowCustomizationHelpers.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/Logger.h" +#include "Framework/Plugins.h" #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "ResourcePolicy.h" @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector& workflow); // This comes from the framework itself. This way we avoid code duplication. -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& workflowOptions, - std::vector const& detectedOptions, - o2::framework::ConfigContext& configContext); +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext); void doDefaultWorkflowTerminationHook(); @@ -167,55 +160,44 @@ void callWorkflowTermination(T&, char const* idstring) void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflow); -o2::framework::ConfigContext createConfigContext(std::unique_ptr& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv); +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv); std::unique_ptr createRegistry(); -int mainNoCatch(int argc, char** argv) -{ - using namespace o2::framework; +char* getIdString(int argc, char** argv); - std::vector workflowOptions; - UserCustomizationsHelper::userDefinedCustomization(workflowOptions); - auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); - workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); - - std::vector completionPolicies = injectCustomizations(); - std::vector dispatchPolicies = injectCustomizations(); - std::vector resourcePolicies = injectCustomizations(); - std::vector callbacksPolicies = injectCustomizations(); - std::vector sendingPolicies = injectCustomizations(); - - std::unique_ptr configRegistry = createRegistry(); - std::vector extraOptions; - std::unique_ptr workflowOptionsRegistry{nullptr}; - auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv); - - o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); - overrideAll(configContext, specs); - for (auto& spec : specs) { - UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); - } - std::vector channelPolicies; - UserCustomizationsHelper::userDefinedCustomization(channelPolicies); - auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext); - channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); - return doMain(argc, argv, specs, - channelPolicies, completionPolicies, dispatchPolicies, - resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext); +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +// This is to allow the old "executable" based behavior +// Each executable will contain a plugin called InternalWorkflow +// In case one wants to use the new DSO based approach, the +// name of the plugin an the library name where it is located +// will have to be specified at build time. +#ifndef DPL_WORKFLOW_PLUGIN_NAME +#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow +#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY +#error Missing DPL_WORKFLOW_PLUGIN_NAME +#endif +#define DPL_WORKFLOW_PLUGIN_LIBRARY +#endif + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); } -int callMain(int argc, char** argv, int (*)(int, char**)); -char* getIdString(int argc, char** argv); +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); int main(int argc, char** argv) { using namespace o2::framework; - int result = callMain(argc, argv, mainNoCatch); + int result = callMain(argc, argv, pluginName()); char* idstring = getIdString(argc, argv); o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook; @@ -223,4 +205,52 @@ int main(int argc, char** argv) return result; } + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin { + o2::framework::WorkflowDefinition* create() override + { + return new o2::framework::WorkflowDefinition{ + .defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext { + using namespace o2::framework; + WorkflowDefinitionContext workflowContext; + + UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions); + auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); + workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); + + workflowContext.completionPolicies = injectCustomizations(); + workflowContext.dispatchPolicies = injectCustomizations(); + workflowContext.resourcePolicies = injectCustomizations(); + workflowContext.callbacksPolicies = injectCustomizations(); + workflowContext.sendingPolicies = injectCustomizations(); + + workflowContext.configRegistry = createRegistry(); + workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv); + + workflowContext.specs = defineDataProcessing(*workflowContext.configContext); + overrideAll(*workflowContext.configContext, workflowContext.specs); + for (auto& spec : workflowContext.specs) { + UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); + } + UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies); + auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext); + workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); + return workflowContext; + }}; + } +}; + +// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion +extern "C" { +DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous) +{ + previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous}; + return previous; +} +} + #endif diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index ec0a40e44ac31..e4696d3bd92fb 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1711,6 +1711,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, } O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s", spec.id.c_str(), str.str().c_str()); + // Copy the plugin over from the DataProcessingInfo + execution.plugin = pi->plugin; } } diff --git a/Framework/Core/src/WorkflowSerializationHelpers.cxx b/Framework/Core/src/WorkflowSerializationHelpers.cxx index e20e23f98c90b..eeeab4512710c 100644 --- a/Framework/Core/src/WorkflowSerializationHelpers.cxx +++ b/Framework/Core/src/WorkflowSerializationHelpers.cxx @@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, IN_DATAPROCESSOR_INFO, IN_DATAPROCESSOR_INFO_NAME, IN_DATAPROCESSOR_INFO_EXECUTABLE, + IN_DATAPROCESSOR_INFO_PLUGIN, IN_DATAPROCESSOR_INFO_ARGS, IN_DATAPROCESSOR_INFO_ARG, IN_DATAPROCESSOR_INFO_CHANNELS, @@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, case State::IN_DATAPROCESSOR_INFO_EXECUTABLE: s << "IN_DATAPROCESSOR_INFO_EXECUTABLE"; break; + case State::IN_DATAPROCESSOR_INFO_PLUGIN: + s << "IN_DATAPROCESSOR_INFO_PLUGIN"; + break; case State::IN_DATAPROCESSOR_INFO_ARGS: s << "IN_DATAPROCESSOR_INFO_ARGS"; break; @@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, push(State::IN_DATAPROCESSOR_INFO_NAME); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE); + } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) { + push(State::IN_DATAPROCESSOR_INFO_PLUGIN); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_ARGS); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) { @@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, } else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) { assert(metadata.size()); metadata.back().executable = s; + } else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) { + assert(metadata.size()); + metadata.back().plugin = s; } else if (in(State::IN_INPUT_BINDING)) { binding = s; } else if (in(State::IN_INPUT_ORIGIN)) { @@ -888,7 +897,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, if (!states.empty()) { debug << " now in " << states.back(); } - O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str()); + O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size() + 1}, "import", "POP: %s", debug.str().c_str()); return result; } bool in(State o) @@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out, w.StartObject(); w.Key("name"); w.String(info.name.c_str()); - w.Key("executable"); - w.String(info.executable.c_str()); + if (!info.executable.empty()) { + w.Key("executable"); + w.String(info.executable.c_str()); + } + if (!info.plugin.empty()) { + w.Key("plugin"); + w.String(info.plugin.c_str()); + } w.Key("cmdLineArgs"); w.StartArray(); for (auto& arg : info.cmdLineArgs) { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index a7e80134a2cc0..3c224466e824d 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -13,9 +13,7 @@ #include #include "Framework/BoostOptionsRetriever.h" #include "Framework/BacktraceHelpers.h" -#include "Framework/CallbacksPolicy.h" #include "Framework/ChannelConfigurationPolicy.h" -#include "Framework/ChannelMatching.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigContext.h" @@ -38,7 +36,6 @@ #include "Framework/ServiceRegistryHelpers.h" #include "Framework/DevicesManager.h" #include "Framework/DebugGUI.h" -#include "Framework/LocalRootFileService.h" #include "Framework/LogParsingHelpers.h" #include "Framework/Logger.h" #include "Framework/ParallelContext.h" @@ -62,6 +59,7 @@ #include "Framework/DeviceContext.h" #include "Framework/ServiceMetricsInfo.h" #include "Framework/DataTakingContext.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/CommonServices.h" #include "Framework/DefaultsHelpers.h" #include "ProcessingPoliciesHelpers.h" @@ -192,13 +190,29 @@ char* getIdString(int argc, char** argv) return nullptr; } -int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext); + +int callMain(int argc, char** argv, char const* pluginSpec) { + std::vector plugins; + auto morePlugins = PluginManager::parsePluginSpecString(pluginSpec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + // Only one for now + assert(plugins.size() == 1); + + std::vector availableWorkflows; + PluginManager::loadFromPlugin(plugins, availableWorkflows); + + assert(availableWorkflows.size() == 1); static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0"); int result = 1; + o2::framework::WorkflowDefinitionContext workflowContext = availableWorkflows.back().defineWorkflow(argc, argv); + if (noCatch) { try { - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (o2::framework::RuntimeErrorRef& ref) { doDPLException(ref, argv[0]); throw; @@ -209,7 +223,7 @@ int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) // SFINAE expression above fit better the version which invokes user code over // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (boost::exception& e) { doBoostException(e, argv[0]); throw; @@ -238,7 +252,6 @@ void getChildData(int infd, DeviceInfo& outinfo) int bytes_read; // NOTE: do not quite understand read ends up blocking if I read more than // once. Oh well... Good enough for now. - int64_t total_bytes_read = 0; int64_t count = 0; bool once = false; while (true) { @@ -670,20 +683,22 @@ void handle_crash(int sig) } /// This will start a new device by forking and executing a -/// new child -void spawnDevice(uv_loop_t* loop, - DeviceRef ref, - std::vector const& specs, - DriverInfo& driverInfo, - std::vector&, - std::vector& executions, - std::vector& deviceInfos, - std::vector& allStates, - ServiceRegistryRef serviceRegistry, - boost::program_options::variables_map& varmap, - std::vector& childFds, - unsigned parentCPU, - unsigned parentNode) +/// new child. +/// @return the PID of the new process (or 0 if we are in the driver) +std::string spawnDevice(uv_loop_t* loop, + DeviceRef ref, + std::vector const& specs, + DriverInfo& driverInfo, + DriverControl& driverControl, + std::vector&, + std::vector& executions, + std::vector& deviceInfos, + std::vector& allStates, + ServiceRegistryRef serviceRegistry, + boost::program_options::variables_map& varmap, + std::vector& childFds, + unsigned parentCPU, + unsigned parentNode) { // FIXME: this might not work when more than one DPL driver on the same // machine. Hopefully we do not care. @@ -703,14 +718,26 @@ void spawnDevice(uv_loop_t* loop, id = fork(); // We are the child: prepare options and reexec. if (id == 0) { + // If we are using plugins we can stop immediately when + // runnign with -s + if (driverControl.defaultStopped && (execution.plugin.empty() == false) && sameExecutable == true) { + kill(getpid(), SIGSTOP); + } + // From this moment on, the child will print out through the parent + dup2(childFds[ref.index].childstdin[0], STDIN_FILENO); + dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO); + dup2(childFds[ref.index].childstdout[1], STDERR_FILENO); + // Needed in case we want to reuse the same loop of the parent. + uv_loop_fork(loop); + uv_loop_fork(uv_default_loop()); // We allow being debugged and do not terminate on SIGTRAP signal(SIGTRAP, SIG_IGN); // We immediately ignore SIGUSR1 and SIGUSR2 so that we do not // get killed by the parent trying to force stepping children. // We will re-enable them later on, when it is actually safe to // do so. - signal(SIGUSR1, SIG_IGN); - signal(SIGUSR2, SIG_IGN); + // We do not do so if the plugin is there, because that confuses libuv + // FIXME: maybe use libuv to ignore? // This is the child. // For stdout / stderr, we close the read part of the pipe, the @@ -725,19 +752,26 @@ void spawnDevice(uv_loop_t* loop, // idea in the first place, because rlim_cur could be huge // FIXME: I should understand which one is really to be closed and use // CLOEXEC on it. - int rlim_cur = std::min((int)rlim.rlim_cur, 10000); - for (int i = 0; i < rlim_cur; ++i) { - if (childFds[ref.index].childstdin[0] == i) { - continue; - } - if (childFds[ref.index].childstdout[1] == i) { - continue; + int uvSkipped = 0; + // We close all the FD we know we will not use. + // These include: + // - The pipes which we use to redirect the output messages to the parent + for (int i = 0; i < childFds.size(); ++i) { + if (i != ref.index) { + close(childFds[ref.index].childstdin[0]); + close(childFds[ref.index].childstdout[1]); } - close(i); - } - dup2(childFds[ref.index].childstdin[0], STDIN_FILENO); - dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO); - dup2(childFds[ref.index].childstdout[1], STDERR_FILENO); + close(childFds[i].childstdin[1]); + close(childFds[i].childstdout[0]); + } + auto* dummy_handle = (uv_signal_t*)malloc(sizeof(uv_signal_t)); + dummy_handle->data = nullptr; + auto* dummy_handle2 = (uv_signal_t*)malloc(sizeof(uv_signal_t)); + dummy_handle2->data = nullptr; + uv_signal_init(loop, dummy_handle); + uv_signal_init(loop, dummy_handle2); + uv_signal_start(dummy_handle, [](uv_signal_t* handle, int signum) {}, SIGUSR1); + uv_signal_start(dummy_handle2, [](uv_signal_t* handle, int signum) {}, SIGUSR2); for (auto& service : spec.services) { if (service.postForkChild != nullptr) { @@ -747,7 +781,28 @@ void spawnDevice(uv_loop_t* loop, for (auto& env : execution.environ) { putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data())); } - execvp(execution.args[0], execution.args.data()); + + if (execution.plugin.empty() || !sameExecutable) { + if (execution.plugin.empty() == false) { + execution.args.pop_back(); + execution.args.emplace_back(strdup("--workflow-plugin")); + execution.args.push_back(strdup(execution.plugin.c_str())); + execution.args.push_back(nullptr); + } + LOGP(detail, "Child runs {} in a separate executable {} from the current one ({}). Using execvp, resources will not be shared.", + execution.plugin, execution.args[0], driverInfo.argv[0]); + execvp(execution.args[0], execution.args.data()); + } + LOG(info) << "Child device uses plugins. Loading " << execution.plugin << "."; + // In the general case we never end up here, because execvp is used. + // Once we move to plugins however, the run the plugin without loading + // a new environment so the new pid can be used to identify. + // + // Let's stop immediately so that we can attach debuggers from here. + driverControl.forcedTransitions = { + DriverState::DO_CHILD, + DriverState::BIND_GUI_PORT}; + return spec.id; } else { O2_SIGNPOST_ID_GENERATE(sid, driver); O2_SIGNPOST_EVENT_EMIT(driver, sid, "spawnDevice", "New child at %{pid}d", id); @@ -831,6 +886,7 @@ void spawnDevice(uv_loop_t* loop, // Let's add also metrics information for the given device gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{}); + return ""; } void processChildrenOutput(uv_loop_t* loop, @@ -1074,14 +1130,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, &deviceProxy, &processingPolicies, &deviceContext, - &driverConfig, - &loop](fair::mq::DeviceRunner& r) { + &driverConfig](fair::mq::DeviceRunner& r) { ServiceRegistryRef serviceRef = {serviceRegistry}; simpleRawDeviceService = std::make_unique(nullptr, spec); serviceRef.registerService(ServiceRegistryHelpers::handleForService(simpleRawDeviceService.get())); deviceState = std::make_unique(); - deviceState->loop = loop; + deviceState->loop = uv_loop_new(); deviceState->tracingFlags = DeviceStateHelpers::parseTracingFlags(r.fConfig.GetPropertyAsString("dpl-tracing-flags")); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceState.get())); @@ -1121,6 +1176,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, struct WorkflowInfo { std::string executable; + std::string plugin; std::vector args; std::vector options; }; @@ -1619,12 +1675,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, channels.push_back(channel.name); } dataProcessorInfos.push_back( - DataProcessorInfo{ - device.id, - workflowInfo.executable, - workflowInfo.args, - workflowInfo.options, - channels}); + {.name = device.id, + .executable = workflowInfo.executable, + .plugin = workflowInfo.plugin, + .cmdLineArgs = workflowInfo.args, + .workflowOptions = workflowInfo.options, + .channels = channels}); } break; case DriverState::MATERIALISE_WORKFLOW: @@ -1937,10 +1993,16 @@ int runStateMachine(DataProcessorSpecs const& workflow, if (driverControl.defaultStopped) { kill(getpid(), SIGSTOP); } + // We are in the child here, the frameworkId must be set. + assert(!frameworkId.empty()); for (size_t di = 0; di < runningWorkflow.devices.size(); di++) { RunningDeviceRef ref{di}; if (runningWorkflow.devices[di].id == frameworkId) { - return doChild(driverInfo.argc, driverInfo.argv, + auto& execution = deviceExecutions[ref.index]; + // Last pointer is nullptr + assert(execution.args.data()[execution.args.size() - 1] == nullptr); + LOG(info) << "Process " << getpid() << " is about to spawn " << execution.args[0]; + return doChild(execution.args.size() - 1, execution.args.data(), serviceRegistry, runningWorkflow, ref, driverConfig, @@ -2086,24 +2148,38 @@ int runStateMachine(DataProcessorSpecs const& workflow, callback(serviceRegistry, {varmap}); } childFds.resize(runningWorkflow.devices.size()); - for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) { + // Let's create all the pipes upfront. Notice that they will + // be close + for (int di = 0; di < childFds.size(); di++) { auto& context = childFds[di]; createPipes(context.childstdin); createPipes(context.childstdout); + } + for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) { if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[di].resource.hostname != driverInfo.deployHostname) { spawnRemoteDevice(loop, forwardedStdin.str(), runningWorkflow.devices[di], controls[di], deviceExecutions[di], infos, allStates); } else { DeviceRef ref{di}; - spawnDevice(loop, - ref, - runningWorkflow.devices, driverInfo, - controls, deviceExecutions, infos, - allStates, - serviceRegistry, varmap, - childFds, parentCPU, parentNode); + frameworkId = spawnDevice(loop, + ref, + runningWorkflow.devices, + driverInfo, + driverControl, + controls, deviceExecutions, infos, + allStates, + serviceRegistry, varmap, + childFds, parentCPU, parentNode); + // We are in the child in this case. Do not continue spawning. + if (!frameworkId.empty()) { + break; + } } } + // Do not bother about the rest of the scheduling if we are in the child. + if (!frameworkId.empty()) { + break; + } handleSignals(); handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles); for (auto& callback : postScheduleCallbacks) { @@ -2808,10 +2884,10 @@ void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv) +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv) { std::vector> retrievers; std::unique_ptr retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)}; @@ -2825,7 +2901,7 @@ o2::framework::ConfigContext createConfigContext(std::unique_ptr(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv); } std::unique_ptr createRegistry() @@ -2842,16 +2918,7 @@ std::unique_ptr createRegistry() // killing them all on ctrl-c). // - Child, pick the data-processor ID and start a O2DataProcessorDevice for // each DataProcessorSpec -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& currentWorkflowOptions, - std::vector const& detectedParams, - o2::framework::ConfigContext& configContext) +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext) { // Peek very early in the driver options and look for // signposts, so the we can enable it without going through the whole dance @@ -2868,9 +2935,10 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, } WorkflowInfo currentWorkflow{ - argv[0], - currentArgs, - currentWorkflowOptions}; + .executable = argv[0], + .plugin = pluginName, + .args = currentArgs, + .options = workflowContext.workflowOptions}; ProcessingPolicies processingPolicies; enum LogParsingHelpers::LogLevel minFailureLevel; @@ -2920,7 +2988,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, bpo::options_description visibleOptions; visibleOptions.add(executorOptions); - auto physicalWorkflow = workflow; + auto physicalWorkflow = workflowContext.specs; std::map rankIndex; // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the @@ -2931,11 +2999,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, size_t workflowHashA = 0; std::hash hash_fn; - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { workflowHashA += hash_fn(dp.name); } - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { rankIndex.insert(std::make_pair(dp.name, workflowHashA)); } @@ -2987,7 +3055,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES")); ServiceSpecs driverServices = ServiceSpecHelpers::filterDisabled(CommonDriverServices::defaultServices(), driverServicesOverride); // We insert the hash for the internal devices. - WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext); + WorkflowHelpers::injectServiceDevices(physicalWorkflow, *workflowContext.configContext); auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec& spec) { return spec.name == "internal-dpl-aod-reader"; }); if (reader != physicalWorkflow.end()) { driverServices.push_back(ArrowSupport::arrowBackendSpec()); @@ -2997,7 +3065,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, continue; } WorkflowSpecNode node{physicalWorkflow}; - service.injectTopology(node, configContext); + service.injectTopology(node, *workflowContext.configContext); } for (auto& dp : physicalWorkflow) { if (dp.name.rfind("internal-", 0) == 0) { @@ -3103,7 +3171,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // Use the hidden options as veto, all config specs matching a definition // in the hidden options are skipped in order to avoid duplicate definitions // in the main parser. Note: all config specs are forwarded to devices - visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions)); + visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowContext.workflowOptions, gHiddenDeviceOptions)); bpo::options_description od; od.add(visibleOptions); @@ -3139,7 +3207,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, conflicting_options(varmap, "no-batch", "batch"); if (varmap.count("help")) { - printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions); + printHelp(varmap, executorOptions, physicalWorkflow, workflowContext.workflowOptions); exit(0); } /// Set the fair::Logger severity to the one specified in the command line @@ -3189,16 +3257,16 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, return true; }; DriverInfo driverInfo{ - .sendingPolicies = sendingPolicies, + .sendingPolicies = workflowContext.sendingPolicies, .forwardingPolicies = forwardingPolicies, - .callbacksPolicies = callbacksPolicies}; + .callbacksPolicies = workflowContext.callbacksPolicies}; driverInfo.states.reserve(10); driverInfo.sigintRequested = false; driverInfo.sigchldRequested = false; - driverInfo.channelPolicies = channelPolicies; - driverInfo.completionPolicies = completionPolicies; - driverInfo.dispatchPolicies = dispatchPolicies; - driverInfo.resourcePolicies = resourcePolicies; + driverInfo.channelPolicies = workflowContext.channelPolicies; + driverInfo.completionPolicies = workflowContext.completionPolicies; + driverInfo.dispatchPolicies = workflowContext.dispatchPolicies; + driverInfo.resourcePolicies = workflowContext.resourcePolicies; driverInfo.argc = argc; driverInfo.argv = argv; driverInfo.noSHMCleanup = varmap["no-cleanup"].as(); @@ -3230,7 +3298,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos; - driverInfo.configContext = &configContext; + driverInfo.configContext = workflowContext.configContext.get(); DriverControl driverControl; initialiseDriverControl(varmap, driverInfo, driverControl); @@ -3259,7 +3327,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo, driverConfig, gDeviceMetricsInfos, - detectedParams, + workflowContext.extraOptions, varmap, driverServices, frameworkId); diff --git a/Framework/Core/src/runDataProcessingPlugin.cxx b/Framework/Core/src/runDataProcessingPlugin.cxx new file mode 100644 index 0000000000000..3d09248d980c8 --- /dev/null +++ b/Framework/Core/src/runDataProcessingPlugin.cxx @@ -0,0 +1,54 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include +#include +#include + +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); + +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); +} + +int main(int argc, char** argv) +{ + // Allow this code to lunch different plugins compared to the one + // associated with it. + auto pluginOption = "--workflow-plugin"; + int optionSize = strlen(pluginOption); + std::string pluginSpec = ""; + for (int i = 0; i < argc; i++) { + int argSize = strlen(argv[i]); + if (argSize < optionSize) { + continue; + } + if (argSize > optionSize && argv[i][optionSize] == '=') { + pluginSpec = std::string_view(argv[i] + optionSize + 1); + break; + } + if (argSize == optionSize && (strncmp(argv[i], pluginOption, optionSize) == 0) && i < argc) { + pluginSpec = argv[i + 1]; + break; + } + } + + if (pluginSpec.empty()) { + pluginSpec = pluginName(); + } + + std::cerr << "Loading plugin " << pluginSpec << std::endl; + return callMain(argc, argv, pluginSpec.c_str()); +} diff --git a/cmake/O2AddWorkflow.cmake b/cmake/O2AddWorkflow.cmake index b952890921f45..17d6b95497ad7 100644 --- a/cmake/O2AddWorkflow.cmake +++ b/cmake/O2AddWorkflow.cmake @@ -35,11 +35,26 @@ function(o2_add_dpl_workflow baseTargetName) message(FATAL_ERROR "Got trailing arguments ${A_UNPARSED_ARGUMENTS}") endif() - o2_add_executable(${baseTargetName} - COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + string(REGEX REPLACE "(^|-)([a-z])" "\\2" pluginName "${baseTargetName}") + + o2_add_library(${pluginName}Plugin SOURCES ${A_SOURCES} PUBLIC_LINK_LIBRARIES O2::Framework ${A_PUBLIC_LINK_LIBRARIES}) + o2_add_executable(${baseTargetName} + COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + SOURCES ${CMAKE_SOURCE_DIR}/Framework/Core/src/runDataProcessingPlugin.cxx + PUBLIC_LINK_LIBRARIES O2::Framework ) + + o2_name_target(${pluginName}Plugin NAME pluginTargetName) + + add_dependencies(${targetExeName} ${pluginTargetName}) + + target_compile_definitions(${targetExeName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + target_compile_definitions(${pluginTargetName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + if(A_TARGETVARNAME) set(${A_TARGETVARNAME} ${targetExeName} From 7f354f4994cabddceb68752b98fa128b3fe0e6c8 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 16 Apr 2025 07:01:23 +0200 Subject: [PATCH 2/3] More fixes for the plugins --- Framework/Core/src/DeviceSpecHelpers.cxx | 2 ++ Framework/Core/src/runDataProcessing.cxx | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index e4696d3bd92fb..f956bcc13f300 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1566,6 +1566,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("early-forward-policy", bpo::value()); realOdesc.add_options()("session", bpo::value()); realOdesc.add_options()("signposts", bpo::value()); + realOdesc.add_options()("workflow-plugin", bpo::value()); filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc); wordfree(&expansions); return; @@ -1757,6 +1758,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("network-interface", bpo::value(), "network interface to which to bind tpc fmq ports without specified address") // ("early-forward-policy", bpo::value()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") // ("configuration,cfg", bpo::value(), "configuration connection string") // + ("workflow-plugin", bpo::value(), "workflow configuration plugin") // ("driver-client-backend", bpo::value(), "driver connection string") // ("monitoring-backend", bpo::value(), "monitoring connection string") // ("dpl-stats-min-online-publishing-interval", bpo::value(), "minimum flushing interval for online metrics (in s)") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 3c224466e824d..35806cc72fab1 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -716,6 +716,7 @@ std::string spawnDevice(uv_loop_t* loop, // the framework-id as one of the options. pid_t id = 0; id = fork(); + bool sameExecutable = strcmp(execution.args[0], driverInfo.argv[0]) == 0; // We are the child: prepare options and reexec. if (id == 0) { // If we are using plugins we can stop immediately when @@ -1108,6 +1109,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // + ("workflow-plugin", bpo::value()->default_value(""), "workflow plugin to use") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); }); From e2a968bb0a6e998b4c10bfa9c2d98bf688f4c372 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Wed, 16 Apr 2025 09:34:51 +0000 Subject: [PATCH 3/3] Please consider the following formatting changes --- Framework/Core/src/DeviceSpecHelpers.cxx | 2 +- Framework/Core/src/runDataProcessing.cxx | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index f956bcc13f300..cf144e5fb7722 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1758,7 +1758,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("network-interface", bpo::value(), "network interface to which to bind tpc fmq ports without specified address") // ("early-forward-policy", bpo::value()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") // ("configuration,cfg", bpo::value(), "configuration connection string") // - ("workflow-plugin", bpo::value(), "workflow configuration plugin") // + ("workflow-plugin", bpo::value(), "workflow configuration plugin") // ("driver-client-backend", bpo::value(), "driver connection string") // ("monitoring-backend", bpo::value(), "monitoring connection string") // ("dpl-stats-min-online-publishing-interval", bpo::value(), "minimum flushing interval for online metrics (in s)") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 35806cc72fab1..91fd9092a9497 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -791,7 +791,7 @@ std::string spawnDevice(uv_loop_t* loop, execution.args.push_back(nullptr); } LOGP(detail, "Child runs {} in a separate executable {} from the current one ({}). Using execvp, resources will not be shared.", - execution.plugin, execution.args[0], driverInfo.argv[0]); + execution.plugin, execution.args[0], driverInfo.argv[0]); execvp(execution.args[0], execution.args.data()); } LOG(info) << "Child device uses plugins. Loading " << execution.plugin << "."; @@ -1109,7 +1109,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // - ("workflow-plugin", bpo::value()->default_value(""), "workflow plugin to use") // + ("workflow-plugin", bpo::value()->default_value(""), "workflow plugin to use") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); });