diff --git a/modelseedpy/core/msbuilder.py b/modelseedpy/core/msbuilder.py index 54b854b1..36f3f50a 100755 --- a/modelseedpy/core/msbuilder.py +++ b/modelseedpy/core/msbuilder.py @@ -1,11 +1,12 @@ import logging -import itertools +import itertools # !!! this import is not used + import cobra -from modelseedpy.core.rast_client import RastClient +from modelseedpy.core.rast_client import RastClient # !!! this import is not used from modelseedpy.core.msgenome import normalize_role -from modelseedpy.core.msmodel import get_gpr_string, get_reaction_constraints_from_direction -from cobra.core import Gene, Metabolite, Model, Reaction -from modelseedpy.core import FBAHelper +from modelseedpy.core.msmodel import get_gpr_string, get_reaction_constraints_from_direction # !!! none of these imports are used +from cobra.core import Gene, Metabolite, Model, Reaction # !!! Gene is not used +from modelseedpy.core.fbahelper import FBAHelper from modelseedpy.fbapkg.mspackagemanager import MSPackageManager SBO_ANNOTATION = "sbo" @@ -249,20 +250,18 @@ def build_biomass(rxn_id, cobra_model, template, biomass_compounds, index='0'): bio_rxn.annotation[SBO_ANNOTATION] = "SBO:0000629" return bio_rxn - +# A search function def _aaaa(genome, ontology_term): - search_name_to_genes = {} - search_name_to_orginal = {} - for f in genome.features: - if ontology_term in f.ontology_terms: - functions = f.ontology_terms[ontology_term] - for function in functions: + search_name_to_genes, search_name_to_orginal = {}, {} + for feature in genome.features: + if ontology_term in feature.ontology_terms: + for function in feature.ontology_terms[ontology_term]: f_norm = normalize_role(function) if f_norm not in search_name_to_genes: search_name_to_genes[f_norm] = set() search_name_to_orginal[f_norm] = set() search_name_to_orginal[f_norm].add(function) - search_name_to_genes[f_norm].add(f.id) + search_name_to_genes[f_norm].add(feature.id) return search_name_to_genes, search_name_to_orginal @@ -281,10 +280,9 @@ def aux_template(template): def build_gpr2(cpx_sets): list_of_ors = [] - for cpx in cpx_sets: + for cpx, role_ids in cpx_sets.items(): list_of_ands = [] - for role_id in cpx_sets[cpx]: - gene_ors = cpx_sets[cpx][role_id] + for role_id, gene_ors in role_ids.items(): if len(gene_ors) > 1: list_of_ands.append('(' + ' or '.join(gene_ors) + ')') else: @@ -294,7 +292,18 @@ def build_gpr2(cpx_sets): return ' or '.join(list_of_ors) return list_of_ors[0] -def build_gpr(cpx_gene_role): +def _reaction_sinks(self,model): + reactions_sinks = [] + for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']: + if cpd_id in model.metabolites: + met = model.metabolites.get_by_id(cpd_id) + rxn_exchange = Reaction('SK_'+met.id, 'Sink for '+met.name, 'exchanges', 0, 1000) + rxn_exchange.add_metabolites({met: -1}) + rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627" + reactions_sinks.append(rxn_exchange) + return reactions_sinks + +def build_gpr(cpx_gene_role): #!!! Unused and redundant function """ example input: {'sdh': [{'b0721': 'sdhC', 'b0722': 'sdhD', 'b0723': 'sdhA', 'b0724': 'sdhB'}]} @@ -331,7 +340,6 @@ def __init__(self, genome, template=None): def _get_template_reaction_complexes(self, template_reaction): """ - :param template_reaction: :return: """ @@ -339,12 +347,10 @@ def _get_template_reaction_complexes(self, template_reaction): for cpx in template_reaction.get_complexes(): template_reaction_complexes[cpx.id] = {} for role, (triggering, optional) in cpx.roles.items(): - sn = normalize_role(role.name) + rn_norm = normalize_role(role.name) template_reaction_complexes[cpx.id][role.id] = [ - sn, - triggering, - optional, - set() if sn not in self.search_name_to_genes else set(self.search_name_to_genes[sn]) + rn_norm, triggering, optional, + set() if rn_norm not in self.search_name_to_genes else set(self.search_name_to_genes[rn_norm]) ] return template_reaction_complexes @@ -356,11 +362,11 @@ def _build_reaction_complex_gpr_sets2(match_complex, allow_incomplete_complexes= roles = set() role_genes = {} for role_id in match_complex[cpx_id]: - t = match_complex[cpx_id][role_id] - complete &= len(t[3]) > 0 or not t[1] or t[2] - if len(t[3]) > 0: + complx = match_complex[cpx_id][role_id] + complete &= len(complx[3]) > 0 or not complx[1] or complx[2] + if len(complx[3]) > 0: roles.add(role_id) - role_genes[role_id] = t[3] + role_genes[role_id] = complx[3] #print(cpx_id, complete, roles) if len(roles) > 0 and (allow_incomplete_complexes or complete): complexes[cpx_id] = {} @@ -370,19 +376,19 @@ def _build_reaction_complex_gpr_sets2(match_complex, allow_incomplete_complexes= return complexes @staticmethod - def _build_reaction_complex_gpr_sets(match_complex, allow_incomplete_complexes=True): + def _build_reaction_complex_gpr_sets(match_complex, allow_incomplete_complexes=True): #!!! Unused and redundant function complexes = {} for cpx_id in match_complex: complete = True roles = set() role_genes = {} for role_id in match_complex[cpx_id]: - t = match_complex[cpx_id][role_id] - complete &= len(t[3]) > 0 or not t[1] or t[2] # true if has genes or is not triggering or is optional + complx = match_complex[cpx_id][role_id] + complete &= len(complx[3]) > 0 or not complx[1] or complx[2] # true if has genes or is not triggering or is optional # print(t[3]) - if len(t[3]) > 0: + if len(complx[3]) > 0: roles.add(role_id) - role_genes[role_id] = t[3] + role_genes[role_id] = complx[3] # print(t) # it is never complete if has no genes, only needed if assuming a complex can have all # roles be either non triggering or optional @@ -404,38 +410,30 @@ def get_gpr_from_template_reaction(self, template_reaction, allow_incomplete_com template_reaction_complexes = self._get_template_reaction_complexes(template_reaction) if len(template_reaction_complexes) == 0: return None - # self.map_gene(template_reaction_complexes) - # print(template_reaction_complexes) gpr_set = self._build_reaction_complex_gpr_sets2(template_reaction_complexes, allow_incomplete_complexes) return gpr_set @staticmethod def _build_reaction(reaction_id, gpr_set, template, index='0', sbo=None): template_reaction = template.reactions.get_by_id(reaction_id) - reaction_compartment = template_reaction.compartment metabolites = {} - for cpd, value in template_reaction.metabolites.items(): + for cpd, stoich in template_reaction.metabolites.items(): compartment = f"{cpd.compartment}{index}" - name = f"{cpd.name}_{compartment}" - cpd = Metabolite(cpd.id + str(index), cpd.formula, name, cpd.charge, compartment) - metabolites[cpd] = value + met = Metabolite(cpd.id+index, cpd.formula, f"{cpd.name}_{compartment}", cpd.charge, compartment) + metabolites[met] = stoich - reaction = Reaction( - "{}{}".format(template_reaction.id, index), - "{}_{}{}".format(template_reaction.name, reaction_compartment, index), - '', - template_reaction.lower_bound, template_reaction.upper_bound - ) + reaction = Reaction(template_reaction.id+index, + f'{template_reaction.name}_{reaction_compartment}{index}', '', + template_reaction.lower_bound, template_reaction.upper_bound) gpr_str = build_gpr2(gpr_set) if gpr_set else '' reaction.add_metabolites(metabolites) + reaction.annotation["seed.reaction"] = template_reaction.reference_id if gpr_str and len(gpr_str) > 0: reaction.gene_reaction_rule = gpr_str # get_gpr_string(gpr_ll) - - reaction.annotation["seed.reaction"] = template_reaction.reference_id if sbo: reaction.annotation[SBO_ANNOTATION] = sbo return reaction @@ -449,29 +447,28 @@ def build_exchanges(model, extra_cell='e0'): :return: """ reactions_exchanges = [] - for m in model.metabolites: - if m.compartment == extra_cell: - rxn_exchange_id = 'EX_' + m.id + for met in model.metabolites: + if met.compartment == extra_cell: + rxn_exchange_id = 'EX_' + met.id if rxn_exchange_id not in model.reactions: - rxn_exchange = Reaction(rxn_exchange_id, 'Exchange for ' + m.name, 'exchanges', -1000, 1000) - rxn_exchange.add_metabolites({m: -1}) + rxn_exchange = Reaction(rxn_exchange_id, 'Exchange for '+met.name, 'exchanges', -1000, 1000) + rxn_exchange.add_metabolites({met: -1}) rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627" reactions_exchanges.append(rxn_exchange) model.add_reactions(reactions_exchanges) - return reactions_exchanges @staticmethod def build_biomasses(model, template, index): - res = [] + biomass_reactions = [] if template.name.startswith('CoreModel'): - res.append(build_biomass('bio1', model, template, core_biomass, index)) - res.append(build_biomass('bio2', model, template, core_atp, index)) + biomass_reactions.append(build_biomass('bio1', model, template, core_biomass, index)) + biomass_reactions.append(build_biomass('bio2', model, template, core_atp, index)) if template.name.startswith('GramNeg'): - res.append(build_biomass('bio1', model, template, gramneg, index)) + biomass_reactions.append(build_biomass('bio1', model, template, gramneg, index)) if template.name.startswith('GramPos'): - res.append(build_biomass('bio1', model, template, grampos, index)) - return res + biomass_reactions.append(build_biomass('bio1', model, template, grampos, index)) + return biomass_reactions def auto_select_template(self): """ @@ -480,9 +477,7 @@ def auto_select_template(self): """ from modelseedpy.helpers import get_template, get_classifier from modelseedpy.core.mstemplate import MSTemplateBuilder - genome_classifier = get_classifier('knn_ACNP_RAST_filter') - genome_class = genome_classifier.classify(self.genome) - + genome_class = get_classifier('knn_ACNP_RAST_filter').classify(self.genome) template_genome_scale_map = { 'A': 'template_gram_neg', 'C': 'template_gram_neg', @@ -499,8 +494,7 @@ def auto_select_template(self): if genome_class in template_genome_scale_map and genome_class in template_core_map: self.template = MSTemplateBuilder.from_dict(get_template(template_genome_scale_map[genome_class])).build() elif self.template is None: - raise Exception(f'unable to select template for {genome_class}') - + raise Exception(f'A template is not defined for the genome class {genome_class}.') return genome_class def build_metabolic_reactions(self, index='0', allow_incomplete_complexes=True): @@ -511,58 +505,42 @@ def build_metabolic_reactions(self, index='0', allow_incomplete_complexes=True): metabolic_reactions[template_reaction.id] = gpr_set logger.debug("[%s] gpr set: %s", template_reaction.id, gpr_set) - reactions = list(map( - lambda x: self._build_reaction(x[0], x[1], self.template, index, "SBO:0000176"), - metabolic_reactions.items())) - - return reactions + return [self._build_reaction(key, val, self.template, index, "SBO:0000176") + for key, val in metabolic_reactions.items()] - def build_non_metabolite_reactions(self, cobra_model, index='0', allow_all_non_grp_reactions=False): - reactions_no_gpr = [] - reactions_in_model = set(map(lambda x: x.id, cobra_model.reactions)) - metabolites_in_model = set(map(lambda x: x.id, cobra_model.metabolites)) + def build_non_metabolite_reactions(self, cobra_model, index='0', allow_all_non_grp_reactions=False): #!!! allow_all_non_grp_reactions in not used + reactions_sans_gpr = [] + model_rxns = set(x.id for x in cobra_model.reactions) + model_mets = set(x.id for x in cobra_model.metabolites) for rxn in self.template.reactions: - if rxn.type == 'universal' or rxn.type == 'spontaneous': + if rxn.type in ['universal', 'spontaneous']: reaction = self._build_reaction(rxn.id, {}, self.template, index, "SBO:0000176") - reaction_metabolite_ids = set(map(lambda x: x.id, set(reaction.metabolites))) - if (len(metabolites_in_model & reaction_metabolite_ids) > 0 or allow_all_non_grp_reactions) and \ - reaction.id not in reactions_in_model: - reaction.annotation["seed.reaction"] = rxn.id - reactions_no_gpr.append(reaction) + if len(model_mets.intersection(x.id for x in set(reaction.metabolites))) > 0: + if reaction.id not in model_rxns: + reaction.annotation["seed.reaction"] = rxn.id + reactions_sans_gpr.append(reaction) - return reactions_no_gpr + return reactions_sans_gpr def build(self, model_id, index='0', allow_all_non_grp_reactions=False, annotate_with_rast=True): - if annotate_with_rast: rast = RastClient() - res = rast.annotate_genome(self.genome) + res = rast.annotate_genome(self.genome) # !!! res is never used self.search_name_to_genes, self.search_name_to_original = _aaaa(self.genome, 'RAST') - # rxn_roles = aux_template(self.template) # needs to be fixed to actually reflect template GPR rules if self.template is None: self.auto_select_template() + # construct the model cobra_model = Model(model_id) cobra_model.add_reactions(self.build_metabolic_reactions(index=index)) cobra_model.add_reactions(self.build_non_metabolite_reactions(cobra_model, index, allow_all_non_grp_reactions)) self.build_exchanges(cobra_model) - if self.template.name.startswith('CoreModel') or \ - self.template.name.startswith('GramNeg') or self.template.name.startswith('GramPos'): + if any([self.template.name.startswith(x) for x in ('GramPos', 'CoreModel', 'GramNeg')]): cobra_model.add_reactions(self.build_biomasses(cobra_model, self.template, index)) cobra_model.objective = 'bio1' - - reactions_sinks = [] - for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']: - if cpd_id in cobra_model.metabolites: - m = cobra_model.metabolites.get_by_id(cpd_id) - rxn_exchange = Reaction('SK_' + m.id, 'Sink for ' + m.name, 'exchanges', 0, 1000) - rxn_exchange.add_metabolites({m: -1}) - rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627" - reactions_sinks.append(rxn_exchange) - cobra_model.add_reactions(reactions_sinks) - + cobra_model.add_reactions(_reaction_sinks(cobra_model)) return cobra_model @staticmethod @@ -574,7 +552,7 @@ def build_full_template_model(template, model_id=None, index='0'): :param index: index for the metabolites :return: """ - model = Model(model_id if model_id else template.id) + model = Model(model_id or template.id) all_reactions = [] for rxn in template.reactions: reaction = MSBuilder._build_reaction(rxn.id, {}, template, index, "SBO:0000176") @@ -588,24 +566,15 @@ def build_full_template_model(template, model_id=None, index='0'): bio_rxn2 = build_biomass('bio2', model, template, core_atp, index) model.add_reactions([bio_rxn1, bio_rxn2]) model.objective = 'bio1' - if template.name.startswith('GramNeg'): + elif template.name.startswith('GramNeg'): bio_rxn1 = build_biomass('bio1', model, template, gramneg, index) model.add_reactions([bio_rxn1]) model.objective = 'bio1' - if template.name.startswith('GramPos'): + elif template.name.startswith('GramPos'): bio_rxn1 = build_biomass('bio1', model, template, grampos, index) model.add_reactions([bio_rxn1]) model.objective = 'bio1' - - reactions_sinks = [] - for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']: - if cpd_id in model.metabolites: - m = model.metabolites.get_by_id(cpd_id) - rxn_exchange = Reaction('SK_' + m.id, 'Sink for ' + m.name, 'exchanges', 0, 1000) - rxn_exchange.add_metabolites({m: -1}) - rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627" - reactions_sinks.append(rxn_exchange) - model.add_reactions(reactions_sinks) + model.add_reactions(_reaction_sinks(model)) return model @staticmethod @@ -613,7 +582,6 @@ def build_metabolic_model(model_id, genome, gapfill_media=None, template=None, i allow_all_non_grp_reactions=False, annotate_with_rast=True, gapfill_model=True): builder = MSBuilder(genome, template) model = builder.build(model_id, index, allow_all_non_grp_reactions, annotate_with_rast) - # Gapfilling model if gapfill_model: model = MSBuilder.gapfill_model(model, 'bio1', builder.template, gapfill_media) return model @@ -621,7 +589,7 @@ def build_metabolic_model(model_id, genome, gapfill_media=None, template=None, i @staticmethod def gapfill_model(original_mdl, target_reaction, template, media): FBAHelper.set_objective_from_target_reaction(original_mdl, target_reaction) - model = cobra.io.json.from_json(cobra.io.json.to_json(original_mdl)) + model = cobra.io.json.from_json(cobra.io.json.to_json(original_mdl)) #!!! what is the benefit of this I/O processing? pkgmgr = MSPackageManager.get_pkg_mgr(model) pkgmgr.getpkg("GapfillingPkg").build_package({ "default_gapfill_templates": [template], @@ -632,7 +600,7 @@ def gapfill_model(original_mdl, target_reaction, template, media): pkgmgr.getpkg("KBaseMediaPkg").build_package(media) #with open('Gapfilling.lp', 'w') as out: # out.write(str(model.solver)) - sol = model.optimize() + sol = model.optimize() # !!! sol is never used gfresults = pkgmgr.getpkg("GapfillingPkg").compute_gapfilled_solution() for rxnid in gfresults["reversed"]: rxn = original_mdl.reactions.get_by_id(rxnid) @@ -650,4 +618,4 @@ def gapfill_model(original_mdl, target_reaction, template, media): else: rxn.upper_bound = 0 rxn.lower_bound = -100 - return original_mdl \ No newline at end of file + return original_mdl