diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index c04b28fac..cb840b86a 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -45,6 +45,9 @@ from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg, option_if_available, overwrite_config from o2dpg_dpl_config_tools import parse_command_string, modify_dpl_command, dpl_option_from_config, TaskFinalizer +# for some JAliEn interaction +from alienpy.alien import JAlien + parser = argparse.ArgumentParser(description='Create an ALICE (Run3) MC simulation workflow') # the run-number of data taking or default if unanchored @@ -1581,6 +1584,18 @@ def getDigiTaskName(det): aod_df_id = '{0:03}'.format(tf) + import os + aod_creator = os.getenv("JALIEN_USER") + if aod_creator == None: + # we use JAliEn to determine the user and capture it's output into a variable via redirect_stdout + import io + from contextlib import redirect_stdout + f = io.StringIO() + with redirect_stdout(f): + if JAlien(['whoami']) == 0: + aod_creator = f.getvalue().strip() + print (f"Determined GRID username {aod_creator}") + AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ' @@ -1596,6 +1611,7 @@ def getDigiTaskName(det): "--lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}", "--anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}", "--anchor-prod ${ALIEN_JDL_LPMANCHORPRODUCTION:-unknown}", + f"--created-by {aod_creator}", "--combine-source-devices" if not args.no_combine_dpl_devices else "", "--disable-mc" if args.no_mc_labels else "", "--enable-truncation 0" if environ.get("O2DPG_AOD_NOTRUNCATE") or environ.get("ALIEN_JDL_O2DPG_AOD_NOTRUNCATE") else "", diff --git a/MC/bin/o2dpg_sim_workflow_anchored.py b/MC/bin/o2dpg_sim_workflow_anchored.py index 2fa3f73cf..5a5410f6c 100755 --- a/MC/bin/o2dpg_sim_workflow_anchored.py +++ b/MC/bin/o2dpg_sim_workflow_anchored.py @@ -17,6 +17,15 @@ import subprocess import shlex +# hack to find the script for meta upload +o2dpg_root = os.environ.get("O2DPG_ROOT") +if o2dpg_root is None: + raise EnvironmentError("O2DPG_ROOT is not set in the environment.") +mc_prodinfo_path = os.path.abspath(os.path.join(o2dpg_root, "MC", "prodinfo")) +sys.path.append(mc_prodinfo_path) +from mcprodinfo_ccdb_upload import MCProdInfo, upload_mcprodinfo_meta, query_mcprodinfo +import dataclasses + # Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc) # Example: @@ -417,6 +426,16 @@ def parse_file(filename): print(f"This run as globally {total_excluded_fraction} of it's data marked to be exluded") return excluded +def publish_MCProdInfo(mc_prod_info, ccdb_url = "https://alice-ccdb.cern.ch", username = "aliprod", include_meta_into_aod=False): + print("Publishing MCProdInfo") + + # see if this already has meta-data uploaded, otherwise do nothing + mc_prod_info_q = query_mcprodinfo(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag) + if mc_prod_info_q == None: + # could make this depend on hash values in future + upload_mcprodinfo_meta(ccdb_url, username, mc_prod_info.RunNumber, mc_prod_info.LPMProductionTag, dataclasses.asdict(mc_prod_info)) + + def main(): parser = argparse.ArgumentParser(description='Creates an O2DPG simulation workflow, anchored to a given LHC run. The workflows are time anchored at regular positions within a run as a function of production size, split-id and cycle.') @@ -431,6 +450,7 @@ def main(): parser.add_argument("--run-time-span-file", type=str, dest="run_span_file", help="Run-time-span-file for exclusions of timestamps (bad data periods etc.)", default="") parser.add_argument("--invert-irframe-selection", action='store_true', help="Inverts the logic of --run-time-span-file") parser.add_argument("--orbitsPerTF", type=str, help="Force a certain orbits-per-timeframe number; Automatically taken from CCDB if not given.", default="") + parser.add_argument('--publish-mcprodinfo', action='store_true', default=False, help="Publish MCProdInfo metadata to CCDB") parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation args = parser.parse_args() print (args) @@ -547,11 +567,28 @@ def main(): else: print ("Creating time-anchored workflow...") print ("Executing: " + cmd) - # os.system(cmd) try: cmd_list = shlex.split(os.path.expandvars(cmd)) output = subprocess.check_output(cmd_list, text=True, stdin=subprocess.DEVNULL, timeout = 120) print (output) + + # when we get here, we can publish info about the production (optionally) + if args.publish_mcprodinfo == True or os.getenv("PUBLISH_MCPRODINFO") != None: + prod_tag = os.getenv("ALIEN_JDL_LPMPRODUCTIONTAG") + grid_user_name = os.getenv("JALIEN_USER") + mcprod_ccdb_server = os.getenv("PUBLISH_MCPRODINFO_CCDBSERVER") + if mcprod_ccdb_server == None: + mcprod_ccdb_server = "https://alice-ccdb.cern.ch" + if prod_tag != None and grid_user_name != None: + info = MCProdInfo(LPMProductionTag = prod_tag, + Col = ColSystem, + IntRate =rate, + RunNumber = args.run_number, + OrbitsPerTF = GLOparams["OrbitsPerTF"]) + publish_MCProdInfo(info, username = grid_user_name, ccdb_url = mcprod_ccdb_server) + else: + print("No production tag or GRID user name known. Not publishing MCProdInfo") + except subprocess.CalledProcessError as e: print(f"Command failed with return code {e.returncode}") print("Output:") diff --git a/MC/prodinfo/README.md b/MC/prodinfo/README.md new file mode 100644 index 000000000..af99241b5 --- /dev/null +++ b/MC/prodinfo/README.md @@ -0,0 +1,9 @@ +This directory contains scripts and function to collect, define and upload +CCDB meta data objects for (official) MC productions. + +This meta data can be queried in other stages, such as analysis, for the purpose of further data processing. + +TODO: + +- include cycle number in data +- include software versions (2tag or not) \ No newline at end of file diff --git a/MC/prodinfo/mcprodinfo_ccdb_upload.py b/MC/prodinfo/mcprodinfo_ccdb_upload.py new file mode 100644 index 000000000..5b798cfa9 --- /dev/null +++ b/MC/prodinfo/mcprodinfo_ccdb_upload.py @@ -0,0 +1,144 @@ +import json +import os +import requests +import subprocess + +import dataclasses # to define the MCProdInfo data layout and convert it to dict +from dataclasses import dataclass, field, asdict, fields +from typing import Optional +import hashlib + +@dataclass(frozen=True) +class MCProdInfo: + """ + struct for MonteCarlo production info + """ + LPMProductionTag: str + Col: int + IntRate: float # only indicative of some interaction rate (could vary within the run) + RunNumber: int + OrbitsPerTF: int + # max_events_per_tf: Optional[int] = -1 + Comment: Optional[str] = None + Hash: Optional[str] = field(default=None) + + def __post_init__(self): + if self.Hash == None: + # Hash only the meaningful fields + data_to_hash = { + k: v for k, v in asdict(self).items() + if k != 'hash' + } + hash_str = hashlib.sha256( + json.dumps(data_to_hash, sort_keys=True).encode() + ).hexdigest() + object.__setattr__(self, 'hash', hash_str) + + +import re + +def extract_metadata_blocks_from_CCDB(text: str): + blocks = [] + # Split on 'Metadata:\n' and iterate over each block + sections = text.split('Metadata:\n') + for section in sections[1:]: # skip the first chunk (before any Metadata:) + metadata = {} + for line in section.splitlines(): + if not line.strip(): # stop at first blank line + break + match = re.match(r'\s*(\w+)\s*=\s*(.+)', line) + if match: + key, val = match.groups() + # Type conversion + if val == "None": + val = None + elif val.isdigit() or (val.startswith('-') and val[1:].isdigit()): + val = int(val) + else: + try: + val = float(val) + except ValueError: + val = val.strip() + metadata[key] = val + if metadata: + blocks.append(metadata) + return blocks + + + +def query_mcprodinfo(base_url, user, run_number, lpm_prod_tag, cert_dir="/tmp"): + """ + Queries MCProdInfo from CCDB. Returns object or None + """ + # check if the tokenfiles are there + key_path = os.environ.get("JALIEN_TOKEN_KEY") + cert_path = os.environ.get("JALIEN_TOKEN_CERT") + if key_path == None and cert_path == None: + uid = os.getuid() + cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem") + key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem") + + # Build full URL + user_path = 'Users/' + user[0] + '/' + user + start = run_number + stop = run_number + 1 + url = f"{base_url}/browse/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}" + + response = requests.get(url, cert=(cert_path, key_path), verify=False) + if response.status_code != 404: + meta = extract_metadata_blocks_from_CCDB(response.content.decode('utf-8')) + if (len(meta) > 0): + def filter_known_fields(cls, data: dict) -> dict: + valid_keys = {f.name for f in fields(cls)} + return {k: v for k, v in data.items() if k in valid_keys} + + clean_meta = filter_known_fields(MCProdInfo, meta[0]) + return MCProdInfo(**clean_meta) + + return None + + +def upload_mcprodinfo_meta(base_url, user, run_number, lpm_prod_tag, keys, cert_dir="/tmp"): + """ + Uploads an empty .dat file using client certificates. + + Parameters: + - base_url (str): The base HTTPS URL, e.g., "https://URL" + - user (str): The uploader --> Determines location "Users/f/foo_bar/MCProdInfo/..." + - keys (dict): Dictionary with meta information to upload, e.g., {"key1": "var1", "key2": "var2"} + - cert_dir (str): Directory where the .pem files are located (default: /tmp) + + Returns: + - Response object from the POST request + """ + # Create an empty file + empty_file = "empty.dat" + with open(empty_file, "w") as f: + f.write("0") + + # Construct user ID-specific cert and key paths + key_path = os.environ.get("JALIEN_TOKEN_KEY") + cert_path = os.environ.get("JALIEN_TOKEN_CERT") + if key_path == None and cert_path == None: + uid = os.getuid() + cert_path = os.path.join(cert_dir, f"tokencert_{uid}.pem") + key_path = os.path.join(cert_dir, f"tokenkey_{uid}.pem") + + # Build full URL + query = "/".join(f"{k}={v}" for k, v in keys.items()) + user_path = 'Users/' + user[0] + '/' + user + start = run_number + stop = run_number + 1 + url = f"{base_url}/{user_path}/MCProdInfo/{lpm_prod_tag}/{start}/{stop}/{query}" + + print (f"Full {url}") + + # Prepare request + with open(empty_file, 'rb') as f: + files = {'blob': f} + response = requests.post(url, files=files, cert=(cert_path, key_path), verify=False) + + # Optional: remove the temporary file + os.remove(empty_file) + + return response diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index ebdd1277a..fd0013799 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -162,7 +162,7 @@ SEED=${ALIEN_PROC_ID:-${SEED:-1}} ONCVMFS=0 if [ "${ALIEN_JDL_O2DPG_OVERWRITE}" ]; then - echo "Setting O2DPG_ROOT to overwritten path" + echo "Setting O2DPG_ROOT to overwritten path ${ALIEN_JDL_O2DPG_OVERWRITE}" export O2DPG_ROOT=${ALIEN_JDL_O2DPG_OVERWRITE} fi @@ -287,10 +287,19 @@ MODULES="--skipModules ZDC" # Since this is used, set it explicitly ALICEO2_CCDB_LOCALCACHE=${ALICEO2_CCDB_LOCALCACHE:-$(pwd)/ccdb} +# publish MCPRODINFO for first few jobs of a production +# if external script exported PUBLISH_MCPRODINFO, it will be published anyways +if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then + PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo" + echo "Will publish MCProdInfo" +else + echo "Will not publish MCProdInfo" +fi + # these arguments will be digested by o2dpg_sim_workflow_anchored.py baseargs="-tf ${NTIMEFRAMES} --split-id ${SPLITID} --prod-split ${PRODSPLIT} --cycle ${CYCLE} --run-number ${ALIEN_JDL_LPMRUNNUMBER} \ ${ALIEN_JDL_RUN_TIME_SPAN_FILE:+--run-time-span-file ${ALIEN_JDL_RUN_TIME_SPAN_FILE} ${ALIEN_JDL_INVERT_IRFRAME_SELECTION:+--invert-irframe-selection}} \ - ${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}}" + ${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}} ${PUBLISH_MCPRODINFO_OPTION}" # these arguments will be passed as well but only eventually be digested by o2dpg_sim_workflow.py which is called from o2dpg_sim_workflow_anchored.py remainingargs="-seed ${SEED} -ns ${NSIGEVENTS} --include-local-qc --pregenCollContext"