diff --git a/petab/v1/calculate.py b/petab/v1/calculate.py index 4c129b88..131d0d60 100644 --- a/petab/v1/calculate.py +++ b/petab/v1/calculate.py @@ -1,6 +1,7 @@ """Functions performing various calculations.""" import numbers +import operator from functools import reduce import numpy as np @@ -139,19 +140,17 @@ def calculate_residuals_for_table( # apply scaling observable = observable_df.loc[row[OBSERVABLE_ID]] trafo = observable.get(OBSERVABLE_TRANSFORMATION, LIN) - simulation = petab.scale(simulation, trafo) - measurement = petab.scale(measurement, trafo) + scaled_simulation = petab.scale(simulation, trafo) + scaled_measurement = petab.scale(measurement, trafo) # non-normalized residual is just the difference - residual = simulation - measurement + residual = scaled_measurement - scaled_simulation - noise_value = 1 if normalize: - # look up noise standard deviation - noise_value = evaluate_noise_formula( + # divide by standard deviation + residual /= evaluate_noise_formula( row, noise_formulas, parameter_df, simulation ) - residual /= noise_value # fill in value residual_df.loc[irow, RESIDUAL] = residual @@ -169,13 +168,10 @@ def get_symbolic_noise_formulas(observable_df) -> dict[str, sp.Expr]: """ noise_formulas = {} # iterate over observables - for row in observable_df.itertuples(): - observable_id = row.Index - if NOISE_FORMULA not in observable_df.columns: - noise_formula = None - else: - noise_formula = sympify_petab(row.noiseFormula) - noise_formulas[observable_id] = noise_formula + for observable_id, row in observable_df.iterrows(): + noise_formulas[observable_id] = ( + sympify_petab(row.noiseFormula) if NOISE_FORMULA in row else None + ) return noise_formulas @@ -364,7 +360,7 @@ def calculate_llh_for_table( (simulation_df[col] == row[col]) | petab.is_empty(row[col]) for col in compared_cols ] - mask = reduce(lambda x, y: x & y, masks) + mask = reduce(operator.and_, masks) simulation = simulation_df.loc[mask][SIMULATION].iloc[0] @@ -375,7 +371,7 @@ def calculate_llh_for_table( # get noise standard deviation noise_value = evaluate_noise_formula( - row, noise_formulas, parameter_df, petab.scale(simulation, scale) + row, noise_formulas, parameter_df, simulation ) # get noise distribution diff --git a/petab/v2/calculate.py b/petab/v2/calculate.py new file mode 100644 index 00000000..830c2d89 --- /dev/null +++ b/petab/v2/calculate.py @@ -0,0 +1,487 @@ +"""Functions performing various calculations.""" + +import numbers +import operator +from functools import reduce + +import numpy as np +import pandas as pd +import sympy as sp + +from petab.v1 import is_empty, split_parameter_replacement_list + +from .C import * +from .math import sympify_petab + +__all__ = [ + "calculate_residuals", + "calculate_residuals_for_table", + "get_symbolic_noise_formulas", + "evaluate_noise_formula", + "calculate_chi2", + "calculate_chi2_for_table_from_residuals", + "calculate_llh", + "calculate_llh_for_table", + "calculate_single_llh", +] + + +def calculate_residuals( + measurement_dfs: list[pd.DataFrame] | pd.DataFrame, + simulation_dfs: list[pd.DataFrame] | pd.DataFrame, + observable_dfs: list[pd.DataFrame] | pd.DataFrame, + parameter_dfs: list[pd.DataFrame] | pd.DataFrame, + normalize: bool = True, + scale: bool = True, +) -> list[pd.DataFrame]: + """Calculate residuals. + + Arguments: + measurement_dfs: + The problem measurement tables. + simulation_dfs: + Simulation tables corresponding to the measurement tables. + observable_dfs: + The problem observable tables. + parameter_dfs: + The problem parameter tables. + normalize: + Whether to normalize residuals by the noise standard deviation + terms. + scale: + Whether to calculate residuals of scaled values. + + Returns: + List of DataFrames in the same structure as `measurement_dfs` + with a field `residual` instead of measurement. + """ + # convenience + if isinstance(measurement_dfs, pd.DataFrame): + measurement_dfs = [measurement_dfs] + if isinstance(simulation_dfs, pd.DataFrame): + simulation_dfs = [simulation_dfs] + if isinstance(observable_dfs, pd.DataFrame): + observable_dfs = [observable_dfs] + if isinstance(parameter_dfs, pd.DataFrame): + parameter_dfs = [parameter_dfs] + + # iterate over data frames + residual_dfs = [] + for measurement_df, simulation_df, observable_df, parameter_df in zip( + measurement_dfs, + simulation_dfs, + observable_dfs, + parameter_dfs, + strict=True, + ): + residual_df = calculate_residuals_for_table( + measurement_df, + simulation_df, + observable_df, + parameter_df, + normalize, + scale, + ) + residual_dfs.append(residual_df) + return residual_dfs + + +def calculate_residuals_for_table( + measurement_df: pd.DataFrame, + simulation_df: pd.DataFrame, + observable_df: pd.DataFrame, + parameter_df: pd.DataFrame, + normalize: bool = True, + scale: bool = True, +) -> pd.DataFrame: + """ + Calculate residuals for a single measurement table. + For the arguments, see `calculate_residuals`. + """ + from petab.v1 import scale + + # below, we rely on a unique index + measurement_df = measurement_df.reset_index(drop=True) + + # create residual df as copy of measurement df, change column + residual_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: RESIDUAL} + ) + residual_df[RESIDUAL] = residual_df[RESIDUAL].astype("float64") + # matching columns + compared_cols = set(measurement_df.columns) & set(simulation_df.columns) + + # compute noise formulas for observables + noise_formulas = get_symbolic_noise_formulas(observable_df) + + # iterate over measurements, find corresponding simulations + for irow, row in measurement_df.iterrows(): + measurement = row[MEASUREMENT] + # look up in simulation df + masks = [ + (simulation_df[col] == row[col]) | is_empty(row[col]) + for col in compared_cols + ] + mask = reduce(operator.and_, masks) + if mask.sum() == 0: + raise ValueError( + f"Could not find simulation for measurement {row}." + ) + # if we have multiple matches, check that the rows are all identical + elif ( + mask.sum() > 1 + and simulation_df.loc[mask].drop_duplicates().shape[0] > 1 + ): + raise ValueError( + f"Multiple different simulations found for measurement " + f"{row}:\n{simulation_df.loc[mask]}" + ) + + simulation = simulation_df.loc[mask][SIMULATION].iloc[0] + if scale: + # apply scaling + observable = observable_df.loc[row[OBSERVABLE_ID]] + # for v2, the transformation is part of the noise distribution + noise_distr = observable.get(NOISE_DISTRIBUTION, NORMAL) + if noise_distr.startswith("log-"): + trafo = LOG + elif noise_distr.startswith("log10-"): + trafo = LOG10 + else: + trafo = LIN + + # scale simulation and measurement + + scaled_simulation = scale(simulation, trafo) + scaled_measurement = scale(measurement, trafo) + + # non-normalized residual is just the difference + residual = scaled_measurement - scaled_simulation + + if normalize: + # divide by standard deviation + residual /= evaluate_noise_formula( + row, noise_formulas, parameter_df, simulation, observable + ) + + # fill in value + residual_df.loc[irow, RESIDUAL] = residual + return residual_df + + +def get_symbolic_noise_formulas(observable_df) -> dict[str, sp.Expr]: + """Sympify noise formulas. + + Arguments: + observable_df: The observable table. + + Returns: + Dictionary of {observable_id}: {noise_formula}. + """ + noise_formulas = {} + # iterate over observables + for observable_id, row in observable_df.iterrows(): + noise_formulas[observable_id] = ( + sympify_petab(row.noiseFormula) if NOISE_FORMULA in row else None + ) + return noise_formulas + + +def evaluate_noise_formula( + measurement: pd.Series, + noise_formulas: dict[str, sp.Expr], + parameter_df: pd.DataFrame, + simulation: numbers.Number, + observable: dict, +) -> float: + """Fill in parameters for `measurement` and evaluate noise_formula. + + Arguments: + measurement: A measurement table row. + noise_formulas: The noise formulas as computed by + `get_symbolic_noise_formulas`. + parameter_df: The parameter table. + simulation: The simulation corresponding to the measurement, scaled. + observable: The observable table row corresponding to the measurement. + + Returns: + The noise value. + """ + # the observable id + observable_id = measurement[OBSERVABLE_ID] + + # extract measurement specific overrides + observable_parameter_overrides = split_parameter_replacement_list( + measurement.get(OBSERVABLE_PARAMETERS, None) + ) + noise_parameter_overrides = split_parameter_replacement_list( + measurement.get(NOISE_PARAMETERS, None) + ) + observable_parameter_placeholders = observable.get( + OBSERVABLE_PLACEHOLDERS, "" + ).split(PARAMETER_SEPARATOR) + noise_parameter_placeholders = observable.get( + NOISE_PLACEHOLDERS, "" + ).split(PARAMETER_SEPARATOR) + + # fill in measurement specific parameters + overrides = { + sp.Symbol(placeholder, real=True): override + for placeholder, override in zip( + [ + p.strip() + for p in observable_parameter_placeholders + + noise_parameter_placeholders + if p.strip() + ], + observable_parameter_overrides + noise_parameter_overrides, + strict=False, + ) + } + + # fill in observables + overrides[sp.Symbol(observable_id, real=True)] = simulation + + # fill in general parameters + for row in parameter_df.itertuples(): + overrides[sp.Symbol(row.Index, real=True)] = row.nominalValue + + # replace parametric measurement specific parameters + for key, value in overrides.items(): + if not isinstance(value, numbers.Number): + # is parameter + overrides[key] = parameter_df.loc[value, NOMINAL_VALUE] + + # replace parameters by values in formula + noise_formula = noise_formulas[observable_id] + noise_value = noise_formula.subs(overrides) + + # conversion is possible if all parameters are replaced + try: + noise_value = float(noise_value) + except TypeError as e: + raise ValueError( + f"Cannot replace all parameters in noise formula {noise_value} " + f"for observable {observable_id}. " + f"Missing {noise_formula.free_symbols}. Note that model states " + "are currently not supported." + ) from e + return noise_value + + +def calculate_chi2( + measurement_dfs: list[pd.DataFrame] | pd.DataFrame, + simulation_dfs: list[pd.DataFrame] | pd.DataFrame, + observable_dfs: list[pd.DataFrame] | pd.DataFrame, + parameter_dfs: list[pd.DataFrame] | pd.DataFrame, + normalize: bool = True, + scale: bool = True, +) -> float: + """Calculate the chi2 value. + + Arguments: + measurement_dfs: + The problem measurement tables. + simulation_dfs: + Simulation tables corresponding to the measurement tables. + observable_dfs: + The problem observable tables. + parameter_dfs: + The problem parameter tables. + normalize: + Whether to normalize residuals by the noise standard deviation + terms. + scale: + Whether to calculate residuals of scaled values. + + Returns: + The aggregated chi2 value. + """ + residual_dfs = calculate_residuals( + measurement_dfs, + simulation_dfs, + observable_dfs, + parameter_dfs, + normalize, + scale, + ) + chi2s = [ + calculate_chi2_for_table_from_residuals(df) for df in residual_dfs + ] + return sum(chi2s) + + +def calculate_chi2_for_table_from_residuals( + residual_df: pd.DataFrame, +) -> float: + """Compute chi2 value for a single residual table.""" + return (np.array(residual_df[RESIDUAL]) ** 2).sum() + + +def calculate_llh( + measurement_dfs: list[pd.DataFrame] | pd.DataFrame, + simulation_dfs: list[pd.DataFrame] | pd.DataFrame, + observable_dfs: list[pd.DataFrame] | pd.DataFrame, + parameter_dfs: list[pd.DataFrame] | pd.DataFrame, +) -> float: + """Calculate total log likelihood. + + Arguments: + measurement_dfs: + The problem measurement tables. + simulation_dfs: + Simulation tables corresponding to the measurement tables. + observable_dfs: + The problem observable tables. + parameter_dfs: + The problem parameter tables. + + Returns: + The log-likelihood. + """ + # convenience + if isinstance(measurement_dfs, pd.DataFrame): + measurement_dfs = [measurement_dfs] + if isinstance(simulation_dfs, pd.DataFrame): + simulation_dfs = [simulation_dfs] + if isinstance(observable_dfs, pd.DataFrame): + observable_dfs = [observable_dfs] + if isinstance(parameter_dfs, pd.DataFrame): + parameter_dfs = [parameter_dfs] + + # iterate over data frames + llhs = [] + for measurement_df, simulation_df, observable_df, parameter_df in zip( + measurement_dfs, + simulation_dfs, + observable_dfs, + parameter_dfs, + strict=True, + ): + _llh = calculate_llh_for_table( + measurement_df, simulation_df, observable_df, parameter_df + ) + llhs.append(_llh) + return sum(llhs) + + +def calculate_llh_for_table( + measurement_df: pd.DataFrame, + simulation_df: pd.DataFrame, + observable_df: pd.DataFrame, + parameter_df: pd.DataFrame, +) -> float: + """Calculate log-likelihood for one set of tables. For the arguments, see + `calculate_llh`. + """ + + llhs = [] + + # matching columns + compared_cols = set(measurement_df.columns) & set(simulation_df.columns) + + # compute noise formulas for observables + noise_formulas = get_symbolic_noise_formulas(observable_df) + + # iterate over measurements, find corresponding simulations + for _, row in measurement_df.iterrows(): + measurement = row[MEASUREMENT] + + # look up in simulation df + masks = [ + (simulation_df[col] == row[col]) | is_empty(row[col]) + for col in compared_cols + ] + mask = reduce(lambda x, y: x & y, masks) + + simulation = simulation_df.loc[mask][SIMULATION].iloc[0] + + observable = observable_df.loc[row[OBSERVABLE_ID]] + + # get noise distribution + noise_distr = observable.get(NOISE_DISTRIBUTION, NORMAL) + + if noise_distr.startswith("log-"): + obs_scale = LOG + noise_distr = noise_distr.removeprefix("log-") + elif noise_distr.startswith("log10-"): + obs_scale = LOG10 + noise_distr = noise_distr.removeprefix("log10-") + else: + obs_scale = LIN + + # get noise standard deviation + noise_value = evaluate_noise_formula( + row, + noise_formulas, + parameter_df, + simulation, + observable, + ) + + llh = calculate_single_llh( + measurement, simulation, obs_scale, noise_distr, noise_value + ) + llhs.append(llh) + return sum(llhs) + + +def calculate_single_llh( + measurement: float, + simulation: float, + scale: str, + noise_distribution: str, + noise_value: float, +) -> float: + """Calculate a single log likelihood. + + Arguments: + measurement: The measurement value. + simulation: The simulated value. + scale: The scale on which the noise model is to be applied. + noise_distribution: The noise distribution. + noise_value: The considered noise models possess a single noise + parameter, e.g. the normal standard deviation. + + Returns: + The computed likelihood for the given values. + """ + # PEtab v2: + if noise_distribution == LOG10_NORMAL and scale == LIN: + noise_distribution = NORMAL + scale = LOG10 + elif noise_distribution == LOG_NORMAL and scale == LIN: + noise_distribution = NORMAL + scale = LOG + + # short-hand + m, s, sigma = measurement, simulation, noise_value + pi, log, log10 = np.pi, np.log, np.log10 + + # go over the possible cases + if noise_distribution == NORMAL and scale == LIN: + nllh = 0.5 * log(2 * pi * sigma**2) + 0.5 * ((s - m) / sigma) ** 2 + elif noise_distribution == NORMAL and scale == LOG: + nllh = ( + 0.5 * log(2 * pi * sigma**2 * m**2) + + 0.5 * ((log(s) - log(m)) / sigma) ** 2 + ) + elif noise_distribution == NORMAL and scale == LOG10: + nllh = ( + 0.5 * log(2 * pi * sigma**2 * m**2 * log(10) ** 2) + + 0.5 * ((log10(s) - log10(m)) / sigma) ** 2 + ) + elif noise_distribution == LAPLACE and scale == LIN: + nllh = log(2 * sigma) + abs((s - m) / sigma) + elif noise_distribution == LAPLACE and scale == LOG: + nllh = log(2 * sigma * m) + abs((log(s) - log(m)) / sigma) + elif noise_distribution == LAPLACE and scale == LOG10: + nllh = log(2 * sigma * m * log(10)) + abs( + (log10(s) - log10(m)) / sigma + ) + else: + raise NotImplementedError( + "Unsupported combination of noise_distribution and scale " + f"specified: {noise_distribution}, {scale}." + ) + return -nllh diff --git a/petab/v2/math/__init__.py b/petab/v2/math/__init__.py new file mode 100644 index 00000000..8a5a5559 --- /dev/null +++ b/petab/v2/math/__init__.py @@ -0,0 +1,3 @@ +"""Functions for parsing and evaluating mathematical expressions.""" + +from petab.v1.math import * # noqa: F401 diff --git a/petab/v2/problem.py b/petab/v2/problem.py index 52baf724..f6ec9c6a 100644 --- a/petab/v2/problem.py +++ b/petab/v2/problem.py @@ -909,6 +909,8 @@ def add_observable( formula: str, noise_formula: str | float | int = None, noise_distribution: str = None, + observable_placeholders: list[str] = None, + noise_placeholders: list[str] = None, name: str = None, **kwargs, ): @@ -919,6 +921,8 @@ def add_observable( formula: The observable formula noise_formula: The noise formula noise_distribution: The noise distribution + observable_placeholders: Placeholders for the observable formula + noise_placeholders: Placeholders for the noise formula name: The observable name kwargs: additional columns/values to add to the observable table @@ -933,7 +937,10 @@ def add_observable( record[NOISE_FORMULA] = noise_formula if noise_distribution is not None: record[NOISE_DISTRIBUTION] = noise_distribution - + if observable_placeholders is not None: + record[OBSERVABLE_PLACEHOLDERS] = observable_placeholders + if noise_placeholders is not None: + record[NOISE_PLACEHOLDERS] = noise_placeholders record.update(kwargs) self.observable_table += core.Observable(**record) diff --git a/tests/v1/test_calculate.py b/tests/v1/test_calculate.py index 526ea7c9..c13105a8 100644 --- a/tests/v1/test_calculate.py +++ b/tests/v1/test_calculate.py @@ -4,14 +4,14 @@ import pandas as pd import pytest -import petab -from petab import ( +from petab.v1 import get_observable_df, get_parameter_df +from petab.v1.C import * +from petab.v1.calculate import ( calculate_chi2, calculate_llh, calculate_residuals, calculate_single_llh, ) -from petab.C import * def model_simple(): @@ -43,12 +43,12 @@ def model_simple(): simulation_df[SIMULATION] = [2, 2, 19, 20] expected_residuals = { - (2 - 0) / 2, - (2 - 1) / 2, - (19 - 20) / 3, - (20 - 22) / 3, + (0 - 2) / 2, + (1 - 2) / 2, + (20 - 19) / 3, + (22 - 20) / 3, } - expected_residuals_nonorm = {2 - 0, 2 - 1, 19 - 20, 20 - 22} + expected_residuals_nonorm = {0 - 2, 1 - 2, 20 - 19, 22 - 20} expected_llh = ( -0.5 * (np.array(list(expected_residuals)) ** 2).sum() - 0.5 * np.log(2 * np.pi * np.array([2, 2, 3, 3]) ** 2).sum() @@ -56,8 +56,8 @@ def model_simple(): return ( measurement_df, - petab.get_observable_df(observable_df), - petab.get_parameter_df(parameter_df), + get_observable_df(observable_df), + get_parameter_df(parameter_df), simulation_df, expected_residuals, expected_residuals_nonorm, @@ -93,8 +93,8 @@ def model_replicates(): ) simulation_df[SIMULATION] = [2, 2] - expected_residuals = {(2 - 0) / 2, (2 - 1) / 2} - expected_residuals_nonorm = {2 - 0, 2 - 1} + expected_residuals = {(0 - 2) / 2, (1 - 2) / 2} + expected_residuals_nonorm = {0 - 2, 1 - 2} expected_llh = ( -0.5 * (np.array(list(expected_residuals)) ** 2).sum() - 0.5 * np.log(2 * np.pi * np.array([2, 2]) ** 2).sum() @@ -141,12 +141,12 @@ def model_scalings(): simulation_df[SIMULATION] = [2, 3] expected_residuals = { - (np.log(2) - np.log(0.5)) / 2, - (np.log(3) - np.log(1)) / 2, + (np.log(0.5) - np.log(2)) / 2, + (np.log(1) - np.log(3)) / 2, } expected_residuals_nonorm = { - np.log(2) - np.log(0.5), - np.log(3) - np.log(1), + np.log(0.5) - np.log(2), + np.log(1) - np.log(3), } expected_llh = ( -0.5 * (np.array(list(expected_residuals)) ** 2).sum() @@ -201,12 +201,12 @@ def model_non_numeric_overrides(): simulation_df[SIMULATION] = [2, 3] expected_residuals = { - (np.log(2) - np.log(0.5)) / (2 * 7 + 8 + 4 + np.log(2)), - (np.log(3) - np.log(1)) / (2 * 2 + 3 + 4 + np.log(3)), + (np.log(0.5) - np.log(2)) / (2 * 7 + 8 + 4 + 2), + (np.log(1) - np.log(3)) / (2 * 2 + 3 + 4 + 3), } expected_residuals_nonorm = { - np.log(2) - np.log(0.5), - np.log(3) - np.log(1), + np.log(0.5) - np.log(2), + np.log(1) - np.log(3), } expected_llh = ( -0.5 * (np.array(list(expected_residuals)) ** 2).sum() @@ -214,8 +214,7 @@ def model_non_numeric_overrides(): * np.log( 2 * np.pi - * np.array([2 * 7 + 8 + 4 + np.log(2), 2 * 2 + 3 + 4 + np.log(3)]) - ** 2 + * np.array([2 * 7 + 8 + 4 + 2, 2 * 2 + 3 + 4 + 3]) ** 2 * np.array([0.5, 1]) ** 2 ).sum() ) @@ -261,8 +260,8 @@ def model_custom_likelihood(): ) simulation_df[SIMULATION] = [2, 3] - expected_residuals = {(np.log(2) - np.log(0.5)) / 2, (3 - 2) / 1.5} - expected_residuals_nonorm = {np.log(2) - np.log(0.5), 3 - 2} + expected_residuals = {(np.log(0.5) - np.log(2)) / 2, (2 - 3) / 1.5} + expected_residuals_nonorm = {np.log(0.5) - np.log(2), 2 - 3} expected_llh = ( -np.abs(list(expected_residuals)).sum() - np.log(2 * np.array([2, 1.5]) * np.array([0.5, 1])).sum() diff --git a/tests/v2/test_calculate.py b/tests/v2/test_calculate.py new file mode 100644 index 00000000..cba929ae --- /dev/null +++ b/tests/v2/test_calculate.py @@ -0,0 +1,451 @@ +"""Tests related to petab.calculate.""" + +import numpy as np +import pandas as pd +import pytest + +from petab.v2 import get_observable_df, get_parameter_df +from petab.v2.C import * +from petab.v2.calculate import ( + calculate_chi2, + calculate_llh, + calculate_residuals, + calculate_single_llh, +) + + +def model_simple(): + "Simple model." + measurement_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_a", "obs_b", "obs_b"], + EXPERIMENT_ID: ["c0", "c1", "c0", "c1"], + TIME: [0, 10, 0, 10], + MEASUREMENT: [0, 1, 20, 22], + } + ) + + observable_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_b"], + OBSERVABLE_FORMULA: ["A", "B"], + NOISE_FORMULA: [2, 3], + } + ).set_index([OBSERVABLE_ID]) + + parameter_df = pd.DataFrame( + data={PARAMETER_ID: ["par1", "par2"], NOMINAL_VALUE: [3, 4]} + ) + + simulation_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: SIMULATION} + ) + simulation_df[SIMULATION] = [2, 2, 19, 20] + + expected_residuals = { + (0 - 2) / 2, + (1 - 2) / 2, + (20 - 19) / 3, + (22 - 20) / 3, + } + expected_residuals_nonorm = {0 - 2, 1 - 2, 20 - 19, 22 - 20} + expected_llh = ( + -0.5 * (np.array(list(expected_residuals)) ** 2).sum() + - 0.5 * np.log(2 * np.pi * np.array([2, 2, 3, 3]) ** 2).sum() + ) + + return ( + measurement_df, + get_observable_df(observable_df), + get_parameter_df(parameter_df), + simulation_df, + expected_residuals, + expected_residuals_nonorm, + expected_llh, + ) + + +def model_replicates(): + """Model with replicates.""" + measurement_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_a"], + EXPERIMENT_ID: ["c0", "c0"], + TIME: [10, 10], + MEASUREMENT: [0, 1], + } + ) + + observable_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a"], + OBSERVABLE_FORMULA: ["A"], + NOISE_FORMULA: [2], + } + ).set_index([OBSERVABLE_ID]) + + parameter_df = pd.DataFrame( + data={PARAMETER_ID: ["par1", "par2"], NOMINAL_VALUE: [3, 4]} + ).set_index([PARAMETER_ID]) + + simulation_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: SIMULATION} + ) + simulation_df[SIMULATION] = [2, 2] + + expected_residuals = {(0 - 2) / 2, (1 - 2) / 2} + expected_residuals_nonorm = {0 - 2, 1 - 2} + expected_llh = ( + -0.5 * (np.array(list(expected_residuals)) ** 2).sum() + - 0.5 * np.log(2 * np.pi * np.array([2, 2]) ** 2).sum() + ) + + return ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + expected_residuals_nonorm, + expected_llh, + ) + + +def model_scalings(): + """Model with scalings.""" + measurement_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_a"], + EXPERIMENT_ID: ["c0", "c0"], + TIME: [5, 10], + MEASUREMENT: [0.5, 1], + } + ) + + observable_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a"], + OBSERVABLE_FORMULA: ["A"], + NOISE_DISTRIBUTION: [LOG_NORMAL], + NOISE_FORMULA: [2], + } + ).set_index([OBSERVABLE_ID]) + + parameter_df = pd.DataFrame( + data={PARAMETER_ID: ["par1", "par2"], NOMINAL_VALUE: [3, 4]} + ).set_index([PARAMETER_ID]) + + simulation_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: SIMULATION} + ) + simulation_df[SIMULATION] = [2, 3] + + expected_residuals = { + (np.log(0.5) - np.log(2)) / 2, + (np.log(1) - np.log(3)) / 2, + } + expected_residuals_nonorm = { + np.log(0.5) - np.log(2), + np.log(1) - np.log(3), + } + expected_llh = ( + -0.5 * (np.array(list(expected_residuals)) ** 2).sum() + - 0.5 + * np.log( + 2 * np.pi * np.array([2, 2]) ** 2 * np.array([0.5, 1]) ** 2 + ).sum() + ) + + return ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + expected_residuals_nonorm, + expected_llh, + ) + + +def model_non_numeric_overrides(): + """Model with non-numeric overrides.""" + measurement_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_a"], + EXPERIMENT_ID: ["c0", "c0"], + TIME: [5, 10], + MEASUREMENT: [0.5, 1], + NOISE_PARAMETERS: ["7;8", "2;par1"], + } + ) + + observable_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a"], + OBSERVABLE_FORMULA: ["A"], + NOISE_DISTRIBUTION: [LOG_NORMAL], + NOISE_FORMULA: [ + "2*noiseParameter1_obs_a + " + "noiseParameter2_obs_a + par2 + obs_a" + ], + NOISE_PLACEHOLDERS: [ + "noiseParameter1_obs_a;noiseParameter2_obs_a" + ], + } + ).set_index([OBSERVABLE_ID]) + + parameter_df = pd.DataFrame( + data={PARAMETER_ID: ["par1", "par2"], NOMINAL_VALUE: [3, 4]} + ).set_index([PARAMETER_ID]) + + simulation_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: SIMULATION} + ) + simulation_df[SIMULATION] = [2, 3] + + expected_residuals = { + (np.log(0.5) - np.log(2)) / (2 * 7 + 8 + 4 + 2), + (np.log(1) - np.log(3)) / (2 * 2 + 3 + 4 + 3), + } + expected_residuals_nonorm = { + np.log(0.5) - np.log(2), + np.log(1) - np.log(3), + } + expected_llh = ( + -0.5 * (np.array(list(expected_residuals)) ** 2).sum() + - 0.5 + * np.log( + 2 + * np.pi + * np.array([2 * 7 + 8 + 4 + 2, 2 * 2 + 3 + 4 + 3]) ** 2 + * np.array([0.5, 1]) ** 2 + ).sum() + ) + + return ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + expected_residuals_nonorm, + expected_llh, + ) + + +def model_custom_likelihood(): + """Model with customized likelihoods.""" + measurement_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_b"], + EXPERIMENT_ID: ["c0", "c0"], + TIME: [5, 10], + MEASUREMENT: [0.5, 2], + } + ) + + observable_df = pd.DataFrame( + data={ + OBSERVABLE_ID: ["obs_a", "obs_b"], + OBSERVABLE_FORMULA: ["A", "B"], + NOISE_FORMULA: [2, 1.5], + NOISE_DISTRIBUTION: [LOG_LAPLACE, LAPLACE], + } + ).set_index([OBSERVABLE_ID]) + + parameter_df = pd.DataFrame( + data={PARAMETER_ID: ["par1", "par2"], NOMINAL_VALUE: [3, 4]} + ).set_index([PARAMETER_ID]) + + simulation_df = measurement_df.copy(deep=True).rename( + columns={MEASUREMENT: SIMULATION} + ) + simulation_df[SIMULATION] = [2, 3] + + expected_residuals = {(np.log(0.5) - np.log(2)) / 2, (2 - 3) / 1.5} + expected_residuals_nonorm = {np.log(0.5) - np.log(2), 2 - 3} + expected_llh = ( + -np.abs(list(expected_residuals)).sum() + - np.log(2 * np.array([2, 1.5]) * np.array([0.5, 1])).sum() + ) + + return ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + expected_residuals_nonorm, + expected_llh, + ) + + +@pytest.fixture +def models(): + """Test model collection covering different features.""" + return [ + model_simple(), + model_replicates(), + model_scalings(), + model_non_numeric_overrides(), + model_custom_likelihood(), + ] + + +def test_calculate_residuals(models): # pylint: disable=W0621 + """Test calculate.calculate_residuals.""" + for i_model, model in enumerate(models): + print(f"Model {i_model}") + ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + _, + _, + ) = model + residual_dfs = calculate_residuals( + measurement_df, simulation_df, observable_df, parameter_df + ) + assert sorted(residual_dfs[0][RESIDUAL]) == pytest.approx( + sorted(expected_residuals) + ) + + +def test_calculate_non_normalized_residuals(models): # pylint: disable=W0621 + """Test calculate.calculate_residuals without normalization.""" + for i_model, model in enumerate(models): + print(f"Model {i_model}") + ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + _, + expected_residuals_nonorm, + _, + ) = model + residual_dfs = calculate_residuals( + measurement_df, + simulation_df, + observable_df, + parameter_df, + normalize=False, + ) + assert sorted(residual_dfs[0][RESIDUAL]) == pytest.approx( + sorted(expected_residuals_nonorm) + ) + + +def test_calculate_chi2(models): # pylint: disable=W0621 + """Test calculate.calculate_chi2.""" + for i_model, model in enumerate(models): + print(f"Model {i_model}") + ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + expected_residuals, + _, + _, + ) = model + chi2 = calculate_chi2( + measurement_df, simulation_df, observable_df, parameter_df + ) + + expected = sum(np.array(list(expected_residuals)) ** 2) + assert chi2 == pytest.approx(expected) + + +def test_calculate_llh(models): # pylint: disable=W0621 + """Test calculate.calculate_llh.""" + for i_model, model in enumerate(models): + print(f"Model {i_model}") + ( + measurement_df, + observable_df, + parameter_df, + simulation_df, + _, + _, + expected_llh, + ) = model + llh = calculate_llh( + measurement_df, simulation_df, observable_df, parameter_df + ) + assert llh == pytest.approx(expected_llh) or expected_llh is None + + +def test_calculate_single_llh(): + """Test calculate.calculate_single_llh.""" + m, s, sigma = 5.3, 4.5, 1.6 + pi, log, log10 = np.pi, np.log, np.log10 + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=NORMAL, + scale=LIN, + ) + expected_llh = -0.5 * (((s - m) / sigma) ** 2 + log(2 * pi * sigma**2)) + assert llh == pytest.approx(expected_llh) + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=NORMAL, + scale=LOG, + ) + expected_llh = -0.5 * ( + ((log(s) - log(m)) / sigma) ** 2 + log(2 * pi * sigma**2 * m**2) + ) + assert llh == pytest.approx(expected_llh) + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=NORMAL, + scale=LOG10, + ) + expected_llh = -0.5 * ( + ((log10(s) - log10(m)) / sigma) ** 2 + + log(2 * pi * sigma**2 * m**2 * log(10) ** 2) + ) + assert llh == pytest.approx(expected_llh) + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=LAPLACE, + scale=LIN, + ) + expected_llh = -abs((s - m) / sigma) - log(2 * sigma) + assert llh == pytest.approx(expected_llh) + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=LAPLACE, + scale=LOG, + ) + expected_llh = -abs((log(s) - log(m)) / sigma) - log(2 * sigma * m) + assert llh == pytest.approx(expected_llh) + + llh = calculate_single_llh( + measurement=m, + simulation=s, + noise_value=sigma, + noise_distribution=LAPLACE, + scale=LOG10, + ) + expected_llh = -abs((log10(s) - log10(m)) / sigma) - log( + 2 * sigma * m * log(10) + ) + assert llh == pytest.approx(expected_llh)