diff --git a/Framework/Core/include/Framework/DataProcessorInfo.h b/Framework/Core/include/Framework/DataProcessorInfo.h index 23c9d3f722bd4..38c78b2b08898 100644 --- a/Framework/Core/include/Framework/DataProcessorInfo.h +++ b/Framework/Core/include/Framework/DataProcessorInfo.h @@ -25,7 +25,9 @@ struct DataProcessorInfo { /// Name of the associated DataProcessorSpec std::string name = "Unknown"; /// The executable name of the program which holds the DataProcessorSpec - std::string executable = "/bin/false"; + std::string executable = ""; + /// The plugin spec of the plugin which holds the DataProcessorSpec + std::string plugin = ""; /// The argument passed on the command line for this DataProcessorSpec std::vector cmdLineArgs = {}; /// The workflow options which are available for the associated DataProcessorSpec @@ -34,6 +36,6 @@ struct DataProcessorInfo { std::vector channels = {}; }; -} // namespace o2 +} // namespace o2::framework #endif // O2_FRAMEWORK_CORE_DATAPROCESSORINFO_H_ diff --git a/Framework/Core/include/Framework/DeviceExecution.h b/Framework/Core/include/Framework/DeviceExecution.h index d417a6980cf58..1c7b1ac8c0b3e 100644 --- a/Framework/Core/include/Framework/DeviceExecution.h +++ b/Framework/Core/include/Framework/DeviceExecution.h @@ -8,26 +8,25 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_DEVICEEXECUTION_H -#define FRAMEWORK_DEVICEEXECUTION_H +#ifndef O2_FRAMEWORK_DEVICEEXECUTION_H_ +#define O2_FRAMEWORK_DEVICEEXECUTION_H_ #include -namespace o2 -{ -namespace framework +namespace o2::framework { /// This represent one single execution of a Device. It's meant to hold /// information which can change between one execution of a Device and the /// other, e.g. the executable name or the arguments it is started with. struct DeviceExecution { + std::string plugin; /// The options passed to a given device std::vector args; /// The environment to be passed to a given device std::vector environ; }; -} // namespace framework -} // namespace o2 -#endif +} // namespace o2::framework + +#endif // O2_FRAMEWORK_DEVICEEXECUTION_H_ diff --git a/Framework/Core/include/Framework/Plugins.h b/Framework/Core/include/Framework/Plugins.h index 925943c6bffc3..3d320c6f2abb5 100644 --- a/Framework/Core/include/Framework/Plugins.h +++ b/Framework/Core/include/Framework/Plugins.h @@ -44,6 +44,9 @@ enum struct DplPluginKind : int { // using the arrow dataset API RootObjectReadingImplementation, + // A plugin which defines a whole workflow. This will be used to separate + // workflows in shared libraries and run them via a separate loader. + Workflow, // A plugin which was not initialised properly. Unknown }; diff --git a/Framework/Core/include/Framework/WorkflowDefinitionContext.h b/Framework/Core/include/Framework/WorkflowDefinitionContext.h new file mode 100644 index 0000000000000..b7c29aa6077b3 --- /dev/null +++ b/Framework/Core/include/Framework/WorkflowDefinitionContext.h @@ -0,0 +1,56 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ +#define O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ + +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/DispatchPolicy.h" +#include "Framework/ResourcePolicy.h" +#include "Framework/CallbacksPolicy.h" +#include "Framework/SendingPolicy.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ChannelConfigurationPolicy.h" +#include + +namespace o2::framework +{ + +struct WorkflowDefinitionContext { + std::vector workflowOptions; + std::vector completionPolicies; + std::vector dispatchPolicies; + std::vector resourcePolicies; + std::vector callbacksPolicies; + std::vector sendingPolicies; + std::vector extraOptions; + std::vector channelPolicies; + std::unique_ptr configContext; + + // For the moment, let's put them here. We should + // probably move them to a different place, since these are not really part + // of the workflow definition but will be there also at runtine. + std::unique_ptr configRegistry{nullptr}; + std::unique_ptr workflowOptionsRegistry{nullptr}; + + o2::framework::WorkflowSpec specs; +}; + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct WorkflowPlugin { + virtual o2::framework::WorkflowDefinition* create() = 0; +}; + +} // namespace o2::framework +#endif // O2_FRAMEWORK_WORKFLOWDEFINITIONCONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index 07083314af12e..4a1d1954e7075 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -26,7 +26,9 @@ #include "Framework/CustomWorkflowTerminationHook.h" #include "Framework/CommonServices.h" #include "Framework/WorkflowCustomizationHelpers.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/Logger.h" +#include "Framework/Plugins.h" #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "ResourcePolicy.h" @@ -125,16 +127,7 @@ void overrideCloning(o2::framework::ConfigContext& ctx, std::vector& workflow); // This comes from the framework itself. This way we avoid code duplication. -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& workflowOptions, - std::vector const& detectedOptions, - o2::framework::ConfigContext& configContext); +int doMain(int argc, char** argv, o2::framework::WorkflowDefinitionContext& context, o2::framework::ConfigContext& configContext); void doDefaultWorkflowTerminationHook(); @@ -167,55 +160,44 @@ void callWorkflowTermination(T&, char const* idstring) void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflow); -o2::framework::ConfigContext createConfigContext(std::unique_ptr& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv); +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv); std::unique_ptr createRegistry(); -int mainNoCatch(int argc, char** argv) -{ - using namespace o2::framework; +char* getIdString(int argc, char** argv); - std::vector workflowOptions; - UserCustomizationsHelper::userDefinedCustomization(workflowOptions); - auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); - workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); - - std::vector completionPolicies = injectCustomizations(); - std::vector dispatchPolicies = injectCustomizations(); - std::vector resourcePolicies = injectCustomizations(); - std::vector callbacksPolicies = injectCustomizations(); - std::vector sendingPolicies = injectCustomizations(); - - std::unique_ptr configRegistry = createRegistry(); - std::vector extraOptions; - std::unique_ptr workflowOptionsRegistry{nullptr}; - auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv); - - o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); - overrideAll(configContext, specs); - for (auto& spec : specs) { - UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); - } - std::vector channelPolicies; - UserCustomizationsHelper::userDefinedCustomization(channelPolicies); - auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext); - channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); - return doMain(argc, argv, specs, - channelPolicies, completionPolicies, dispatchPolicies, - resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext); +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +// This is to allow the old "executable" based behavior +// Each executable will contain a plugin called InternalWorkflow +// In case one wants to use the new DSO based approach, the +// name of the plugin an the library name where it is located +// will have to be specified at build time. +#ifndef DPL_WORKFLOW_PLUGIN_NAME +#define DPL_WORKFLOW_PLUGIN_NAME InternalCustomWorkflow +#ifdef DPL_WORKFLOW_PLUGIN_LIBRARY +#error Missing DPL_WORKFLOW_PLUGIN_NAME +#endif +#define DPL_WORKFLOW_PLUGIN_LIBRARY +#endif + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); } -int callMain(int argc, char** argv, int (*)(int, char**)); -char* getIdString(int argc, char** argv); +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); int main(int argc, char** argv) { using namespace o2::framework; - int result = callMain(argc, argv, mainNoCatch); + int result = callMain(argc, argv, pluginName()); char* idstring = getIdString(argc, argv); o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook; @@ -223,4 +205,52 @@ int main(int argc, char** argv) return result; } + +struct WorkflowDefinition { + std::function defineWorkflow; +}; + +struct DPL_WORKFLOW_PLUGIN_NAME : o2::framework::WorkflowPlugin { + o2::framework::WorkflowDefinition* create() override + { + return new o2::framework::WorkflowDefinition{ + .defineWorkflow = [](int argc, char** argv) -> o2::framework::WorkflowDefinitionContext { + using namespace o2::framework; + WorkflowDefinitionContext workflowContext; + + UserCustomizationsHelper::userDefinedCustomization(workflowContext.workflowOptions); + auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); + workflowContext.workflowOptions.insert(std::end(workflowContext.workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); + + workflowContext.completionPolicies = injectCustomizations(); + workflowContext.dispatchPolicies = injectCustomizations(); + workflowContext.resourcePolicies = injectCustomizations(); + workflowContext.callbacksPolicies = injectCustomizations(); + workflowContext.sendingPolicies = injectCustomizations(); + + workflowContext.configRegistry = createRegistry(); + workflowContext.configContext = createConfigContext(workflowContext.workflowOptionsRegistry, *workflowContext.configRegistry, workflowContext.workflowOptions, workflowContext.extraOptions, argc, argv); + + workflowContext.specs = defineDataProcessing(*workflowContext.configContext); + overrideAll(*workflowContext.configContext, workflowContext.specs); + for (auto& spec : workflowContext.specs) { + UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices); + } + UserCustomizationsHelper::userDefinedCustomization(workflowContext.channelPolicies); + auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*workflowContext.configContext); + workflowContext.channelPolicies.insert(std::end(workflowContext.channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies)); + return workflowContext; + }}; + } +}; + +// This is like the plugin macros, we simply do it explicitly to avoid macro inside macro expansion +extern "C" { +DPLPluginHandle* dpl_plugin_callback(DPLPluginHandle* previous) +{ + previous = new DPLPluginHandle{new DPL_WORKFLOW_PLUGIN_NAME{}, strdup(STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME)), o2::framework::DplPluginKind::Workflow, previous}; + return previous; +} +} + #endif diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index ec0a40e44ac31..cf144e5fb7722 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1566,6 +1566,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("early-forward-policy", bpo::value()); realOdesc.add_options()("session", bpo::value()); realOdesc.add_options()("signposts", bpo::value()); + realOdesc.add_options()("workflow-plugin", bpo::value()); filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc); wordfree(&expansions); return; @@ -1711,6 +1712,8 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, } O2_SIGNPOST_END(device_spec_helpers, poid, "prepareArguments", "The following options are being forwarded to %{public}s: %{public}s", spec.id.c_str(), str.str().c_str()); + // Copy the plugin over from the DataProcessingInfo + execution.plugin = pi->plugin; } } @@ -1755,6 +1758,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("network-interface", bpo::value(), "network interface to which to bind tpc fmq ports without specified address") // ("early-forward-policy", bpo::value()->default_value(EarlyForwardPolicy::NEVER), "when to forward early the messages: never, noraw, always") // ("configuration,cfg", bpo::value(), "configuration connection string") // + ("workflow-plugin", bpo::value(), "workflow configuration plugin") // ("driver-client-backend", bpo::value(), "driver connection string") // ("monitoring-backend", bpo::value(), "monitoring connection string") // ("dpl-stats-min-online-publishing-interval", bpo::value(), "minimum flushing interval for online metrics (in s)") // diff --git a/Framework/Core/src/WorkflowSerializationHelpers.cxx b/Framework/Core/src/WorkflowSerializationHelpers.cxx index e20e23f98c90b..eeeab4512710c 100644 --- a/Framework/Core/src/WorkflowSerializationHelpers.cxx +++ b/Framework/Core/src/WorkflowSerializationHelpers.cxx @@ -91,6 +91,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, IN_DATAPROCESSOR_INFO, IN_DATAPROCESSOR_INFO_NAME, IN_DATAPROCESSOR_INFO_EXECUTABLE, + IN_DATAPROCESSOR_INFO_PLUGIN, IN_DATAPROCESSOR_INFO_ARGS, IN_DATAPROCESSOR_INFO_ARG, IN_DATAPROCESSOR_INFO_CHANNELS, @@ -263,6 +264,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, case State::IN_DATAPROCESSOR_INFO_EXECUTABLE: s << "IN_DATAPROCESSOR_INFO_EXECUTABLE"; break; + case State::IN_DATAPROCESSOR_INFO_PLUGIN: + s << "IN_DATAPROCESSOR_INFO_PLUGIN"; + break; case State::IN_DATAPROCESSOR_INFO_ARGS: s << "IN_DATAPROCESSOR_INFO_ARGS"; break; @@ -706,6 +710,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, push(State::IN_DATAPROCESSOR_INFO_NAME); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "executable", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_EXECUTABLE); + } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "plugin", length) == 0) { + push(State::IN_DATAPROCESSOR_INFO_PLUGIN); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "cmdLineArgs", length) == 0) { push(State::IN_DATAPROCESSOR_INFO_ARGS); } else if (in(State::IN_DATAPROCESSOR_INFO) && strncmp(str, "workflowOptions", length) == 0) { @@ -732,6 +738,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, } else if (in(State::IN_DATAPROCESSOR_INFO_EXECUTABLE)) { assert(metadata.size()); metadata.back().executable = s; + } else if (in(State::IN_DATAPROCESSOR_INFO_PLUGIN)) { + assert(metadata.size()); + metadata.back().plugin = s; } else if (in(State::IN_INPUT_BINDING)) { binding = s; } else if (in(State::IN_INPUT_ORIGIN)) { @@ -888,7 +897,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, if (!states.empty()) { debug << " now in " << states.back(); } - O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size()+1}, "import", "POP: %s", debug.str().c_str()); + O2_SIGNPOST_END(workflow_importer, _o2_signpost_id_t{(int64_t)states.size() + 1}, "import", "POP: %s", debug.str().c_str()); return result; } bool in(State o) @@ -1254,8 +1263,14 @@ void WorkflowSerializationHelpers::dump(std::ostream& out, w.StartObject(); w.Key("name"); w.String(info.name.c_str()); - w.Key("executable"); - w.String(info.executable.c_str()); + if (!info.executable.empty()) { + w.Key("executable"); + w.String(info.executable.c_str()); + } + if (!info.plugin.empty()) { + w.Key("plugin"); + w.String(info.plugin.c_str()); + } w.Key("cmdLineArgs"); w.StartArray(); for (auto& arg : info.cmdLineArgs) { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index a7e80134a2cc0..91fd9092a9497 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -13,9 +13,7 @@ #include #include "Framework/BoostOptionsRetriever.h" #include "Framework/BacktraceHelpers.h" -#include "Framework/CallbacksPolicy.h" #include "Framework/ChannelConfigurationPolicy.h" -#include "Framework/ChannelMatching.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigContext.h" @@ -38,7 +36,6 @@ #include "Framework/ServiceRegistryHelpers.h" #include "Framework/DevicesManager.h" #include "Framework/DebugGUI.h" -#include "Framework/LocalRootFileService.h" #include "Framework/LogParsingHelpers.h" #include "Framework/Logger.h" #include "Framework/ParallelContext.h" @@ -62,6 +59,7 @@ #include "Framework/DeviceContext.h" #include "Framework/ServiceMetricsInfo.h" #include "Framework/DataTakingContext.h" +#include "Framework/WorkflowDefinitionContext.h" #include "Framework/CommonServices.h" #include "Framework/DefaultsHelpers.h" #include "ProcessingPoliciesHelpers.h" @@ -192,13 +190,29 @@ char* getIdString(int argc, char** argv) return nullptr; } -int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext); + +int callMain(int argc, char** argv, char const* pluginSpec) { + std::vector plugins; + auto morePlugins = PluginManager::parsePluginSpecString(pluginSpec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + // Only one for now + assert(plugins.size() == 1); + + std::vector availableWorkflows; + PluginManager::loadFromPlugin(plugins, availableWorkflows); + + assert(availableWorkflows.size() == 1); static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0"); int result = 1; + o2::framework::WorkflowDefinitionContext workflowContext = availableWorkflows.back().defineWorkflow(argc, argv); + if (noCatch) { try { - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (o2::framework::RuntimeErrorRef& ref) { doDPLException(ref, argv[0]); throw; @@ -209,7 +223,7 @@ int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**)) // SFINAE expression above fit better the version which invokes user code over // the default one. // The default policy is a catch all pub/sub setup to be consistent with the past. - result = mainNoCatch(argc, argv); + result = doMain(argc, argv, pluginSpec, workflowContext); } catch (boost::exception& e) { doBoostException(e, argv[0]); throw; @@ -238,7 +252,6 @@ void getChildData(int infd, DeviceInfo& outinfo) int bytes_read; // NOTE: do not quite understand read ends up blocking if I read more than // once. Oh well... Good enough for now. - int64_t total_bytes_read = 0; int64_t count = 0; bool once = false; while (true) { @@ -670,20 +683,22 @@ void handle_crash(int sig) } /// This will start a new device by forking and executing a -/// new child -void spawnDevice(uv_loop_t* loop, - DeviceRef ref, - std::vector const& specs, - DriverInfo& driverInfo, - std::vector&, - std::vector& executions, - std::vector& deviceInfos, - std::vector& allStates, - ServiceRegistryRef serviceRegistry, - boost::program_options::variables_map& varmap, - std::vector& childFds, - unsigned parentCPU, - unsigned parentNode) +/// new child. +/// @return the PID of the new process (or 0 if we are in the driver) +std::string spawnDevice(uv_loop_t* loop, + DeviceRef ref, + std::vector const& specs, + DriverInfo& driverInfo, + DriverControl& driverControl, + std::vector&, + std::vector& executions, + std::vector& deviceInfos, + std::vector& allStates, + ServiceRegistryRef serviceRegistry, + boost::program_options::variables_map& varmap, + std::vector& childFds, + unsigned parentCPU, + unsigned parentNode) { // FIXME: this might not work when more than one DPL driver on the same // machine. Hopefully we do not care. @@ -701,16 +716,29 @@ void spawnDevice(uv_loop_t* loop, // the framework-id as one of the options. pid_t id = 0; id = fork(); + bool sameExecutable = strcmp(execution.args[0], driverInfo.argv[0]) == 0; // We are the child: prepare options and reexec. if (id == 0) { + // If we are using plugins we can stop immediately when + // runnign with -s + if (driverControl.defaultStopped && (execution.plugin.empty() == false) && sameExecutable == true) { + kill(getpid(), SIGSTOP); + } + // From this moment on, the child will print out through the parent + dup2(childFds[ref.index].childstdin[0], STDIN_FILENO); + dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO); + dup2(childFds[ref.index].childstdout[1], STDERR_FILENO); + // Needed in case we want to reuse the same loop of the parent. + uv_loop_fork(loop); + uv_loop_fork(uv_default_loop()); // We allow being debugged and do not terminate on SIGTRAP signal(SIGTRAP, SIG_IGN); // We immediately ignore SIGUSR1 and SIGUSR2 so that we do not // get killed by the parent trying to force stepping children. // We will re-enable them later on, when it is actually safe to // do so. - signal(SIGUSR1, SIG_IGN); - signal(SIGUSR2, SIG_IGN); + // We do not do so if the plugin is there, because that confuses libuv + // FIXME: maybe use libuv to ignore? // This is the child. // For stdout / stderr, we close the read part of the pipe, the @@ -725,19 +753,26 @@ void spawnDevice(uv_loop_t* loop, // idea in the first place, because rlim_cur could be huge // FIXME: I should understand which one is really to be closed and use // CLOEXEC on it. - int rlim_cur = std::min((int)rlim.rlim_cur, 10000); - for (int i = 0; i < rlim_cur; ++i) { - if (childFds[ref.index].childstdin[0] == i) { - continue; - } - if (childFds[ref.index].childstdout[1] == i) { - continue; + int uvSkipped = 0; + // We close all the FD we know we will not use. + // These include: + // - The pipes which we use to redirect the output messages to the parent + for (int i = 0; i < childFds.size(); ++i) { + if (i != ref.index) { + close(childFds[ref.index].childstdin[0]); + close(childFds[ref.index].childstdout[1]); } - close(i); - } - dup2(childFds[ref.index].childstdin[0], STDIN_FILENO); - dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO); - dup2(childFds[ref.index].childstdout[1], STDERR_FILENO); + close(childFds[i].childstdin[1]); + close(childFds[i].childstdout[0]); + } + auto* dummy_handle = (uv_signal_t*)malloc(sizeof(uv_signal_t)); + dummy_handle->data = nullptr; + auto* dummy_handle2 = (uv_signal_t*)malloc(sizeof(uv_signal_t)); + dummy_handle2->data = nullptr; + uv_signal_init(loop, dummy_handle); + uv_signal_init(loop, dummy_handle2); + uv_signal_start(dummy_handle, [](uv_signal_t* handle, int signum) {}, SIGUSR1); + uv_signal_start(dummy_handle2, [](uv_signal_t* handle, int signum) {}, SIGUSR2); for (auto& service : spec.services) { if (service.postForkChild != nullptr) { @@ -747,7 +782,28 @@ void spawnDevice(uv_loop_t* loop, for (auto& env : execution.environ) { putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data())); } - execvp(execution.args[0], execution.args.data()); + + if (execution.plugin.empty() || !sameExecutable) { + if (execution.plugin.empty() == false) { + execution.args.pop_back(); + execution.args.emplace_back(strdup("--workflow-plugin")); + execution.args.push_back(strdup(execution.plugin.c_str())); + execution.args.push_back(nullptr); + } + LOGP(detail, "Child runs {} in a separate executable {} from the current one ({}). Using execvp, resources will not be shared.", + execution.plugin, execution.args[0], driverInfo.argv[0]); + execvp(execution.args[0], execution.args.data()); + } + LOG(info) << "Child device uses plugins. Loading " << execution.plugin << "."; + // In the general case we never end up here, because execvp is used. + // Once we move to plugins however, the run the plugin without loading + // a new environment so the new pid can be used to identify. + // + // Let's stop immediately so that we can attach debuggers from here. + driverControl.forcedTransitions = { + DriverState::DO_CHILD, + DriverState::BIND_GUI_PORT}; + return spec.id; } else { O2_SIGNPOST_ID_GENERATE(sid, driver); O2_SIGNPOST_EVENT_EMIT(driver, sid, "spawnDevice", "New child at %{pid}d", id); @@ -831,6 +887,7 @@ void spawnDevice(uv_loop_t* loop, // Let's add also metrics information for the given device gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{}); + return ""; } void processChildrenOutput(uv_loop_t* loop, @@ -1052,6 +1109,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // + ("workflow-plugin", bpo::value()->default_value(""), "workflow plugin to use") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); }); @@ -1074,14 +1132,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, &deviceProxy, &processingPolicies, &deviceContext, - &driverConfig, - &loop](fair::mq::DeviceRunner& r) { + &driverConfig](fair::mq::DeviceRunner& r) { ServiceRegistryRef serviceRef = {serviceRegistry}; simpleRawDeviceService = std::make_unique(nullptr, spec); serviceRef.registerService(ServiceRegistryHelpers::handleForService(simpleRawDeviceService.get())); deviceState = std::make_unique(); - deviceState->loop = loop; + deviceState->loop = uv_loop_new(); deviceState->tracingFlags = DeviceStateHelpers::parseTracingFlags(r.fConfig.GetPropertyAsString("dpl-tracing-flags")); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceState.get())); @@ -1121,6 +1178,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, struct WorkflowInfo { std::string executable; + std::string plugin; std::vector args; std::vector options; }; @@ -1619,12 +1677,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, channels.push_back(channel.name); } dataProcessorInfos.push_back( - DataProcessorInfo{ - device.id, - workflowInfo.executable, - workflowInfo.args, - workflowInfo.options, - channels}); + {.name = device.id, + .executable = workflowInfo.executable, + .plugin = workflowInfo.plugin, + .cmdLineArgs = workflowInfo.args, + .workflowOptions = workflowInfo.options, + .channels = channels}); } break; case DriverState::MATERIALISE_WORKFLOW: @@ -1937,10 +1995,16 @@ int runStateMachine(DataProcessorSpecs const& workflow, if (driverControl.defaultStopped) { kill(getpid(), SIGSTOP); } + // We are in the child here, the frameworkId must be set. + assert(!frameworkId.empty()); for (size_t di = 0; di < runningWorkflow.devices.size(); di++) { RunningDeviceRef ref{di}; if (runningWorkflow.devices[di].id == frameworkId) { - return doChild(driverInfo.argc, driverInfo.argv, + auto& execution = deviceExecutions[ref.index]; + // Last pointer is nullptr + assert(execution.args.data()[execution.args.size() - 1] == nullptr); + LOG(info) << "Process " << getpid() << " is about to spawn " << execution.args[0]; + return doChild(execution.args.size() - 1, execution.args.data(), serviceRegistry, runningWorkflow, ref, driverConfig, @@ -2086,24 +2150,38 @@ int runStateMachine(DataProcessorSpecs const& workflow, callback(serviceRegistry, {varmap}); } childFds.resize(runningWorkflow.devices.size()); - for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) { + // Let's create all the pipes upfront. Notice that they will + // be close + for (int di = 0; di < childFds.size(); di++) { auto& context = childFds[di]; createPipes(context.childstdin); createPipes(context.childstdout); + } + for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) { if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[di].resource.hostname != driverInfo.deployHostname) { spawnRemoteDevice(loop, forwardedStdin.str(), runningWorkflow.devices[di], controls[di], deviceExecutions[di], infos, allStates); } else { DeviceRef ref{di}; - spawnDevice(loop, - ref, - runningWorkflow.devices, driverInfo, - controls, deviceExecutions, infos, - allStates, - serviceRegistry, varmap, - childFds, parentCPU, parentNode); + frameworkId = spawnDevice(loop, + ref, + runningWorkflow.devices, + driverInfo, + driverControl, + controls, deviceExecutions, infos, + allStates, + serviceRegistry, varmap, + childFds, parentCPU, parentNode); + // We are in the child in this case. Do not continue spawning. + if (!frameworkId.empty()) { + break; + } } } + // Do not bother about the rest of the scheduling if we are in the child. + if (!frameworkId.empty()) { + break; + } handleSignals(); handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles); for (auto& callback : postScheduleCallbacks) { @@ -2808,10 +2886,10 @@ void overrideAll(o2::framework::ConfigContext& ctx, std::vector& workflowOptionsRegistry, - o2::framework::ServiceRegistry& configRegistry, - std::vector& workflowOptions, - std::vector& extraOptions, int argc, char** argv) +std::unique_ptr createConfigContext(std::unique_ptr& workflowOptionsRegistry, + o2::framework::ServiceRegistry& configRegistry, + std::vector& workflowOptions, + std::vector& extraOptions, int argc, char** argv) { std::vector> retrievers; std::unique_ptr retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)}; @@ -2825,7 +2903,7 @@ o2::framework::ConfigContext createConfigContext(std::unique_ptr(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv); } std::unique_ptr createRegistry() @@ -2842,16 +2920,7 @@ std::unique_ptr createRegistry() // killing them all on ctrl-c). // - Child, pick the data-processor ID and start a O2DataProcessorDevice for // each DataProcessorSpec -int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, - std::vector const& channelPolicies, - std::vector const& completionPolicies, - std::vector const& dispatchPolicies, - std::vector const& resourcePolicies, - std::vector const& callbacksPolicies, - std::vector const& sendingPolicies, - std::vector const& currentWorkflowOptions, - std::vector const& detectedParams, - o2::framework::ConfigContext& configContext) +int doMain(int argc, char** argv, std::string pluginName, o2::framework::WorkflowDefinitionContext& workflowContext) { // Peek very early in the driver options and look for // signposts, so the we can enable it without going through the whole dance @@ -2868,9 +2937,10 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, } WorkflowInfo currentWorkflow{ - argv[0], - currentArgs, - currentWorkflowOptions}; + .executable = argv[0], + .plugin = pluginName, + .args = currentArgs, + .options = workflowContext.workflowOptions}; ProcessingPolicies processingPolicies; enum LogParsingHelpers::LogLevel minFailureLevel; @@ -2920,7 +2990,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, bpo::options_description visibleOptions; visibleOptions.add(executorOptions); - auto physicalWorkflow = workflow; + auto physicalWorkflow = workflowContext.specs; std::map rankIndex; // We remove the duplicates because for the moment child get themself twice: // once from the actual definition in the child, a second time from the @@ -2931,11 +3001,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, size_t workflowHashA = 0; std::hash hash_fn; - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { workflowHashA += hash_fn(dp.name); } - for (auto& dp : workflow) { + for (auto& dp : workflowContext.specs) { rankIndex.insert(std::make_pair(dp.name, workflowHashA)); } @@ -2987,7 +3057,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES")); ServiceSpecs driverServices = ServiceSpecHelpers::filterDisabled(CommonDriverServices::defaultServices(), driverServicesOverride); // We insert the hash for the internal devices. - WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext); + WorkflowHelpers::injectServiceDevices(physicalWorkflow, *workflowContext.configContext); auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec& spec) { return spec.name == "internal-dpl-aod-reader"; }); if (reader != physicalWorkflow.end()) { driverServices.push_back(ArrowSupport::arrowBackendSpec()); @@ -2997,7 +3067,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, continue; } WorkflowSpecNode node{physicalWorkflow}; - service.injectTopology(node, configContext); + service.injectTopology(node, *workflowContext.configContext); } for (auto& dp : physicalWorkflow) { if (dp.name.rfind("internal-", 0) == 0) { @@ -3103,7 +3173,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // Use the hidden options as veto, all config specs matching a definition // in the hidden options are skipped in order to avoid duplicate definitions // in the main parser. Note: all config specs are forwarded to devices - visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions)); + visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowContext.workflowOptions, gHiddenDeviceOptions)); bpo::options_description od; od.add(visibleOptions); @@ -3139,7 +3209,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, conflicting_options(varmap, "no-batch", "batch"); if (varmap.count("help")) { - printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions); + printHelp(varmap, executorOptions, physicalWorkflow, workflowContext.workflowOptions); exit(0); } /// Set the fair::Logger severity to the one specified in the command line @@ -3189,16 +3259,16 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, return true; }; DriverInfo driverInfo{ - .sendingPolicies = sendingPolicies, + .sendingPolicies = workflowContext.sendingPolicies, .forwardingPolicies = forwardingPolicies, - .callbacksPolicies = callbacksPolicies}; + .callbacksPolicies = workflowContext.callbacksPolicies}; driverInfo.states.reserve(10); driverInfo.sigintRequested = false; driverInfo.sigchldRequested = false; - driverInfo.channelPolicies = channelPolicies; - driverInfo.completionPolicies = completionPolicies; - driverInfo.dispatchPolicies = dispatchPolicies; - driverInfo.resourcePolicies = resourcePolicies; + driverInfo.channelPolicies = workflowContext.channelPolicies; + driverInfo.completionPolicies = workflowContext.completionPolicies; + driverInfo.dispatchPolicies = workflowContext.dispatchPolicies; + driverInfo.resourcePolicies = workflowContext.resourcePolicies; driverInfo.argc = argc; driverInfo.argv = argv; driverInfo.noSHMCleanup = varmap["no-cleanup"].as(); @@ -3230,7 +3300,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos; - driverInfo.configContext = &configContext; + driverInfo.configContext = workflowContext.configContext.get(); DriverControl driverControl; initialiseDriverControl(varmap, driverInfo, driverControl); @@ -3259,7 +3329,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo, driverConfig, gDeviceMetricsInfos, - detectedParams, + workflowContext.extraOptions, varmap, driverServices, frameworkId); diff --git a/Framework/Core/src/runDataProcessingPlugin.cxx b/Framework/Core/src/runDataProcessingPlugin.cxx new file mode 100644 index 0000000000000..3d09248d980c8 --- /dev/null +++ b/Framework/Core/src/runDataProcessingPlugin.cxx @@ -0,0 +1,54 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include +#include +#include + +// Executables behave this way +int callMain(int argc, char** argv, char const* pluginName); + +#define STRINGIZE_NX(A) #A +#define STRINGIZE(A) STRINGIZE_NX(A) + +consteval char const* pluginName() +{ + return STRINGIZE(DPL_WORKFLOW_PLUGIN_LIBRARY) ":" STRINGIZE(DPL_WORKFLOW_PLUGIN_NAME); +} + +int main(int argc, char** argv) +{ + // Allow this code to lunch different plugins compared to the one + // associated with it. + auto pluginOption = "--workflow-plugin"; + int optionSize = strlen(pluginOption); + std::string pluginSpec = ""; + for (int i = 0; i < argc; i++) { + int argSize = strlen(argv[i]); + if (argSize < optionSize) { + continue; + } + if (argSize > optionSize && argv[i][optionSize] == '=') { + pluginSpec = std::string_view(argv[i] + optionSize + 1); + break; + } + if (argSize == optionSize && (strncmp(argv[i], pluginOption, optionSize) == 0) && i < argc) { + pluginSpec = argv[i + 1]; + break; + } + } + + if (pluginSpec.empty()) { + pluginSpec = pluginName(); + } + + std::cerr << "Loading plugin " << pluginSpec << std::endl; + return callMain(argc, argv, pluginSpec.c_str()); +} diff --git a/cmake/O2AddWorkflow.cmake b/cmake/O2AddWorkflow.cmake index b952890921f45..17d6b95497ad7 100644 --- a/cmake/O2AddWorkflow.cmake +++ b/cmake/O2AddWorkflow.cmake @@ -35,11 +35,26 @@ function(o2_add_dpl_workflow baseTargetName) message(FATAL_ERROR "Got trailing arguments ${A_UNPARSED_ARGUMENTS}") endif() - o2_add_executable(${baseTargetName} - COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + string(REGEX REPLACE "(^|-)([a-z])" "\\2" pluginName "${baseTargetName}") + + o2_add_library(${pluginName}Plugin SOURCES ${A_SOURCES} PUBLIC_LINK_LIBRARIES O2::Framework ${A_PUBLIC_LINK_LIBRARIES}) + o2_add_executable(${baseTargetName} + COMPONENT_NAME ${A_COMPONENT_NAME} TARGETVARNAME targetExeName + SOURCES ${CMAKE_SOURCE_DIR}/Framework/Core/src/runDataProcessingPlugin.cxx + PUBLIC_LINK_LIBRARIES O2::Framework ) + + o2_name_target(${pluginName}Plugin NAME pluginTargetName) + + add_dependencies(${targetExeName} ${pluginTargetName}) + + target_compile_definitions(${targetExeName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + target_compile_definitions(${pluginTargetName} PRIVATE DPL_WORKFLOW_PLUGIN_NAME=${pluginName} + DPL_WORKFLOW_PLUGIN_LIBRARY=O2${pluginName}Plugin) + if(A_TARGETVARNAME) set(${A_TARGETVARNAME} ${targetExeName}