From d80528804def2bddffc6727d33a03541cc4efa1e Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Thu, 21 Aug 2025 11:04:25 +0200 Subject: [PATCH 1/8] Update DB for D0 jets --- .../database_ml_parameters_D0Jet_pp.yml | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml b/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml index 61bd6b388d..ce7a030b01 100644 --- a/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml +++ b/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml @@ -80,10 +80,10 @@ D0Jet_pp: extra: fY: log((sqrt(1.864**2 + (fPt * cosh(fEta))**2) + fPt * sinh(fEta)) / sqrt(1.864**2 + fPt**2)) #TODO : change mass or make sure Lc mass is updated tags: - ismcsignal: {var: fFlagMcMatchGen, req: [[0], []], abs: true} - ismcbkg: {var: fFlagMcMatchGen, req: [[], [0]], abs: true} - ismcprompt: {var: fOriginMcGen, req: [[0], [1]]} - ismcfd: {var: fOriginMcGen, req: [[1], [0]]} + ismcsignal: {var: fFlagMcMatchGen, req: 1, abs: true, level: mc} + ismcbkg: {var: ismcsignal, req: 0, level: mc} + ismcprompt: {var: fOriginMcGen, req: 1, level: mc} + ismcfd: {var: fOriginMcGen, req: 2, level: mc} filter: "fPt >= 1. and abs(fY) <= 0.8 and abs(fJetEta) < (.9 - (fJetR / 100.))" # TODO: check jet eta cut colldet: @@ -110,12 +110,12 @@ D0Jet_pp: isd0: fFlagMcMatchRec == 1 isd0bar: fFlagMcMatchRec == -1 tags: - ismcsignal: {var: fFlagMcMatchRec, req: [[0], []], abs: True} - ismcbkg: {var: fFlagMcMatchRec, req: [[], [0]], abs: True} - seld0: {var: fCandidateSelFlag, req: [[0], []]} - seld0bar: {var: fCandidateSelFlag, req: [[1], []]} - ismcprompt: {var: fOriginMcRec, req: [[0], [1]]} - ismcfd: {var: fOriginMcRec, req: [[1], [0]]} + ismcsignal: {var: fFlagMcMatchRec, req: 1, abs: true, level: mc} + ismcbkg: {var: ismcsignal, req: 0, level: mc} + ismcprompt: {var: fOriginMcRec, req: 1, level: mc} + ismcfd: {var: fOriginMcRec, req: 2, level: mc} + seld0: {var: fCandidateSelFlag, req: 0, level: mc} + seld0bar: {var: fCandidateSelFlag, req: 1, level: mc} extract_component: - {var: fMlScores, newvar: mlBkgScore, component: 0} filter: "fPt >= 1. and abs(fY) <= 0.8 and abs(fJetEta) < (.9 - (fJetR / 100.))" # TODO: check jet eta cut From 4a5d58ab7770f070bf8f1253ccb7232696175c83 Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Mon, 1 Sep 2025 20:16:40 +0200 Subject: [PATCH 2/8] Update DB DB udpates (mostly temporary) Add fd sections to DB DB changes --- .../database_ml_parameters_D0Jet_pp.yml | 233 +++++++++++++----- 1 file changed, 165 insertions(+), 68 deletions(-) diff --git a/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml b/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml index ce7a030b01..f36a86a7f1 100644 --- a/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml +++ b/machine_learning_hep/data/data_run3/database_ml_parameters_D0Jet_pp.yml @@ -51,34 +51,85 @@ D0Jet_pp: #region dfs dfs: read: - evtorig: + evtorig_mc: + level: mc index: fIndexHfD0McCollBases trees: O2hfd0collbase: [fNumContrib, fPosZ] filter: "abs(fPosZ) < 10." + evtorig_data: + index: fIndexHfD0CollBases + trees: + O2hfd0collbase: [fNumContrib, fPosZ] + filter: "abs(fPosZ) < 10." + evtorig_fd: + level: fd + index: fIndexHFD0MCCOLLBASES + trees: + O2hfd0mccollbase: [fPosZ] + filter: "abs(fPosZ) < 10." + + coll_powheg_fd: + level: fd + trees: + O2d0cmcpjetmcco: [fEventWeight] + collcnt_mc: + level: mc + trees: + O2collcount: [fReadCounts, fReadCountsWithTVX, fReadCountsWithTVXAndZVertexAndSel8, fReadCountsWithTVXAndZVertexAndSelMC] + bccnt_mc: + level: mc + trees: + O2bccount: [fReadCountsWithTVX, fReadCountsWithTVXAndNoTFB, fReadCountsWithTVXAndNoTFBAndNoITSROFB] collcnt: + level: data trees: O2collcount: [fReadCounts, fReadCountsWithTVX, fReadCountsWithTVXAndZVertexAndSel8, fReadCountsWithTVXAndZVertexAndSelMC] bccnt: + level: data trees: O2bccount: [fReadCountsWithTVX, fReadCountsWithTVXAndNoTFB, fReadCountsWithTVXAndNoTFBAndNoITSROFB] collgen: # TODO: check if we can use the HF collision table instead - level: gen + level: mc index: fIndexD0CMCPJETCOS trees: O2d0cmcpjetco: [fPosZ, fCentrality, fEventSel] filter: "abs(fPosZ) < 10." jetgen: - level: gen + level: mc index: fIndexD0CMCPJETOS trees: O2hfd0pbase: [fIndexHfD0McCollBases, fPt, fEta, fPhi, fFlagMcMatchGen, fOriginMcGen] O2d0cmcpjeto: [fIndexD0CMCPJETCOS, fJetPt, fJetPhi, fJetEta, fJetNConstituents, fJetR] O2d0cmcpjetmo: [fIndexArrayD0CMCDJETOS_hf, fIndexArrayD0CMCDJETOS_geo, fIndexArrayD0CMCDJETOS_pt] - O2d0cmcpjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity, fPairPt, fPairEnergy, fPairTheta] + O2d0cmcpjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity] extra: fY: log((sqrt(1.864**2 + (fPt * cosh(fEta))**2) + fPt * sinh(fEta)) / sqrt(1.864**2 + fPt**2)) #TODO : change mass or make sure Lc mass is updated + fM: 1.864 + tags: + ismcsignal: {var: fFlagMcMatchGen, req: 1, abs: true, level: mc} + ismcbkg: {var: ismcsignal, req: 0, level: mc} + ismcprompt: {var: fOriginMcGen, req: 1, level: mc} + ismcfd: {var: fOriginMcGen, req: 2, level: mc} + filter: "fPt >= 1. and abs(fY) <= 0.8 and abs(fJetEta) < (.9 - (fJetR / 100.))" # TODO: check jet eta cut + + collfd: # TODO: check if we can use the HF collision table instead + level: fd + index: fIndexD0CMCPJETCOS + trees: + O2d0cmcpjetco: [fPosZ, fCentrality, fEventSel] + filter: "abs(fPosZ) < 10." + jetfd: + level: fd + index: fIndexD0CMCPJETOS + trees: + O2hfd0pbase: [fIndexHFD0MCCOLLBASES, fPt, fEta, fPhi, fFlagMcMatchGen, fOriginMcGen] + O2d0cmcpjeto: [fIndexD0CMCPJETCOS, fJetPt, fJetPhi, fJetEta, fJetNConstituents, fJetR] + O2d0cmcpjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity] + extra: + fY: log((sqrt(1.864**2 + (fPt * cosh(fEta))**2) + fPt * sinh(fEta)) / sqrt(1.864**2 + fPt**2)) #TODO : change mass or make sure Lc mass is updated + fM: 1.864 tags: ismcsignal: {var: fFlagMcMatchGen, req: 1, abs: true, level: mc} ismcbkg: {var: ismcsignal, req: 0, level: mc} @@ -87,13 +138,13 @@ D0Jet_pp: filter: "fPt >= 1. and abs(fY) <= 0.8 and abs(fJetEta) < (.9 - (fJetR / 100.))" # TODO: check jet eta cut colldet: - level: det + level: mc index: fIndexD0CMCDJETCOS trees: O2d0cmcdjetco: [fPosZ, fCentrality, fEventSel] filter: "abs(fPosZ) < 10." jetdet: - level: det + level: mc index: fIndexD0CMCDJETOS trees: # add EEC columns O2hfd0base: [fIndexHfD0CollBases, fPt, fEta, fPhi, fM] @@ -104,7 +155,7 @@ D0Jet_pp: O2hfd0ml: [fMlScores] O2d0cmcdjeto: [fIndexD0CMCDJETCOS, fJetPt, fJetPhi, fJetEta, fJetNConstituents, fJetR] O2d0cmcdjetmo: [fIndexArrayD0CMCPJETOS_hf, fIndexArrayD0CMCPJETOS_geo, fIndexArrayD0CMCPJETOS_pt] - O2d0cmcdjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity, fPairPt, fPairEnergy, fPairTheta] + O2d0cmcdjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity] extra: fY: log((sqrt(1.864**2 + (fPt * cosh(fEta))**2) + fPt * sinh(fEta)) / sqrt(1.864**2 + fPt**2)) #TODO : change mass or make sure Lc mass is updated isd0: fFlagMcMatchRec == 1 @@ -138,7 +189,7 @@ D0Jet_pp: O2hfd0ml: [fMlScores] O2hfd0sel: [fCandidateSelFlag] O2d0cjeto: [fIndexD0CJETCOS, fJetPt, fJetPhi, fJetEta, fJetNConstituents, fJetR] - O2d0cjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity, fPairPt, fPairEnergy, fPairTheta] + O2d0cjetsso: [fEnergyMother, fPtLeading, fPtSubLeading, fTheta, fNSub2DR, fNSub1, fNSub2, fAngularity] extra: fY: log((sqrt(1.864**2 + (fPt * cosh(fEta))**2) + fPt * sinh(fEta)) / sqrt(1.864**2 + fPt**2)) #TODO : change mass or make sure Lc mass is updated extract_component: @@ -146,33 +197,66 @@ D0Jet_pp: filter: "fPt >= 1. and abs(fY) <= 0.8 and abs(fJetEta) < (.9 - (fJetR / 100.))" # TODO: check jet eta cut merge: - - {base: jetgen, ref: evtorig} + - {base: jetgen, ref: evtorig_mc} + - {base: jetfd, ref: collfd} - {base: jetdet, ref: colldet} - {base: jetdata, ref: colldata} # workaround for yamlfmt issue #110 write: jetgen: - level: gen + level: mc + file: AnalysisResultsGen.parquet + jetfd: + level: fd file: AnalysisResultsGen.parquet jetdet: - level: det + level: mc file: AnalysisResultsReco.parquet jetdata: level: data file: AnalysisResultsReco.parquet - evtorig: - level: all + + evtorig_mc: + level: mc + source: collgen file: AnalysisResultsEvtOrig.parquet - evt: - level: all + evt_mc: + level: mc source: evtorig file: AnalysisResultsEvt.parquet + + evtorig_data: + level: data + source: colldata + file: AnalysisResultsEvtOrig.parquet + evt_data: + level: data + source: colldata + file: AnalysisResultsEvt.parquet + + coll_powheg_fd: + level: fd + file: AnalysisResultsPowheg.parquet + evtorig_fd: + level: fd + source: collfd + file: AnalysisResultsEvtOrig.parquet + evt_fd: + level: fd + source: collfd + file: AnalysisResultsEvt.parquet + collcnt_mc: + level: mc + file: AnalysisResultsCollCnt.parquet + bccnt_mc: + level: mc + file: AnalysisResultsBcCnt.parquet collcnt: - level: all + level: data file: AnalysisResultsCollCnt.parquet bccnt: - level: all + level: data file: AnalysisResultsBcCnt.parquet variables: @@ -231,6 +315,7 @@ D0Jet_pp: namefile_reco_applieddata: AnalysisResultsRecoAppliedData.parquet namefile_reco_appliedmc: AnalysisResultsRecoAppliedMC.parquet namefile_mcweights: mcweights.root + namefile_wgt: AnalysisResultsPowheg.parquet treeoutput: "D0tree" histofilename: "masshisto.root" efffilename: "effhisto.root" @@ -242,20 +327,14 @@ D0Jet_pp: multi: data: nprocessesparallel: 80 - maxfiles: [-1] #list of periods + maxfiles: [1] #list of periods chunksizeunp: [100] #list of periods chunksizeskim: [100] #list of periods fracmerge: [.1] #list of periods seedmerge: [12] #list of periods period: [LHC22o] #list of periods select_period: [1] - prefix_dir: /data2/MLhep/real/train_318624/ # full stats new - # prefix_dir: /data2/MLhep/real/train_257023/ # full stats - # prefix_dir: /data2/MLhep/real/train_240309/ # Andrea's model - # prefix_dir: /data2/MLhep/real/train_245508/ # pass7 - # prefix_dir: /data2/MLhep/real/train_245509/ # no TOF - # prefix_dir: /data2/MLhep/real/train_245510/ # no PID - # prefix_dir: /data2/MLhep/real/train_245511/ # full PID + prefix_dir: /data2/MLhep/real/train_318624/ unmerged_tree_dir: [/alice/] #list of periods pkl: ["${USER}/d0jet/pkl"] #list of periods pkl_skimmed: ["${USER}/d0jet/pklsk"] #list of periods @@ -265,20 +344,14 @@ D0Jet_pp: mcreweights: [../Analyses] #list of periods mc: nprocessesparallel: 80 - maxfiles: [-1] #list of periods + maxfiles: [1] #list of periods chunksizeunp: [100] #list of periods chunksizeskim: [1000] #list of periods fracmerge: [1.] #list of periods seedmerge: [12] #list of periods period: [LHC24d3b] #list of periods select_period: [1] - prefix_dir: /data2/MLhep/sim/train_318983/ # full stats new - # prefix_dir: /data2/MLhep/sim/train_257228/ # full stats - # prefix_dir: /data2/MLhep/sim/train_259454/ # 97 % tracking eff - # prefix_dir: /data2/MLhep/sim/train_239774/ # Andrea's model - # prefix_dir: /data2/MLhep/sim/train_244893/ # no TOF - # prefix_dir: /data2/MLhep/sim/train_244894/ # no PID - # prefix_dir: /data2/MLhep/sim/train_244895/ # full PID + prefix_dir: /data2/MLhep/sim/train_318983/ unmerged_tree_dir: [alice/] pkl: ["${USER}/d0jet/pkl"] #list of periods pkl_skimmed: ["${USER}/d0jet/pklsk"] #list of periods @@ -286,6 +359,23 @@ D0Jet_pp: pkl_skimmed_merge_for_ml_all: "${USER}/d0jet/pp_mc_prod_mltot" pkl_evtcounter_all: "${USER}/d0jet/pp_mc_prod_evttot" mcreweights: [../Analyses] #list of periods + fd: + nprocessesparallel: 80 + maxfiles: [-1] #list of periods + chunksizeunp: [100] #list of periods + chunksizeskim: [1000] #list of periods + fracmerge: [1.] #list of periods + seedmerge: [12] #list of periods + period: [LHC24d3b] #list of periods + select_period: [1] + prefix_dir: /home/jklein/powheg/parallel/ + unmerged_tree_dir: [default/] + pkl: ["${USER}/d0jet/pkl"] #list of periods + pkl_skimmed: ["${USER}/d0jet/pklsk"] #list of periods + pkl_skimmed_merge_for_ml: ["${USER}/d0jet/pklskml"] #list of periods + pkl_skimmed_merge_for_ml_all: "${USER}/d0jet/pp_mc_prod_mltot" + pkl_evtcounter_all: "${USER}/d0jet/pp_mc_prod_evttot" + mcreweights: [../Analyses] #list of periods ml: evtsel: null # TODO: fIsEventReject == 0 @@ -371,7 +461,7 @@ D0Jet_pp: sel_an_binmax: [2, 3, 4, 5, 6, 7, 8, 10, 12, 16, 24, 48] # hadron pt bins (sel_an_binmin bins) # FIXME: move the last edge in sel_an_binmin bins_ptjet: [5, 7, 10, 15, 20, 30, 40, 60, 70] # systematics, TODO: split rec and gen binning bins_ptjet_eff: [2, 5, 7, 10, 15, 20, 30, 40, 60, 70, 90] # systematics, TODO: split rec and gen binning - cand_collidx: fIndexHfD0CollBases + # cand_collidx: fIndexHfD0CollBases counter_read_data: fReadCountsWithTVXAndZVertexAndSel8 counter_read_mc: fReadCountsWithTVXAndZVertexAndSelMC counter_tvx: fReadCountsWithTVX @@ -406,40 +496,40 @@ D0Jet_pp: bins_gen_fix: [10, 0., 1.] bins_det_fix: [10, 0., 1.] label: "#Delta#it{r}" - lntheta: - bins_gen_fix: [8, 1., 5.] - bins_det_fix: [8, 1., 5.] - label: "#minusln(#it{#theta})" - arraycols: [3] - lnkt: - bins_gen_fix: [8, -4., 4.] - bins_det_fix: [8, -4., 4.] - label: "ln(#it{k}_{T}/(GeV/#it{c}))" - arraycols: [3] - lntheta-lnkt: - arraycols: [3, 4] - # new variables - fEnergyMother: - bins_gen_fix: [1, 0., 100.] - bins_det_fix: [1, 0., 100.] - arraycols: [3] - # lntheta-lnkt-fEnergyMother: - # arraycols: [3, 4, 5] - fJetNConstituents: - bins_gen_fix: [5, 0., 20.] - bins_det_fix: [5, 0., 20.] - zpar-fJetNConstituents: {} - nsub21: - # TODO: check for 1-track jets - bins_gen_fix: [11, -1., 1.] - bins_det_fix: [11, -1., 1.] - eecweight: - # TODO: adjust binning - bins_gen_fix: [10, 0., 1.] - bins_det_fix: [10, 0., 1.] - arraycols: [3] - fPairTheta-eecweight: - arraycols: [3, 4] + # lntheta: + # bins_gen_fix: [8, 1., 5.] + # bins_det_fix: [8, 1., 5.] + # label: "#minusln(#it{#theta})" + # arraycols: [3] + # lnkt: + # bins_gen_fix: [8, -4., 4.] + # bins_det_fix: [8, -4., 4.] + # label: "ln(#it{k}_{T}/(GeV/#it{c}))" + # arraycols: [3] + # lntheta-lnkt: + # arraycols: [3, 4] + # # new variables + # fEnergyMother: + # bins_gen_fix: [1, 0., 100.] + # bins_det_fix: [1, 0., 100.] + # arraycols: [3] + # # lntheta-lnkt-fEnergyMother: + # # arraycols: [3, 4, 5] + # fJetNConstituents: + # bins_gen_fix: [5, 0., 20.] + # bins_det_fix: [5, 0., 20.] + # zpar-fJetNConstituents: {} + # nsub21: + # # TODO: check for 1-track jets + # bins_gen_fix: [11, -1., 1.] + # bins_det_fix: [11, -1., 1.] + # eecweight: + # # TODO: adjust binning + # bins_gen_fix: [10, 0., 1.] + # bins_det_fix: [10, 0., 1.] + # arraycols: [3] + # fPairTheta-eecweight: + # arraycols: [3, 4] data_selections: mcsig: @@ -722,6 +812,7 @@ D0Jet_pp: use_matched: true frac_mcana: .2 # fraction of MC sample for the closure + fd_input: "sim" fd_root: "/data2/vkucera/powheg/trees_powheg_fd_central.root" # systematics fd_parquet: "/data2/jklein/powheg/trees_powheg_fd_central.parquet" # systematics @@ -769,10 +860,16 @@ D0Jet_pp: runselection: [null] #FIXME # used but useless results: ["/home/${USER}/mlhep/d0jet/jet_obs/default/default/mc/results"] #list of periods resultsallp: "/home/${USER}/mlhep/d0jet/jet_obs/default/default/mc/results_all" + fd: &fd_out_default + runselection: [null] #FIXME # used but useless + results: ["/home/${USER}/mlhep/d0jet/jet_obs/default/default/fd/results"] #list of periods + resultsallp: "/home/${USER}/mlhep/d0jet/jet_obs/default/default/fd/results_all" data_proc: # alternative processor output used as the analyzer input <<: *data_out_default mc_proc: # alternative processor output used as the analyzer input <<: *mc_out_default + fd_proc: # alternative processor output used as the analyzer input + <<: *fd_out_default # simple fitter START # used in cplusutilities/mass_fitter.C # sgnfunc: [0,0,0,0,0,0,0,0,0,0,0,0] # kGaus=0, k2Gaus=1, k2GausSigmaRatioPar=2 (sel_an_binmin bins) From dcb56774f2c011fc16bc02cf3dda0016d3d91e3f Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Thu, 6 Mar 2025 11:27:55 +0100 Subject: [PATCH 3/8] Generalize data type to include fd Establish conversion for gen-only --- machine_learning_hep/common.py | 6 + machine_learning_hep/multiprocesser.py | 64 +++--- machine_learning_hep/processer.py | 110 +++++----- machine_learning_hep/processer_jet.py | 10 +- machine_learning_hep/processerdhadrons.py | 4 +- .../processerdhadrons_mult.py | 4 +- machine_learning_hep/steer_analysis.py | 193 +++++++----------- 7 files changed, 181 insertions(+), 210 deletions(-) create mode 100644 machine_learning_hep/common.py diff --git a/machine_learning_hep/common.py b/machine_learning_hep/common.py new file mode 100644 index 0000000000..15ac2719b0 --- /dev/null +++ b/machine_learning_hep/common.py @@ -0,0 +1,6 @@ +from enum import Enum + +class DataType(Enum): + MC = "mc" + DATA = "data" + FD = "fd" diff --git a/machine_learning_hep/multiprocesser.py b/machine_learning_hep/multiprocesser.py index 9fd4048378..ccb4fa3c06 100755 --- a/machine_learning_hep/multiprocesser.py +++ b/machine_learning_hep/multiprocesser.py @@ -16,33 +16,44 @@ main script for doing data processing, machine learning and analysis """ +from functools import reduce import os import tempfile +from typing import TypeVar from machine_learning_hep.io_ml_utils import dump_yaml_from_dict, parse_yaml from machine_learning_hep.logger import get_logger from machine_learning_hep.utilities import merge_method, mergerootfiles - +from .common import DataType class MultiProcesser: # pylint: disable=too-many-instance-attributes, too-many-statements, consider-using-f-string, too-many-branches species = "multiprocesser" logger = get_logger() - def __init__(self, case, proc_class, datap, typean, run_param, mcordata): + T = TypeVar("T") + + def cfg(self, param: str, default: T = None) -> T: + return reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + param.split("."), + self.datap, + ) + + def __init__(self, case, proc_class, datap, typean, run_param, datatype): self.case = case self.datap = datap self.typean = typean self.run_param = run_param - self.mcordata = mcordata - self.prodnumber = len(datap["multi"][self.mcordata]["unmerged_tree_dir"]) - self.p_period = datap["multi"][self.mcordata]["period"] - self.select_period = datap["multi"][self.mcordata]["select_period"] - self.p_seedmerge = datap["multi"][self.mcordata]["seedmerge"] - self.p_fracmerge = datap["multi"][self.mcordata]["fracmerge"] - self.p_maxfiles = datap["multi"][self.mcordata]["maxfiles"] - self.p_chunksizeunp = datap["multi"][self.mcordata]["chunksizeunp"] - self.p_chunksizeskim = datap["multi"][self.mcordata]["chunksizeskim"] - self.p_nparall = datap["multi"][self.mcordata]["nprocessesparallel"] + self.datatype = DataType(datatype) + self.prodnumber = len(datap["multi"][self.datatype.value]["unmerged_tree_dir"]) + self.p_period = datap["multi"][self.datatype.value]["period"] + self.select_period = datap["multi"][self.datatype.value]["select_period"] + self.p_seedmerge = datap["multi"][self.datatype.value]["seedmerge"] + self.p_fracmerge = datap["multi"][self.datatype.value]["fracmerge"] + self.p_maxfiles = datap["multi"][self.datatype.value]["maxfiles"] + self.p_chunksizeunp = datap["multi"][self.datatype.value]["chunksizeunp"] + self.p_chunksizeskim = datap["multi"][self.datatype.value]["chunksizeskim"] + self.p_nparall = datap["multi"][self.datatype.value]["nprocessesparallel"] self.lpt_anbinmin = datap["sel_skim_binmin"] self.lpt_anbinmax = datap["sel_skim_binmax"] self.p_nptbins = len(datap["sel_skim_binmax"]) @@ -53,18 +64,19 @@ def __init__(self, case, proc_class, datap, typean, run_param, mcordata): self.dlper_pkl = [] self.dlper_pklsk = [] self.dlper_pklml = [] - self.d_prefix = datap["multi"][self.mcordata].get("prefix_dir", "") - self.d_prefix_app = datap["mlapplication"][self.mcordata].get("prefix_dir_app", "") - self.d_prefix_res = datap["analysis"][self.typean][self.mcordata].get("prefix_dir_res", "") + self.d_prefix = datap["multi"][self.datatype.value].get("prefix_dir", "") + self.d_prefix_app = self.cfg(f"mlapplication.{self.datatype.value}.prefix_dir_app", "") + self.d_prefix_res = self.cfg(f"analysis.{self.typean}.{self.datatype.value}.prefix_dir_res", "") - dp = datap["multi"][self.mcordata] + dp = datap["multi"][self.datatype.value] self.dlper_root = [self.d_prefix + os.path.expandvars(p) for p in dp["unmerged_tree_dir"]] + print('****', self.dlper_root, flush=True) self.dlper_pkl = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl"]] self.dlper_pklsk = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl_skimmed"]] self.dlper_pklml = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl_skimmed_merge_for_ml"]] self.d_pklml_mergedallp = self.d_prefix + os.path.expandvars(dp["pkl_skimmed_merge_for_ml_all"]) self.d_pklevt_mergedallp = self.d_prefix + os.path.expandvars(dp["pkl_evtcounter_all"]) - self.dlper_mcreweights = datap["multi"][self.mcordata]["mcreweights"] + self.dlper_mcreweights = datap["multi"][self.datatype.value]["mcreweights"] # namefiles pkl self.v_var_binning = datap["var_binning"] @@ -99,21 +111,21 @@ def __init__(self, case, proc_class, datap, typean, run_param, mcordata): self.lper_evt = [os.path.join(direc, self.n_evt) for direc in self.dlper_pkl] self.lper_evtorig = [os.path.join(direc, self.n_evtorig) for direc in self.dlper_pkl] - dp = datap["mlapplication"][self.mcordata] - self.dlper_reco_modapp = [self.d_prefix_app + p for p in dp["pkl_skimmed_dec"]] - self.dlper_reco_modappmerged = [self.d_prefix_app + p for p in dp["pkl_skimmed_decmerged"]] + dp = self.cfg(f"mlapplication.{self.datatype.value}", {}) + self.dlper_reco_modapp = [self.d_prefix_app + p for p in dp["pkl_skimmed_dec"]] if dp else [None] * len(self.p_period) + self.dlper_reco_modappmerged = [self.d_prefix_app + p for p in dp["pkl_skimmed_decmerged"]] if dp else [None] * len(self.p_period) - dp = datap["analysis"][self.typean][self.mcordata] + dp = self.cfg(f"analysis.{self.typean}.{self.datatype.value}", {}) self.d_results = [self.d_prefix_res + os.path.expandvars(p) for p in dp["results"]] self.d_resultsallp = self.d_prefix_res + os.path.expandvars(dp["resultsallp"]) self.f_evt_mergedallp = os.path.join(self.d_pklevt_mergedallp, self.n_evt) self.f_evtorig_mergedallp = os.path.join(self.d_pklevt_mergedallp, self.n_evtorig) - self.lper_runlistrigger = datap["analysis"][self.typean][self.mcordata]["runselection"] + self.lper_runlistrigger = self.cfg(f"analysis.{self.typean}.{self.datatype.value}.runselection", [None] * len(self.p_period)) self.lper_mcreweights = None - if self.mcordata == "mc": + if self.datatype == DataType.MC: self.lper_mcreweights = [os.path.join(direc, self.n_mcreweights) for direc in self.dlper_mcreweights] self.process_listsample = [] @@ -123,7 +135,7 @@ def __init__(self, case, proc_class, datap, typean, run_param, mcordata): self.case, self.datap, self.run_param, - self.mcordata, + self.datatype.value, self.p_maxfiles[indexp], self.dlper_root[indexp], self.dlper_pkl[indexp], @@ -186,10 +198,10 @@ def multi_mergeml_allinone(self): for indexp in range(self.prodnumber): if self.select_period[indexp] == 0: self.lptper_recoml[ipt].remove(self.lptper_recoml[ipt][indexp]) - if self.mcordata == "mc": + if self.datatype == DataType.MC: self.lptper_genml[ipt].remove(self.lptper_genml[ipt][indexp]) merge_method(self.lptper_recoml[ipt], self.lpt_recoml_mergedallp[ipt]) - if self.mcordata == "mc": + if self.datatype == DataType.MC: merge_method(self.lptper_genml[ipt], self.lpt_genml_mergedallp[ipt]) count_evt = 0 diff --git a/machine_learning_hep/processer.py b/machine_learning_hep/processer.py index b2766d3cbd..6815ef7b8f 100644 --- a/machine_learning_hep/processer.py +++ b/machine_learning_hep/processer.py @@ -88,13 +88,14 @@ def __init__( # directories self.d_prefix_ml = datap["ml"].get("prefix_dir_ml", "") self.d_root = d_root + print('%%%%', self.d_root, flush=True) self.d_pkl = d_pkl self.d_pklsk = d_pklsk self.d_pkl_ml = d_pkl_ml self.d_results = d_results self.d_mcreweights = d_mcreweights # used in hadrons self.datap = datap - self.mcordata = mcordata + self.datatype = mcordata self.lpt_anbinmin = datap["sel_skim_binmin"] self.lpt_anbinmax = datap["sel_skim_binmax"] @@ -177,6 +178,7 @@ def __init__( # list of files names if os.path.isdir(self.d_root): + print(f"{self.d_root=}", flush=True) self.l_path = list_folders(self.d_root, self.n_root, self.p_maxfiles, self.select_jobs) elif glob.glob(f"{self.d_pkl}/**/{self.n_reco}", recursive=True): self.l_path = list_folders(self.d_pkl, self.n_reco, self.p_maxfiles, self.select_jobs) @@ -185,6 +187,7 @@ def __init__( ".p", "_%s%d_%d.p" % (self.v_var_binning, self.lpt_anbinmin[0], self.lpt_anbinmax[0]) ) self.l_path = list_folders(self.d_pklsk, self.n_sk, self.p_maxfiles, self.select_jobs) + print(f"{self.l_path=}", flush=True) self.l_root = createlist(self.d_root, self.l_path, self.n_root) self.l_reco = createlist(self.d_pkl, self.l_path, self.n_reco) @@ -196,7 +199,7 @@ def __init__( self.l_histoeff = createlist(self.d_results, self.l_path, self.n_fileeff) # self.l_historesp = createlist(self.d_results, self.l_path, self.n_fileresp) - if self.mcordata == "mc": + if self.datatype == "mc": self.l_gen = createlist(self.d_pkl, self.l_path, self.n_gen) if self.n_gen_sl: self.l_gen_sl = createlist(self.d_pkl, self.l_path, self.n_gen_sl) @@ -223,41 +226,36 @@ def __init__( for bin in self.bins_analysis ] - self.lpt_probcutpre = datap["mlapplication"]["probcutpresel"][self.mcordata] - lpt_probcutfin_tmp = datap["mlapplication"]["probcutoptimal"] + self.lpt_probcutpre = self.cfg_global(f"mlapplication.probcutpresel.{self.datatype}", [None] * self.p_nptbins) + lpt_probcutfin_tmp = self.cfg_global(f"mlapplication.probcutoptimal", [None] * self.p_nptfinbins) self.lpt_probcutfin = [lpt_probcutfin_tmp[bin_matching[ibin]] for ibin in range(self.p_nptfinbins)] - for ibin, probcutfin in enumerate(self.lpt_probcutfin): - probcutpre = self.lpt_probcutpre[bin_matching[ibin]] - if self.mltype == "MultiClassification": - if probcutfin[0] > probcutpre[0] or probcutfin[1] < probcutpre[1] or probcutfin[2] < probcutpre[2]: + if self.datatype in ('mc', 'data'): + for ibin, probcutfin in enumerate(self.lpt_probcutfin): + probcutpre = self.lpt_probcutpre[bin_matching[ibin]] + if self.mltype == "MultiClassification": + if probcutfin[0] > probcutpre[0] or probcutfin[1] < probcutpre[1] or probcutfin[2] < probcutpre[2]: + self.logger.fatal( + "Probability cut final: %s must be tighter than presel %s!\n" + "Verify that bkg prob presel > final, and other cuts presel < final", + self.lpt_probcutfin, + self.lpt_probcutpre, + ) + elif probcutfin < probcutpre: self.logger.fatal( - "Probability cut final: %s must be tighter than presel %s!\n" - "Verify that bkg prob presel > final, and other cuts presel < final", + "Probability cut final: %s must be tighter (smaller values) than presel %s!", self.lpt_probcutfin, self.lpt_probcutpre, ) - elif probcutfin < probcutpre: - self.logger.fatal( - "Probability cut final: %s must be tighter (smaller values) than presel %s!", - self.lpt_probcutfin, - self.lpt_probcutpre, - ) - if self.mltype == "MultiClassification": - self.l_selml = [] - comps = ["<=", ">=", ">="] - for ipt in range(self.p_nptfinbins): - mlsel_multi = [ - f"y_test_prob{self.p_modelname}{label.replace('-', '_')} {comp} {probcut}" - for label, comp, probcut in zip(self.class_labels, comps, self.lpt_probcutfin[ipt], strict=False) + if self.mltype == "MultiClassification": + self.l_selml = [] + comps = ["<=", ">=", ">="] + for ipt in range(self.p_nptfinbins): + mlsel_multi = [ + f"y_test_prob{self.p_modelname}{label.replace('-', '_')} {comp} {probcut}" + for label, comp, probcut in zip(self.class_labels, comps, self.lpt_probcutfin[ipt], strict=False) ] - self.l_selml.append(" and ".join(mlsel_multi)) - - else: - self.l_selml = [ - f"y_test_prob{self.p_modelname} > {self.lpt_probcutfin[ipt]}" for ipt in range(self.p_nptfinbins) - ] self.d_pkl_dec = d_pkl_dec self.mptfiles_recosk = [] @@ -265,6 +263,7 @@ def __init__( self.mptfiles_gensk_sl = [] self.d_pkl_decmerged = d_pkl_decmerged + print(self.d_results, self.n_filemass, flush=True) self.n_filemass = os.path.join(self.d_results, self.n_filemass) self.n_fileeff = os.path.join(self.d_results, self.n_fileeff) self.n_fileresp = os.path.join(self.d_results, self.n_fileresp) @@ -293,7 +292,7 @@ def __init__( ) self.lpt_recodec = None - if self.doml is True: + if self.doml and self.datatype in ('mc', 'data'): if self.mltype == "MultiClassification": self.lpt_recodec = [ self.n_reco.replace( @@ -327,11 +326,11 @@ def __init__( ] self.mptfiles_recoskmldec = [ createlist(self.d_pkl_dec, self.l_path, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) - ] + ] if self.datatype in ('mc', 'data') else [] self.lpt_recodecmerged = [ os.path.join(self.d_pkl_decmerged, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) - ] - if self.mcordata == "mc": + ] if self.datatype in ('mc', 'data') else [] + if self.datatype == "mc": self.mptfiles_gensk = [ createlist(self.d_pklsk, self.l_path, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins) ] @@ -365,6 +364,13 @@ def cfg(self, param: str, default: T = None) -> T: self.datap["analysis"][self.typean], ) + def cfg_global(self, param: str, default: T = None) -> T: + return reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + param.split("."), + self.datap, + ) + def unpack(self, file_index, max_no_keys=None): # pylint: disable=too-many-branches, too-many-locals def dfread(rdir, trees, cols, idx_name=None): """Read DF from multiple (joinable) O2 tables""" @@ -411,8 +417,9 @@ def dfuse(df_spec): level = df_spec.get("level", "all") return ( (level == "all") - or (level in ("mc", "gen", "det") and self.mcordata == "mc") - or (level in ("data") and self.mcordata == "data") + or (level in ("mc", "gen", "det") and self.datatype == "mc") + or (level in ("mc", "gen") and self.datatype == "fd") + or (level in ("data") and self.datatype == "data") ) self.logger.info("unpacking: %s", self.l_root[file_index]) @@ -532,14 +539,15 @@ def dfuse(df_spec): write_df(dfo, path) def skim(self, file_index): - dfreco = read_df(self.l_reco[file_index]) - dfgen = read_df(self.l_gen[file_index]) if self.mcordata == "mc" else None - dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.mcordata == "mc" else None + dfreco = read_df(self.l_reco[file_index]) if self.datatype != "fd" else None + dfgen = read_df(self.l_gen[file_index]) if self.datatype == "mc" else None + dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype == "mc" else None for ipt in range(self.p_nptbins): - dfrecosk = seldf_singlevar(dfreco, self.v_var_binning, self.lpt_anbinmin[ipt], self.lpt_anbinmax[ipt]) - dfrecosk = dfquery(dfrecosk, self.s_reco_skim[ipt]) - write_df(dfrecosk, self.mptfiles_recosk[ipt][file_index]) + if dfreco is not None: + dfrecosk = seldf_singlevar(dfreco, self.v_var_binning, self.lpt_anbinmin[ipt], self.lpt_anbinmax[ipt]) + dfrecosk = dfquery(dfrecosk, self.s_reco_skim[ipt]) + write_df(dfrecosk, self.mptfiles_recosk[ipt][file_index]) if dfgen is not None: dfgensk = seldf_singlevar(dfgen, self.v_var_binning, self.lpt_anbinmin[ipt], self.lpt_anbinmax[ipt]) @@ -606,14 +614,14 @@ def parallelizer(self, function, argument_list, maxperchunk): # pass def process_unpack_par(self): - self.logger.info("Unpacking %s period %s", self.mcordata, self.period) + self.logger.info("Unpacking %s period %s", self.datatype, self.period) create_folder_struc(self.d_pkl, self.l_path) arguments = [(i,) for i in range(len(self.l_root))] - self.logger.debug("d_pkl: %s, l_path: %s, arguments: %s", self.d_pkl, str(self.l_path), str(arguments)) + self.logger.info("d_pkl: %s, l_path: %s, arguments: %s", self.d_pkl, str(self.l_path), str(arguments)) self.parallelizer(self.unpack, arguments, self.p_chunksizeunp) def process_skim_par(self): - self.logger.info("Skimming %s period %s", self.mcordata, self.period) + self.logger.info("Skimming %s period %s", self.datatype, self.period) create_folder_struc(self.d_pklsk, self.l_path) arguments = [(i,) for i in range(len(self.l_reco))] self.parallelizer(self.skim, arguments, self.p_chunksizeskim) @@ -622,13 +630,13 @@ def process_skim_par(self): merge_method(self.l_evtorig, self.f_totevtorig) def process_applymodel_par(self): - self.logger.info("Applying model to %s period %s", self.mcordata, self.period) + self.logger.info("Applying model to %s period %s", self.datatype, self.period) create_folder_struc(self.d_pkl_dec, self.l_path) arguments = [(i,) for i in range(len(self.mptfiles_recosk[0]))] self.parallelizer(self.applymodel, arguments, self.p_chunksizeskim) def process_mergeforml(self): - self.logger.info("doing merging for ml %s %s", self.mcordata, self.period) + self.logger.info("doing merging for ml %s %s", self.datatype, self.period) indices_for_evt = [] for ipt in range(self.p_nptbins): nfiles = len(self.mptfiles_recosk[ipt]) @@ -642,7 +650,7 @@ def process_mergeforml(self): indices_for_evt = list(set(indices_for_evt) | set(filesel)) list_sel_recosk = [self.mptfiles_recosk[ipt][j] for j in filesel] merge_method(list_sel_recosk, self.lpt_reco_ml[ipt]) - if self.mcordata == "mc": + if self.datatype == "mc": list_sel_gensk = [self.mptfiles_gensk[ipt][j] for j in filesel] merge_method(list_sel_gensk, self.lpt_gen_ml[ipt]) @@ -655,7 +663,7 @@ def process_mergeforml(self): def process_mergedec(self): for ipt in range(self.p_nptbins): merge_method(self.mptfiles_recoskmldec[ipt], self.lpt_recodecmerged[ipt]) - if self.mcordata == "mc": + if self.datatype == "mc": merge_method(self.mptfiles_gensk[ipt], self.lpt_gendecmerged[ipt]) def load_cuts(self): @@ -709,14 +717,14 @@ def apply_cut_for_ipt(df_full, ipt: int): if any(self.analysis_cuts): df_ipt = df_ipt.query(self.analysis_cuts[ipt]) if in_range else df_ipt - if any(self.analysis_mult_cuts) and self.mcordata == "data": + if any(self.analysis_mult_cuts) and self.datatype == "data": df_ipt = df_ipt.query(self.analysis_mult_cuts[ipt]) if in_range else df_ipt return df_ipt return pd.concat(apply_cut_for_ipt(df_, ipt) for ipt in range(-1, self.p_nptfinbins + 1)) def process_histomass(self): - self.logger.debug("Doing masshisto %s %s", self.mcordata, self.period) + self.logger.debug("Doing masshisto %s %s", self.datatype, self.period) self.logger.debug("Using run selection for mass histo %s %s %s", self.runlistrigger, "for period", self.period) if self.doml is True: self.logger.debug("Doing ml analysis") @@ -735,7 +743,7 @@ def process_histomass(self): mergerootfiles(self.l_histomass, self.n_filemass, tmp_merged_dir) def process_efficiency(self): - print("Doing efficiencies", self.mcordata, self.period) + print("Doing efficiencies", self.datatype, self.period) print("Using run selection for eff histo", self.runlistrigger, "for period", self.period) if self.doml is True: print("Doing ml analysis") diff --git a/machine_learning_hep/processer_jet.py b/machine_learning_hep/processer_jet.py index 5efabe2361..221fe962d9 100644 --- a/machine_learning_hep/processer_jet.py +++ b/machine_learning_hep/processer_jet.py @@ -237,7 +237,7 @@ def process_histomass_single(self, index): histonorm.SetBinContent(1, len(dfquery(dfevtorig, self.s_evtsel))) if self.l_collcnt: dfcollcnt = read_df(self.l_collcnt[index]) - ser_collcnt = dfcollcnt[self.cfg(f"counter_read_{self.mcordata}")] + ser_collcnt = dfcollcnt[self.cfg(f"counter_read_{self.datatype}")] collcnt_read = functools.reduce(lambda x, y: float(x) + float(y), (ar[0] for ar in ser_collcnt)) self.logger.info("sampled %g collisions", collcnt_read) histonorm.SetBinContent(2, collcnt_read) @@ -277,7 +277,7 @@ def process_histomass_single(self, index): fill_hist(h, df[["fM", "fJetPt", "fPt"]], write=True) for sel_name, sel_spec in self.cfg("data_selections", {}).items(): - if sel_spec["level"] == self.mcordata: + if sel_spec["level"] == self.datatype: df_sel = dfquery(df, sel_spec["query"]) h = create_hist( f"h_mass-ptjet-pthf_{sel_name}", @@ -288,7 +288,7 @@ def process_histomass_single(self, index): ) fill_hist(h, df_sel[["fM", "fJetPt", "fPt"]], write=True) - if self.mcordata == "mc": + if self.datatype == "mc": df, _ = self.split_df(df, self.cfg("frac_mcana", 0.2)) if len(df) == 0: return @@ -433,8 +433,8 @@ def process_efficiency_single(self, index): "fNSub2", "fJetNConstituents", "fEnergyMother", - "fPairTheta", - "fPairPt", + # "fPairTheta", + # "fPairPt", ] ) cols = None diff --git a/machine_learning_hep/processerdhadrons.py b/machine_learning_hep/processerdhadrons.py index a46e90ee37..1180ee0c2e 100755 --- a/machine_learning_hep/processerdhadrons.py +++ b/machine_learning_hep/processerdhadrons.py @@ -170,7 +170,7 @@ def process_histomass_single(self, index): myfile.cd() h_invmass.Write() - if self.mcordata == "mc": + if self.datatype == "mc": df_sig = df[df[self.v_ismcsignal] == 1] df_bkg = df[df[self.v_ismcbkg] == 1] h_invmass_sig = TH1F( @@ -188,7 +188,7 @@ def process_histomass_single(self, index): h_invmass_bkg.Write() for sel_name, sel_spec in self.cfg("data_selections", {}).items(): - if sel_spec["level"] == self.mcordata: + if sel_spec["level"] == self.datatype: df_sel = dfquery(df_ptmerged, sel_spec["query"]) h = create_hist( f"h_mass-pthf_{sel_name}", diff --git a/machine_learning_hep/processerdhadrons_mult.py b/machine_learning_hep/processerdhadrons_mult.py index c0d060a9c0..670863dc72 100755 --- a/machine_learning_hep/processerdhadrons_mult.py +++ b/machine_learning_hep/processerdhadrons_mult.py @@ -245,7 +245,7 @@ def process_histomass_single(self, index): myfile.cd() h_invmass.Write() - if self.mcordata == "mc": + if self.datatype == "mc": df_bin_sig = df_bin[df_bin[self.v_ismcsignal] == 1] h_invmass_sig = TH1F( "hmass_sig" + suffix, "", self.p_num_bins, self.p_mass_fit_lim[0], self.p_mass_fit_lim[1] @@ -461,7 +461,7 @@ def set_content(df_to_use, histogram, i_b=ibin2, b_c=bincounter): h_list = [] def process_efficiency(self): - print("Doing efficiencies", self.mcordata, self.period) + print("Doing efficiencies", self.datatype, self.period) if self.doml is True: print("Doing ml analysis") else: diff --git a/machine_learning_hep/steer_analysis.py b/machine_learning_hep/steer_analysis.py index 6958a2461e..fe8d0e3e3e 100644 --- a/machine_learning_hep/steer_analysis.py +++ b/machine_learning_hep/steer_analysis.py @@ -28,6 +28,7 @@ from .analysis.analyzer_manager import AnalyzerManager from .config import update_config +from .common import DataType from .logger import configure_logger, get_logger from .utilities_files import checkdirs, checkmakedir, checkmakedirlist, delete_dirlist @@ -92,21 +93,27 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, doanaperperiod = data_config["analysis"]["doperperiod"] typean = data_config["analysis"]["type"] - dp = data_param[case]["multi"]["mc"] - dirprefixmc = dp.get("prefix_dir", "") - dirpklmc = [dirprefixmc + os.path.expandvars(p) for p in dp["pkl"]] - dirpklskmc = [dirprefixmc + os.path.expandvars(p) for p in dp["pkl_skimmed"]] - dirpklmlmc = [dirprefixmc + os.path.expandvars(p) for p in dp["pkl_skimmed_merge_for_ml"]] - dirpklevtcounter_allmc = dirprefixmc + os.path.expandvars(dp["pkl_evtcounter_all"]) - dirpklmltotmc = dirprefixmc + os.path.expandvars(dp["pkl_skimmed_merge_for_ml_all"]) - - dp = data_param[case]["multi"]["data"] - dirprefixdata = dp.get("prefix_dir", "") - dirpkldata = [dirprefixdata + os.path.expandvars(p) for p in dp["pkl"]] - dirpklskdata = [dirprefixdata + os.path.expandvars(p) for p in dp["pkl_skimmed"]] - dirpklmldata = [dirprefixdata + os.path.expandvars(p) for p in dp["pkl_skimmed_merge_for_ml"]] - dirpklevtcounter_alldata = dirprefixdata + os.path.expandvars(dp["pkl_evtcounter_all"]) - dirpklmltotdata = dirprefixdata + os.path.expandvars(dp["pkl_skimmed_merge_for_ml_all"]) + dirpkl = {} + dirpklsk = {} + dirpklml = {} + dirpklevtcounter_all = {} + dirpklmltot = {} + dirpklmltotmc = {} + dirresults = {} + dirresultstot = {} + for dt in DataType: + dp = data_param[case]["multi"][dt.value] + dirprefix = dp.get("prefix_dir", "") + dirpkl[dt] = [dirprefix + os.path.expandvars(p) for p in dp["pkl"]] + dirpklsk[dt] = [dirprefix + os.path.expandvars(p) for p in dp["pkl_skimmed"]] + dirpklml[dt] = [dirprefix + os.path.expandvars(p) for p in dp["pkl_skimmed_merge_for_ml"]] + dirpklevtcounter_all[dt] = dirprefix + os.path.expandvars(dp["pkl_evtcounter_all"]) + dirpklmltot[dt] = dirprefix + os.path.expandvars(dp["pkl_skimmed_merge_for_ml_all"]) + + dp = data_param[case]["analysis"][typean][dt.value] + dirprefixres = dp.get("prefix_dir_res", "") + dirresults[dt] = [dirprefixres + os.path.expandvars(p) for p in dp["results"]] + dirresultstot[dt] = dirprefixres + os.path.expandvars(dp["resultsallp"]) dp = data_param[case]["mlapplication"]["mc"] dirprefixmcapp = dp.get("prefix_dir_app", "") @@ -118,16 +125,6 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, dirpklskdecdata = [dirprefixdataapp + p for p in dp["pkl_skimmed_dec"]] dirpklskdec_mergeddata = [dirprefixdataapp + p for p in dp["pkl_skimmed_decmerged"]] - dp = data_param[case]["analysis"][typean]["data"] - dirprefixdatares = dp.get("prefix_dir_res", "") - dirresultsdata = [dirprefixdatares + os.path.expandvars(p) for p in dp["results"]] - dirresultsdatatot = dirprefixdatares + os.path.expandvars(dp["resultsallp"]) - - dp = data_param[case]["analysis"][typean]["mc"] - dirprefixmcres = dp.get("prefix_dir_res", "") - dirresultsmc = [dirprefixmcres + os.path.expandvars(p) for p in dp["results"]] - dirresultsmctot = dirprefixmcres + os.path.expandvars(dp["resultsallp"]) - binminarray = data_param[case]["ml"]["binmin"] binmaxarray = data_param[case]["ml"]["binmax"] multbkg = data_param[case]["ml"]["mult_bkg"] @@ -142,31 +139,19 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, proc_type = data_param[case]["analysis"][typean]["proc_type"] exdirs = [] - if doconversionmc: - exdirs.extend(checkdirs(dirpklmc)) - - if doconversiondata: - exdirs.extend(checkdirs(dirpkldata)) - - if doskimmingmc: - exdirs.extend(checkdirs(dirpklskmc)) - exdirs.extend(checkdirs(dirpklevtcounter_allmc)) - - if doskimmingdata: - exdirs.extend(checkdirs(dirpklskdata)) - exdirs.extend(checkdirs(dirpklevtcounter_alldata)) - - if domergingmc: - exdirs.extend(checkdirs(dirpklmlmc)) - - if domergingdata: - exdirs.extend(checkdirs(dirpklmldata)) - - if domergingperiodsmc: - exdirs.extend(checkdirs(dirpklmltotmc)) - - if domergingperiodsdata: - exdirs.extend(checkdirs(dirpklmltotdata)) + for dt in DataType: + if data_config["conversion"][dt.value]["activate"]: + exdirs.extend(checkdirs(dirpkl[dt])) + if data_config["skimming"][dt.value]["activate"]: + exdirs.extend(checkdirs(dirpklsk[dt])) + exdirs.extend(checkdirs(dirpklevtcounter_all[dt])) + if data_config["merging"][dt.value]["activate"]: + exdirs.extend(checkdirs(dirpklml[dt])) + if data_config["mergingperiods"][dt.value]["activate"]: + exdirs.extend(checkdirs(dirpklmltot[dt])) + if data_config["analysis"][dt.value]["histomass"]: + exdirs.extend(checkdirs(dirresults[dt])) + exdirs.extend(checkdirs(dirresultstot[dt])) if not docontinueapplymc: if doapplymc: @@ -182,14 +167,6 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, if domergeapplydata: exdirs.extend(checkdirs(dirpklskdec_mergeddata)) - if dohistomassmc: - exdirs.extend(checkdirs(dirresultsmc)) - exdirs.extend(checkdirs(dirresultsmctot)) - - if dohistomassdata: - exdirs.extend(checkdirs(dirresultsdata)) - exdirs.extend(checkdirs(dirresultsdatatot)) - if len(exdirs) > 0: logger.info("existing directories must be deleted") for d in exdirs: @@ -208,31 +185,21 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, sys.exit() # check and create directories - if doconversionmc: - checkmakedirlist(dirpklmc) - - if doconversiondata: - checkmakedirlist(dirpkldata) - - if doskimmingmc: - checkmakedirlist(dirpklskmc) - checkmakedir(dirpklevtcounter_allmc) - - if doskimmingdata: - checkmakedirlist(dirpklskdata) - checkmakedir(dirpklevtcounter_alldata) - - if domergingmc: - checkmakedirlist(dirpklmlmc) - - if domergingdata: - checkmakedirlist(dirpklmldata) - - if domergingperiodsmc: - checkmakedir(dirpklmltotmc) - - if domergingperiodsdata: - checkmakedir(dirpklmltotdata) + for dt in DataType: + if data_config["conversion"][dt.value]["activate"]: + checkmakedirlist(dirpkl[dt]) + if data_config["skimming"][dt.value]["activate"]: + checkmakedirlist(dirpklsk[dt]) + checkmakedir(dirpklevtcounter_all[dt]) + exdirs.extend(checkdirs(dirpklsk[dt])) + exdirs.extend(checkdirs(dirpklevtcounter_all[dt])) + if data_config["merging"][dt.value]["activate"]: + checkmakedirlist(dirpklml[dt]) + if data_config["mergingperiods"][dt.value]["activate"]: + checkmakedir(dirpklmltot[dt]) + if data_config["analysis"][dt.value]["histomass"]: + checkmakedirlist(dirresults[dt]) + checkmakedir(dirresultstot[dt]) if doml: checkmakedir(mlout) @@ -252,12 +219,6 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, if domergeapplydata: checkmakedirlist(dirpklskdec_mergeddata) - # Always create result directories. (produces "double free or corruption (!prev)") - checkmakedirlist(dirresultsmc) - checkmakedir(dirresultsmctot) - checkmakedirlist(dirresultsdata) - checkmakedir(dirresultsdatatot) - def mlhepmod(name): return importlib.import_module(f"..{name}", __name__) @@ -286,8 +247,12 @@ def mlhepmod(name): proc_class = mlhepmod("processer").Processer ana_class = mlhepmod("analysis.analyzer").Analyzer - mymultiprocessmc = MultiProcesser(case, proc_class, data_param[case], typean, run_param, "mc") - mymultiprocessdata = MultiProcesser(case, proc_class, data_param[case], typean, run_param, "data") + multiprocessor = {} + for dt in DataType: + multiprocessor[dt] = MultiProcesser(case, proc_class, data_param[case], typean, run_param, dt) + mymultiprocessmc = multiprocessor[DataType.MC] + mymultiprocessdata = multiprocessor[DataType.DATA] + mymultiprocessfd = multiprocessor[DataType.FD] ana_mgr = AnalyzerManager(ana_class, data_param[case], case, typean, doanaperperiod) @@ -308,29 +273,15 @@ def mlhepmod(name): if dodownloadalice: subprocess.call("../cplusutilities/Download.sh") - if doconversionmc: - mymultiprocessmc.multi_unpack_allperiods() - - if doconversiondata: - mymultiprocessdata.multi_unpack_allperiods() - - if doskimmingmc: - mymultiprocessmc.multi_skim_allperiods() - - if doskimmingdata: - mymultiprocessdata.multi_skim_allperiods() - - if domergingmc: - mymultiprocessmc.multi_mergeml_allperiods() - - if domergingdata: - mymultiprocessdata.multi_mergeml_allperiods() - - if domergingperiodsmc: - mymultiprocessmc.multi_mergeml_allinone() - - if domergingperiodsdata: - mymultiprocessdata.multi_mergeml_allinone() + for dt in DataType: + if data_config["conversion"][dt.value]["activate"]: + multiprocessor[dt].multi_unpack_allperiods() + if data_config["skimming"][dt.value]["activate"]: + multiprocessor[dt].multi_skim_allperiods() + if data_config["merging"][dt.value]["activate"]: + multiprocessor[dt].multi_mergeml_allperiods() + if data_config["mergingperiods"][dt.value]["activate"]: + multiprocessor[dt].multi_mergeml_allinone() if doml: from machine_learning_hep.optimiser import Optimiser # pylint: disable=import-outside-toplevel @@ -392,16 +343,10 @@ def mlhepmod(name): if domergeapplymc: mymultiprocessmc.multi_mergeapply_allperiods() - if dohistomassmc: - mymultiprocessmc.multi_histomass() - if dohistomassdata: - # After-burner in case of a mult analysis to obtain "correctionsweight.root" - # for merged-period data - # pylint: disable=fixme - # FIXME Can only be run here because result directories are constructed when histomass - # is run. If this step was independent, histomass would always complain that the - # result directory already exists. - mymultiprocessdata.multi_histomass() + for dt in DataType: + if data_config["analysis"][dt.value]["histomass"]: + multiprocessor[dt].multi_histomass() + if doefficiency: mymultiprocessmc.multi_efficiency() analyze_steps = [] From 242cdda8188e13ac1b108f60194e979aced56315 Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Mon, 1 Sep 2025 20:26:58 +0200 Subject: [PATCH 4/8] Update submission Add submission for feeddown simulation --- machine_learning_hep/submission/fd.yml | 85 +++++++++++++++++++ machine_learning_hep/submission/processor.yml | 2 + 2 files changed, 87 insertions(+) create mode 100644 machine_learning_hep/submission/fd.yml diff --git a/machine_learning_hep/submission/fd.yml b/machine_learning_hep/submission/fd.yml new file mode 100644 index 0000000000..3349a16fd6 --- /dev/null +++ b/machine_learning_hep/submission/fd.yml @@ -0,0 +1,85 @@ +--- +case: XXXX # used to find the database file unless specified explicitly as do_entire_analysis -d database_analysis +download: + alice: + activate: false +conversion: # pkl + mc: + activate: false + fd: + activate: true + data: + activate: false +skimming: # pkl_skimmed (pklsk), pkl_evtcounter_all + mc: + activate: false + fd: + activate: true + data: + activate: false +merging: # pkl_skimmed_merge_for_ml (pklskml) + mc: + activate: false + fd: + activate: false + data: + activate: false +mergingperiods: # pkl_skimmed_merge_for_ml_all + mc: + activate: false + fd: + activate: false + data: + activate: false + +ml_study: # mlout, mlplot + activate: false + dotraining: false + dotesting: false + doplotdistr: false + doroc: false + doroctraintest: false + doimportance: false + doimportanceshap: false + docorrelation: false + dolearningcurve: false + doapplytodatamc: false + doscancuts: false + doefficiency: false + dosignifopt: false + doboundary: false + docrossvalidation: false + dogridsearch: false + dobayesianopt: false + +mlapplication: + data: + doapply: false # pkl_skimmed_dec (pklskdec) + domergeapply: false # pkl_skimmed_decmerged (pklskdecmerged) + docontinueafterstop: false # set to true to resume interrupted processing (existing corrupted output will be overwritten) + mc: + doapply: false # pkl_skimmed_dec (pklskdec) + domergeapply: false # pkl_skimmed_decmerged (pklskdecmerged) + docontinueafterstop: false # set to true to resume interrupted processing (existing corrupted output will be overwritten) + +analysis: + type: "YYYY" # used unless specified explicitly as do_entire_analysis -a type_ana + # Do each period separately including merged (true) + # Do only merged (false) + doperperiod: false + data: + histomass: false # processer: process_histomass + fd: + histomass: false + mc: + histomass: false # processer: process_histomass + efficiency: false # processer: process_efficiency + steps: + +systematics: + cutvar: + activate: false + do_only_analysis: false # This can be done anytime when mass and efficiency histograms have been produced already for a number of trials + resume: false # already done mass and efficiency histograms will not be done again, continue with left trials + mcptshape: + activate: false diff --git a/machine_learning_hep/submission/processor.yml b/machine_learning_hep/submission/processor.yml index df4e00a242..de70f15aa2 100644 --- a/machine_learning_hep/submission/processor.yml +++ b/machine_learning_hep/submission/processor.yml @@ -64,6 +64,8 @@ analysis: mc: histomass: true # processer: process_histomass efficiency: true # processer: process_efficiency + fd: + histomass: true steps: systematics: From e34d1af282782a7cc322272b9e4424e8626f0206 Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Thu, 6 Mar 2025 14:43:04 +0100 Subject: [PATCH 5/8] Establish masshisto for gen-only production Run all conversions Fix Run processor --- machine_learning_hep/multiprocesser.py | 2 +- machine_learning_hep/processer.py | 25 +++++++------ machine_learning_hep/processer_jet.py | 10 ++++-- machine_learning_hep/steer_analysis.py | 50 +++++++++++++------------- 4 files changed, 48 insertions(+), 39 deletions(-) diff --git a/machine_learning_hep/multiprocesser.py b/machine_learning_hep/multiprocesser.py index ccb4fa3c06..034431b43a 100755 --- a/machine_learning_hep/multiprocesser.py +++ b/machine_learning_hep/multiprocesser.py @@ -80,7 +80,7 @@ def __init__(self, case, proc_class, datap, typean, run_param, datatype): # namefiles pkl self.v_var_binning = datap["var_binning"] - self.n_reco = datap["files_names"]["namefile_reco"] + self.n_reco = datap["files_names"].get("namefile_reco", "") self.n_evt = datap["files_names"]["namefile_evt"] self.n_evtorig = datap["files_names"]["namefile_evtorig"] self.n_evt_count_ml = datap["files_names"].get("namefile_evt_count", "evtcount.yaml") diff --git a/machine_learning_hep/processer.py b/machine_learning_hep/processer.py index 6815ef7b8f..d743798cdd 100644 --- a/machine_learning_hep/processer.py +++ b/machine_learning_hep/processer.py @@ -138,9 +138,9 @@ def __init__( # def nget(d : dict, k : list, dd = None): # return nget(d.get(k.pop(0), {}), k, dd) if len(k) > 1 else d.get(k.pop(0), dd) # nget(datap, ['dfs', 'write', 'jetsubdet', 'file']) - self.n_reco = datap["files_names"]["namefile_reco"] + self.n_reco = datap["files_names"].get("namefile_reco", "") self.n_evt = datap["files_names"]["namefile_evt"] - self.n_collcnt = datap["files_names"]["namefile_collcnt"] + self.n_collcnt = datap["files_names"].get("namefile_collcnt") self.n_bccnt = datap["files_names"].get("namefile_bccnt") self.n_evtorig = datap["files_names"].get("namefile_evtorig") self.n_evt_count_ml = datap["files_names"].get("namefile_evt_count", "evtcount.yaml") @@ -191,10 +191,11 @@ def __init__( self.l_root = createlist(self.d_root, self.l_path, self.n_root) self.l_reco = createlist(self.d_pkl, self.l_path, self.n_reco) + self.l_gen = createlist(self.d_pkl, self.l_path, self.n_gen) self.l_evt = createlist(self.d_pkl, self.l_path, self.n_evt) self.l_evtorig = createlist(self.d_pkl, self.l_path, self.n_evtorig) - self.l_collcnt = createlist(self.d_pkl, self.l_path, self.n_collcnt) - self.l_bccnt = createlist(self.d_pkl, self.l_path, self.n_bccnt) + self.l_collcnt = createlist(self.d_pkl, self.l_path, self.n_collcnt) if self.datatype != "fd" else None + self.l_bccnt = createlist(self.d_pkl, self.l_path, self.n_bccnt) if self.datatype != "fd" else None self.l_histomass = createlist(self.d_results, self.l_path, self.n_filemass) self.l_histoeff = createlist(self.d_results, self.l_path, self.n_fileeff) # self.l_historesp = createlist(self.d_results, self.l_path, self.n_fileresp) @@ -323,20 +324,20 @@ def __init__( self.mptfiles_recosk = [ createlist(self.d_pklsk, self.l_path, self.lpt_recosk[ipt]) for ipt in range(self.p_nptbins) - ] + ] if self.datatype in ('mc', 'data') else [] self.mptfiles_recoskmldec = [ createlist(self.d_pkl_dec, self.l_path, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) ] if self.datatype in ('mc', 'data') else [] self.lpt_recodecmerged = [ os.path.join(self.d_pkl_decmerged, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) ] if self.datatype in ('mc', 'data') else [] - if self.datatype == "mc": + if self.datatype in ('mc', 'fd'): self.mptfiles_gensk = [ createlist(self.d_pklsk, self.l_path, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins) ] self.lpt_gendecmerged = [ os.path.join(self.d_pkl_decmerged, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins) - ] + ] if self.d_pkl_decmerged else [] self.mptfiles_gensk_sl = ( [createlist(self.d_pklsk, self.l_path, self.lpt_gensk_sl[ipt]) for ipt in range(self.p_nptbins)] if self.lpt_gensk_sl @@ -418,7 +419,7 @@ def dfuse(df_spec): return ( (level == "all") or (level in ("mc", "gen", "det") and self.datatype == "mc") - or (level in ("mc", "gen") and self.datatype == "fd") + or (level in ("fd", "gen") and self.datatype == "fd") or (level in ("data") and self.datatype == "data") ) @@ -540,8 +541,9 @@ def dfuse(df_spec): def skim(self, file_index): dfreco = read_df(self.l_reco[file_index]) if self.datatype != "fd" else None - dfgen = read_df(self.l_gen[file_index]) if self.datatype == "mc" else None - dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype == "mc" else None + dfgen = read_df(self.l_gen[file_index]) if self.datatype in ('mc', 'fd') else None + dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype in ('mc', 'fd') else None + print(dfreco, dfgen, dfgen_sl, flush=True) for ipt in range(self.p_nptbins): if dfreco is not None: @@ -552,6 +554,7 @@ def skim(self, file_index): if dfgen is not None: dfgensk = seldf_singlevar(dfgen, self.v_var_binning, self.lpt_anbinmin[ipt], self.lpt_anbinmax[ipt]) dfgensk = dfquery(dfgensk, self.s_gen_skim[ipt]) + print(self.mptfiles_gensk, flush=True) write_df(dfgensk, self.mptfiles_gensk[ipt][file_index]) if dfgen_sl is not None: @@ -623,7 +626,7 @@ def process_unpack_par(self): def process_skim_par(self): self.logger.info("Skimming %s period %s", self.datatype, self.period) create_folder_struc(self.d_pklsk, self.l_path) - arguments = [(i,) for i in range(len(self.l_reco))] + arguments = [(i,) for i in range(len(self.l_gen))] self.parallelizer(self.skim, arguments, self.p_chunksizeskim) if self.p_dofullevtmerge is True: merge_method(self.l_evt, self.f_totevt) diff --git a/machine_learning_hep/processer_jet.py b/machine_learning_hep/processer_jet.py index 221fe962d9..153e6d875e 100644 --- a/machine_learning_hep/processer_jet.py +++ b/machine_learning_hep/processer_jet.py @@ -255,13 +255,19 @@ def process_histomass_single(self, index): get_axis(histonorm, 0).SetBinLabel(4, "N_{BC}^{TVX}") histonorm.Write() - df = pd.concat(read_df(self.mptfiles_recosk[bin][index]) for bin in self.active_bins_skim) + print(f"{self.mptfiles_recosk=}", flush=True) + print(f"{self.mptfiles_gensk=}", flush=True) + if self.datatype != 'fd': + df = pd.concat(read_df(self.mptfiles_recosk[bin][index]) for bin in self.active_bins_skim) + else: + df = pd.concat(read_df(self.mptfiles_gensk[bin][index]) for bin in self.active_bins_skim) # remove entries outside of kinematic range (should be taken care of by projections in analyzer) df = df.loc[(df.fJetPt >= min(self.binarray_ptjet)) & (df.fJetPt < max(self.binarray_ptjet))] df = df.loc[(df.fPt >= min(self.bins_analysis[:, 0])) & (df.fPt < max(self.bins_analysis[:, 1]))] # Custom skimming cuts - df = self.apply_cuts_all_ptbins(df) + if self.datatype != "fd": + df = self.apply_cuts_all_ptbins(df) if col_evtidx := self.cfg("cand_collidx"): h = create_hist("h_ncand", ";N_{cand}", 20, 0.0, 20.0) diff --git a/machine_learning_hep/steer_analysis.py b/machine_learning_hep/steer_analysis.py index fe8d0e3e3e..05f2ce276b 100644 --- a/machine_learning_hep/steer_analysis.py +++ b/machine_learning_hep/steer_analysis.py @@ -15,11 +15,13 @@ """ import argparse +from functools import reduce import importlib import os import shutil import subprocess import sys +from typing import TypeVar # unclear why shap needs to be imported from here, # segfaults when imported from within other modules @@ -33,6 +35,14 @@ from .utilities_files import checkdirs, checkmakedir, checkmakedirlist, delete_dirlist +T = TypeVar("T") +def cfg(config: dict, param: str, default: T = None) -> T: + return reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + param.split("."), + config, + ) + def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, too-many-branches data_config: dict, data_param: dict, @@ -51,14 +61,6 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, update_config(data_param, data_config, data_param_overwrite) dodownloadalice = data_config["download"]["alice"]["activate"] - doconversionmc = data_config["conversion"]["mc"]["activate"] - doconversiondata = data_config["conversion"]["data"]["activate"] - domergingmc = data_config["merging"]["mc"]["activate"] - domergingdata = data_config["merging"]["data"]["activate"] - doskimmingmc = data_config["skimming"]["mc"]["activate"] - doskimmingdata = data_config["skimming"]["data"]["activate"] - domergingperiodsmc = data_config["mergingperiods"]["mc"]["activate"] - domergingperiodsdata = data_config["mergingperiods"]["data"]["activate"] doml = data_config["ml_study"]["activate"] docorrelation = data_config["ml_study"]["docorrelation"] dotraining = data_config["ml_study"]["dotraining"] @@ -83,8 +85,6 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, domergeapplymc = data_config["mlapplication"]["mc"]["domergeapply"] docontinueapplydata = data_config["mlapplication"]["data"]["docontinueafterstop"] docontinueapplymc = data_config["mlapplication"]["mc"]["docontinueafterstop"] - dohistomassmc = data_config["analysis"]["mc"]["histomass"] - dohistomassdata = data_config["analysis"]["data"]["histomass"] doefficiency = data_config["analysis"]["mc"]["efficiency"] efficiency_resp = data_config["analysis"]["mc"].get("efficiency_resp", False) do_syst_ml = data_config["systematics"]["cutvar"]["activate"] @@ -140,16 +140,16 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, exdirs = [] for dt in DataType: - if data_config["conversion"][dt.value]["activate"]: + if cfg(data_config, f"conversion.{dt.value}.activate"): exdirs.extend(checkdirs(dirpkl[dt])) - if data_config["skimming"][dt.value]["activate"]: + if cfg(data_config, f"skimming.{dt.value}.activate"): exdirs.extend(checkdirs(dirpklsk[dt])) exdirs.extend(checkdirs(dirpklevtcounter_all[dt])) - if data_config["merging"][dt.value]["activate"]: + if cfg(data_config, f"merging.{dt.value}.activate"): exdirs.extend(checkdirs(dirpklml[dt])) - if data_config["mergingperiods"][dt.value]["activate"]: + if cfg(data_config, f"mergingperiods.{dt.value}.activate"): exdirs.extend(checkdirs(dirpklmltot[dt])) - if data_config["analysis"][dt.value]["histomass"]: + if cfg(data_config, f"analysis.{dt.value}.histomass"): exdirs.extend(checkdirs(dirresults[dt])) exdirs.extend(checkdirs(dirresultstot[dt])) @@ -186,18 +186,18 @@ def do_entire_analysis( # pylint: disable=too-many-locals, too-many-statements, # check and create directories for dt in DataType: - if data_config["conversion"][dt.value]["activate"]: + if cfg(data_config, f"conversion.{dt.value}.activate"): checkmakedirlist(dirpkl[dt]) - if data_config["skimming"][dt.value]["activate"]: + if cfg(data_config, f"skimming.{dt.value}.activate"): checkmakedirlist(dirpklsk[dt]) checkmakedir(dirpklevtcounter_all[dt]) exdirs.extend(checkdirs(dirpklsk[dt])) exdirs.extend(checkdirs(dirpklevtcounter_all[dt])) - if data_config["merging"][dt.value]["activate"]: + if cfg(data_config, f"merging.{dt.value}.activate"): checkmakedirlist(dirpklml[dt]) - if data_config["mergingperiods"][dt.value]["activate"]: + if cfg(data_config, f"mergingperiods.{dt.value}.activate"): checkmakedir(dirpklmltot[dt]) - if data_config["analysis"][dt.value]["histomass"]: + if cfg(data_config, f"analysis.{dt.value}.histomass"): checkmakedirlist(dirresults[dt]) checkmakedir(dirresultstot[dt]) @@ -274,13 +274,13 @@ def mlhepmod(name): subprocess.call("../cplusutilities/Download.sh") for dt in DataType: - if data_config["conversion"][dt.value]["activate"]: + if cfg(data_config, f"conversion.{dt.value}.activate"): multiprocessor[dt].multi_unpack_allperiods() - if data_config["skimming"][dt.value]["activate"]: + if cfg(data_config, f"skimming.{dt.value}.activate"): multiprocessor[dt].multi_skim_allperiods() - if data_config["merging"][dt.value]["activate"]: + if cfg(data_config, f"merging.{dt.value}.activate"): multiprocessor[dt].multi_mergeml_allperiods() - if data_config["mergingperiods"][dt.value]["activate"]: + if cfg(data_config, f"mergingperiods.{dt.value}.activate"): multiprocessor[dt].multi_mergeml_allinone() if doml: @@ -344,7 +344,7 @@ def mlhepmod(name): mymultiprocessmc.multi_mergeapply_allperiods() for dt in DataType: - if data_config["analysis"][dt.value]["histomass"]: + if cfg(data_config, f"analysis.{dt.value}.histomass"): multiprocessor[dt].multi_histomass() if doefficiency: From 767d1e4e8d8e1f5d6a764adde03d3c16a694697c Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Tue, 11 Mar 2025 13:25:57 +0100 Subject: [PATCH 6/8] Extend jet analyzer to read feeddown info from processor --- .../analysis/analyzer_jets.py | 111 ++++++++++-------- 1 file changed, 65 insertions(+), 46 deletions(-) diff --git a/machine_learning_hep/analysis/analyzer_jets.py b/machine_learning_hep/analysis/analyzer_jets.py index ba4b1da86a..941957e327 100644 --- a/machine_learning_hep/analysis/analyzer_jets.py +++ b/machine_learning_hep/analysis/analyzer_jets.py @@ -65,15 +65,18 @@ def __init__(self, datap, case, typean, period): suffix = f"results.{period}" if period is not None else "resultsallp" self.d_resultsallpmc = self.cfg(f"mc.{suffix}") self.d_resultsallpdata = self.cfg(f"data.{suffix}") + self.d_resultsallpfd = self.cfg(f"fd.{suffix}") # input directories (processor output) self.d_resultsallpdata_proc = self.cfg(f"data_proc.{suffix}") self.d_resultsallpmc_proc = self.cfg(f"mc_proc.{suffix}") + self.d_resultsallpfd_proc = self.cfg(f"fd_proc.{suffix}") # input files n_filemass_name = datap["files_names"]["histofilename"] self.n_filemass = os.path.join(self.d_resultsallpdata_proc, n_filemass_name) self.n_filemass_mc = os.path.join(self.d_resultsallpmc_proc, n_filemass_name) + self.n_filemass_fd = os.path.join(self.d_resultsallpfd_proc, n_filemass_name) self.n_fileeff = datap["files_names"]["efffilename"] self.n_fileeff = os.path.join(self.d_resultsallpmc_proc, self.n_fileeff) self.n_fileresp = datap["files_names"]["respfilename"] @@ -1063,57 +1066,73 @@ def _extract_signal(self, hist, var, mcordata, ipt): # hres.Sumw2() # TODO: check if we should do this here return hres - # region feeddown - # pylint: disable=too-many-statements - def estimate_feeddown(self): - self.logger.info("Estimating feeddown") - with TFile(self.cfg("fd_root")) as rfile: - powheg_xsection = rfile.Get("fHistXsection") - powheg_xsection_scale_factor = powheg_xsection.GetBinContent(1) / powheg_xsection.GetEntries() - self.logger.info("POWHEG luminosity (mb^{-1}): %g", 1.0 / powheg_xsection_scale_factor) + def estimate_feeddown(self): + """Estimate feeddown from legacy Run 2 trees or gen-only simulation""" + match self.cfg("fd_input", "tree"): + case "tree": + with TFile(self.cfg("fd_root")) as rfile: + powheg_xsection = rfile.Get("fHistXsection") + powheg_xsection_scale_factor = powheg_xsection.GetBinContent(1) / powheg_xsection.GetEntries() + self.logger.info("POWHEG luminosity (mb^{-1}): %g", 1.0 / powheg_xsection_scale_factor) + + df = pd.read_parquet(self.cfg("fd_parquet")) + col_mapping = {"dr": "delta_r_jet", "zpar": "z"} # TODO: check mapping + + # TODO: generalize to higher dimensions + h3_fd_gen_orig = {} + for var in self.observables["all"]: + bins_ptjet = np.asarray(self.cfg("bins_ptjet"), "d") + # TODO: generalize or derive from histogram? + bins_obs = {} + if binning := self.cfg(f"observables.{var}.bins_gen_var"): + bins_tmp = np.asarray(binning, "d") + elif binning := self.cfg(f"observables.{var}.bins_gen_fix"): + bins_tmp = bin_array(*binning) + elif binning := self.cfg(f"observables.{var}.bins_var"): + bins_tmp = np.asarray(binning, "d") + elif binning := self.cfg(f"observables.{var}.bins_fix"): + bins_tmp = bin_array(*binning) + else: + self.logger.error("no binning specified for %s, using defaults", var) + bins_tmp = bin_array(10, 0.0, 1.0) + bins_obs[var] = bins_tmp + + colname = col_mapping.get(var, f"{var}_jet") + if f"{colname}" not in df: + if var is not None: + self.logger.error("No feeddown information for %s (%s), cannot estimate feeddown", var, colname) + # print(df.info(), flush=True) + continue - df = pd.read_parquet(self.cfg("fd_parquet")) - col_mapping = {"dr": "delta_r_jet", "zpar": "z"} # TODO: check mapping + # TODO: derive histogram + h3_fd_gen_orig[var] = create_hist( + "h3_feeddown_gen", + f";p_{{T}}^{{jet}} (GeV/#it{{c}});p_{{T}}^{{HF}} (GeV/#it{{c}});{var}", + bins_ptjet, + self.bins_candpt, + bins_obs[var], + ) + fill_hist_fast(h3_fd_gen_orig, df[["pt_jet", "pt_cand", f"{colname}"]]) + self._save_hist(project_hist(h3_fd_gen_orig, [0, 2], {}), f"fd/h_ptjet-{var}_feeddown_gen_noeffscaling.png") + + case "sim": + # TODO: recover cross section + h3_fd_gen_orig = {} + with TFile(self.n_filemass_fd) as rfile: + for var in self.observables["all"]: + self.logger.info("Running feeddown analysis for obs. %s", var) + label = f"-{var}" if var else "" + if fh := rfile.Get(f"h_mass-ptjet-pthf{label}"): + h3_fd_gen_orig[var] = project_hist(fh, list(range(1, get_dim(fh))), {}) + ensure_sumw2(h3_fd_gen_orig[var]) + + case fd_input: + self.logger.critical("Invalid feeddown input %s", fd_input) - # TODO: generalize to higher dimensions for var in self.observables["all"]: - bins_ptjet = np.asarray(self.cfg("bins_ptjet"), "d") - # TODO: generalize or derive from histogram? - bins_obs = {} - if binning := self.cfg(f"observables.{var}.bins_gen_var"): - bins_tmp = np.asarray(binning, "d") - elif binning := self.cfg(f"observables.{var}.bins_gen_fix"): - bins_tmp = bin_array(*binning) - elif binning := self.cfg(f"observables.{var}.bins_var"): - bins_tmp = np.asarray(binning, "d") - elif binning := self.cfg(f"observables.{var}.bins_fix"): - bins_tmp = bin_array(*binning) - else: - self.logger.error("no binning specified for %s, using defaults", var) - bins_tmp = bin_array(10, 0.0, 1.0) - bins_obs[var] = bins_tmp - - colname = col_mapping.get(var, f"{var}_jet") - if f"{colname}" not in df: - if var is not None: - self.logger.error("No feeddown information for %s (%s), cannot estimate feeddown", var, colname) - # print(df.info(), flush=True) - continue - - # TODO: derive histogram - h3_fd_gen_orig = create_hist( - "h3_feeddown_gen", - f";p_{{T}}^{{jet}} (GeV/#it{{c}});p_{{T}}^{{HF}} (GeV/#it{{c}});{var}", - bins_ptjet, - self.bins_candpt, - bins_obs[var], - ) - fill_hist_fast(h3_fd_gen_orig, df[["pt_jet", "pt_cand", f"{colname}"]]) - self._save_hist(project_hist(h3_fd_gen_orig, [0, 2], {}), f"fd/h_ptjet-{var}_feeddown_gen_noeffscaling.png") - # new method - h3_fd_gen = h3_fd_gen_orig.Clone() + h3_fd_gen = h3_fd_gen_orig[var].Clone() ensure_sumw2(h3_fd_gen) self._save_hist(project_hist(h3_fd_gen, [0, 2], {}), f"fd/h_ptjet-{var}_fdnew_gen.png") # apply np efficiency From 8d20eac8f5967d5e4e7af2b8e841339c7aa53de8 Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Tue, 25 Mar 2025 09:35:09 +0100 Subject: [PATCH 7/8] Establish runnable version with POWHEG Debug output Fix minor issues Fix technical issues Fix technicalities --- .../analysis/analyzer_jets.py | 37 ++++++++++--------- machine_learning_hep/multiprocesser.py | 1 - machine_learning_hep/processer.py | 19 +++++----- machine_learning_hep/processer_jet.py | 24 ++++++++---- .../workflow/workflow_base.py | 4 +- 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/machine_learning_hep/analysis/analyzer_jets.py b/machine_learning_hep/analysis/analyzer_jets.py index 941957e327..b111665286 100644 --- a/machine_learning_hep/analysis/analyzer_jets.py +++ b/machine_learning_hep/analysis/analyzer_jets.py @@ -86,9 +86,8 @@ def __init__(self, datap, case, typean, period): self.p_pdfnames = datap["analysis"][self.typean].get("pdf_names") self.p_param_names = datap["analysis"][self.typean].get("param_names") - # TODO: should come entirely from DB self.observables = { - "qa": ["zg", "rg", "nsd", "zpar", "dr", "lntheta", "lnkt", "lntheta-lnkt"], + "qa": [*self.cfg("observables", {})], "all": [*self.cfg("observables", {})], } @@ -117,7 +116,7 @@ def __init__(self, datap, case, typean, period): for param, symbol in zip( ("mean", "sigma", "significance", "chi2"), ("#it{#mu}", "#it{#sigma}", "significance", "#it{#chi}^{2}"), - strict=False, + strict=True, ) } for level in self.fit_levels @@ -212,7 +211,7 @@ def qa(self): # pylint: disable=invalid-name # region efficiency # pylint: disable=too-many-statements def calculate_efficiencies(self): - self.logger.info("Calculating efficiencies") + self.logger.info("Calculating efficiencies from %s", self.n_fileeff) cats = {"pr", "np"} with TFile(self.n_fileeff) as rfile: h_gen = {cat: rfile.Get(f"h_ptjet-pthf_{cat}_gen") for cat in cats} @@ -589,9 +588,6 @@ def fit(self): if iptjet is None: if not fitcfg.get("per_ptjet"): for jptjet in range(get_nbins(h, 1)): - self.logger.info( - "Overwriting roows_ptjet for %s iptjet %s ipt %d", level, jptjet, ipt - ) self.roows[(jptjet, ipt)] = roo_ws.Clone() self.roo_ws[(level, jptjet, ipt)] = roo_ws.Clone() if level in ("data", "mc"): @@ -688,7 +684,6 @@ def _subtract_sideband(self, hist, var, mcordata, ipt): # project out the mass regions (first axis) axes = list(range(get_dim(hist)))[1:] fh[region] = project_hist(hist, axes, {0: bins[region]}) - self.logger.info("Projecting %s to %s in %s: %g entries", hist, axes, bins[region], fh[region].GetEntries()) self._save_hist( fh[region], f"sideband/h_ptjet{label}_{region}_{string_range_pthf(range_pthf)}_{mcordata}.png" ) @@ -705,11 +700,8 @@ def _subtract_sideband(self, hist, var, mcordata, ipt): self.logger.info("Scaling sidebands in ptjet-%s bins: %s using %s", label, bins_ptjet, fh_sideband) hx = project_hist(fh_sideband, (0,), {}) if get_dim(fh_sideband) > 1 else fh_sideband for iptjet in bins_ptjet: - if iptjet: - n = hx.GetBinContent(iptjet) - self.logger.info("Need to scale in ptjet %i: %g", iptjet, n) - if n <= 0: - continue + if iptjet and hx.GetBinContent(iptjet) <= 0: + continue rws = self.roo_ws.get((mcordata, iptjet, ipt)) if not rws: self.logger.error("Falling back to incl. roows for %s-iptjet%i-ipt%i", mcordata, iptjet, ipt) @@ -1066,11 +1058,11 @@ def _extract_signal(self, hist, var, mcordata, ipt): # hres.Sumw2() # TODO: check if we should do this here return hres - def estimate_feeddown(self): """Estimate feeddown from legacy Run 2 trees or gen-only simulation""" match self.cfg("fd_input", "tree"): case "tree": + self.logger.info("Reading feeddown information from trees") with TFile(self.cfg("fd_root")) as rfile: powheg_xsection = rfile.Get("fHistXsection") powheg_xsection_scale_factor = powheg_xsection.GetBinContent(1) / powheg_xsection.GetEntries() @@ -1101,7 +1093,9 @@ def estimate_feeddown(self): colname = col_mapping.get(var, f"{var}_jet") if f"{colname}" not in df: if var is not None: - self.logger.error("No feeddown information for %s (%s), cannot estimate feeddown", var, colname) + self.logger.error( + "No feeddown information for %s (%s), cannot estimate feeddown", var, colname + ) # print(df.info(), flush=True) continue @@ -1113,8 +1107,10 @@ def estimate_feeddown(self): self.bins_candpt, bins_obs[var], ) - fill_hist_fast(h3_fd_gen_orig, df[["pt_jet", "pt_cand", f"{colname}"]]) - self._save_hist(project_hist(h3_fd_gen_orig, [0, 2], {}), f"fd/h_ptjet-{var}_feeddown_gen_noeffscaling.png") + fill_hist_fast(h3_fd_gen_orig[var], df[["pt_jet", "pt_cand", f"{colname}"]]) + self._save_hist( + project_hist(h3_fd_gen_orig[var], [0, 2], {}), f"fd/h_ptjet-{var}_feeddown_gen_noeffscaling.png" + ) case "sim": # TODO: recover cross section @@ -1126,6 +1122,11 @@ def estimate_feeddown(self): if fh := rfile.Get(f"h_mass-ptjet-pthf{label}"): h3_fd_gen_orig[var] = project_hist(fh, list(range(1, get_dim(fh))), {}) ensure_sumw2(h3_fd_gen_orig[var]) + self._save_hist( + project_hist(h3_fd_gen_orig[var], [0, 2], {}), + f"fd/h_ptjet-{var}_feeddown_genonly_noeffscaling.png", + ) + powheg_xsection_scale_factor = 0. # FIXME: retrieve cross section case fd_input: self.logger.critical("Invalid feeddown input %s", fd_input) @@ -1177,7 +1178,7 @@ def estimate_feeddown(self): h_fd_det = project_hist(h3_fd_det, [0, 2], {}) # old method - h3_fd_gen = h3_fd_gen_orig.Clone() + h3_fd_gen = h3_fd_gen_orig[var].Clone() ensure_sumw2(h3_fd_gen) for ipt in range(get_nbins(h3_fd_gen, 1)): eff_pr = self.hcandeff["pr"].GetBinContent(ipt + 1) diff --git a/machine_learning_hep/multiprocesser.py b/machine_learning_hep/multiprocesser.py index 034431b43a..063397ce08 100755 --- a/machine_learning_hep/multiprocesser.py +++ b/machine_learning_hep/multiprocesser.py @@ -70,7 +70,6 @@ def __init__(self, case, proc_class, datap, typean, run_param, datatype): dp = datap["multi"][self.datatype.value] self.dlper_root = [self.d_prefix + os.path.expandvars(p) for p in dp["unmerged_tree_dir"]] - print('****', self.dlper_root, flush=True) self.dlper_pkl = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl"]] self.dlper_pklsk = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl_skimmed"]] self.dlper_pklml = [self.d_prefix + os.path.expandvars(p) for p in dp["pkl_skimmed_merge_for_ml"]] diff --git a/machine_learning_hep/processer.py b/machine_learning_hep/processer.py index d743798cdd..86ad85a246 100644 --- a/machine_learning_hep/processer.py +++ b/machine_learning_hep/processer.py @@ -88,7 +88,6 @@ def __init__( # directories self.d_prefix_ml = datap["ml"].get("prefix_dir_ml", "") self.d_root = d_root - print('%%%%', self.d_root, flush=True) self.d_pkl = d_pkl self.d_pklsk = d_pklsk self.d_pkl_ml = d_pkl_ml @@ -178,7 +177,6 @@ def __init__( # list of files names if os.path.isdir(self.d_root): - print(f"{self.d_root=}", flush=True) self.l_path = list_folders(self.d_root, self.n_root, self.p_maxfiles, self.select_jobs) elif glob.glob(f"{self.d_pkl}/**/{self.n_reco}", recursive=True): self.l_path = list_folders(self.d_pkl, self.n_reco, self.p_maxfiles, self.select_jobs) @@ -187,7 +185,6 @@ def __init__( ".p", "_%s%d_%d.p" % (self.v_var_binning, self.lpt_anbinmin[0], self.lpt_anbinmax[0]) ) self.l_path = list_folders(self.d_pklsk, self.n_sk, self.p_maxfiles, self.select_jobs) - print(f"{self.l_path=}", flush=True) self.l_root = createlist(self.d_root, self.l_path, self.n_root) self.l_reco = createlist(self.d_pkl, self.l_path, self.n_reco) @@ -264,7 +261,6 @@ def __init__( self.mptfiles_gensk_sl = [] self.d_pkl_decmerged = d_pkl_decmerged - print(self.d_results, self.n_filemass, flush=True) self.n_filemass = os.path.join(self.d_results, self.n_filemass) self.n_fileeff = os.path.join(self.d_results, self.n_fileeff) self.n_fileresp = os.path.join(self.d_results, self.n_fileresp) @@ -352,7 +348,7 @@ def __init__( # Analysis cuts (loaded in self.process_histomass) self.analysis_cuts = None - self.analysis_mult_cuts = None + self.analysis_multcuts = None # Flag if they should be used self.do_custom_analysis_cuts = datap["analysis"][self.typean].get("use_cuts", False) @@ -429,7 +425,11 @@ def dfuse(df_spec): with uproot.open(self.l_root[file_index]) as rfile: df_processed = set() keys = rfile.keys(recursive=False, filter_name="DF_*") - self.logger.info("found %d dataframes, reading %s", len(keys), max_no_keys or "all") + if len(keys) == 0: + self.logger.error("no dataframes found in %s", self.l_root[file_index]) + return + else: + self.logger.info("found %d dataframes, reading %s", len(keys), max_no_keys or "all") for idx, key in enumerate(keys[:max_no_keys]): if not (df_key := re.match("^DF_(\\d+);", key)): continue @@ -533,17 +533,16 @@ def dfuse(df_spec): if self.df_write: for df_name, df_spec in self.df_write.items(): if dfuse(df_spec): - self.logger.info("writing %s to %s", df_name, df_spec["file"]) src = df_spec.get("source", df_name) dfo = dfquery(dfs[src], df_spec.get("filter", None)) path = os.path.join(self.d_pkl, self.l_path[file_index], df_spec["file"]) + self.logger.info("writing %s to %s with info %s", df_name, path, dfo.info()) write_df(dfo, path) def skim(self, file_index): dfreco = read_df(self.l_reco[file_index]) if self.datatype != "fd" else None dfgen = read_df(self.l_gen[file_index]) if self.datatype in ('mc', 'fd') else None dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype in ('mc', 'fd') else None - print(dfreco, dfgen, dfgen_sl, flush=True) for ipt in range(self.p_nptbins): if dfreco is not None: @@ -554,7 +553,6 @@ def skim(self, file_index): if dfgen is not None: dfgensk = seldf_singlevar(dfgen, self.v_var_binning, self.lpt_anbinmin[ipt], self.lpt_anbinmax[ipt]) dfgensk = dfquery(dfgensk, self.s_gen_skim[ipt]) - print(self.mptfiles_gensk, flush=True) write_df(dfgensk, self.mptfiles_gensk[ipt][file_index]) if dfgen_sl is not None: @@ -620,7 +618,7 @@ def process_unpack_par(self): self.logger.info("Unpacking %s period %s", self.datatype, self.period) create_folder_struc(self.d_pkl, self.l_path) arguments = [(i,) for i in range(len(self.l_root))] - self.logger.info("d_pkl: %s, l_path: %s, arguments: %s", self.d_pkl, str(self.l_path), str(arguments)) + # self.logger.info("d_pkl: %s, l_path: %s, arguments: %s", self.d_pkl, str(self.l_path), str(arguments)) self.parallelizer(self.unpack, arguments, self.p_chunksizeunp) def process_skim_par(self): @@ -747,6 +745,7 @@ def process_histomass(self): def process_efficiency(self): print("Doing efficiencies", self.datatype, self.period) + self.load_cuts() print("Using run selection for eff histo", self.runlistrigger, "for period", self.period) if self.doml is True: print("Doing ml analysis") diff --git a/machine_learning_hep/processer_jet.py b/machine_learning_hep/processer_jet.py index 153e6d875e..8a0c345c54 100644 --- a/machine_learning_hep/processer_jet.py +++ b/machine_learning_hep/processer_jet.py @@ -83,8 +83,10 @@ def __init__( self.s_evtsel = datap["analysis"][self.typean]["evtsel"] # bins: 2d array [[low, high], ...] - self.bins_skimming = np.array(list(zip(self.lpt_anbinmin, self.lpt_anbinmax)), "d") # TODO: replace with cfg - self.bins_analysis = np.array(list(zip(self.lpt_finbinmin, self.lpt_finbinmax)), "d") + self.bins_skimming = np.array( + list(zip(self.lpt_anbinmin, self.lpt_anbinmax, strict=True)), "d" + ) # TODO: replace with cfg + self.bins_analysis = np.array(list(zip(self.lpt_finbinmin, self.lpt_finbinmax, strict=True)), "d") # skimming bins in overlap with the analysis range self.active_bins_skim = [ @@ -143,7 +145,7 @@ def _verify_variables(self, dfi): for idx, row in df.iterrows(): isSoftDropped = False nsd = 0 - for zg, theta in zip(row["zg_array"], row["fTheta"]): + for zg, theta in zip(row["zg_array"], row["fTheta"], strict=True): if zg >= self.cfg("zcut", 0.1): if not isSoftDropped: df.loc[idx, "zg"] = zg @@ -178,7 +180,12 @@ def _calculate_variables(self, df, verify=False): # pylint: disable=invalid-nam df["zg"] = df["zg_array"].apply(lambda ar: next((zg for zg in ar if zg >= zcut), -0.1)) if "rg" in observables: df["rg"] = df[["zg_array", "fTheta"]].apply( - (lambda ar: next((rg for (zg, rg) in zip(ar.zg_array, ar.fTheta) if zg >= zcut), -0.1)), axis=1 + ( + lambda ar: next( + (rg for (zg, rg) in zip(ar.zg_array, ar.fTheta, strict=True) if zg >= zcut), -0.1 + ) + ), + axis=1, ) if "nsd" in observables: df["nsd"] = df["zg_array"].apply(lambda ar: len([zg for zg in ar if zg >= zcut])) @@ -231,6 +238,7 @@ def split_df(self, dfi, frac): def process_histomass_single(self, index): self.logger.info("Processing (histomass) %s", self.l_evtorig[index]) + print(f"Opening file {self.l_histomass[index]}", flush=True) with TFile.Open(self.l_histomass[index], "recreate") as _: dfevtorig = read_df(self.l_evtorig[index]) histonorm = TH1F("histonorm", "histonorm", 4, 0, 4) @@ -255,9 +263,7 @@ def process_histomass_single(self, index): get_axis(histonorm, 0).SetBinLabel(4, "N_{BC}^{TVX}") histonorm.Write() - print(f"{self.mptfiles_recosk=}", flush=True) - print(f"{self.mptfiles_gensk=}", flush=True) - if self.datatype != 'fd': + if self.datatype != "fd": df = pd.concat(read_df(self.mptfiles_recosk[bin][index]) for bin in self.active_bins_skim) else: df = pd.concat(read_df(self.mptfiles_gensk[bin][index]) for bin in self.active_bins_skim) @@ -490,6 +496,10 @@ def process_efficiency_single(self, index): } for cat in cats: + print( + f"Filling histograms for {cat}: {dfgen[cat].info()}, {dfdet[cat].info()}, {dfmatch[cat].info()}", + flush=True, + ) fill_hist(h_eff[(cat, "gen")], dfgen[cat][["fJetPt_gen", "fPt_gen"]]) fill_hist(h_eff[(cat, "det")], dfdet[cat][["fJetPt", "fPt"]]) if cat in dfmatch and dfmatch[cat] is not None: diff --git a/machine_learning_hep/workflow/workflow_base.py b/machine_learning_hep/workflow/workflow_base.py index 57929513c7..a0870550f1 100644 --- a/machine_learning_hep/workflow/workflow_base.py +++ b/machine_learning_hep/workflow/workflow_base.py @@ -14,6 +14,7 @@ from functools import reduce from os.path import join +from typing import TypeVar # pylint: disable=import-error, no-name-in-module from ROOT import gStyle @@ -37,7 +38,8 @@ def __init__(self, datap, case, typean, period=None): self.typean = typean self.period = period - def cfg(self, param, default=None): + T = TypeVar("T") + def cfg(self, param: str, default: T = None) -> T: return reduce( lambda d, key: d.get(key, default) if isinstance(d, dict) else default, param.split("."), From 5f1abd45fab3e5a5d3022058b2bf0077c9eec77e Mon Sep 17 00:00:00 2001 From: Jochen Klein Date: Mon, 28 Apr 2025 16:24:07 +0200 Subject: [PATCH 8/8] Propagate POWHEG weights to luminosity scaling Clean up code Fix formatting Update POWHEG scaling --- .../analysis/analyzer_jets.py | 15 ++-- machine_learning_hep/multiprocesser.py | 16 +++-- machine_learning_hep/processer.py | 68 +++++++++++-------- machine_learning_hep/processer_jet.py | 16 +++-- 4 files changed, 68 insertions(+), 47 deletions(-) diff --git a/machine_learning_hep/analysis/analyzer_jets.py b/machine_learning_hep/analysis/analyzer_jets.py index b111665286..7ace5632a8 100644 --- a/machine_learning_hep/analysis/analyzer_jets.py +++ b/machine_learning_hep/analysis/analyzer_jets.py @@ -1108,12 +1108,8 @@ def estimate_feeddown(self): bins_obs[var], ) fill_hist_fast(h3_fd_gen_orig[var], df[["pt_jet", "pt_cand", f"{colname}"]]) - self._save_hist( - project_hist(h3_fd_gen_orig[var], [0, 2], {}), f"fd/h_ptjet-{var}_feeddown_gen_noeffscaling.png" - ) case "sim": - # TODO: recover cross section h3_fd_gen_orig = {} with TFile(self.n_filemass_fd) as rfile: for var in self.observables["all"]: @@ -1121,12 +1117,11 @@ def estimate_feeddown(self): label = f"-{var}" if var else "" if fh := rfile.Get(f"h_mass-ptjet-pthf{label}"): h3_fd_gen_orig[var] = project_hist(fh, list(range(1, get_dim(fh))), {}) - ensure_sumw2(h3_fd_gen_orig[var]) - self._save_hist( - project_hist(h3_fd_gen_orig[var], [0, 2], {}), - f"fd/h_ptjet-{var}_feeddown_genonly_noeffscaling.png", - ) - powheg_xsection_scale_factor = 0. # FIXME: retrieve cross section + h_norm = rfile.Get("histonorm") + powheg_xsection_avg = h_norm.GetBinContent(6) / h_norm.GetBinContent(5) + powheg_xsection_scale_factor = powheg_xsection_avg / h_norm.GetBinContent(5) + self.logger.info("powheg_xsection_scale_factor = %f", powheg_xsection_scale_factor) + self.logger.info("POWHEG luminosity (mb^{-1}): %g", 1. / powheg_xsection_scale_factor) case fd_input: self.logger.critical("Invalid feeddown input %s", fd_input) diff --git a/machine_learning_hep/multiprocesser.py b/machine_learning_hep/multiprocesser.py index 063397ce08..76d8ce7bf3 100755 --- a/machine_learning_hep/multiprocesser.py +++ b/machine_learning_hep/multiprocesser.py @@ -16,16 +16,18 @@ main script for doing data processing, machine learning and analysis """ -from functools import reduce import os import tempfile +from functools import reduce from typing import TypeVar from machine_learning_hep.io_ml_utils import dump_yaml_from_dict, parse_yaml from machine_learning_hep.logger import get_logger from machine_learning_hep.utilities import merge_method, mergerootfiles + from .common import DataType + class MultiProcesser: # pylint: disable=too-many-instance-attributes, too-many-statements, consider-using-f-string, too-many-branches species = "multiprocesser" logger = get_logger() @@ -111,8 +113,12 @@ def __init__(self, case, proc_class, datap, typean, run_param, datatype): self.lper_evtorig = [os.path.join(direc, self.n_evtorig) for direc in self.dlper_pkl] dp = self.cfg(f"mlapplication.{self.datatype.value}", {}) - self.dlper_reco_modapp = [self.d_prefix_app + p for p in dp["pkl_skimmed_dec"]] if dp else [None] * len(self.p_period) - self.dlper_reco_modappmerged = [self.d_prefix_app + p for p in dp["pkl_skimmed_decmerged"]] if dp else [None] * len(self.p_period) + self.dlper_reco_modapp = ( + [self.d_prefix_app + p for p in dp["pkl_skimmed_dec"]] if dp else [None] * len(self.p_period) + ) + self.dlper_reco_modappmerged = ( + [self.d_prefix_app + p for p in dp["pkl_skimmed_decmerged"]] if dp else [None] * len(self.p_period) + ) dp = self.cfg(f"analysis.{self.typean}.{self.datatype.value}", {}) self.d_results = [self.d_prefix_res + os.path.expandvars(p) for p in dp["results"]] @@ -121,7 +127,9 @@ def __init__(self, case, proc_class, datap, typean, run_param, datatype): self.f_evt_mergedallp = os.path.join(self.d_pklevt_mergedallp, self.n_evt) self.f_evtorig_mergedallp = os.path.join(self.d_pklevt_mergedallp, self.n_evtorig) - self.lper_runlistrigger = self.cfg(f"analysis.{self.typean}.{self.datatype.value}.runselection", [None] * len(self.p_period)) + self.lper_runlistrigger = self.cfg( + f"analysis.{self.typean}.{self.datatype.value}.runselection", [None] * len(self.p_period) + ) self.lper_mcreweights = None if self.datatype == DataType.MC: diff --git a/machine_learning_hep/processer.py b/machine_learning_hep/processer.py index 86ad85a246..55e25c4355 100644 --- a/machine_learning_hep/processer.py +++ b/machine_learning_hep/processer.py @@ -141,6 +141,7 @@ def __init__( self.n_evt = datap["files_names"]["namefile_evt"] self.n_collcnt = datap["files_names"].get("namefile_collcnt") self.n_bccnt = datap["files_names"].get("namefile_bccnt") + self.n_wgt = datap["files_names"].get("namefile_wgt", None) self.n_evtorig = datap["files_names"].get("namefile_evtorig") self.n_evt_count_ml = datap["files_names"].get("namefile_evt_count", "evtcount.yaml") self.n_gen = datap["files_names"]["namefile_gen"] @@ -193,6 +194,7 @@ def __init__( self.l_evtorig = createlist(self.d_pkl, self.l_path, self.n_evtorig) self.l_collcnt = createlist(self.d_pkl, self.l_path, self.n_collcnt) if self.datatype != "fd" else None self.l_bccnt = createlist(self.d_pkl, self.l_path, self.n_bccnt) if self.datatype != "fd" else None + self.l_wgt = createlist(self.d_pkl, self.l_path, self.n_wgt) if self.datatype == "fd" and self.n_wgt else None self.l_histomass = createlist(self.d_results, self.l_path, self.n_filemass) self.l_histoeff = createlist(self.d_results, self.l_path, self.n_fileeff) # self.l_historesp = createlist(self.d_results, self.l_path, self.n_fileresp) @@ -217,18 +219,18 @@ def __init__( # Potentially mask certain values (e.g. nsigma TOF of -999) self.p_mask_values = datap["ml"].get("mask_values", None) - self.bins_skimming = np.array(list(zip(self.lpt_anbinmin, self.lpt_anbinmax, strict=False)), "d") - self.bins_analysis = np.array(list(zip(self.lpt_finbinmin, self.lpt_finbinmax, strict=False)), "d") + self.bins_skimming = np.array(list(zip(self.lpt_anbinmin, self.lpt_anbinmax, strict=True)), "d") + self.bins_analysis = np.array(list(zip(self.lpt_finbinmin, self.lpt_finbinmax, strict=True)), "d") bin_matching = [ [ptrange[0] <= bin[0] and ptrange[1] >= bin[1] for ptrange in self.bins_skimming].index(True) for bin in self.bins_analysis ] self.lpt_probcutpre = self.cfg_global(f"mlapplication.probcutpresel.{self.datatype}", [None] * self.p_nptbins) - lpt_probcutfin_tmp = self.cfg_global(f"mlapplication.probcutoptimal", [None] * self.p_nptfinbins) + lpt_probcutfin_tmp = self.cfg_global("mlapplication.probcutoptimal", [None] * self.p_nptfinbins) self.lpt_probcutfin = [lpt_probcutfin_tmp[bin_matching[ibin]] for ibin in range(self.p_nptfinbins)] - if self.datatype in ('mc', 'data'): + if self.datatype in ("mc", "data"): for ibin, probcutfin in enumerate(self.lpt_probcutfin): probcutpre = self.lpt_probcutpre[bin_matching[ibin]] if self.mltype == "MultiClassification": @@ -252,7 +254,7 @@ def __init__( for ipt in range(self.p_nptfinbins): mlsel_multi = [ f"y_test_prob{self.p_modelname}{label.replace('-', '_')} {comp} {probcut}" - for label, comp, probcut in zip(self.class_labels, comps, self.lpt_probcutfin[ipt], strict=False) + for label, comp, probcut in zip(self.class_labels, comps, self.lpt_probcutfin[ipt], strict=True) ] self.d_pkl_dec = d_pkl_dec @@ -289,7 +291,7 @@ def __init__( ) self.lpt_recodec = None - if self.doml and self.datatype in ('mc', 'data'): + if self.doml and self.datatype in ("mc", "data"): if self.mltype == "MultiClassification": self.lpt_recodec = [ self.n_reco.replace( @@ -318,22 +320,30 @@ def __init__( for i in range(self.p_nptbins) ] - self.mptfiles_recosk = [ - createlist(self.d_pklsk, self.l_path, self.lpt_recosk[ipt]) for ipt in range(self.p_nptbins) - ] if self.datatype in ('mc', 'data') else [] - self.mptfiles_recoskmldec = [ - createlist(self.d_pkl_dec, self.l_path, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) - ] if self.datatype in ('mc', 'data') else [] - self.lpt_recodecmerged = [ - os.path.join(self.d_pkl_decmerged, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins) - ] if self.datatype in ('mc', 'data') else [] - if self.datatype in ('mc', 'fd'): + self.mptfiles_recosk = ( + [createlist(self.d_pklsk, self.l_path, self.lpt_recosk[ipt]) for ipt in range(self.p_nptbins)] + if self.datatype in ("mc", "data") + else [] + ) + self.mptfiles_recoskmldec = ( + [createlist(self.d_pkl_dec, self.l_path, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins)] + if self.datatype in ("mc", "data") + else [] + ) + self.lpt_recodecmerged = ( + [os.path.join(self.d_pkl_decmerged, self.lpt_recodec[ipt]) for ipt in range(self.p_nptbins)] + if self.datatype in ("mc", "data") + else [] + ) + if self.datatype in ("mc", "fd"): self.mptfiles_gensk = [ createlist(self.d_pklsk, self.l_path, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins) ] - self.lpt_gendecmerged = [ - os.path.join(self.d_pkl_decmerged, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins) - ] if self.d_pkl_decmerged else [] + self.lpt_gendecmerged = ( + [os.path.join(self.d_pkl_decmerged, self.lpt_gensk[ipt]) for ipt in range(self.p_nptbins)] + if self.d_pkl_decmerged + else [] + ) self.mptfiles_gensk_sl = ( [createlist(self.d_pklsk, self.l_path, self.lpt_gensk_sl[ipt]) for ipt in range(self.p_nptbins)] if self.lpt_gensk_sl @@ -376,8 +386,10 @@ def dfread(rdir, trees, cols, idx_name=None): trees = [trees] cols = [cols] # if all(type(var) is str for var in vars): vars = [vars] + if not all(name in rdir for name in trees): + self.logger.critical("Missing trees: %s", trees) df = None - for tree, col in zip([rdir[name] for name in trees], cols, strict=False): + for tree, col in zip([rdir[name] for name in trees], cols, strict=True): try: data = tree.arrays(expressions=col, library="np") dfnew = pd.DataFrame(columns=col, data=data) @@ -444,7 +456,7 @@ def dfuse(df_spec): if dfuse(df_spec): trees = [] cols = [] - for tree, spec in zip(df_spec["trees"].keys(), df_spec["trees"].values(), strict=False): + for tree, spec in zip(df_spec["trees"].keys(), df_spec["trees"].values(), strict=True): if isinstance(spec, list): trees.append(tree) cols.append(spec) @@ -534,15 +546,17 @@ def dfuse(df_spec): for df_name, df_spec in self.df_write.items(): if dfuse(df_spec): src = df_spec.get("source", df_name) - dfo = dfquery(dfs[src], df_spec.get("filter", None)) - path = os.path.join(self.d_pkl, self.l_path[file_index], df_spec["file"]) - self.logger.info("writing %s to %s with info %s", df_name, path, dfo.info()) - write_df(dfo, path) + if src in dfs: + dfo = dfquery(dfs[src], df_spec.get("filter", None)) + path = os.path.join(self.d_pkl, self.l_path[file_index], df_spec["file"]) + write_df(dfo, path) + else: + self.logger.error("could not write tree, missing source %s", src) def skim(self, file_index): dfreco = read_df(self.l_reco[file_index]) if self.datatype != "fd" else None - dfgen = read_df(self.l_gen[file_index]) if self.datatype in ('mc', 'fd') else None - dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype in ('mc', 'fd') else None + dfgen = read_df(self.l_gen[file_index]) if self.datatype in ("mc", "fd") else None + dfgen_sl = read_df(self.l_gen_sl[file_index]) if self.n_gen_sl and self.datatype in ("mc", "fd") else None for ipt in range(self.p_nptbins): if dfreco is not None: diff --git a/machine_learning_hep/processer_jet.py b/machine_learning_hep/processer_jet.py index 8a0c345c54..b8fee7c4ae 100644 --- a/machine_learning_hep/processer_jet.py +++ b/machine_learning_hep/processer_jet.py @@ -238,10 +238,9 @@ def split_df(self, dfi, frac): def process_histomass_single(self, index): self.logger.info("Processing (histomass) %s", self.l_evtorig[index]) - print(f"Opening file {self.l_histomass[index]}", flush=True) with TFile.Open(self.l_histomass[index], "recreate") as _: dfevtorig = read_df(self.l_evtorig[index]) - histonorm = TH1F("histonorm", "histonorm", 4, 0, 4) + histonorm = TH1F("histonorm", "histonorm", 6, 0, 6) histonorm.SetBinContent(1, len(dfquery(dfevtorig, self.s_evtsel))) if self.l_collcnt: dfcollcnt = read_df(self.l_collcnt[index]) @@ -257,10 +256,19 @@ def process_histomass_single(self, index): ser_bccnt = dfbccnt[self.cfg("counter_tvx")] bccnt_tvx = functools.reduce(lambda x, y: float(x) + float(y), (ar[0] for ar in ser_bccnt)) histonorm.SetBinContent(4, bccnt_tvx) + if self.l_wgt: + self.logger.info("Filling event weights") + dfwgt = read_df(self.l_wgt[index]) + histonorm.SetBinContent(5, len(dfwgt["fEventWeight"])) + histonorm.SetBinContent(6, dfwgt["fEventWeight"].sum()) + elif self.datatype == "fd": + self.logger.warning("No event weights found, empty list: %s", self.l_wgt) get_axis(histonorm, 0).SetBinLabel(1, "N_{evt}") get_axis(histonorm, 0).SetBinLabel(2, "N_{coll}") get_axis(histonorm, 0).SetBinLabel(3, "N_{coll}^{TVX}") get_axis(histonorm, 0).SetBinLabel(4, "N_{BC}^{TVX}") + get_axis(histonorm, 0).SetBinLabel(5, "N_{POWHEG}") + get_axis(histonorm, 0).SetBinLabel(6, "sum_xs_{POWHEG}") histonorm.Write() if self.datatype != "fd": @@ -496,10 +504,6 @@ def process_efficiency_single(self, index): } for cat in cats: - print( - f"Filling histograms for {cat}: {dfgen[cat].info()}, {dfdet[cat].info()}, {dfmatch[cat].info()}", - flush=True, - ) fill_hist(h_eff[(cat, "gen")], dfgen[cat][["fJetPt_gen", "fPt_gen"]]) fill_hist(h_eff[(cat, "det")], dfdet[cat][["fJetPt", "fPt"]]) if cat in dfmatch and dfmatch[cat] is not None: