From 0562fadbc1ca847e1381bec7d9a299e445d080a0 Mon Sep 17 00:00:00 2001 From: swenzel Date: Mon, 24 Feb 2025 14:21:55 +0100 Subject: [PATCH] Support for 2tag anchoredMC With this commit, we achieve support for 2tag operation in anchoredMC. --- MC/bin/o2_dpg_workflow_runner.py | 61 +++++++++++++++++++++++---- MC/bin/o2dpg_sim_config.py | 7 ++- MC/bin/o2dpg_sim_workflow.py | 1 + MC/bin/o2dpg_sim_workflow_anchored.py | 11 ++++- MC/bin/o2dpg_workflow_utils.py | 3 +- MC/run/ANCHOR/anchorMC.sh | 4 +- 6 files changed, 72 insertions(+), 15 deletions(-) diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 03d69a5bc..14cc866ff 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -470,8 +470,44 @@ def update_resource_estimates(workflow, resource_json): # a python dictionary def get_alienv_software_environment(packagestring): """ - packagestring is something like O2::v202298081-1,O2Physics::xxx + packagestring is something like O2::v202298081-1,O2Physics::xxx representing packages + published on CVMFS ... or ... a file containing directly the software environment to apply """ + + # the trivial cases do nothing + if packagestring == None or packagestring == "" or packagestring == "None": + return {} + + def load_env_file(env_file): + """Transform an environment file generated with 'export > env.txt' into a python dictionary.""" + env_vars = {} + with open(env_file, "r") as f: + for line in f: + line = line.strip() + + # Ignore empty lines or comments + if not line or line.startswith("#"): + continue + + # Remove 'declare -x ' if present + if line.startswith("declare -x "): + line = line.replace("declare -x ", "", 1) + + # Handle case: "FOO" without "=" (assign empty string) + if "=" not in line: + key, value = line.strip(), "" + else: + key, value = line.split("=", 1) + value = value.strip('"') # Remove surrounding quotes if present + + env_vars[key.strip()] = value + return env_vars + + # see if this is a file + if os.path.exists(packagestring) and os.path.isfile(packagestring): + actionlogger.info("Taking software environment from file " + packagestring) + return load_env_file(packagestring) + # alienv printenv packagestring --> dictionary # for the moment this works with CVMFS only cmd="/cvmfs/alice.cern.ch/bin/alienv printenv " + packagestring @@ -1089,19 +1125,28 @@ def submit(self, tid, nice): return subprocess.Popen(['/bin/bash','-c',drycommand], cwd=workdir) taskenv = os.environ.copy() - # add task specific environment - if self.workflowspec['stages'][tid].get('env')!=None: - taskenv.update(self.workflowspec['stages'][tid]['env']) - # apply specific (non-default) software version, if any # (this was setup earlier) alternative_env = self.alternative_envs.get(tid, None) - if alternative_env != None: + if alternative_env != None and len(alternative_env) > 0: actionlogger.info('Applying alternative software environment to task ' + self.idtotask[tid]) - for entry in alternative_env: + if alternative_env.get('TERM') != None: + # the environment is a complete environment + taskenv = {} + taskenv = alternative_env + else: + for entry in alternative_env: # overwrite what is present in default taskenv[entry] = alternative_env[entry] + # add task specific environment + if self.workflowspec['stages'][tid].get('env')!=None: + taskenv.update(self.workflowspec['stages'][tid]['env']) + + # envfilename = "taskenv_" + str(tid) + ".json" + # with open(envfilename, "w") as file: + # json.dump(taskenv, file, indent=2) + p = psutil.Popen(['/bin/bash','-c',c], cwd=workdir, env=taskenv) try: p.nice(nice) @@ -1406,7 +1451,7 @@ def get_tar_command(dir='./', flags='cf', findtype='f', filename='checkpoint.tar def init_alternative_software_environments(self): """ - Initiatialises alternative software environments for specific tasks, if there + Initialises alternative software environments for specific tasks, if there is an annotation in the workflow specificiation. """ diff --git a/MC/bin/o2dpg_sim_config.py b/MC/bin/o2dpg_sim_config.py index 945403804..59e161c46 100755 --- a/MC/bin/o2dpg_sim_config.py +++ b/MC/bin/o2dpg_sim_config.py @@ -180,7 +180,7 @@ def parse_dpl_help_output(executable, envfile): # the DEVNULL is important for o2-dpl workflows not to hang on non-interactive missing tty environments # it is cleaner that the echo | trick - output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 10) + output = subprocess.check_output([executable, "--help", "full"], env=env, text=True, stdin=subprocess.DEVNULL, timeout = 100) except subprocess.CalledProcessError: return {}, {} @@ -211,6 +211,5 @@ def get_dpl_options_for_executable(executable, envfile): def option_if_available(executable, option, envfile = None): """Checks if an option is available for a given executable and returns it as a string. Otherwise empty string""" - # _, inverse_lookup = get_dpl_options_for_executable(executable, envfile) - # return ' ' + option if option in inverse_lookup else '' - return option + _, inverse_lookup = get_dpl_options_for_executable(executable, envfile) + return ' ' + option if option in inverse_lookup else '' \ No newline at end of file diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index 0731784db..e35fd98e0 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -1603,6 +1603,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root' # produce MonaLisa event stat file AOD_merge_task['cmd'] += ' ; ${O2DPG_ROOT}/MC/bin/o2dpg_determine_eventstat.py' + AOD_merge_task['alternative_alienv_package'] = "None" # we want latest software for this step workflow['stages'].append(AOD_merge_task) job_merging = False diff --git a/MC/bin/o2dpg_sim_workflow_anchored.py b/MC/bin/o2dpg_sim_workflow_anchored.py index 85356a918..cd26047ad 100755 --- a/MC/bin/o2dpg_sim_workflow_anchored.py +++ b/MC/bin/o2dpg_sim_workflow_anchored.py @@ -14,6 +14,8 @@ import json import math import pandas as pd +import subprocess +import shlex # Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc) @@ -542,7 +544,14 @@ def main(): print ("TIMESTAMP IS EXCLUDED IN RUN") else: print ("Creating time-anchored workflow...") - os.system(cmd) + print ("Executing: " + cmd) + # os.system(cmd) + try: + cmd_list = shlex.split(os.path.expandvars(cmd)) + output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120) + except subprocess.CalledProcessError: + return {}, {} + if __name__ == "__main__": sys.exit(main()) diff --git a/MC/bin/o2dpg_workflow_utils.py b/MC/bin/o2dpg_workflow_utils.py index a196a9c86..d4350e199 100755 --- a/MC/bin/o2dpg_workflow_utils.py +++ b/MC/bin/o2dpg_workflow_utils.py @@ -307,7 +307,8 @@ def matches_or_inherits_label(taskid, label, cache): for taskid in range(len(workflowspec['stages'])): if (matches_or_inherits_label(taskid, "RECO", matches_label)): # now we do the final adjust (as annotation) in the workflow itself - workflowspec['stages'][taskid]["alternative_alienv_package"] = package + if workflowspec['stages'][taskid].get("alternative_alienv_package") == None: + workflowspec['stages'][taskid]["alternative_alienv_package"] = package def merge_dicts(dict1, dict2): """ diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index b114ac8c4..26ab19b98 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -282,7 +282,9 @@ remainingargs="${remainingargs} -productionTag ${ALIEN_JDL_LPMPRODUCTIONTAG:-ali # since the last passed argument wins, e.g. -productionTag cannot be overwritten by the user remainingargs="${ALIEN_JDL_ANCHOR_SIM_OPTIONS} ${remainingargs} --anchor-config config-json.json" # apply software tagging choice -remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}" +# remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}" +remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${PWD}/env_async.env}" + echo_info "baseargs passed to o2dpg_sim_workflow_anchored.py: ${baseargs}" echo_info "remainingargs forwarded to o2dpg_sim_workflow.py: ${remainingargs}"