From 60de896107b7463cf142ecbfc265c88cd7efc86b Mon Sep 17 00:00:00 2001 From: swenzel Date: Fri, 4 Jul 2025 08:59:26 +0200 Subject: [PATCH 1/2] Publish MCProdInfo This commit provides code that publishes MonteCarlo MetaData describing a production. The information is stored into CCDB, into folders specific to the executing user as well as the LPM production tag. We now also write the username into AO2D.root, so that clients can retrieve the username next to the LPM production tag. By default upload of MCProdInfo is attempted for the first few JobIDs within a production. This is decided based on the split-id. --- MC/bin/o2dpg_sim_workflow.py | 16 +++ MC/bin/o2dpg_sim_workflow_anchored.py | 39 ++++++- MC/prodinfo/README.md | 9 ++ MC/prodinfo/mcprodinfo_ccdb_upload.py | 144 ++++++++++++++++++++++++++ MC/run/ANCHOR/anchorMC.sh | 15 ++- 5 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 MC/prodinfo/README.md create mode 100644 MC/prodinfo/mcprodinfo_ccdb_upload.py diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index c04b28fac..cb840b86a 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -45,6 +45,9 @@ from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg, option_if_available, overwrite_config from o2dpg_dpl_config_tools import parse_command_string, modify_dpl_command, dpl_option_from_config, TaskFinalizer +# for some JAliEn interaction +from alienpy.alien import JAlien + parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow') # the run-number of data taking or default if unanchored @@ -1581,6 +1584,18 @@ def getDigiTaskName(det): aod_df_id = '{0:03}'.format(tf) + import os + aod_creator = os.getenv("JALIEN_USER") + if aod_creator == None: + # we use JAliEn to determine the user and capture it's output into a variable via redirect_stdout + import io + from contextlib import redirect_stdout + f = io.StringIO() + with redirect_stdout(f): + if JAlien(['whoami']) == 0: + aod_creator = f.getvalue().strip() + print (f"Determined GRID username {aod_creator}") + AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ' @@ -1596,6 +1611,7 @@ def getDigiTaskName(det): "--lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}", "--anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}", "--anchor-prod ${ALIEN_JDL_LPMANCHORPRODUCTION:-unknown}", + f"--created-by {aod_creator}", "--combine-source-devices" if not args.no_combine_dpl_devices else "", "--disable-mc" if args.no_mc_labels else "", "--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "", diff --git a/MC/bin/o2dpg_sim_workflow_anchored.py b/MC/bin/o2dpg_sim_workflow_anchored.py index 2fa3f73cf..5a5410f6c 100755 --- a/MC/bin/o2dpg_sim_workflow_anchored.py +++ b/MC/bin/o2dpg_sim_workflow_anchored.py @@ -17,6 +17,15 @@ import subprocess import shlex +# hack to find the script for meta upload +o2dpg_root = os.environ.get("O2DPG_ROOT") +if o2dpg_root is None: + raise EnvironmentError("O2DPG_ROOT is not set in the environment.") +mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo")) +sys.path.append(mc_prodinfo_path) +from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo +import dataclasses + # Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc) # Example: @@ -417,6 +426,16 @@ def parse_file(filename): print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded") return excluded +def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False): + print("Publishing MCProdInfo") + + # see if this already has meta-data uploaded, otherwise do nothing + mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag) + if mc_prod_info_q == None: + # could make this depend on hash values in future + upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info)) + + def main(): parser = argparse.ArgumentParser(description='Creates an O2DPG simulation workflow, anchored to a given LHC run. The workflows are time anchored at regular positions within a run as a function of production size, split-id and cycle.') @@ -431,6 +450,7 @@ def main(): parser.add_argument("--run-time-span-file", type=str, dest="run_span_file", help="Run-time-span-file for exclusions of timestamps (bad data periods etc.)", default="") parser.add_argument("--invert-irframe-selection", action='store_true', help="Inverts the logic of --run-time-span-file") parser.add_argument("--orbitsPerTF", type=str, help="Force a certain orbits-per-timeframe number; Automatically taken from CCDB if not given.", default="") + parser.add_argument('--publish-mcprodinfo', action='store_true', default=False, help="Publish MCProdInfo metadata to CCDB") parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation args = parser.parse_args() print (args) @@ -547,11 +567,28 @@ def main(): else: print ("Creating time-anchored workflow...") print ("Executing: " + cmd) - # os.system(cmd) try: cmd_list = shlex.split(os.path.expandvars(cmd)) output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120) print (output) + + # when we get here, we can publish info about the production (optionally) + if args.publish_mcprodinfo == True or os.getenv("PUBLISH_MCPRODINFO") != None: + prod_tag = os.getenv("ALIEN_JDL_LPMPRODUCTIONTAG") + grid_user_name = os.getenv("JALIEN_USER") + mcprod_ccdb_server = os.getenv("PUBLISH_MCPRODINFO_CCDBSERVER") + if mcprod_ccdb_server == None: + mcprod_ccdb_server = "https://alice-ccdb.cern.ch" + if prod_tag != None and grid_user_name != None: + info = MCProdInfo(LPMProductionTag = prod_tag, + Col = ColSystem, + IntRate =rate, + RunNumber = args.run_number, + OrbitsPerTF = GLOparams["OrbitsPerTF"]) + publish_MCProdInfo(info, username = grid_user_name, ccdb_url = mcprod_ccdb_server) + else: + print("No production tag or GRID user name known. Not publishing MCProdInfo") + except subprocess.CalledProcessError as e: print(f"Command failed with return code {e.returncode}") print("Output:") diff --git a/MC/prodinfo/README.md b/MC/prodinfo/README.md new file mode 100644 index 000000000..af99241b5 --- /dev/null +++ b/MC/prodinfo/README.md @@ -0,0 +1,9 @@ +This directory contains scripts and function to collect, define and upload +CCDB meta data objects for (official) MC productions. + +This meta data can be queried in other stages, such as analysis, for the purpose of further data processing. + +TODO: + +- include cycle number in data +- include software versions (2tag or not) \ No newline at end of file diff --git a/MC/prodinfo/mcprodinfo_ccdb_upload.py b/MC/prodinfo/mcprodinfo_ccdb_upload.py new file mode 100644 index 000000000..5b798cfa9 --- /dev/null +++ b/MC/prodinfo/mcprodinfo_ccdb_upload.py @@ -0,0 +1,144 @@ +import json +import os +import requests +import subprocess + +import dataclasses # to define the MCProdInfo data layout and convert it to dict +from dataclasses import dataclass, field, asdict, fields +from typing import Optional +import hashlib + +@dataclass(frozen=True) +class MCProdInfo: + """ + struct for MonteCarlo production info + """ + LPMProductionTag: str + Col: int + IntRate: float # only indicative of some interaction rate (could vary within the run) + RunNumber: int + OrbitsPerTF: int + # max_events_per_tf: Optional[int] = -1 + Comment: Optional[str] = None + Hash: Optional[str] = field(default=None) + + def __post_init__(self): + if self.Hash == None: + # Hash only the meaningful fields + data_to_hash = { + k: v for k, v in asdict(self).items() + if k != 'hash' + } + hash_str = hashlib.sha256( + json.dumps(data_to_hash, sort_keys=True).encode() + ).hexdigest() + object.__setattr__(self, 'hash', hash_str) + + +import re + +def extract_metadata_blocks_from_CCDB(text: str): + blocks = [] + # Split on 'Metadata:\n' and iterate over each block + sections = text.split('Metadata:\n') + for section in sections[1:]: # skip the first chunk (before any Metadata:) + metadata = {} + for line in section.splitlines(): + if not line.strip(): # stop at first blank line + break + match = re.match(r'\s*(\w+)\s*=\s*(.+)', line) + if match: + key, val = match.groups() + # Type conversion + if val == "None": + val = None + elif val.isdigit() or (val.startswith('-') and val[1:].isdigit()): + val = int(val) + else: + try: + val = float(val) + except ValueError: + val = val.strip() + metadata[key] = val + if metadata: + blocks.append(metadata) + return blocks + + + +def query_mcprodinfo(base_url, user, run_number, lpm_prod_tag, cert_dir="/tmp"): + """ + Queries MCProdInfo from CCDB. Returns object or None + """ + # check if the tokenfiles are there + key_path = os.environ.get("JALIEN_TOKEN_KEY") + cert_path = os.environ.get("JALIEN_TOKEN_CERT") + if key_path == None and cert_path == None: + uid = os.getuid() + cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem") + key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem") + + # Build full URL + user_path = 'Users/' + user[0] + '/' + user + start = run_number + stop = run_number + 1 + url = f"{base_url}/browse/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}" + + response = requests.get(url, cert=(cert_path, key_path), verify=False) + if response.status_code != 404: + meta = extract_metadata_blocks_from_CCDB(response.content.decode('utf-8')) + if (len(meta) > 0): + def filter_known_fields(cls, data: dict) -> dict: + valid_keys = {f.name for f in fields(cls)} + return {k: v for k, v in data.items() if k in valid_keys} + + clean_meta = filter_known_fields(MCProdInfo, meta[0]) + return MCProdInfo(**clean_meta) + + return None + + +def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_dir="/tmp"): + """ + Uploads an empty .dat file using client certificates. + + Parameters: + - base_url (str): The base HTTPS URL, e.g., "https://URL" + - user (str): The uploader --> Determines location "Users/f/foo_bar/MCProdInfo/..." + - keys (dict): Dictionary with meta information to upload, e.g., {"key1": "var1", "key2": "var2"} + - cert_dir (str): Directory where the .pem files are located (default: /tmp) + + Returns: + - Response object from the POST request + """ + # Create an empty file + empty_file = "empty.dat" + with open(empty_file, "w") as f: + f.write("0") + + # Construct user ID-specific cert and key paths + key_path = os.environ.get("JALIEN_TOKEN_KEY") + cert_path = os.environ.get("JALIEN_TOKEN_CERT") + if key_path == None and cert_path == None: + uid = os.getuid() + cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem") + key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem") + + # Build full URL + query = "/".join(f"{k}={v}" for k, v in keys.items()) + user_path = 'Users/' + user[0] + '/' + user + start = run_number + stop = run_number + 1 + url = f"{base_url}/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}/{query}" + + print (f"Full {url}") + + # Prepare request + with open(empty_file, 'rb') as f: + files = {'blob': f} + response = requests.post(url, files=files, cert=(cert_path, key_path), verify=False) + + # Optional: remove the temporary file + os.remove(empty_file) + + return response diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index ebdd1277a..d58d52db5 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -162,7 +162,7 @@ SEED=${ALIEN_PROC_ID:-${SEED:-1}} ONCVMFS=0 if [ "${ALIEN_JDL_O2DPG_OVERWRITE}" ]; then - echo "Setting O2DPG_ROOT to overwritten path" + echo "Setting O2DPG_ROOT to overwritten path ${ALIEN_JDL_O2DPG_OVERWRITE}" export O2DPG_ROOT=${ALIEN_JDL_O2DPG_OVERWRITE} fi @@ -287,10 +287,21 @@ MODULES="--skipModules ZDC" # Since this is used, set it explicitly ALICEO2_CCDB_LOCALCACHE=${ALICEO2_CCDB_LOCALCACHE:-$(pwd)/ccdb} +# publish MCPRODINFO for first few jobs of a production +# if external script exported PUBLISH_MCPRODINFO, it will be published anyways +if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then + PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo" + echo "Will publish MCProdInfo" + export AOD_ADDITIONAL_METADATA_FILE="mc-prod-meta-file.json" + +else + echo "Will not publish MCProdInfo" +fi + # these arguments will be digested by o2dpg_sim_workflow_anchored.py baseargs="-tf ${NTIMEFRAMES} --split-id ${SPLITID} --prod-split ${PRODSPLIT} --cycle ${CYCLE} --run-number ${ALIEN_JDL_LPMRUNNUMBER} \ ${ALIEN_JDL_RUN_TIME_SPAN_FILE:+--run-time-span-file ${ALIEN_JDL_RUN_TIME_SPAN_FILE} ${ALIEN_JDL_INVERT_IRFRAME_SELECTION:+--invert-irframe-selection}} \ - ${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}}" + ${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}} ${PUBLISH_MCPRODINFO_OPTION}" # these arguments will be passed as well but only eventually be digested by o2dpg_sim_workflow.py which is called from o2dpg_sim_workflow_anchored.py remainingargs="-seed ${SEED} -ns ${NSIGEVENTS} --include-local-qc --pregenCollContext" From 173c8dfec7eada4f8b7cb38dcc7a858d09d78203 Mon Sep 17 00:00:00 2001 From: Sandro Wenzel Date: Wed, 9 Jul 2025 15:17:51 +0200 Subject: [PATCH 2/2] Update anchorMC.sh --- MC/run/ANCHOR/anchorMC.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index d58d52db5..fd0013799 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -292,8 +292,6 @@ ALICEO2_CCDB_LOCALCACHE=${ALICEO2_CCDB_LOCALCACHE:-$(pwd)/ccdb} if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo" echo "Will publish MCProdInfo" - export AOD_ADDITIONAL_METADATA_FILE="mc-prod-meta-file.json" - else echo "Will not publish MCProdInfo" fi