diff --git a/src/decisionengine_modules/glideinwms/glide_frontend_element.py b/src/decisionengine_modules/glideinwms/glide_frontend_element.py index 82fef1a9..250530bd 100644 --- a/src/decisionengine_modules/glideinwms/glide_frontend_element.py +++ b/src/decisionengine_modules/glideinwms/glide_frontend_element.py @@ -4,15 +4,21 @@ import copy import math import os -import socket +import pickle import sys -import tempfile -import time import pandas from glideinwms.frontend import glideinFrontendConfig, glideinFrontendInterface, glideinFrontendPlugins -from glideinwms.lib import pubCrypto, token_util +from glideinwms.lib.credentials import ( + AuthenticationMethod, + create_credential, + CredentialPurpose, + CredentialType, + RSAPublicKey, + SecurityBundle, +) +from glideinwms.lib.defaults import TOKEN_DIR from decisionengine_modules.glideinwms import classads from decisionengine_modules.glideinwms.security import Credential, CredentialCache @@ -38,6 +44,39 @@ def get_gfe_obj(fe_group, acct_group, fe_cfg, logger, gfe_type="glideinwms_fom") return gfe_obj +def sudo_glidein_dict(entry_info): + """ + Compatibility function to interface with the GlideinWMS libraries. + + This function creates a dictionary with the 'attrs' key, which contains + the attributes of the first row of the entry_info DataFrame as a dictionary, + and two empty dictionaries for 'params' and 'monitor' keys. + """ + + return {"params": {}, "monitor": {}, "attrs": entry_info.to_dict(orient="records")[0]} + + +def sudo_element_merged_descript(group_name, group_config): + """ + Compatibility function to interface with the GlideinWMS libraries. + + This function creates an object with the 'group_name', 'element_data', + 'frontend_data', and 'merged_data' attributes. The 'element_data' and + 'frontend_data' attributes are initialized as empty dictionaries. + The 'merged_data' attribute is set to the provided group_config. + The 'group_name' attribute is set to the provided group name. + """ + + class ElementMergedDescript: + def __init__(self, group_name, group_config): + self.group_name = group_name + self.element_data = {} + self.frontend_data = {} + self.merged_data = group_config + + return ElementMergedDescript(group_name, group_config) + + class GlideFrontendElement: """ Class that implements the functionality of a GlideinWMS Frontend Element @@ -139,7 +178,7 @@ def generate_glidein_requests(self, jobs_df, slots_df, entries, factory_globals, # STEP 6: Create glideclientglobal classads ######################################################################## - key_builder = glideinFrontendInterface.Key4AdvertizeBuilder() + key_builder = glideinFrontendInterface.Key4AdvertiseBuilder() # List of classad objects self.gc_classads = [] self.gcg_classads = [] @@ -326,11 +365,11 @@ def generate_glidein_requests(self, jobs_df, slots_df, entries, factory_globals, glidein_monitors_per_cred = {} creds_with_running = 0 for cred in self.credential_plugin.cred_list: - glidein_monitors_per_cred[cred.get_id()] = { - f"Glideins{t}": count_slots_per_cred[cred.get_id()][t] for t in count_slots + glidein_monitors_per_cred[cred.id] = { + f"Glideins{t}": count_slots_per_cred[cred.id][t] for t in count_slots } - glidein_monitors_per_cred[cred.get_id()]["ScaledRunning"] = 0 - if glidein_monitors_per_cred[cred.get_id()]["GlideinsRunning"]: + glidein_monitors_per_cred[cred.id]["ScaledRunning"] = 0 + if glidein_monitors_per_cred[cred.id]["GlideinsRunning"]: creds_with_running += 1 if creds_with_running: @@ -338,7 +377,7 @@ def generate_glidein_requests(self, jobs_df, slots_df, entries, factory_globals, scaled = 0 tr = glidein_monitors["Running"] for cred in self.credential_plugin.cred_list: - cred_monitor = glidein_monitors_per_cred[cred.get_id()] + cred_monitor = glidein_monitors_per_cred[cred.id] if cred_monitor["GlideinsRunning"]: # This cred has running, scale them down if (creds_with_running - scaled) == 1: @@ -403,7 +442,7 @@ def generate_glidein_requests(self, jobs_df, slots_df, entries, factory_globals, pids=[] # Advertise glideclient and glideclient global classads ad_file_id_cache=glideinFrontendInterface.CredentialCache() - advertizer.renew_and_load_credentials() + advertiser.renew_and_load_credentials() """ gcg_df = pandas.DataFrame([ad.adParams for ad in self.gcg_classads]) @@ -438,7 +477,7 @@ def create_glideclient_classads( security_name=None, remove_excess_str=None, trust_domain="Any", - auth_method="Any", + auth_method=AuthenticationMethod("Any"), entry_name="condor", glidein_site="Unknown", ): @@ -452,7 +491,7 @@ def create_glideclient_classads( # params_obj is only required for fair_assign in the plugins # Else it is not really useful as the info is available in other vars - params_obj = glideinFrontendInterface.AdvertizeParams( + params_obj = glideinFrontendInterface.AdvertiseParams( request_name, glidein_name, min_nr_glideins, @@ -465,13 +504,7 @@ def create_glideclient_classads( remove_excess_str=remove_excess_str, ) - # TODO: Filter by credential type once token/proxy hybrid configurations are no longer supported - # credential_type=auth_method - credentials_with_request = self.credential_plugin.get_credentials( - params_obj=params_obj, trust_domain=trust_domain - ) - - if not credentials_with_request: + if not self.request_credentials: raise NoCredentialException # There will one glideclient classad per glidefactory classad for @@ -479,24 +512,59 @@ def create_glideclient_classads( # List of glideclient classad objects to return gc_classads = [] - for cred in credentials_with_request: - if not cred.advertise: + if glidein_params_to_encrypt is None: + glidein_params_to_encrypt = {} + + # Calculate authentication set for this group + auth_set = auth_method.match(self.credential_plugin.security_bundle) + if not auth_set: + self.logger.warning( + f"Credentials do not match auth method {auth_method} (for {params_obj.request_name}), aborting." + ) + return + glidein_params_to_encrypt["AuthSet"] = pickle.dumps(auth_set) + + # Pack payload credentials to send with requests for this group + payload_creds = [ + cred + for cred in self.credential_plugin.get_credentials( + trust_domain=trust_domain, credential_purpose=CredentialPurpose.PAYLOAD, snapshot=entry_name + ) + ] + payload_creds.extend( + [ + cred + for cred in self.credential_plugin.get_credentials( + trust_domain=trust_domain, credential_purpose=CredentialPurpose.CALLBACK, snapshot=entry_name + ) + if cred.subject.split("@")[0] == params_obj.request_name.split("@")[0] + ] + ) + glidein_params_to_encrypt["PayloadCredentials"] = pickle.dumps(payload_creds) + + # Pack parameters to send to the request + security_params = [param.copy() for param in self.credential_plugin.params_dict.values()] + glidein_params_to_encrypt["SecurityParameters"] = pickle.dumps(security_params) + + # Assign work to the credentials per the plugin policy + self.credential_plugin.assign_work(self.request_credentials, params_obj, auth_set) + + for request_cred in self.request_credentials: + cred = self.credential_plugin.get_credential(request_cred.credential.id, snapshot=entry_name) + if not request_cred.advertise: self.logger.info( - f"Ignoring credential with id: {cred.get_id()}, " - f"pilot_proxy: {cred.pilot_fname}, type: {cred.type}, trust_domain: {cred.trust_domain}, security_name: {cred.security_class}, advertise: {cred.advertise}" + f"Ignoring credential with id: {cred.id}, " + f"credential: {cred.path}, type: {cred.cred_type}, trust_domain: {cred.trust_domain}, " + f"security_name: {cred.security_class}, advertise: {cred.advertise}" ) # We have already determined that this cred cannot be used continue - if not cred.supports_auth_method(auth_method): - # TODO: Remove this condition once token/proxy hybrid configurations are no longer supported - if auth_method == "grid_proxy" and "scitoken" in cred.type: - self.logger.debug("Sending scitoken along with grid-proxy") - else: - self.logger.warning( - f"Credential {cred.type} does not match auth method {auth_method} (for {params_obj.request_name}), skipping..." - ) - continue + if not auth_set.supports(cred.cred_type): + self.logger.warning( + f"Credential {cred.cred_type} does not match auth method {auth_method} (for {params_obj.request_name}), skipping..." + ) + continue if cred.trust_domain != trust_domain: self.logger.warning( @@ -505,24 +573,19 @@ def create_glideclient_classads( continue glidein_monitors_this_cred = {} - if glidein_params_to_encrypt is None: - glidein_params_to_encrypt = {} - else: - glidein_params_to_encrypt = copy.deepcopy(glidein_params_to_encrypt) - req_idle, req_max_run = cred.get_usage_details() + glidein_params_to_encrypt = copy.deepcopy(glidein_params_to_encrypt) + req_idle, req_max_run = request_cred.get_usage_details() # TODO: Need to print this somewhere but currently logging this is interleaved with stats # self.logger.info('Advertizing credential %s with (%d idle, %d max run) for request %s' % # (cred.filename, req_idle, req_max_run, params_obj.request_name)) - glidein_monitors_this_cred = params_obj.glidein_monitors_per_cred.get(cred.get_id(), {}) + glidein_monitors_this_cred = params_obj.glidein_monitors_per_cred.get(cred.id, {}) # Create GlideClientClassad object my_name = f"{self.frontend_name}.{self.fe_group}" gc_classad = classads.GlideClientClassad(glidein_name, my_name) # Make the classad name unique by adding credential id to it - gc_classad.adParams[ - "Name" - ] = f"{self.file_id_cache.file_id(cred, cred.filename)}_{gc_classad.adParams['Name']}" + gc_classad.adParams["Name"] = f"{cred.id}_{gc_classad.adParams['Name']}" gc_classad.adParams["CollectorHost"] = factory_pool gc_classad.adParams["FrontendName"] = self.frontend_name gc_classad.adParams["GroupName"] = self.fe_group @@ -560,47 +623,13 @@ def create_glideclient_classads( gc_classad.adParams.update(config_attr) gc_classad.adParams.update(monitor_attr) + # Add request credential + glidein_params_to_encrypt["RequestCredentials"] = pickle.dumps([cred]) + # Add security class and security name to encrypted params glidein_params_to_encrypt["SecurityClass"] = str(cred.security_class) glidein_params_to_encrypt["SecurityName"] = str(self.security_name) - # Add id for all the credential files - if "username_password" in cred.type: - glidein_params_to_encrypt["Username"] = self.file_id_cache.file_id(cred, cred.filename) - glidein_params_to_encrypt["Password"] = self.file_id_cache.file_id(cred, cred.key_fname) - if "grid_proxy" in cred.type: - glidein_params_to_encrypt["SubmitProxy"] = self.file_id_cache.file_id(cred, cred.filename) - if "scitoken" in cred.type: - glidein_params_to_encrypt["frontend_scitoken"] = cred.loaded_data[0][1] - if "cert_pair" in cred.type: - glidein_params_to_encrypt["PublicCert"] = self.file_id_cache.file_id(cred, cred.filename) - glidein_params_to_encrypt["PrivateCert"] = self.file_id_cache.file_id(cred, cred.key_fname) - if "key_pair" in cred.type: - glidein_params_to_encrypt["PublicKey"] = self.file_id_cache.file_id(cred, cred.filename) - glidein_params_to_encrypt["PrivateKey"] = self.file_id_cache.file_id(cred, cred.key_fname) - if "vm_id" in cred.type: - glidein_params_to_encrypt["VMId"] = str(cred.vm_id) - if "vm_type" in cred.type: - glidein_params_to_encrypt["VMType"] = str(cred.vm_type) - if "remote_username" in cred.type: - glidein_params_to_encrypt["RemoteUsername"] = self.file_id_cache.file_id(cred, cred.remote_username) - if "auth_file" in cred.type: - glidein_params_to_encrypt["AuthFile"] = self.file_id_cache.file_id(cred, cred.filename) - if cred.project_id: - glidein_params_to_encrypt["ProjectId"] = str(cred.project_id) - - # Add id for the pilot proxy - if cred.pilot_fname: - glidein_params_to_encrypt["GlideinProxy"] = self.file_id_cache.file_id(cred, cred.pilot_fname) - - # Add condor token - ctkn = self.refresh_entry_token(glidein_site) - if ctkn: - # mark token for encrypted advertisement - entry_token_name = "%s.idtoken" % entry_name - self.logger.debug("found condor token: %s" % entry_token_name) - glidein_params_to_encrypt[entry_token_name] = ctkn - # Add classad attributes that need to be encrypted for attr in glidein_params_to_encrypt: value = str(key_obj.encrypt_hex(glidein_params_to_encrypt[attr])) @@ -633,26 +662,8 @@ def create_glideclientglobal_classads(self, glidefactory_classad, key_builder): self.global_key[glidefactory_classad["CollectorHost"]] = key_obj # Figure out the credentials to advertise and load them - credentials = self.credential_plugin.get_credentials() - nr_credentials = len(credentials) + nr_credentials = len(self.request_credentials) self.logger.info(f"Number of credentials found: {nr_credentials}") - for cred in credentials: - cred.advertise = True - cred.renew() - cred.create_if_not_exist() - cred.loaded_data = [] - for cred_file in (cred.filename, cred.key_fname, cred.pilot_fname): - self.logger.debug(f"Loading credential file {str(cred_file)}") - if cred_file: - cred_data = cred.get_string(cred_file) - if cred_data: - cred.loaded_data.append((cred_file, cred_data)) - else: - # We encountered error with this credential - # Move onto next credential - self.logger.info(f"ERROR loading credential file {cred_file}, setting advertise to False") - cred.advertise = False - break # Create GlideClientGlobalClassad object my_name = f"{self.frontend_name}.{self.fe_group}" @@ -668,14 +679,6 @@ def create_glideclientglobal_classads(self, glidefactory_classad, key_builder): "NumberOfCredentials": f"{nr_credentials}", "SecurityName": f"{self.security_name}", } - for cred in credentials: - if cred.advertise: - for (fname, data) in cred.loaded_data: - classad_attrs_to_encrypt[cred.file_id(fname)] = data - if hasattr(cred, "security_class"): - # Convert security_class to string for factory - # to interpret it correctly - classad_attrs_to_encrypt[f"SecurityClass{cred.file_id(fname)}"] = str(cred.security_class) # Add classad attributes that need to be encrypted for attr in classad_attrs_to_encrypt: @@ -694,7 +697,7 @@ def read_fe_config(self): # TODO: This check can go away once we completely decouple ourselves # from the glideinwms frontend configuration try: - group_config = self.fe_cfg["group"][self.fe_group] + self.group_config = self.fe_cfg["group"][self.fe_group] except KeyError: self.logger.exception(f"Frontend Group {self.fe_group} not configured in frontend.xml") raise @@ -706,69 +709,72 @@ def read_fe_config(self): self.web_url = self.fe_cfg["frontend"]["web_url"] # Group information - if "CONTINUE_IF_NO_PROXY" in group_config.get("attrs_descript"): - self.continue_if_no_proxy = group_config["attrs_descript"]["CONTINUE_IF_NO_PROXY"] + if "CONTINUE_IF_NO_PROXY" in self.group_config.get("attrs_descript"): + self.continue_if_no_proxy = self.group_config["attrs_descript"]["CONTINUE_IF_NO_PROXY"] else: self.continue_if_no_proxy = self.default_glidein_params["CONTINUE_IF_NO_PROXY"] self.logger.info(f"CONTINUE_IF_NO_PROXY set to {self.continue_if_no_proxy}") - self.workdir = group_config["workdir"] + self.workdir = self.group_config["workdir"] # This group's curbs and limits - self.total_max_slots = int(group_config["total_max_glideins"]) - self.total_curb_slots = int(group_config["total_curb_glideins"]) - self.total_max_slots_idle = int(group_config["total_max_vms_idle"]) - self.total_curb_slots_idle = int(group_config["total_curb_vms_idle"]) + self.total_max_slots = int(self.group_config["total_max_glideins"]) + self.total_curb_slots = int(self.group_config["total_curb_glideins"]) + self.total_max_slots_idle = int(self.group_config["total_max_vms_idle"]) + self.total_curb_slots_idle = int(self.group_config["total_curb_vms_idle"]) # This frontend's curbs and limits - self.fe_total_max_slots = int(group_config["fe_total_max_glideins"]) - self.fe_total_curb_slots = int(group_config["fe_total_curb_glideins"]) - self.fe_total_max_slots_idle = int(group_config["fe_total_max_vms_idle"]) - self.fe_total_curb_slots_idle = int(group_config["fe_total_curb_vms_idle"]) + self.fe_total_max_slots = int(self.group_config["fe_total_max_glideins"]) + self.fe_total_curb_slots = int(self.group_config["fe_total_curb_glideins"]) + self.fe_total_max_slots_idle = int(self.group_config["fe_total_max_vms_idle"]) + self.fe_total_curb_slots_idle = int(self.group_config["fe_total_curb_vms_idle"]) # This frontend's curbs and limits - self.global_total_max_slots = int(group_config["global_total_max_glideins"]) - self.global_total_curb_slots = int(group_config["global_total_curb_glideins"]) - self.global_total_max_slots_idle = int(group_config["global_total_max_vms_idle"]) - self.global_total_curb_slots_idle = int(group_config["global_total_curb_vms_idle"]) + self.global_total_max_slots = int(self.group_config["global_total_max_glideins"]) + self.global_total_curb_slots = int(self.group_config["global_total_curb_glideins"]) + self.global_total_max_slots_idle = int(self.group_config["global_total_max_vms_idle"]) + self.global_total_curb_slots_idle = int(self.group_config["global_total_curb_vms_idle"]) # Consider if we need to always keep some running glideins at entry # entry_min_running - self.entry_min_glideins_running = int(group_config["min_running"]) + self.entry_min_glideins_running = int(self.group_config["min_running"]) # entry_max_running - self.entry_max_glideins = int(group_config["max_running"]) + self.entry_max_glideins = int(self.group_config["max_running"]) # max_idle - self.entry_max_glideins_idle = int(group_config["max_idle"]) + self.entry_max_glideins_idle = int(self.group_config["max_idle"]) # max_vms_idle - self.entry_max_slots_idle = int(group_config["max_vms_idle"]) + self.entry_max_slots_idle = int(self.group_config["max_vms_idle"]) # curb_vms_idle - self.entry_curb_slots_idle = int(group_config["curb_vms_idle"]) + self.entry_curb_slots_idle = int(self.group_config["curb_vms_idle"]) # entry_fraction_running - self.entry_fraction_glidein_running = float(group_config["fraction_running"]) + self.entry_fraction_glidein_running = float(self.group_config["fraction_running"]) - self.idle_lifetime = int(group_config.get("idle_lifetime", 0)) - self.reserve_idle = int(group_config["reserve_idle"]) - self.security_name = group_config["security_name"] + self.idle_lifetime = int(self.group_config.get("idle_lifetime", 0)) + self.reserve_idle = int(self.group_config["reserve_idle"]) + self.security_name = self.group_config["security_name"] # TODO: Support different credential selection plugins in future # credential selection plugin to use # cred_plugin => self.x509_proxy_plugin in gwms frontend - cred_plugin_name = "ProxyAll" + cred_plugin_name = "CredentialsBasic" cred_plugin_class = glideinFrontendPlugins.proxy_plugins[cred_plugin_name] - cred_list = create_credential_list(group_config["proxies"], group_config, self.logger) - self.logger.info(f"Number of credentials found from the configuration {len(cred_list)}") - self.credential_plugin = cred_plugin_class(self.workdir, cred_list) + security_bundle = SecurityBundle() + security_bundle.load_from_element(self.group_config) + self.logger.info(f"Number of credentials found from the configuration {len(security_bundle.credentials)}") + self.logger.info(f"Number of parameters found from the configuration {len(security_bundle.parameters)}") + self.credential_plugin = cred_plugin_class(self.workdir, security_bundle) + self.request_credentials = self.credential_plugin.get_request_credentials() self.glidein_config_limits = {} # Lookup my identity: key = (factory_collector, factory_auth_id) self.my_identity_at_factory = {} - for col in group_config["factory_collectors"]: + for col in self.group_config["factory_collectors"]: self.my_identity_at_factory[(col[0], col[1])] = col[2] - self.signature_type = group_config["sign_descript"]["signature_type"] + self.signature_type = self.group_config["sign_descript"]["signature_type"] - self.descript_fname = group_config["sign_descript"]["frontend_descript_fname"] - self.descript_signature = group_config["sign_descript"]["frontend_descript_signature"] + self.descript_fname = self.group_config["sign_descript"]["frontend_descript_fname"] + self.descript_signature = self.group_config["sign_descript"]["frontend_descript_signature"] - self.group_web_url = group_config["web_url"] - self.group_descript_fname = group_config["sign_descript"]["group_descript_fname"] - self.group_descript_signature = group_config["sign_descript"]["group_descript_signature"] + self.group_web_url = self.group_config["web_url"] + self.group_descript_fname = self.group_config["sign_descript"]["group_descript_fname"] + self.group_descript_signature = self.group_config["sign_descript"]["group_descript_signature"] def match(self, job_types, slot_types, entries): """ @@ -859,7 +865,7 @@ def count_glidein_slots(self, slot_types): count_entry_slots[request_name] = {} count_entry_slots_cred[request_name] = {} for cred in self.credential_plugin.cred_list: - count_entry_slots_cred[request_name][cred.get_id()] = {} + count_entry_slots_cred[request_name][cred.id] = {} req_entry, req_name, req_fact = request_name.split("@") @@ -902,25 +908,23 @@ def count_glidein_slots(self, slot_types): # Further get counts per credentials for cred in self.credential_plugin.cred_list: # Initialize all counts to 0 for potential empty frames - count_entry_slots_cred[request_name][cred.get_id()][st] = 0 + count_entry_slots_cred[request_name][cred.id][st] = 0 entry_slots_cred = pandas.DataFrame() if not entry_slot_types[st].empty: - entry_slots_cred = entry_slot_types[st].query( - f'GLIDEIN_CredentialIdentifier == "{cred.get_id()}"' - ) + entry_slots_cred = entry_slot_types[st].query(f'GLIDEIN_CredentialIdentifier == "{cred.id}"') if st == "TotalCores": - count_entry_slots_cred[request_name][cred.get_id()][st] = count_total_cores(entry_slots_cred) + count_entry_slots_cred[request_name][cred.id][st] = count_total_cores(entry_slots_cred) elif st == "IdleCores": - count_entry_slots_cred[request_name][cred.get_id()][st] = count_idle_cores(entry_slots_cred) + count_entry_slots_cred[request_name][cred.id][st] = count_idle_cores(entry_slots_cred) elif st == "RunningCores": - count_entry_slots_cred[request_name][cred.get_id()][st] = count_running_cores(entry_slots_cred) + count_entry_slots_cred[request_name][cred.id][st] = count_running_cores(entry_slots_cred) elif st == "Running": - count_entry_slots_cred[request_name][cred.get_id()][st] = len(entry_slots_cred) - len( + count_entry_slots_cred[request_name][cred.id][st] = len(entry_slots_cred) - len( get_running_pslots(entry_slots_cred) ) else: - count_entry_slots_cred[request_name][cred.get_id()][st] = len(entry_slots_cred) + count_entry_slots_cred[request_name][cred.id][st] = len(entry_slots_cred) return (count_entry_slots, count_entry_slots_cred) @@ -1258,7 +1262,7 @@ def create_factory_pubkeyobj(self, factory_globals): # NOTE: Newline is escaped before storing into the dataframe # Un-escape it to make the pub key string usable pub_key = row.get("PubKeyValue").replace("\\n", "\n") - key_obj = pubCrypto.PubRSAKey(key_str=pub_key) + key_obj = RSAPublicKey(pub_key) except Exception: # if no valid key # if key needed, will handle the error later on @@ -1266,68 +1270,6 @@ def create_factory_pubkeyobj(self, factory_globals): key_objs.append(key_obj) return key_objs - def refresh_entry_token(self, glidein_site, work_dir="/var/lib/gwms-frontend"): - """ - create or update a condor token for an entry point - params: glidein_el: a glidein element data structure - returns: jwt encoded condor token on success - None on failure - """ - tkn_file = "" - tkn_str = "" - - try: - tkn_dir = os.path.join(work_dir, "tokens.d") - pwd_dir = os.path.join(work_dir, "passwords.d") - req_dir = os.path.join(work_dir, "passwords.d/requests") - tkn_file = tkn_dir + "/" + glidein_site + ".idtoken" - pwd_file = pwd_dir + "/" + glidein_site - pwd_default = pwd_dir + "/" + "FRONTEND" - one_hr = 3600 - tkn_age = sys.maxsize - - if not os.path.exists(tkn_dir): - os.mkdir(tkn_dir, 0o700) - if not os.path.exists(pwd_dir): - os.mkdir(pwd_dir, 0o700) - if not os.path.exists(req_dir): - os.mkdir(req_dir, 0o700) - - if not os.path.exists(pwd_file): - if os.path.exists(pwd_default): - pwd_file = pwd_default - - if os.path.exists(tkn_file): - tkn_age = time.time() - os.stat(tkn_file).st_mtime - if tkn_age > one_hr and os.path.exists(pwd_file): - # TODO: scope, duration, identity should be configurable - scope = "condor:/READ condor:/ADVERTISE_STARTD condor:/ADVERTISE_MASTER" - duration = 24 * one_hr - identity = f"{glidein_site}@{socket.gethostname()}" - self.logger.debug("creating token %s" % tkn_file) - self.logger.debug("pwd_flie= %s" % pwd_file) - self.logger.debug("scope= %s" % scope) - self.logger.debug("duration= %s" % duration) - self.logger.debug("identity= %s" % identity) - tkn_str = token_util.create_and_sign_token(pwd_file, scope=scope, duration=duration, identity=identity) - self.logger.debug("tkn_str= %s" % tkn_str) - with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=tkn_dir) as fd: - os.chmod(fd.name, 0o600) - fd.write(tkn_str) - fd.flush() - os.replace(fd.name, tkn_file) - self.logger.debug("created token %s" % tkn_file) - elif os.path.exists(tkn_file): - with open(tkn_file) as fbuf: - for line in fbuf: - tkn_str += line - except Exception as err: - self.logger.warning(f"failed to create {tkn_file}: {err}") - for i in sys.exc_info(): - self.logger.warning("%s" % i) - - return tkn_str - ############################################################################### # GlideFrontendElement class using Figure of Merit @@ -1489,7 +1431,7 @@ def generate_glidein_requests_one(self, jobs_df, slots_df, entries, factory_glob # STEP 6: Create glideclientglobal classads ######################################################################## - key_builder = glideinFrontendInterface.Key4AdvertizeBuilder() + key_builder = glideinFrontendInterface.Key4AdvertiseBuilder() # List of classad objects self.gc_classads = [] self.gcg_classads = [] @@ -1691,11 +1633,11 @@ def generate_glidein_requests_one(self, jobs_df, slots_df, entries, factory_glob glidein_monitors_per_cred = {} creds_with_running = 0 for cred in self.credential_plugin.cred_list: - glidein_monitors_per_cred[cred.get_id()] = { - f"Glideins{t}": count_slots_per_cred[cred.get_id()][t] for t in count_slots + glidein_monitors_per_cred[cred.id] = { + f"Glideins{t}": count_slots_per_cred[cred.id][t] for t in count_slots } - glidein_monitors_per_cred[cred.get_id()]["ScaledRunning"] = 0 - if glidein_monitors_per_cred[cred.get_id()]["GlideinsRunning"]: + glidein_monitors_per_cred[cred.id]["ScaledRunning"] = 0 + if glidein_monitors_per_cred[cred.id]["GlideinsRunning"]: creds_with_running += 1 if creds_with_running: @@ -1703,7 +1645,7 @@ def generate_glidein_requests_one(self, jobs_df, slots_df, entries, factory_glob scaled = 0 tr = glidein_monitors["Running"] for cred in self.credential_plugin.cred_list: - cred_monitor = glidein_monitors_per_cred[cred.get_id()] + cred_monitor = glidein_monitors_per_cred[cred.id] if cred_monitor["GlideinsRunning"]: # This cred has running, scale them down if (creds_with_running - scaled) == 1: @@ -1724,9 +1666,47 @@ def generate_glidein_requests_one(self, jobs_df, slots_df, entries, factory_glob # TODO: These two come from glidefactory classad rows how to get it? trust_domain = entry_info.get("GLIDEIN_TrustDomain", ["Grid"]).tolist()[0] - auth_method = entry_info.get("GLIDEIN_SupportedAuthenticationMethod", ["grid_proxy"]).tolist()[0] glidein_site = entry_info.get("GLIDEIN_Site", ["Unknown"]).tolist()[0] entry_name = entry_info.get("EntryName", ["Unknown"]).tolist()[0] + auth_method = AuthenticationMethod( + entry_info.get("GLIDEIN_SupportedAuthenticationMethod", ["grid_proxy"]).tolist()[0] + ) + + # Add callback credential generator if needed + if not self.credential_plugin.get_credentials(credential_purpose=CredentialPurpose.CALLBACK): + self.logger.debug("Custom callback credential not provided. Using default.") + tkn_dir = TOKEN_DIR + if not os.path.exists(tkn_dir): + os.mkdir(tkn_dir, 0o700) + callback_generator = create_credential( + "IdTokenGenerator", + cred_type=CredentialType.DYNAMIC, + purpose=CredentialPurpose.CALLBACK, + trust_domain=trust_domain, + context={"cache_dir": tkn_dir}, + ) + self.credential_plugin.security_bundle.add_credential(callback_generator) + else: + self.logger.debug("Custom callback credential provided. Using it.") + + # Renew credentials + self.credential_plugin.renew_credentials() + + # Generate credentials and parameters + self.credential_plugin.generate_credentials( + elementDescript=sudo_element_merged_descript(self.fe_group, self.group_config), + glidein_el=sudo_glidein_dict(entry_info), + group_name=self.fe_group, + trust_domain=trust_domain, + snapshot=entry_info["EntryName"][0], + ) + self.credential_plugin.generate_parameters( + elementDescript=sudo_element_merged_descript(self.fe_group, self.group_config), + glidein_el=sudo_glidein_dict(entry_info), + group_name=self.fe_group, + trust_domain=trust_domain, + snapshot=entry_info["EntryName"][0], + ) # Only advertise if there is a valid key for encryption if key_obj is not None: @@ -1769,7 +1749,7 @@ def generate_glidein_requests_one(self, jobs_df, slots_df, entries, factory_glob pids=[] # Advertise glideclient and glideclient global classads ad_file_id_cache=glideinFrontendInterface.CredentialCache() - advertizer.renew_and_load_credentials() + advertiser.renew_and_load_credentials() """ gcg_df = pandas.DataFrame([ad.adParams for ad in self.gcg_classads]) diff --git a/src/decisionengine_modules/glideinwms/glideinwms_config_lib.py b/src/decisionengine_modules/glideinwms/glideinwms_config_lib.py index 97883206..3404fa1d 100644 --- a/src/decisionengine_modules/glideinwms/glideinwms_config_lib.py +++ b/src/decisionengine_modules/glideinwms/glideinwms_config_lib.py @@ -62,7 +62,7 @@ def __init__(self, frontend_workdir="/var/lib/gwms-frontend/vofrontend"): "global_total_max_vms_idle": int(self.group_descript[g].frontend_data["MaxIdleVMsTotalGlobal"]), "global_total_curb_vms_idle": int(self.group_descript[g].frontend_data["CurbIdleVMsTotalGlobal"]), "max_matchmakers": int(self.group_descript[g].element_data["MaxMatchmakers"]), - "proxies": self.group_descript[g].merged_data["Proxies"], + "Proxies": self.group_descript[g].merged_data["Proxies"], "proxy_selection_plugin": self.group_descript[g].merged_data["ProxySelectionPlugin"], "condor_config": self.group_descript[g].frontend_data["CondorConfig"], "condor_mapfile": self.group_descript[g].element_data["MapFile"], @@ -95,6 +95,11 @@ def get_proxy_descript_data(self, group): "ProxyUpdateFrequency", "ProxyRemoteUsernames", "ProxyProjectIds", + "CredentialPurposes", + "CredentialContexts", + "CredentialCreationScripts", + "CredentialMinimumLifetime", + "Parameters", ] for attr in proxy_descript_attrs: diff --git a/src/decisionengine_modules/glideinwms/transforms/glidein_requests.py b/src/decisionengine_modules/glideinwms/transforms/glidein_requests.py index 82be73f4..5a759a61 100644 --- a/src/decisionengine_modules/glideinwms/transforms/glidein_requests.py +++ b/src/decisionengine_modules/glideinwms/transforms/glidein_requests.py @@ -203,8 +203,8 @@ def transform(self, datablock): METRICS["REQ_MAX_GLIDEINS"].labels(ce).set(count) manifests = self.merge_requests(manifests, group_manifests) - except Exception: - self.logger.exception("Error generating glidein requests") + except Exception as e: + self.logger.exception(f"Error generating glidein requests: {e}") raise return manifests diff --git a/src/decisionengine_modules/tests/glideinwms/test_glide_frontend_element.py b/src/decisionengine_modules/tests/glideinwms/test_glide_frontend_element.py index 45d4cc9a..a4d05317 100644 --- a/src/decisionengine_modules/tests/glideinwms/test_glide_frontend_element.py +++ b/src/decisionengine_modules/tests/glideinwms/test_glide_frontend_element.py @@ -127,22 +127,3 @@ def test_compute_glidein_max_running(): assert gfe.compute_glidein_max_running({"Idle": 100}, 0, 0) == 115 assert gfe.compute_glidein_max_running({"Idle": 0}, 0, 0) == 0 assert gfe.compute_glidein_max_running({"Idle": 0}, 100, 100) == 100 - - -def test_refresh_entry_token(): - with tempfile.TemporaryDirectory() as work_dir: - os.mkdir(os.path.join(work_dir, "passwords.d")) - with open(os.path.join(work_dir, "passwords.d", "FRONTEND"), "wb") as fd: - fd.write(token_util.derive_master_key(b"TEST")) - - create_and_sign_token = token_util.create_and_sign_token - with mock.patch.object(token_util, "create_and_sign_token") as create_token: - create_token.side_effect = lambda *args, **kwargs: create_and_sign_token( - *args, **kwargs, issuer="fermicloud000.fnal.gov:0000" - ) - - gfe = glide_frontend_element.get_gfe_obj("CMS", "CMS", FRONTEND_CFG, structlog.getLogger("test")) - token = gfe.refresh_entry_token("test_entry", work_dir) - - assert token - assert not token_util.token_str_expired(token)