From b0255d0886c16da3d48fea32ddd5929cd501f3be Mon Sep 17 00:00:00 2001 From: swenzel Date: Fri, 11 Apr 2025 00:51:18 +0200 Subject: [PATCH 1/2] Refactor of anchor configuration logic This commit refactors the transfer of anchoring configuration from async-reco scripts into the MC pipeline, addressing O2-5011 by replacing the previous whitelisting mechanism with a blacklisting approach. In addition, it introduces broader configuration/customization support: users can now inject arbitrary command-line options into any task via a `customize.json` file, reducing the need to edit workflow scripts. For example: { "ConfigParams": { "EMCSimParam": { "mBusyTime": "11.11" } }, "Executables": { "o2-ft0-reco-workflow": { "filtered": { "--my-custom-option": "foo" } }, "ft0fv0emcctp_digi": { "filtered": { "--another-custom-option": "baz" } } } } This allows for flexible injection of both configKeyValues and CLI options via `o2dpg_sim_workflow.py --overwrite-config customize.json`. --- MC/bin/o2dpg_dpl_config_tools.py | 362 +++++++++++ MC/bin/o2dpg_sim_workflow.py | 597 ++++++++++++------ MC/bin/o2dpg_sim_workflow_anchored.py | 7 +- .../ANCHOR/anchor-dpl-options-blacklist.json | 46 ++ MC/run/ANCHOR/anchorMC.sh | 17 +- .../tests/test_anchor_2023_apass2_pp.sh | 2 +- 6 files changed, 816 insertions(+), 215 deletions(-) create mode 100755 MC/bin/o2dpg_dpl_config_tools.py create mode 100644 MC/run/ANCHOR/anchor-dpl-options-blacklist.json diff --git a/MC/bin/o2dpg_dpl_config_tools.py b/MC/bin/o2dpg_dpl_config_tools.py new file mode 100755 index 000000000..26dbfbf86 --- /dev/null +++ b/MC/bin/o2dpg_dpl_config_tools.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 + +import json +import re +import sys +from collections import defaultdict +from copy import deepcopy +import os +from o2dpg_workflow_utils import merge_dicts +import shlex + +BUILTIN_BLACKLIST = { + "--session", "--severity", "--shm-segment-id", "--shm-segment-size", + "--resources-monitoring", "--resources-monitoring-dump-interval", + "--delay", "--loop", "--early-forward-policy", "--fairmq-rate-logging", + "--pipeline", "--disable-mc", "--disable-root-input", "--timeframes-rate-limit", + "--timeframes-rate-limit-ipcid", + "--lumi-type", # TPC corrections are treated separately in o2dpg_sim_workflow + "--corrmap-lumi-mode", # TPC corrections are treated separately in o2dpg_sim_workflow + "--enable-M-shape-correction" # TPC correction option not needed in MC +} + +def parse_command_string(cmd_str): + """ + Parse a DPL command string into structured config_base: + { + "executable": str, + "options": {key: val, ...}, + "configKeyValues": {"Group": {subkey: val}} + } + """ + try: + tokens = shlex.split(cmd_str, posix=False) + except ValueError as e: + print(f"[ERROR] Failed to parse command string: {cmd_str}") + raise e + + if not tokens: + return {} + + exe = tokens[0] + opts = {} + config_keyvals_raw = None + + i = 1 + while i < len(tokens): + token = tokens[i] + if token.startswith('--') or (token.startswith('-') and len(token) == 2): + key = token # preserve the dash prefix: "-b" or "--run-number" + if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'): + value = tokens[i + 1].strip('"').strip("'") + i += 1 + else: + value = True + if key == "--configKeyValues": + config_keyvals_raw = value + else: + opts[key] = value + i += 1 + + config_kv_parsed, config_groups = {}, set() + if config_keyvals_raw: + config_kv_parsed, config_groups = parse_configKeyValues_block(config_keyvals_raw) + + return { + "executable": exe, + "options": opts, + "configKeyValues": config_kv_parsed, + "configKeyGroups": sorted(config_groups) + } + +def parse_command_string_symmetric(cmd_str, configname = None): + """ + Parses a DPL command string into the same structure as parse_workflow_config(...): + { + "ConfigParams": { group: {key: value, ...} }, + "Executables": { + "o2-executable": { + "full": {...}, + "filtered": {...}, + "blacklisted": [], + "configKeyValues": [group, ...] + } + } + } + """ + try: + tokens = shlex.split(cmd_str, posix=False) + except ValueError as e: + print(f"[ERROR] Failed to parse command string: {cmd_str}") + raise e + + if not tokens: + return {} + + exe = os.path.basename(tokens[0]) if configname == None else configname + opts = {} + config_kv_raw = None + + i = 1 + while i < len(tokens): + token = tokens[i] + if token.startswith('--') or (token.startswith('-') and len(token) == 2): + key = token # preserve the dash prefix: "-b" or "--run-number" + if i + 1 < len(tokens) and not tokens[i + 1].startswith('-'): + value = tokens[i + 1].strip('"').strip("'") + i += 1 + else: + value = True + if key == "--configKeyValues": + config_kv_raw = value + else: + opts[key] = value + i += 1 + + config_params = {} + config_key_groups = [] + + if config_kv_raw: + parsed_kv, groups = parse_configKeyValues_block(config_kv_raw) + config_params = parsed_kv + config_key_groups = sorted(groups) + + return { + "ConfigParams": config_params, + "Executables": { + exe: { + "full": opts, + "filtered": dict(opts), + "blacklisted": [], + "configKeyValues": config_key_groups + } + } + } + + +def parse_configKeyValues_block(raw_value): + result = defaultdict(dict) + seen_groups = set() + raw_value = raw_value.replace('\\"', '"').replace("\\'", "'") + parts = raw_value.split(";") + for part in parts: + part = part.strip() + if not part or "=" not in part: + continue + key, val = part.split("=", 1) + key = key.strip() + val = val.strip() + if "." in key: + group, subkey = key.split(".", 1) + result[group][subkey] = val + seen_groups.add(group) + return dict(result), seen_groups + +def log_line(logger, message): + if logger is None or logger == sys.stdout: + print(message) + elif isinstance(logger, str): + with open(logger, "a") as f: + f.write(message + "\n") + else: + logger.write(message + "\n") + +def modify_dpl_command(cmd_str, config_anchor, allow_overwrite=False, logger=None, configname=None): + # check if cmd_str is given as list, in which case we transfrom to string + if isinstance(cmd_str, list) == True: + cmd_str = " ".join(filter(None, cmd_str)) + + base = parse_command_string(cmd_str) + exe = base["executable"] + existing_opts = base["options"] + existing_kv = base["configKeyValues"] + + # Start building new command + new_cmd = [exe] + added = [] + overwritten = [] + + exe_basename = os.path.basename(exe) if configname == None else configname + anchor_exec = None + if "Executables" in config_anchor: + anchor_exec = config_anchor["Executables"].get(exe_basename, None) + if anchor_exec == None: + # try without the Executable section + anchor_exec = config_anchor.get(exe_basename, None) + + if anchor_exec == None: + print(f"[WARN] No anchor config found for {exe}") + return cmd_str + + anchor_opts = anchor_exec.get("filtered", {}) + anchor_kv_groups = anchor_exec.get("configKeyValues", []) + + # --- Step 1: Reconstruct executable and its CLI options + new_cmd = [exe] + added = [] + overwritten = [] + + def quote_if_needed(val): + s = str(val) + if " " in s and not (s.startswith('"') and s.endswith('"')): + return f'"{s}"' + return s + + # Step 1: Existing options (preserved or overwritten) + for key, val in existing_opts.items(): + if allow_overwrite and key in anchor_opts: + val = anchor_opts[key] + overwritten.append(key) + new_cmd.append(f"{key} {quote_if_needed(val)}" if val is not True else f"{key}") + + # Step 2: Add missing options from anchor + for key, val in anchor_opts.items(): + if key not in existing_opts: + new_cmd.append(f"{key} {quote_if_needed(val)}" if val is not True else f"{key}") + added.append(key) + + # what about config-key values (should already be done) Merge configKeyValues + merged_kv = deepcopy(existing_kv) + # for group in anchor_kv_groups: + # group_kvs = config_anchor.get("ConfigParams", {}).get(group, {}) + # if group not in merged_kv: + # merged_kv[group] = group_kvs + # elif allow_overwrite: + # merged_kv[group].update(group_kvs) + + if merged_kv: + kv_flat = [f"{g}.{k}={v}" for g, kv in merged_kv.items() for k, v in kv.items()] + new_cmd.append('--configKeyValues "' + ";".join(kv_flat) + '"') + + # log changes + log_line(logger, f"\n[Executable: {exe}]") + if added: + log_line(logger, f" Added options: {added}") + if overwritten: + log_line(logger, f" Overwritten options: {overwritten}") + if not added and not overwritten: + log_line(logger, f" No changes made to command.") + + return " ".join(new_cmd) + +# CLI: Parse log + blacklist into output.json +def parse_configKeyValues(raw_value): + return parse_configKeyValues_block(raw_value) + +def parse_workflow_config(log_path): + config_params = defaultdict(dict) + executables = {} + + with open(log_path) as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + + parsed = parse_command_string(line) + exe = parsed["executable"] + config_groups_used = parsed["configKeyGroups"] + full_opts = parsed["options"] + + for group, kv in parsed["configKeyValues"].items(): + config_params[group].update(kv) + + executables[exe] = { + "configKeyValues": sorted(config_groups_used), + "full": full_opts + } + + return config_params, executables + + +def apply_blacklist(executables, blacklist_cfg): + for exe, data in executables.items(): + full_opts = data["full"] + exe_blacklist = set(blacklist_cfg.get(exe, [])) + total_blacklist = BUILTIN_BLACKLIST.union(exe_blacklist) + + blacklisted = [] + filtered = {} + + for key, val in full_opts.items(): + if key in total_blacklist: + blacklisted.append(key) + else: + filtered[key] = val + + data["blacklisted"] = sorted(blacklisted) + data["filtered"] = filtered + data["full"] = deepcopy(full_opts) # keep original + return executables + +def dpl_option_from_config(config, dpl_workflow, key, section = "filtered", default_value = None): + """ + Utility to extract a DPL option for workflow dpl_workflow from + the configuration dict "config". The configuration is: + - either a flattish JSON produced by older tool parse-async-WorkflowConfig.py + - more structured version produced by o2dpg_dpl_config_tools (this tool) + """ + if "Executables" in config: + # new standard + return config["Executables"].get(dpl_workflow,{}).get(section,{}).get(key, default_value) + else: + # backward compatible versions + dpl_workflow_key = dpl_workflow + '-options' + if dpl_workflow_key in config: + return config.get(dpl_workflow_key, {}).get(key, default_value) + dpl_workflow_key = dpl_workflow_key + if dpl_workflow_key in config: + return config.get(dpl_workflow_key, {}).get(key, default_value) + return default_value + +def main(): + if len(sys.argv) == 4: + log_path = sys.argv[1] + blacklist_path = sys.argv[2] + output_path = sys.argv[3] + + with open(blacklist_path) as f: + blacklist_data = json.load(f) + + config_params, executables = parse_workflow_config(log_path) + executables = apply_blacklist(executables, blacklist_data) + + result = { + "ConfigParams": dict(config_params), + "Executables": executables + } + + with open(output_path, "w") as out: + json.dump(result, out, indent=2) + + print(f"[INFO] Wrote structured config to: {output_path}") + else: + print("Usage:") + print(" CLI parsing: python3 dpl_config_tools.py workflowconfig.log blacklist.json output.json") + print(" Python usage: import and call parse_command_string() or modify_dpl_command()") + + +class TaskFinalizer: + def __init__(self, anchor_config, allow_overwrite=False, logger=None): + self.anchor_config = anchor_config + self.allow_overwrite = allow_overwrite + self.logger = logger + self.final_config = { + "ConfigParams": {}, + "Executables": {} + } + + def __call__(self, cmd_str_or_list, configname = None): + final_cmd = modify_dpl_command(cmd_str_or_list, self.anchor_config.get("Executables",{}), logger=self.logger, configname=configname) + this_config_final = parse_command_string_symmetric(final_cmd, configname) + print (this_config_final) + merge_dicts (self.final_config, this_config_final) + return final_cmd + + def dump_collected_config(self, path): + with open(path, "w") as f: + json.dump(self.final_config, f, indent=2) + + +if __name__ == "__main__": + main() diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index b559405f6..0cdd324bc 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -17,6 +17,9 @@ # -col pp -eA 2.510 -proc "ccbar" --embedding # +# TODO: +# - pickup the final list of configKey values from the anchorConfig + import sys import importlib.util import argparse @@ -40,6 +43,7 @@ from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector, compute_n_workers, merge_dicts from o2dpg_qc_finalization_workflow import include_all_QC_finalization from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg, option_if_available +from o2dpg_dpl_config_tools import parse_command_string, modify_dpl_command, dpl_option_from_config, TaskFinalizer parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow') @@ -201,30 +205,36 @@ def load_external_config(configfile): else: # we load a generic config print ("** Using generic config **") - anchorConfig = create_sim_config(args) + anchorConfig = { "ConfigParams": create_sim_config(args) } # we apply additional external user choices for the configuration # this will overwrite config from earlier stages if args.overwrite_config != '': # apply final JSON overwrite config_overwrite = load_external_config(args.overwrite_config) + # let's make sure the configs follow the same format + if ("ConfigParams" in anchorConfig) != ("ConfigParams" in config_overwrite): + print ("Error: overwrite config not following same format as base config; Cannot merge") + exit (1) + # merge the dictionaries into anchorConfig, the latter takes precedence merge_dicts(anchorConfig, config_overwrite) +# with the config, we'll create a task_finalizer functor +# this object takes care of customizing/finishing task command with externally given (anchor) config +task_finalizer = TaskFinalizer(anchorConfig, logger="o2dpg_config_replacements.log") + # write this config config_key_param_path = args.dump_config with open(config_key_param_path, "w") as f: print(f"INFO: Written additional config key parameters to JSON {config_key_param_path}") json.dump(anchorConfig, f, indent=2) -def get_anchor_env_var(key, default): - return anchorConfig.get('env_vars',{}).get(key, default) - # with this we can tailor the workflow to the presence of # certain detectors # these are all detectors that should be assumed active readout_detectors = args.readoutDets # here are all detectors that have been set in an anchored script -activeDetectors = anchorConfig.get('o2-ctf-reader-workflow-options',{}).get('onlyDet','all') +activeDetectors = dpl_option_from_config(anchorConfig, 'o2-ctf-reader-workflow', key='onlyDet', default_value='all') if activeDetectors == 'all': # if "all" here, there was in fact nothing in the anchored script, set to what is passed to this script (which it either also "all" or a subset) activeDetectors = readout_detectors @@ -514,7 +524,7 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True): + ' --seed ' + str(RNDSEED) \ + ' --noEmptyTF --first-orbit ' + str(args.first_orbit) \ + ' --extract-per-timeframe tf:sgn' \ - + ' --with-vertices ' + vtxmode_precoll \ + + ' --with-vertices ' + vtxmode_precoll \ + ' --maxCollsPerTF ' + str(args.ns) \ + ' --orbitsEarly ' + str(args.orbits_early) @@ -906,26 +916,7 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True): if (args.sor != -1): globalTFConfigValues["HBFUtils.startTime"] = args.sor - def putConfigValues(localCF = {}): - """ - Creates the final --configValues string to be passed to the workflows. - Uses the globalTFConfigValues and merges/overrides them with the local settings. - localCF is supposed to be a dictionary mapping key to param - """ - returnstring = ' --configKeyValues "' - cf = globalTFConfigValues.copy() - isfirst=True - for e in localCF: - cf[e] = localCF[e] - - for e in cf: - returnstring += (';','')[isfirst] + str(e) + "=" + str(cf[e]) - isfirst=False - - returnstring = returnstring + '"' - return returnstring - - def putConfigValuesNew(listOfMainKeys=[], localCF = {}): + def putConfigValues(listOfMainKeys=[], localCF = {}): """ Creates the final --configValues string to be passed to the workflows. Uses the globalTFConfigValues and applies other parameters on top @@ -939,8 +930,11 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): # now bring in the relevant keys # from the external config for key in listOfMainKeys: - # it this key exists + # try to find key flat in dict (backward compatible) keydict = anchorConfig.get(key) + if keydict == None: + # try to find under the ConfigurableKey entry (standard) + keydict = anchorConfig.get("ConfigParams",{}).get(key) if keydict != None: for k in keydict: cf[key+"."+k] = keydict[k] @@ -969,7 +963,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): ContextTask['cmd'] = '${O2_ROOT}/bin/o2-sim-digitizer-workflow --only-context --interactionRate ' + str(INTRATE) \ + ' ' + getDPL_global_options(ccdbbackend=False) + ' -n ' + str(args.ns) + simsoption \ + ' --seed ' + str(TFSEED) \ - + ' ' + putConfigValuesNew({"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF)},{"DigiParams.passName" : str(PASSNAME)}) \ + + ' ' + putConfigValues({"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF)},{"DigiParams.passName" : str(PASSNAME)}) \ + ' --incontext ' + CONTEXTFILE + QEDdigiargs ContextTask['cmd'] += ' --bcPatternFile ccdb' workflow['stages'].append(ContextTask) @@ -990,13 +984,12 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): if tpcDistortionType == 2 and CTPSCALER <= 0: print('Warning: lumi scaling requested, but no ctp scaler value set. Full map will be applied at face value.') tpcDistortionType=1 - lumiInstFactor=1 if COLTYPE == 'PbPb': lumiInstFactor=2.414 - if tpcDistortionType == 2: tpcLocalCF['TPCCorrMap.lumiInst'] = str(CTPSCALER * lumiInstFactor) + tpcdigimem = 12000 if havePbPb else 9000 TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem=str(tpcdigimem)) @@ -1007,7 +1000,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): + ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS_TF) \ + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini --early-forward-policy always --forceSelectedDets ' \ + ' --tpc-distortion-type ' + str(tpcDistortionType) \ - + putConfigValuesNew(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"], + + putConfigValues(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"], localCF=tpcLocalCF) TPCDigitask['cmd'] += (' --tpc-chunked-writer','')[args.no_tpc_digitchunking] TPCDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] @@ -1025,7 +1018,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding] TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ + ' --onlyDet TRD --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \ - + putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS_TF, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets" + + putConfigValues(localCF={"TRDSimParams.digithreads" : NWORKERS_TF, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets" TRDDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] workflow['stages'].append(TRDDigitask) @@ -1036,7 +1029,7 @@ def createRestDigiTask(name, det='ALLSMALLER'): tneeds += [QED_task['name']] commondigicmd = '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ + ' --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \ - + putConfigValuesNew(["MFTAlpideParam", "ITSAlpideParam", "ITSDigitizerParam"], + + putConfigValues(["MFTAlpideParam", "ITSAlpideParam", "ITSDigitizerParam"], localCF={"DigiParams.seed" : str(TFSEED), "MCHDigitizer.seed" : str(TFSEED)}) + QEDdigiargs if det=='ALLSMALLER': # here we combine all smaller digits in one DPL workflow @@ -1083,13 +1076,23 @@ def createRestDigiTask(name, det='ALLSMALLER'): if includeQED: tneeds += [QED_task['name']] FT0FV0EMCCTPDIGItask = createTask(name="ft0fv0emcctp_digi_" + str(tf), needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') FT0FV0EMCCTPDIGItask['cmd'] = ('','ln -nfs ../bkg_HitsFT0.root . ; ln -nfs ../bkg_HitsFV0.root . ; ln -nfs ../bkg_HitsEMC.root; ln -nfs ../bkg_Kine.root; ')[doembedding] - FT0FV0EMCCTPDIGItask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ - + ' --onlyDet FT0,FV0,EMC,CTP --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) \ - + ' --disable-write-ini' + putConfigValuesNew(listOfMainKeys=['EMCSimParam'], localCF={"DigiParams.seed" : str(TFSEED)}) \ - + (' --combine-devices','')[args.no_combine_dpl_devices] + ('',' --disable-mc')[args.no_mc_labels] + QEDdigiargs \ - + ' --forceSelectedDets' + FT0FV0EMCCTPDIGItask['cmd'] += task_finalizer([ + '${O2_ROOT}/bin/o2-sim-digitizer-workflow', + getDPL_global_options(), + f'-n {args.ns}', + simsoption, + '--onlyDet FT0,FV0,EMC,CTP', + f'--interactionRate {INTRATE}', + f'--incontext {CONTEXTFILE}', + '--disable-write-ini', + putConfigValues(listOfMainKeys=['EMCSimParam'], localCF={"DigiParams.seed" : str(TFSEED)}), + ('--combine-devices','')[args.no_combine_dpl_devices], + ('',' --disable-mc')[args.no_mc_labels], + QEDdigiargs, + '--forceSelectedDets'], configname = 'ft0fv0emcctp_digi') + workflow['stages'].append(FT0FV0EMCCTPDIGItask) det_to_digitask["FT0"]=FT0FV0EMCCTPDIGItask det_to_digitask["FV0"]=FT0FV0EMCCTPDIGItask @@ -1116,7 +1119,7 @@ def getDigiTaskName(det): tpcclustertasks.append(taskname) tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS_TF) + ' | ' - tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels] + tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValues(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels] tpcclussect['env'] = { "OMP_NUM_THREADS" : "4" } tpcclussect['semaphore'] = "tpctriggers.root" tpcclussect['retry_count'] = 2 # the task has a race condition --> makes sense to retry @@ -1129,7 +1132,7 @@ def getDigiTaskName(det): else: tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='2000') tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS_TF) - tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels] + tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValues(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels] workflow['stages'].append(tpcclus) tpcreconeeds.append(tpcclus['name']) @@ -1154,75 +1157,155 @@ def getDigiTaskName(det): tpc_corr_options_mc=' --corrmap-lumi-mode 2 ' tpcLocalCFreco['TPCCorrMap.lumiInst'] = str(CTPSCALER) - # TODO: Is this still used? - tpc_corr_scaling_options = anchorConfig.get('tpc-corr-scaling','') + # Setup the TPC correction scaling options for reco; They come from the anchoring setup + # Some useful comments from Ruben: + # - lumi-type == 0 means no-scaling of corrections with any measure of the lumi rather than no corrections at all. + # - The "no corrections" mode is imposed by the TPCCorrMap.lumiMean configurable being negative, in this case all other options in the corrections treatment are ignored. + # - But if the MC simulation was done with distortions, then the reco needs --lumy-type 1 (i.e. scale with the CTP lumi) even if the corresponding anchor run reco was using --lumy-type 2 + # (i.e. scaling according to the TPC IDC, which don't exist in the MC). + + anchor_lumi_type = dpl_option_from_config(anchorConfig, 'o2-tpcits-match-workflow', '--lumi-type', section = 'full', default_value = '') + if anchor_lumi_type != '': + anchor_lumi_type = '--lumi-type ' + anchor_lumi_type + anchor_corrmaplumi_mode = dpl_option_from_config(anchorConfig, 'o2-tpcits-match-workflow', '--corrmap-lumi-mode', section = 'full', default_value = '') + if anchor_corrmaplumi_mode != '': + anchor_corrmaplumi_mode = '--corrmap-lumi-mode ' + anchor_corrmaplumi_mode + + tpc_corr_scaling_options = anchor_lumi_type + ' ' + anchor_corrmaplumi_mode + + # why not simply? + # tpc_corr_scaling_options = ('--lumi-type 1', '')[tpcDistortionType != 0] + + #<--------- TPC reco task tpc_envfile = 'env_async.env' if environ.get('ALIEN_JDL_O2DPG_ASYNC_RECO_TAG') is not None else None TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') - TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' \ - + putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS_TF} | tpcLocalCFreco) + ('',' --disable-mc')[args.no_mc_labels] \ - + tpc_corr_scaling_options + tpc_corr_options_mc \ - + option_if_available('o2-tpc-reco-workflow', '--tpc-mc-time-gain', envfile=tpc_envfile) - + TPCRECOtask['cmd'] = task_finalizer([ + '${O2_ROOT}/bin/o2-tpc-reco-workflow', + getDPL_global_options(bigshm=True), + '--input-type clusters', + '--output-type tracks,send-clusters-per-sector', + putConfigValues(["GPU_global", + "TPCGasParam", + "TPCCorrMap", + "GPU_rec_tpc", + "trackTuneParams"], + {"GPU_proc.ompThreads":NWORKERS_TF} | tpcLocalCFreco), + ('',' --disable-mc')[args.no_mc_labels], + tpc_corr_scaling_options, + tpc_corr_options_mc, + option_if_available('o2-tpc-reco-workflow', '--tpc-mc-time-gain', envfile=tpc_envfile)]) workflow['stages'].append(TPCRECOtask) - # END TPC reco - + #<--------- ITS reco task ITSMemEstimate = 12000 if havePbPb else 2000 # PbPb has much large mem requirement for now (in worst case) ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[getDigiTaskName("ITS")], - tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem=str(ITSMemEstimate)) - ITSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options(bigshm=havePbPb) \ - + putConfigValuesNew(["ITSVertexerParam", "ITSAlpideParam", - "ITSClustererParam", "ITSCATrackerParam"], {"NameConf.mDirMatLUT" : ".."}) - ITSRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem=str(ITSMemEstimate)) + ITSRECOtask['cmd'] = task_finalizer([ + "${O2_ROOT}/bin/o2-its-reco-workflow", + getDPL_global_options(bigshm=havePbPb), + '--trackerCA', + '--tracking-mode async', + putConfigValues(["ITSVertexerParam", + "ITSAlpideParam", + "ITSClustererParam", + "ITSCATrackerParam"], + {"NameConf.mDirMatLUT" : ".."}), + ('',' --disable-mc')[args.no_mc_labels] + ]) workflow['stages'].append(ITSRECOtask) - FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[getDigiTaskName("FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') - # note: when calibrations (or CCDB objects) are reenabled, we need to say ccdbbackend=True - FT0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-ft0-reco-workflow --disable-time-offset-calib --disable-slewing-calib ' + getDPL_global_options(ccdbbackend=False) + putConfigValues() + #<--------- FT0 reco task + FT0RECOtask = createTask(name='ft0reco_'+str(tf), needs=[getDigiTaskName("FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') + FT0RECOtask["cmd"] = task_finalizer([ + '${O2_ROOT}/bin/o2-ft0-reco-workflow', + getDPL_global_options(ccdbbackend=False), # note: when calibrations (or CCDB objects) are reenabled, we need to say ccdbbackend=True + '--disable-time-offset-calib', # because effect not simulated in MC + '--disable-slewing-calib', # because effect not simulated in MC + putConfigValues() + ]) workflow['stages'].append(FT0RECOtask) + #<--------- ITS-TPC track matching task ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', relative_cpu=3/8) - ITSTPCMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-tpcits-match-workflow ' + getDPL_global_options(bigshm=True) + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\" --use-ft0' \ - + putConfigValuesNew(['MFTClustererParam', 'ITSCATrackerParam', 'tpcitsMatch', 'TPCGasParam', 'TPCCorrMap', 'ITSClustererParam', 'GPU_rec_tpc', 'trackTuneParams', 'ft0tag'], {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco) \ - + tpc_corr_scaling_options + tpc_corr_options_mc + ITSTPCMATCHtask["cmd"] = task_finalizer([ + '${O2_ROOT}/bin/o2-tpcits-match-workflow', + getDPL_global_options(bigshm=True), + ' --tpc-track-reader tpctracks.root', + '--tpc-native-cluster-reader \"--infile tpc-native-clusters.root\"', + '--use-ft0', + putConfigValues(['MFTClustererParam', + 'ITSCATrackerParam', + 'tpcitsMatch', + 'TPCGasParam', + 'TPCCorrMap', + 'ITSClustererParam', + 'GPU_rec_tpc', + 'trackTuneParams', + 'ft0tag'], + {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco), + tpc_corr_scaling_options, + tpc_corr_options_mc + ]) workflow['stages'].append(ITSTPCMATCHtask) + #<--------- ITS-TPC track matching task TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDDigitask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') - TRDTRACKINGtask['cmd'] = '${O2_ROOT}/bin/o2-trd-tracklet-transformer ' + getDPL_global_options() + putConfigValues() + ('',' --disable-mc')[args.no_mc_labels] + TRDTRACKINGtask['cmd'] = task_finalizer(['${O2_ROOT}/bin/o2-trd-tracklet-transformer', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(TRDTRACKINGtask) + #<--------- TRD global tracking # FIXME This is so far a workaround to avoud a race condition for trdcalibratedtracklets.root TRDTRACKINGtask2 = createTask(name='trdreco2_'+str(tf), needs=[TRDTRACKINGtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') - trd_track_sources = anchorConfig.get('o2-trd-global-tracking-options', {}).get('track-sources', 'TPC,ITS-TPC') - TRDTRACKINGtask2['cmd'] = '${O2_ROOT}/bin/o2-trd-global-tracking ' + getDPL_global_options(bigshm=True) + ('',' --disable-mc')[args.no_mc_labels] \ - + putConfigValuesNew(['ITSClustererParam', - 'ITSCATrackerParam', - 'trackTuneParams', - 'GPU_rec_tpc', - 'TPCGasParam', - 'TPCCorrMap'], {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco) \ - + " --track-sources " + trd_track_sources \ - + tpc_corr_scaling_options + tpc_corr_options_mc + trd_track_sources = dpl_option_from_config(anchorConfig, 'o2-trd-global-tracking', 'track-sources', default_value='TPC,ITS-TPC') + TRDTRACKINGtask2['cmd'] = task_finalizer([ + '${O2_ROOT}/bin/o2-trd-global-tracking', + getDPL_global_options(bigshm=True), + ('',' --disable-mc')[args.no_mc_labels], + putConfigValues(['ITSClustererParam', + 'ITSCATrackerParam', + 'trackTuneParams', + 'GPU_rec_tpc', + 'TPCGasParam', + 'TPCCorrMap'], {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco), + '--track-sources ' + trd_track_sources, + tpc_corr_scaling_options, + tpc_corr_options_mc]) workflow['stages'].append(TRDTRACKINGtask2) + #<--------- TOF reco task TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], getDigiTaskName("TOF")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - TOFRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tof-reco-workflow --use-ccdb ' + getDPL_global_options() + putConfigValuesNew() + ('',' --disable-mc')[args.no_mc_labels] + TOFRECOtask["cmd"] = task_finalizer([ + '${O2_ROOT}/bin/o2-tof-reco-workflow', + getDPL_global_options(), + '--use-ccdb', + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels] + ]) workflow['stages'].append(TOFRECOtask) - + #<--------- TOF-TPC(-ITS) global track matcher workflow toftpcmatchneeds = [TOFRECOtask['name'], TPCRECOtask['name'], ITSTPCMATCHtask['name'], TRDTRACKINGtask2['name']] - toftracksrcdefault = anchorConfig.get('o2-tof-matcher-workflow-options', {}).get('track-sources', 'TPC,ITS-TPC,TPC-TRD,ITS-TPC-TRD') + toftracksrcdefault = dpl_option_from_config(anchorConfig, 'o2-tof-matcher-workflow', 'track-sources', default_value='TPC,ITS-TPC,TPC-TRD,ITS-TPC-TRD') TOFTPCMATCHERtask = createTask(name='toftpcmatch_'+str(tf), needs=toftpcmatchneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') - TOFTPCMATCHERtask['cmd'] = '${O2_ROOT}/bin/o2-tof-matcher-workflow ' + getDPL_global_options() \ - + putConfigValuesNew(["ITSClustererParam", - 'TPCGasParam', - 'TPCCorrMap', - 'ITSCATrackerParam', - 'MFTClustererParam', - 'GPU_rec_tpc', - 'trackTuneParams'], tpcLocalCFreco) \ - + " --track-sources " + toftracksrcdefault + (' --combine-devices','')[args.no_combine_dpl_devices] \ - + tpc_corr_scaling_options + tpc_corr_options_mc + tofmatcher_cmd_parts = [ + '${O2_ROOT}/bin/o2-tof-matcher-workflow', + getDPL_global_options(), + putConfigValues(['ITSClustererParam', + 'TPCGasParam', + 'TPCCorrMap', + 'ITSCATrackerParam', + 'MFTClustererParam', + 'GPU_rec_tpc', + 'trackTuneParams'], tpcLocalCFreco), + ' --track-sources ' + toftracksrcdefault, + (' --combine-devices','')[args.no_combine_dpl_devices], + tpc_corr_scaling_options, + tpc_corr_options_mc + ] + TOFTPCMATCHERtask['cmd'] = task_finalizer(tofmatcher_cmd_parts) workflow['stages'].append(TOFTPCMATCHERtask) # MFT reco: needing access to kinematics (when assessment enabled) @@ -1230,12 +1313,18 @@ def getDigiTaskName(det): if usebkgcache: mftreconeeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] + #<--------- MFT reco workflow MFTRECOtask = createTask(name='mftreco_'+str(tf), needs=mftreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTRECOtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] - MFTRECOtask['cmd'] += '${O2_ROOT}/bin/o2-mft-reco-workflow ' + getDPL_global_options() + putConfigValuesNew(['MFTTracking', 'MFTAlpideParam', 'ITSClustererParam','MFTClustererParam']) - MFTRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] - if args.mft_assessment_full == True: - MFTRECOtask['cmd']+= ' --run-assessment ' + MFTRECOtask['cmd'] += task_finalizer([ + '${O2_ROOT}/bin/o2-mft-reco-workflow', + getDPL_global_options(), + putConfigValues(['MFTTracking', + 'MFTAlpideParam', + 'ITSClustererParam', + 'MFTClustererParam']), + ('','--disable-mc')[args.no_mc_labels], + ('','--run-assessment')[args.mft_assessment_full]]) workflow['stages'].append(MFTRECOtask) # MCH reco: needing access to kinematics ... so some extra logic needed here @@ -1243,95 +1332,263 @@ def getDigiTaskName(det): if usebkgcache: mchreconeeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] + #<--------- MCH reco workflow MCHRECOtask = createTask(name='mchreco_'+str(tf), needs=mchreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MCHRECOtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] - MCHRECOtask['cmd'] += '${O2_ROOT}/bin/o2-mch-reco-workflow ' + getDPL_global_options() + putConfigValues() - MCHRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] - MCHRECOtask['cmd'] += ' --enable-clusters-root-output' + MCHRECOtask['cmd'] += task_finalizer( + ['${O2_ROOT}/bin/o2-mch-reco-workflow', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels], + '--enable-clusters-root-output']) workflow['stages'].append(MCHRECOtask) + #<--------- MID reco workflow MIDRECOtask = createTask(name='midreco_'+str(tf), needs=[getDigiTaskName("MID")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - MIDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-mid-digits-reader-workflow ' + ('',' --disable-mc')[args.no_mc_labels] + ' | ${O2_ROOT}/bin/o2-mid-reco-workflow ' + getDPL_global_options() + putConfigValues() - MIDRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + MIDRECOtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-mid-digits-reader-workflow', + ('',' --disable-mc')[args.no_mc_labels]]) + MIDRECOtask['cmd'] += ' | ' + MIDRECOtask['cmd'] += task_finalizer(['${O2_ROOT}/bin/o2-mid-reco-workflow', + getDPL_global_options(), + putConfigValues(),('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(MIDRECOtask) + #<--------- FDD reco workflow FDDRECOtask = createTask(name='fddreco_'+str(tf), needs=[getDigiTaskName("FDD")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - FDDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-fdd-reco-workflow ' + getDPL_global_options(ccdbbackend=False) + putConfigValues() - FDDRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + FDDRECOtask['cmd'] = task_finalizer(['${O2_ROOT}/bin/o2-fdd-reco-workflow', + getDPL_global_options(ccdbbackend=False), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(FDDRECOtask) + #<--------- FV0 reco workflow FV0RECOtask = createTask(name='fv0reco_'+str(tf), needs=[getDigiTaskName("FV0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - FV0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-fv0-reco-workflow ' + getDPL_global_options() + putConfigValues() - FV0RECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + FV0RECOtask['cmd'] = task_finalizer(['${O2_ROOT}/bin/o2-fv0-reco-workflow', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(FV0RECOtask) # calorimeters + #<--------- EMC reco workflow EMCRECOtask = createTask(name='emcalreco_'+str(tf), needs=[getDigiTaskName("EMC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - EMCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-emcal-reco-workflow --input-type digits --output-type cells --infile emcaldigits.root --disable-root-output --subspecificationOut 1 ' + putConfigValues() - EMCRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] - EMCRECOtask['cmd'] += ' | ${O2_ROOT}/bin/o2-emcal-cell-recalibrator-workflow --input-subspec 1 --output-subspec 0 --no-timecalib --no-gaincalib ' + putConfigValues() - EMCRECOtask['cmd'] += (' --isMC','')[args.no_mc_labels] - EMCRECOtask['cmd'] += ' | ${O2_ROOT}/bin/o2-emcal-cell-writer-workflow --subspec 0 ' + getDPL_global_options() - EMCRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + # first part + EMCRECOtask['cmd'] = task_finalizer([ + '${O2_ROOT}/bin/o2-emcal-reco-workflow', + putConfigValues(), + '--input-type digits', + '--output-type cells', + '--infile emcaldigits.root', + '--disable-root-output', + '--subspecificationOut 1', + ('',' --disable-mc')[args.no_mc_labels]]) + # second part + EMCRECOtask['cmd'] += ' | ' + EMCRECOtask['cmd'] += task_finalizer([ + '${O2_ROOT}/bin/o2-emcal-cell-recalibrator-workflow', + putConfigValues(), + '--input-subspec 1', + '--output-subspec 0', + '--no-timecalib', + '--no-gaincalib', + (' --isMC','')[args.no_mc_labels]]) + # third part + EMCRECOtask['cmd'] += ' | ' + EMCRECOtask['cmd'] += task_finalizer([ + '${O2_ROOT}/bin/o2-emcal-cell-writer-workflow', + getDPL_global_options(), + '--subspec 0', + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(EMCRECOtask) + #<--------- PHS reco workflow PHSRECOtask = createTask(name='phsreco_'+str(tf), needs=[getDigiTaskName("PHS")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - PHSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-phos-reco-workflow ' + getDPL_global_options() + putConfigValues() - PHSRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + PHSRECOtask['cmd'] = task_finalizer([ + '${O2_ROOT}/bin/o2-phos-reco-workflow', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(PHSRECOtask) + #<--------- CPV reco workflow CPVRECOtask = createTask(name='cpvreco_'+str(tf), needs=[getDigiTaskName("CPV")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - CPVRECOtask['cmd'] = '${O2_ROOT}/bin/o2-cpv-reco-workflow ' + getDPL_global_options() + putConfigValues() - CPVRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + CPVRECOtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-cpv-reco-workflow', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(CPVRECOtask) + #<--------- ZDC reco workflow ZDCRECOtask = createTask(name='zdcreco_'+str(tf), needs=[getDigiTaskName("ZDC")], tf=tf, cwd=timeframeworkdir, lab=["RECO", "ZDC"]) - ZDCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-zdc-digits-reco ' + getDPL_global_options() + putConfigValues() - ZDCRECOtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + ZDCRECOtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-zdc-digits-reco', + getDPL_global_options(), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(ZDCRECOtask) ## forward matching + #<--------- MCH-MID forward matching MCHMIDMATCHtask = createTask(name='mchmidMatch_'+str(tf), needs=[MCHRECOtask['name'], MIDRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - MCHMIDMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-muon-tracks-matcher-workflow ' + getDPL_global_options(ccdbbackend=False) - MCHMIDMATCHtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + MCHMIDMATCHtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-muon-tracks-matcher-workflow', + getDPL_global_options(ccdbbackend=False), + putConfigValues(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(MCHMIDMATCHtask) + #<--------- MFT-MCH forward matching MFTMCHMATCHtask = createTask(name='mftmchMatch_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - MFTMCHMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig','FwdMatching'],{"FwdMatching.useMIDMatch":"true"}) + MFTMCHMATCHtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-globalfwd-matcher-workflow', + putConfigValues(['ITSAlpideConfig', + 'MFTAlpideConfig', + 'FwdMatching'],{"FwdMatching.useMIDMatch":"true"}), + ('',' --disable-mc')[args.no_mc_labels]]) + if args.fwdmatching_assessment_full == True: - MFTMCHMATCHtask['cmd']+= ' | o2-globalfwd-assessment-workflow ' - MFTMCHMATCHtask['cmd']+= getDPL_global_options() + ('',' --disable-mc')[args.no_mc_labels] + MFTMCHMATCHtask['cmd'] += ' | ' + MFTMCHMATCHtask['cmd'] += task_finalizer( + ['${O2_ROOT}/bin/o2-globalfwd-assessment-workflow', + getDPL_global_options(), + ('',' --disable-mc')[args.no_mc_labels]]) workflow['stages'].append(MFTMCHMATCHtask) if args.fwdmatching_save_trainingdata == True: MFTMCHMATCHTraintask = createTask(name='mftmchMatchTrain_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - MFTMCHMATCHTraintask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) + MFTMCHMATCHTraintask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValues(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) MFTMCHMATCHTraintask['cmd']+= getDPL_global_options() workflow['stages'].append(MFTMCHMATCHTraintask) # HMP tasks + #<--------- HMP forward matching HMPRECOtask = createTask(name='hmpreco_'+str(tf), needs=[getDigiTaskName('HMP')], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') - HMPRECOtask['cmd'] = '${O2_ROOT}/bin/o2-hmpid-digits-to-clusters-workflow ' + getDPL_global_options(ccdbbackend=False) + putConfigValuesNew() + HMPRECOtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-hmpid-digits-to-clusters-workflow', + getDPL_global_options(ccdbbackend=False), + putConfigValues()]) workflow['stages'].append(HMPRECOtask) + #<--------- HMP forward matching hmpmatchneeds = [HMPRECOtask['name'], ITSTPCMATCHtask['name'], TOFTPCMATCHERtask['name'], TRDTRACKINGtask2['name']] - hmp_match_sources = anchorConfig.get('o2-hmpid-matcher-workflow', {}).get('track-sources', 'ITS-TPC,ITS-TPC-TRD,TPC-TRD') + hmp_match_sources = dpl_option_from_config(anchorConfig, 'o2-hmpid-matcher-workflow', 'track-sources', default_value='ITS-TPC,ITS-TPC-TRD,TPC-TRD') HMPMATCHtask = createTask(name='hmpmatch_'+str(tf), needs=hmpmatchneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') - HMPMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-hmpid-matcher-workflow --track-sources ' + hmp_match_sources + getDPL_global_options() + putConfigValuesNew() + HMPMATCHtask['cmd'] = task_finalizer( + ['${O2_ROOT}/bin/o2-hmpid-matcher-workflow', + '--track-sources ' + hmp_match_sources, + getDPL_global_options(), + putConfigValues() + ]) workflow['stages'].append(HMPMATCHtask) - # Take None as default, we only add more if nothing from anchorConfig - pvfinder_sources = anchorConfig.get('o2-primary-vertexing-workflow-options', {}).get('vertexing-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,FDD,HMP,FV0,TRD,MCH,CTP') - pvfinder_matching_sources = anchorConfig.get('', {}).get('vertex-track-matching-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP') - pvfinderneeds = [TRDTRACKINGtask2['name'], FT0RECOtask['name'], FV0RECOtask['name'], EMCRECOtask['name'], PHSRECOtask['name'], CPVRECOtask['name'], FDDRECOtask['name'], ZDCRECOtask['name'], HMPMATCHtask['name'], HMPMATCHtask['name'], ITSTPCMATCHtask['name'], TOFTPCMATCHERtask['name'], MFTMCHMATCHtask['name'], MCHMIDMATCHtask['name']] + #<---------- primary vertex finding + pvfinder_sources = dpl_option_from_config(anchorConfig, + 'o2-primary-vertexing-workflow', + 'vertexing-sources', + default_value='ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,FDD,HMP,FV0,TRD,MCH,CTP') + pvfinder_matching_sources = dpl_option_from_config(anchorConfig, + 'o2-primary-vertexing-workflow', + 'vertex-track-matching-sources', + default_value='ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,FDD,HMP,FV0,TRD,MCH,CTP') + pvfinderneeds = [TRDTRACKINGtask2['name'], + FT0RECOtask['name'], + FV0RECOtask['name'], + EMCRECOtask['name'], + PHSRECOtask['name'], + CPVRECOtask['name'], + FDDRECOtask['name'], + ZDCRECOtask['name'], + HMPMATCHtask['name'], + HMPMATCHtask['name'], + ITSTPCMATCHtask['name'], + TOFTPCMATCHERtask['name'], + MFTMCHMATCHtask['name'], + MCHMIDMATCHtask['name']] PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='4000') - PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \ - + getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam', 'TPCCorrMap', 'ft0tag'], {"NameConf.mDirMatLUT" : ".."}) - PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - PVFINDERtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] + PVFINDERtask['cmd'] = task_finalizer( + [ '${O2_ROOT}/bin/o2-primary-vertexing-workflow', + getDPL_global_options(), + putConfigValues(['ITSAlpideParam', + 'MFTAlpideParam', + 'pvertexer', + 'TPCGasParam', + 'TPCCorrMap', + 'ft0tag'], + {"NameConf.mDirMatLUT" : ".."}), + '--vertexing-sources ' + pvfinder_sources, + '--vertex-track-matching-sources ' + pvfinder_matching_sources, + (' --combine-source-devices','')[args.no_combine_dpl_devices], + ('',' --disable-mc')[args.no_mc_labels] + ]) workflow['stages'].append(PVFINDERtask) + #<------------- secondary vertexer + svfinder_threads = ' --threads 1 ' + svfinder_cpu = 1 + if COLTYPE == "PbPb" or (doembedding and COLTYPEBKG == "PbPb"): + svfinder_threads = ' --threads 8 ' + svfinder_cpu = 8 + + svfinder_sources = dpl_option_from_config(anchorConfig, + 'o2-primary-vertexing-workflow', + 'vertex-track-matching-sources', + default_value='ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP') + SVFINDERtask = createTask(name='svfinder_'+str(tf), needs=[PVFINDERtask['name'], FT0FV0EMCCTPDIGItask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000') + SVFINDERtask['cmd'] = task_finalizer( + [ '${O2_ROOT}/bin/o2-secondary-vertexing-workflow', + getDPL_global_options(bigshm=True), + svfinder_threads, + putConfigValues(['svertexer', 'TPCCorrMap'], {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco), + tpc_corr_scaling_options, + tpc_corr_options_mc, + '--vertexing-sources ' + svfinder_sources, + ('--combine-source-devices','')[args.no_combine_dpl_devices], + ('',' --disable-strangeness-tracker')[args.no_strangeness_tracking], + ('',' --disable-mc')[args.no_mc_labels and not args.no_strangeness_tracking] # strangeness tracking may require MC labels + ]) + workflow['stages'].append(SVFINDERtask) + + #<------------- AOD producer + # TODO This needs further refinement, sources and dependencies should be constructed dynamically + aod_info_souces_default = 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP' + aodinfosources = dpl_option_from_config(anchorConfig, 'o2-aod-producer-workflow', 'info-sources', default_value=aod_info_souces_default) + aodneeds = [PVFINDERtask['name'], SVFINDERtask['name']] + + if usebkgcache: + aodneeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] + + aod_df_id = '{0:03}'.format(tf) + + AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') + AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] + AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ' + AODtask["cmd"] += task_finalizer([ + "${O2_ROOT}/bin/o2-aod-producer-workflow", + "--reco-mctracks-only 1", + "--aod-writer-keep dangling", + "--aod-writer-resfile AO2D", + '--aod-writer-resmode "UPDATE"', + f"--run-number {args.run}", + getDPL_global_options(bigshm=True), + f"--info-sources {aodinfosources}", + "--lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}", + "--anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}", + "--anchor-prod ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}", + "--combine-source-devices" if not args.no_combine_dpl_devices else "", + "--disable-mc" if args.no_mc_labels else "", + "--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "", + "--disable-strangeness-tracker" if args.no_strangeness_tracking else "", + f"--aod-timeframe-id ${ALIEN_PROC_ID}{aod_df_id}" if not args.run_anchored else "", + ]) + workflow['stages'].append(AODtask) + + # + # QC tasks follow + # + if includeFullQC or includeLocalQC: def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): @@ -1554,65 +1811,9 @@ def remove_json_prefix(path): readerCommand='o2-global-track-cluster-reader --track-types "MFT,MCH,MID,MCH-MID,MFT-MCH,MFT-MCH-MID" --cluster-types "MCH,MID,MFT"', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/mftmchmid-tracks-task.json') - #secondary vertexer - svfinder_threads = ' --threads 1 ' - svfinder_cpu = 1 - if COLTYPE == "PbPb" or (doembedding and COLTYPEBKG == "PbPb"): - svfinder_threads = ' --threads 8 ' - svfinder_cpu = 8 - SVFINDERtask = createTask(name='svfinder_'+str(tf), needs=[PVFINDERtask['name'], FT0FV0EMCCTPDIGItask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000') - SVFINDERtask = createTask(name='svfinder_'+str(tf), needs=[PVFINDERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000') - SVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-secondary-vertexing-workflow ' - SVFINDERtask['cmd'] += getDPL_global_options(bigshm=True) + svfinder_threads + putConfigValuesNew(['svertexer', 'TPCCorrMap'], {"NameConf.mDirMatLUT" : ".."} | tpcLocalCFreco) \ - + tpc_corr_scaling_options + tpc_corr_options_mc - # Take None as default, we only add more if nothing from anchorConfig - svfinder_sources = anchorConfig.get('o2-secondary-vertexing-workflow-options', {}). get('vertexing-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP') - SVFINDERtask['cmd'] += ' --vertexing-sources ' + svfinder_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - # strangeness tracking is now called from the secondary vertexer - if args.no_strangeness_tracking: - SVFINDERtask['cmd'] += ' --disable-strangeness-tracker' - # if enabled, it may require MC labels - else: - SVFINDERtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] - workflow['stages'].append(SVFINDERtask) - - # ----------- - # produce AOD - # ----------- - # TODO This needs further refinement, sources and dependencies should be constructed dynamically - aodinfosources = anchorConfig.get('o2-aod-producer-workflow-options', {}).get('info-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP') - aodneeds = [PVFINDERtask['name'], SVFINDERtask['name']] - - if usebkgcache: - aodneeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] - - aod_df_id = '{0:03}'.format(tf) - AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') - AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] - AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ${O2_ROOT}/bin/o2-aod-producer-workflow --reco-mctracks-only 1 --aod-writer-keep dangling --aod-writer-resfile AO2D' - # next line needed for meta data writing (otherwise lost) - AODtask['cmd'] += ' --aod-writer-resmode "UPDATE"' - AODtask['cmd'] += ' --run-number ' + str(args.run) - # only in non-anchored runs - if args.run_anchored == False: - AODtask['cmd'] += ' --aod-timeframe-id ${ALIEN_PROC_ID}' + aod_df_id - AODtask['cmd'] += ' ' + getDPL_global_options(bigshm=True) - AODtask['cmd'] += ' --info-sources ' + aodinfosources - AODtask['cmd'] += ' --lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}' - AODtask['cmd'] += ' --anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}' - AODtask['cmd'] += ' --anchor-prod ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}' - AODtask['cmd'] += (' --combine-source-devices ','')[args.no_combine_dpl_devices] - AODtask['cmd'] += ('',' --disable-mc')[args.no_mc_labels] - if environ.get('O2DPG_AOD_NOTRUNCATE') != None or environ.get('ALIEN_JDL_O2DPG_AOD_NOTRUNCATE') != None: - AODtask['cmd'] += ' --enable-truncation 0' # developer option to suppress precision truncation - - if args.no_strangeness_tracking: - AODtask['cmd'] += ' --disable-strangeness-tracker' - - workflow['stages'].append(AODtask) - - # TPC - time-series objects + + #<------------------ TPC - time-series objects # initial implementation taken from comments in https://its.cern.ch/jira/browse/O2-4612 # TODO: this needs to be made configurable (as a function of which detectors are actually present) tpctsneeds = [ TPCRECOtask['name'], @@ -1624,30 +1825,9 @@ def remove_json_prefix(path): TPCTStask['cmd'] = 'o2-global-track-cluster-reader --disable-mc --cluster-types "FT0,TOF,TPC" --track-types "ITS,TPC,ITS-TPC,ITS-TPC-TOF,ITS-TPC-TRD-TOF"' TPCTStask['cmd'] += ' --primary-vertices ' TPCTStask['cmd'] += ' | o2-tpc-time-series-workflow --enable-unbinned-root-output --sample-unbinned-tsallis --sampling-factor 0.01 ' - TPCTStask['cmd'] += putConfigValuesNew() + ' ' + getDPL_global_options(bigshm=True) + TPCTStask['cmd'] += putConfigValues() + ' ' + getDPL_global_options(bigshm=True) workflow['stages'].append(TPCTStask) - # AOD merging / combination step (as individual stages) --> for the moment deactivated in favor or more stable global merging - """ - aodmergerneeds = [ AODtask['name'] ] - if tf > 1: - # we can only merge this if the previous timeframe was already merged in order - # to keep time ordering of BCs intact - aodmergerneeds += [ 'aodmerge_' + str(tf-1) ] - - AOD_merge_task = createTask(name='aodmerge_'+str(tf), needs = aodmergerneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='2000', cpu='1') - AOD_merge_task['cmd'] = ' root -q -b -l ${O2DPG_ROOT}/UTILS/repairAOD.C\\(\\"AO2D.root\\",\\"AO2D_repaired.root\\"\\); ' - # AOD_merge_task['cmd'] += ' mv AO2D.root AO2D_old.root && mv AO2D_repaired.root AO2D.root ; ' - AOD_merge_task['cmd'] += '[ -f ../AO2D.root ] && mv ../AO2D.root ../AO2D_old.root;' - AOD_merge_task['cmd'] += ' [ -f input.txt ] && rm input.txt; ' - AOD_merge_task['cmd'] += ' [ -f ../AO2D_old.root ] && echo "../AO2D_old.root" > input.txt;' - AOD_merge_task['cmd'] += ' echo "./AO2D_repaired.root" >> input.txt;' - AOD_merge_task['cmd'] += ' o2-aod-merger --output ../AO2D.root;' - AOD_merge_task['cmd'] += ' rm ../AO2D_old.root || true' - AOD_merge_task['semaphore'] = 'aodmerge' #<---- this is making sure that only one merge is running at any time - workflow['stages'].append(AOD_merge_task) - """ - # cleanup # -------- # On the GRID it may be important to cleanup as soon as possible because disc space @@ -1695,4 +1875,7 @@ def remove_json_prefix(path): dump_workflow(workflow['stages'], args.o, meta=vars(args)) +# dump a config that can be used to reproduce this workflow +task_finalizer.dump_collected_config("final_config.json") + exit (0) diff --git a/MC/bin/o2dpg_sim_workflow_anchored.py b/MC/bin/o2dpg_sim_workflow_anchored.py index cd26047ad..13c6959bb 100755 --- a/MC/bin/o2dpg_sim_workflow_anchored.py +++ b/MC/bin/o2dpg_sim_workflow_anchored.py @@ -549,9 +549,12 @@ def main(): try: cmd_list = shlex.split(os.path.expandvars(cmd)) output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120) - except subprocess.CalledProcessError: + print (output) + except subprocess.CalledProcessError as e: + print(f"Command failed with return code {e.returncode}") + print("Output:") + print(e.output) return {}, {} - if __name__ == "__main__": sys.exit(main()) diff --git a/MC/run/ANCHOR/anchor-dpl-options-blacklist.json b/MC/run/ANCHOR/anchor-dpl-options-blacklist.json new file mode 100644 index 000000000..f224be858 --- /dev/null +++ b/MC/run/ANCHOR/anchor-dpl-options-blacklist.json @@ -0,0 +1,46 @@ +{ + "__comment__" : "Options (per dpl workflow) *not* to be transfered from async-reco to MC workflows.", + "global": [ + "--pipeline", + "--disable-mc", + "--loop" + ], + "o2-aod-producer-workflow": [ + "--reco-pass", + "--aod-writer-maxfilesize" + ], + "o2-its-reco-workflow": [ + "--digits-from-upstream" + ], + "o2-tof-reco-workflow": [ + "--input-type", + "--output-type", + "--local-cmp" + ], + "o2-tof-matcher-workflow": [ + "--use-fit", + "--tof-lanes", + "--output-type", + "--enable-dia" + ], + "o2-mft-reco-workflow": [ + "--digits-from-upstream", + "--nThreads" + ], + "o2-emcal-cell-recalibrator-workflow": [ + "--redirect-led" + ], + "o2-phos-reco-workflow": [ + "--input-type", + "--output-type" + ], + "o2-trd-global-tracking" : [ + "--enable-ph" + ], + "o2-trd-tracklet-transformer" : [ + "--disable-irframe-reader" + ], + "o2-tpcits-match-workflow" : [ + "--nthreads" + ] +} \ No newline at end of file diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index e4a6bf1d9..7bea0cf0e 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -242,23 +242,31 @@ RECO_RC=$? echo_info "async_pass.sh finished with ${RECO_RC}" if [[ "${RECO_RC}" != "0" ]] ; then - exit ${RECO_RC} + exit ${RECO_RC} fi +# check that workflowconfig.log was created correctly +if [[ ! -f workflowconfig.log ]]; then + echo "Workflowconfig.log file not found" + exit 1 +fi ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMPRODUCTIONTAG_KEEP echo_info "Setting back ALIEN_JDL_LPMPRODUCTIONTAG to $ALIEN_JDL_LPMPRODUCTIONTAG" -# now create the local MC config file --> config-config.json -${O2DPG_ROOT}/UTILS/parse-async-WorkflowConfig.py +# now create the local MC config file --> config-json.json +# we create the new config output with blacklist functionality +ASYNC_CONFIG_BLACKLIST=${ASYNC_CONFIG_BLACKLIST:-${O2DPG_ROOT}/MC/run/ANCHOR/anchor-dpl-options-blacklist.json} +${O2DPG_ROOT}/MC/bin/o2dpg_dpl_config_tools.py workflowconfig.log ${ASYNC_CONFIG_BLACKLIST} config-json.json ASYNC_WF_RC=${?} # check if config reasonably created -if [[ "${ASYNC_WF_RC}" != "0" || `grep "o2-ctf-reader-workflow-options" config-json.json 2> /dev/null | wc -l` == "0" ]]; then +if [[ "${ASYNC_WF_RC}" != "0" || `grep "ConfigParams" config-json.json 2> /dev/null | wc -l` == "0" ]]; then echo_error "Problem in anchor config creation. Exiting." exit 1 fi + # get rid of the temporary software environment if [ "${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}" ]; then module purge --no-pager @@ -312,7 +320,6 @@ fi TIMESTAMP=`grep "Determined timestamp to be" ${anchoringLogFile} | awk '//{print $6}'` echo_info "TIMESTAMP IS ${TIMESTAMP}" - # check if this job is exluded because it falls inside a bad data-taking period ISEXCLUDED=$(grep "TIMESTAMP IS EXCLUDED IN RUN" ${anchoringLogFile}) if [ "${ISEXCLUDED}" ]; then diff --git a/MC/run/ANCHOR/tests/test_anchor_2023_apass2_pp.sh b/MC/run/ANCHOR/tests/test_anchor_2023_apass2_pp.sh index 32304f5dc..6db8d3c48 100755 --- a/MC/run/ANCHOR/tests/test_anchor_2023_apass2_pp.sh +++ b/MC/run/ANCHOR/tests/test_anchor_2023_apass2_pp.sh @@ -30,7 +30,7 @@ export SEED=5 # for pp and 50 events per TF, we launch only 4 workers. export NWORKERS=2 -export ALIEN_JDL_ANCHOR_SIM_OPTIONS="-gen pythia8 -confKey \"GeometryManagerParam.useParallelWorld=1;GeometryManagerParam.usePwGeoBVH=1;GeometryManagerParam.usePwCaching=1\"" +export ALIEN_JDL_ANCHOR_SIM_OPTIONS="-gen pythia8 -confKey \"GeometryManagerParam.useParallelWorld=1;GeometryManagerParam.usePwGeoBVH=1;GeometryManagerParam.usePwCaching=1\" ${LOCAL_CONFIG:+--overwrite-config ${LOCAL_CONFIG}}" # run the central anchor steering script; this includes # * derive timestamp From 637c421f845c9f955685c3ef4bade5d0a0960270 Mon Sep 17 00:00:00 2001 From: swenzel Date: Sat, 12 Apr 2025 05:42:35 +0200 Subject: [PATCH 2/2] Disable outdated tpc_spacecharge_downloader task This task seems no longer needed. --- MC/bin/o2dpg_sim_workflow.py | 12 ++---------- MC/run/ANCHOR/anchorMC.sh | 1 - 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index 0cdd324bc..b4d5718f0 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -677,14 +677,6 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True): # We download some binary files, necessary for processing # Eventually, these files/objects should be queried directly from within these tasks? -# We download trivial TPC space charge corrections to be applied during -# reco. This is necessary to have consistency (decalibration and calibration) between digitization and reconstruction ... until digitization can -# also apply this effect via CCDB. -TPC_SPACECHARGE_DOWNLOADER_TASK = createTask(name='tpc_spacecharge_downloader', cpu='0') -TPC_SPACECHARGE_DOWNLOADER_TASK['cmd'] = '[ "${O2DPG_ENABLE_TPC_DISTORTIONS}" ] || { ${O2_ROOT}/bin/o2-ccdb-downloadccdbfile --host http://alice-ccdb.cern.ch -p TPC/Calib/CorrectionMapRef --timestamp 1 --created-not-after ' + str(args.condition_not_after) + ' -d ${ALICEO2_CCDB_LOCALCACHE} ; ' \ - '${O2_ROOT}/bin/o2-ccdb-downloadccdbfile --host http://alice-ccdb.cern.ch -p TPC/Calib/CorrectionMap --timestamp 1 --created-not-after ' + str(args.condition_not_after) + ' -d ${ALICEO2_CCDB_LOCALCACHE} ; }' -workflow['stages'].append(TPC_SPACECHARGE_DOWNLOADER_TASK) - # Fix (residual) geometry alignment for simulation stage # Detectors that prefer to apply special alignments (for example residual effects) should be listed here and download these files. # These object will take precedence over ordinary align objects **and** will only be applied in transport simulation @@ -972,7 +964,7 @@ def putConfigValues(listOfMainKeys=[], localCF = {}): CTPSCALER = args.ctp_scaler tpcDistortionType=args.tpc_distortion_type print(f"TPC distortion simulation: type = {tpcDistortionType}, CTP scaler value {CTPSCALER}"); - tpcdigineeds=[ContextTask['name'], LinkGRPFileTask['name'], TPC_SPACECHARGE_DOWNLOADER_TASK['name']] + tpcdigineeds=[ContextTask['name'], LinkGRPFileTask['name']] if usebkgcache: tpcdigineeds += [ BKG_HITDOWNLOADER_TASKS['TPC']['name'] ] @@ -1581,7 +1573,7 @@ def getDigiTaskName(det): "--disable-mc" if args.no_mc_labels else "", "--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "", "--disable-strangeness-tracker" if args.no_strangeness_tracking else "", - f"--aod-timeframe-id ${ALIEN_PROC_ID}{aod_df_id}" if not args.run_anchored else "", + f"--aod-timeframe-id ${{ALIEN_PROC_ID}}{aod_df_id}" if not args.run_anchored else "", ]) workflow['stages'].append(AODtask) diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index 7bea0cf0e..71ed0afe3 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -126,7 +126,6 @@ echo_info "Substituting ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMPRODUCTIONTAG w ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMANCHORPRODUCTION if [[ $ALIEN_JDL_ANCHOR_SIM_OPTIONS == *"--tpc-distortion-type 2"* ]]; then - export O2DPG_ENABLE_TPC_DISTORTIONS=ON # set the SCALING SOURCE to CTP for MC unless explicitely given from outside export ALIEN_JDL_TPCSCALINGSOURCE=${ALIEN_JDL_TPCSCALINGSOURCE:-"CTP"} fi