Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions FAIR_universe_Higgs_tautau/Data_Loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import os
import argparse
import logging
import warnings
import numpy as np
import pandas as pd
import mplhep as hep
import uproot
from HiggsML.datasets import download_dataset
from HiggsML.systematics import systematics

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# # Suppress warnings
# warnings.simplefilter(action='ignore', category=FutureWarning)

hep.style.use(hep.style.ATLAS)

def parse_args():
parser = argparse.ArgumentParser(description="Download and process HiggsML data for analysis.")

parser.add_argument(
"--url",
type=str,
default="https://zenodo.org/records/15131565/files/FAIR_Universe_HiggsML_data.zip",
help="URL to the dataset zip file."
)
parser.add_argument(
"--output-dir",
type=str,
default="./saved_datasets/",
help="Directory to save the processed ROOT files."
)
parser.add_argument(
"--train-size",
type=float,
default=0.35,
help="Fraction of dataset to use for training."
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for sampling."
)

return parser.parse_args()

def download_and_load(url, train_size):
"""Downloads the dataset and loads the training set."""
logger.info(f"Downloading dataset from {url}...")
data = download_dataset(url)

logger.info(f"Loading training set with size fraction: {train_size}")
data.load_train_set(train_size=train_size)

df_training_full = data.get_train_set()
del data # Clean up memory
return df_training_full

def process_data(df, list_of_processes, seed):
"""Filters specific processes and balances the dataset."""

all_labels = df["detailed_labels"].unique()
process_to_exclude = list(set(all_labels) - set(list_of_processes))
logger.info(f"Excluding processes: {process_to_exclude}")

# Filter dataframe
mask_process_exclusion = ~np.isin(df["detailed_labels"], process_to_exclude)
df_filtered = df[mask_process_exclusion].copy()

counts = df_filtered["detailed_labels"].value_counts()
logger.info(f"Counts before balancing:\n{counts}")

# Trim the dataset, so all processes have equal entries

# Here the notebook implemented the the number of ttbar events (lowest)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a little different implementation form the notebook.

min_process = counts.idxmin()
n_min = counts.min()
logger.info(f"Balancing to minimum process count ({min_process}): {n_min}")

df_list = []
for _, df_process in df_filtered.groupby('detailed_labels'):
weight_sum_orig = df_process.weights.sum()

df_sampled = df_process.sample(n=n_min, random_state=seed)

df_sampled['weights'] *= weight_sum_orig / df_sampled['weights'].sum()

df_list.append(df_sampled)
del df_sampled

df_balanced = pd.concat(df_list).reset_index(drop=True)
return df_balanced

def apply_systematics(df, syst_settings):
"""Generates variations of the dataset based on systematics."""
dataset_dict = {}

logger.info("Generating nominal dataset...")
dataset_dict['nominal'] = systematics(
data_set=df,
dopostprocess=False
)

for sample_name, syst_args in syst_settings.items():
logger.info(f"Generating systematic variation: {sample_name}")
dataset_dict[sample_name] = systematics(
data_set=df,
dopostprocess=False,
**syst_args
)

return dataset_dict

def save_root_files(dataset_dict, output_dir, processes, selections):
"""Saves the datasets to ROOT files applying selections."""

if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger.info(f"Created output directory: {output_dir}")

for sample, df in dataset_dict.items():
output_path = os.path.join(output_dir, f"dataset_{sample}.root")
logger.info(f"Writing {output_path}...")

with uproot.recreate(output_path) as ntuple:
for process in processes:

df_process = df[df["detailed_labels"] == process].copy()


df_process = df_process.query(selections).copy()


columns_to_keep = list(set(df_process.columns.tolist()) - {"detailed_labels"})


arrays = {col: df_process[col].to_numpy() for col in columns_to_keep}

if arrays:
ntuple[f"tree_{process}"] = arrays
else:
logger.warning(f"No events found for {process} in {sample} after selection.")

def main():
args = parse_args()

list_of_processes_to_model = ["htautau", "ztautau", "ttbar"]

syst_settings = {
'TES_up': {'tes': 1.02, 'seed': args.seed},
'TES_dn': {'tes': 0.98, 'seed': args.seed},
'JES_up': {'jes': 1.02, 'seed': args.seed},
'JES_dn': {'jes': 0.98, 'seed': args.seed}
}

# Some common analysis selections to remove low-stats regions
selections = (
"DER_mass_transverse_met_lep <= 250.0 and "
"DER_mass_vis <= 500.0 and "
"DER_sum_pt <= 1000 and "
"DER_pt_tot <= 250 and "
"DER_deltar_had_lep <= 4.5 and "
"DER_pt_h <= 400 and "
"DER_pt_ratio_lep_had <= 9.0"
)

# Execution Flow
try:

df_training_full = download_and_load(args.url, args.train_size)

df_training = process_data(df_training_full, list_of_processes_to_model, args.seed)
del df_training_full


dataset_dict = apply_systematics(df_training, syst_settings)


save_root_files(dataset_dict, args.output_dir, list_of_processes_to_model, selections)

logger.info("Data loading workflow completed successfully.")

except Exception as e:
logger.error(f"An error occurred: {e}", exc_info=True)
raise

if __name__ == "__main__":
main()
153 changes: 153 additions & 0 deletions FAIR_universe_Higgs_tautau/Data_Preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os, sys, importlib
import argparse
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mplhep as hep
import yaml
import uproot

from utils import plot_kinematic_features

sys.path.append('../src')
import nsbi_common_utils
from nsbi_common_utils import configuration
from nsbi_common_utils import datasets

import logging
import warnings
# Suppress warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

hep.style.use(hep.style.ATLAS)

def parse_args():
parser = argparse.ArgumentParser(description="Download and process HiggsML data for analysis.")

parser.add_argument(
"--config",
type=str,
default='./config.yml',
help="config file path"
)


return parser.parse_args()

def ds_helper(cfg, branches):
'''
Uses nsbi_common_utils.datasets to load data.
'''
datasets_helper = nsbi_common_utils.datasets.datasets(config_path = cfg,
branches_to_load = branches)
return datasets_helper

def process_data(df, input_features_by_jet, branches):
"""Filters specific processes and balances the dataset."""

median_feature = {}

for sample, sample_dataset in df["Nominal"].items():

median_feature[sample] = {}

for nJets, feat_list in input_features_by_jet.items():
for feature in feat_list:

median_feature[sample][feature] = np.median(sample_dataset.loc[sample_dataset['PRI_n_jets'] >= nJets, feature])

logger.info(f"extracting additional branches from the engineered features")
branches_to_add = []

for region, sample_datasets in df.items():

for sample, sample_dataset in sample_datasets.items():

sample_dataset['njet_0'] = (sample_dataset['PRI_n_jets'] == 0).astype(int)
sample_dataset['njet_1'] = (sample_dataset['PRI_n_jets'] == 1).astype(int)
sample_dataset['njet_2'] = (sample_dataset['PRI_n_jets'] >= 2).astype(int)

branches_to_add += ['njet_0', 'njet_1', 'njet_2']

for i, feat_list in input_features_by_jet.items():
mask_i = (sample_dataset['PRI_n_jets'] >= i).astype(float)
sample_dataset[f'jet{i}_mask'] = mask_i

branches_to_add += [f'jet{i}_mask']

for feat in feat_list:
sample_dataset[feat] = sample_dataset[feat].where(sample_dataset['PRI_n_jets'] >= i, median_feature[sample][feat])

for feat in branches.copy():

kin = sample_dataset[feat].to_numpy()

if (np.amin(kin) > 0.0) and (np.amax(kin)>100):
log_feat = 'log_'+feat
sample_dataset[log_feat] = np.log(kin+10.0)

if log_feat not in branches_to_add:
branches_to_add += [log_feat]

df[region][sample] = sample_dataset

return df, branches_to_add


def main():
args = parse_args()

# Specify branches to load from the ROOT ntuples
input_features_noJets = ['PRI_lep_pt', 'PRI_lep_eta', 'PRI_lep_phi', 'PRI_had_pt', 'PRI_had_eta',
'PRI_had_phi', 'PRI_met', 'PRI_met_phi', 'DER_mass_transverse_met_lep',
'DER_mass_vis', 'DER_pt_h', 'DER_deltar_had_lep', 'DER_pt_tot', 'DER_sum_pt',
'DER_pt_ratio_lep_had', 'DER_met_phi_centrality']

input_features_1Jets = ['PRI_jet_leading_pt', 'PRI_jet_leading_eta',
'PRI_jet_leading_phi',
'PRI_jet_all_pt']

input_features_2Jets = ['PRI_jet_subleading_pt',
'PRI_jet_subleading_eta', 'PRI_jet_subleading_phi', 'DER_deltaeta_jet_jet', 'DER_mass_jet_jet',
'DER_prodeta_jet_jet',
'DER_lep_eta_centrality']

input_features_nJets = ['PRI_n_jets']

branches_to_load = input_features_noJets \
+ input_features_1Jets \
+ input_features_2Jets \
+ input_features_nJets
input_features_by_jet = {
1 : input_features_1Jets,
2 : input_features_2Jets
}

# Execution Flow
try:
logger.info(f"Loading and converting the dataset to Pandas DataFrame for processing...")
datasets_helper = ds_helper(args.config, branches_to_load)
datasets_all = datasets_helper.load_datasets_from_config(load_systematics = True)

datasets_all, add_branches = process_data(datasets_all, input_features_by_jet,
branches=branches_to_load)

logger.info(f"adding additional branches to the DataFrame")
datasets_helper.add_appended_branches(add_branches)

datasets_helper.save_datasets(datasets_all,
save_systematics = True)


logger.info("Data Preprocessing workflow completed successfully.")

except Exception as e:
logger.error(f"An error occurred: {e}", exc_info=True)
raise

if __name__ == "__main__":
main()
Loading