Source code for optimpv.RateEqfits.RateEqAgent
"""Provides general functionality for Agent objects for Rate Equation simulations"""
######### Package Imports #########################################################################
import os, uuid, sys, copy, warnings
import numpy as np
import pandas as pd
from scipy import interpolate, constants
# try:
# import pvlib
# from pvlib.pvsystem import i_from_v
# got_pvlib = True
# except:
# got_pvlib = False
# warnings.warn('pvlib not installed, using scipy for diode equation')
from optimpv import *
from optimpv.general.general import calc_metric, loss_function, transform_data
from optimpv.general.BaseAgent import BaseAgent
from optimpv.RateEqfits.RateEqModel import *
from optimpv.RateEqfits.Pumps import *
## Physics constants
q = constants.value(u'elementary charge')
eps_0 = constants.value(u'electric constant')
kb = constants.value(u'Boltzmann constant in eV/K')
######### Agent Definition #######################################################################
[docs]
class RateEqAgent(BaseAgent):
"""Agent object for fitting rate equation models.
Available models are:
- Bimolecular-Trapping (BT)
- Bimolecular-Trapping-detrapping (BTD)
see optimpv.RateEqfits.RateEqModel.py for more details about the available models
Parameters
----------
params : list of Fitparam() objects
List of Fitparam() objects.
X : array-like
1-D or 2-D array containing the voltage (1st column) and if specified the Gfrac (2nd column) values.
y : array-like
1-D array containing the current values.
pump_model : function, optional
Function to get the generated carrier density profile, by default initial_carrier_density.
pump_args : dict, optional
Arguments for the pump_model function, by default {}.
exp_format : str or list of str, optional
Format of the experimental data, cane be ['trPL','TAS','trMC'], by default 'trPL'.
exp_format_agrs : dict, optional
Arguments dependent on the exp_format function, by default {}.
metric : str or list of str, optional
Metric to evaluate the model, see optimpv.general.calc_metric for options, by default 'mse'.
loss : str or list of str, optional
Loss function to use, see optimpv.general.loss_function for options, by default 'linear'.
threshold : int or list of int, optional
Threshold value for the loss function used when doing multi-objective optimization, by default 100.
minimize : bool or list of bool, optional
If True then minimize the loss function, if False then maximize the loss function (note that if running a fit minize should be True), by default True.
yerr : array-like or list of array-like, optional
Errors in the current values, by default None.
weight : array-like or list of array-like, optional
Weights used for fitting if weight is None and yerr is not None, then weight = 1/yerr**2, by default None.
tracking_metric : str or list of str, optional
Additional metrics to track and report in run_Ax output, by default None.
tracking_loss : str or list of str, optional
Loss functions to apply to tracking metrics, by default None.
tracking_exp_format : str or list of str, optional
Experimental formats for tracking metrics, by default None.
tracking_X : array-like or list of array-like, optional
X values for tracking metrics, by default None.
tracking_y : array-like or list of array-like, optional
y values for tracking metrics, by default None.
tracking_weight : array-like or list of array-like, optional
Weights for tracking metrics, by default None.
name : str, optional
Name of the agent, by default 'RateEq'.
**kwargs : dict
Additional keyword arguments.
"""
def __init__(self, params, X, y, model = BT_model, pump_model = initial_carrier_density, pump_args = {}, exp_format = 'trPL', fixed_model_args = {}, metric = 'mse', loss = 'linear', threshold = 100, minimize = True, yerr = None, weight = None, tracking_metric = None, tracking_loss = None, tracking_exp_format = None, tracking_X = None, tracking_y = None, tracking_weight = None, name = 'RateEq', equilibrate = True, detection_limit=None,**kwargs):
self.params = params
self.X = X # voltage and Gfrac
self.y = y
self.pump_model = pump_model
self.pump_args = pump_args
self.model = model
self.fixed_model_args = fixed_model_args
self.metric = metric
self.loss = loss
self.threshold = threshold
self.minimize = minimize
self.equilibrate = equilibrate
self.detection_limit = detection_limit
self.tracking_metric = tracking_metric
self.tracking_loss = tracking_loss
self.tracking_exp_format = tracking_exp_format
self.tracking_X = tracking_X
self.tracking_y = tracking_y
self.tracking_weight = tracking_weight
self.yerr = yerr
self.weight = weight
self.name = name
self.kwargs = kwargs
# Set and validate experiment format
self.exp_format = exp_format
if isinstance(exp_format, str):
self.exp_format = [exp_format]
# Check that all elements in exp_format are valid
for form in self.exp_format:
if form not in ['trPL','TAS','trMC']:
raise ValueError(f'{form} is an invalid exp_format, must be either "trPL", "TAS" or "trMC"')
# Process main metrics and loss functions
if self.loss is None:
self.loss = 'linear'
if self.metric is None:
self.metric = 'mse'
if isinstance(metric, str):
self.metric = [metric]
if isinstance(loss, str):
self.loss = [loss]
if isinstance(threshold, (int,float)):
self.threshold = [threshold]
if isinstance(minimize, bool):
self.minimize = [minimize]
# Process weights
if weight is not None:
# check that weight has the same length as y
if not len(weight) == len(y):
raise ValueError('weight must have the same length as y')
self.weight = []
for w in weight:
if isinstance(w, (list, tuple)):
self.weight.append(np.asarray(w))
else:
self.weight.append(w)
else:
if yerr is not None:
# check that yerr has the same length as y
if not len(yerr) == len(y):
raise ValueError('yerr must have the same length as y')
self.weight = []
for yer in yerr:
self.weight.append(1/np.asarray(yer)**2)
else:
self.weight = [None]*len(y)
# Check that primary data dimensions match
if not len(self.exp_format) == len(self.metric) == len(self.loss) == len(self.threshold) == len(self.minimize) == len(self.X) == len(self.y) == len(self.weight):
raise ValueError('exp_format, metric, loss, threshold and minimize must have the same length')
self.all_agent_metrics = self.get_all_agent_metric_names()
# Process tracking metrics
if self.tracking_metric is not None:
if isinstance(self.tracking_metric, str):
self.tracking_metric = [self.tracking_metric]
if self.tracking_loss is None:
self.tracking_loss = ['linear'] * len(self.tracking_metric)
elif isinstance(self.tracking_loss, str):
self.tracking_loss = [self.tracking_loss] * len(self.tracking_metric)
# Ensure tracking_metric and tracking_loss have the same length
if len(self.tracking_metric) != len(self.tracking_loss):
raise ValueError('tracking_metric and tracking_loss must have the same length')
# Process tracking_exp_format
if self.tracking_exp_format is None:
# Default to the main experiment formats if not specified
self.tracking_exp_format = self.exp_format
elif isinstance(self.tracking_exp_format, str):
self.tracking_exp_format = [self.tracking_exp_format]
# Check that all elements in tracking_exp_format are valid
for form in self.tracking_exp_format:
if form not in ['trPL','TAS','trMC']:
raise ValueError(f'{form} is an invalid tracking_exp_format, must be either "trPL", "TAS" or "trMC"')
# Process tracking_X and tracking_y
all_formats_in_main = all(fmt in self.exp_format for fmt in self.tracking_exp_format)
if self.tracking_X is None or self.tracking_y is None:
if not all_formats_in_main:
raise ValueError('tracking_X and tracking_y must be provided when tracking_exp_format contains formats not in exp_format')
# Construct tracking_X and tracking_y from main X and y based on matching formats
self.tracking_X = []
self.tracking_y = []
for fmt in self.tracking_exp_format:
fmt_indices = [i for i, main_fmt in enumerate(self.exp_format) if main_fmt == fmt]
if fmt_indices:
# Use the first matching format's data
idx = fmt_indices[0]
self.tracking_X.append(self.X[idx])
self.tracking_y.append(self.y[idx])
# Ensure tracking_X and tracking_y are lists
if not isinstance(self.tracking_X, list):
self.tracking_X = [self.tracking_X]
if not isinstance(self.tracking_y, list):
self.tracking_y = [self.tracking_y]
# Check that tracking_X and tracking_y have the right lengths
if len(self.tracking_X) != len(self.tracking_exp_format) or len(self.tracking_y) != len(self.tracking_exp_format):
raise ValueError('tracking_X and tracking_y must have the same length as tracking_exp_format')
# Process tracking_weight
if self.tracking_weight is None and all_formats_in_main:
# Use the main weights if available
self.tracking_weight = []
for fmt in self.tracking_exp_format:
fmt_indices = [i for i, main_fmt in enumerate(self.exp_format) if main_fmt == fmt]
if fmt_indices:
idx = fmt_indices[0]
self.tracking_weight.append(self.weight[idx])
else:
self.tracking_weight.append(None)
elif self.tracking_weight is None:
self.tracking_weight = [None] * len(self.tracking_exp_format)
elif not isinstance(self.tracking_weight, list):
self.tracking_weight = [self.tracking_weight]
# Ensure tracking_weight has the right length
if len(self.tracking_weight) != len(self.tracking_exp_format):
raise ValueError('tracking_weight must have the same length as tracking_exp_format')
# Check that tracking dimensions match
if not len(self.tracking_exp_format) == len(self.tracking_metric) == len(self.tracking_loss):
raise ValueError('tracking_exp_format, tracking_metric and tracking_loss must have the same length')
self.all_agent_tracking_metrics = self.get_all_agent_tracking_metric_names()
# Process compare_type parameter
self.compare_type = self.kwargs.get('compare_type', 'linear')
if 'compare_type' in self.kwargs.keys():
self.kwargs.pop('compare_type')
# Validate compare_type
if self.compare_type not in ['linear', 'log', 'normalized', 'normalized_log', 'sqrt']:
raise ValueError('compare_type must be either linear, log, normalized, normalized_log, or sqrt')
[docs]
def run_RateEq(self,parameters):
"""Run the diode model and calculate the loss function
Parameters
----------
parameters : dict
Dictionary of parameter names and values.
Returns
-------
float
Loss function value.
"""
# get Gfracs from X
if 'QE' in parameters.keys():
QE = parameters['QE']
elif 'QE' in self.fixed_model_args.keys():
QE = self.fixed_model_args['QE']
else:
QE = 1
Gfracs = []
got_gfrac_none = False
len_X = len(self.X[0])
x_default = self.X[0]
for xx in self.X:
if len(xx.shape) == 1:
Gfracs = None
got_gfrac_none = True
if len(xx) != len_X:
raise ValueError('all X elements should have the same shape')
if not np.allclose(xx,x_default):
raise ValueError('all X elements should be the same, if they are different then create a separate agent for each X element')
else:
if got_gfrac_none:
raise ValueError('all X elements should have the same shape and Gfrac should be provided for all elements if specified for one')
# append np.unique(xx[:,1]) to Gfracs list
if len(xx.shape) == 2:
Gfrac, index = np.unique(xx[:,1],return_index=True)
Gfrac = Gfrac[np.argsort(index)]
for g in Gfrac:
if g not in Gfracs:
Gfracs.append(g)
else:
Gfracs = None
if len(xx) != len_X:
raise ValueError('all X elements should have the same shape')
# check all elements in xx[:,0] are the same
for Gfrac in Gfracs:
if not np.allclose(xx[xx[:,1]==Gfrac,0],x_default[x_default[:,1]==Gfrac,0]):
# if not np.allclose(xx[:,0],x_default[:,0]):
raise ValueError('all X elements should be the same, if they are different then create a separate agent for each X element')
if Gfracs is not None:
Gfracs = np.asarray(Gfracs)
ns, ps = None, None
if Gfracs is None:
t = self.X[0] # time axis should be the same for all elements
tmax = 0.99999*1/self.pump_args['fpu'] # maximum time for the pump
t_span = np.linspace(0,tmax,len(t)) # time axis for the simulation, here we need a different time axis for the simulation in case there is any equilibration to make sure that the full pulse is included and to reproduce the accumulated carrier density properly
t = self.X[0]
tmax = 0.99999*1/self.pump_args['fpu']
t_span = t
if t_span[-1] < tmax:
dum = np.linspace(t[-1],tmax,100)
dum = dum[1:]
t_span = np.hstack((t,dum))
# get the pump profile
Generation = self.pump_model(t_span, **self.pump_args) * QE # multiply by the quantum efficiency
if 'N0' in self.pump_args.keys():
N0 = self.pump_args['N0']
else:
N0 = 0
if 'G_frac' in self.pump_args.keys():
G_frac = self.pump_args['G_frac']
else:
G_frac = 1
ns, ps = self.model(parameters, t, Generation, t_span, N0 = N0, equilibrate = self.equilibrate, G_frac = G_frac, **self.kwargs)
Gfrac_list = np.ones(len(t))
else:
for Gfrac in Gfracs:
t = self.X[0][self.X[0][:,1] == Gfrac,0]
tmax = 0.99999*1/self.pump_args['fpu']
t_span = t
if t_span[-1] < tmax:
dum = np.linspace(t[-1],tmax,100)
dum = dum[1:]
t_span = np.hstack((t,dum))
Generation = self.pump_model(t_span, G_frac = Gfrac, **self.pump_args) * QE
if 'N0' in self.pump_args.keys():
N0 = self.pump_args['N0']
else:
N0 = 0
ns_, ps_ = self.model(parameters, t, Generation, t_span, N0 = N0, equilibrate = self.equilibrate, G_frac = Gfrac, **self.kwargs)
if ns is None:
ns = ns_
ps = ps_
t_list = t
Gfrac_list = np.ones(len(t))*Gfrac
else:
ns = np.hstack((ns,ns_))
ps = np.hstack((ps,ps_))
t_list = np.hstack((t_list,t))
Gfrac_list = np.hstack((Gfrac_list,np.ones(len(t))*Gfrac))
dum_dict = {}
dum_dict['n'] = ns
dum_dict['p'] = ps
if Gfracs is None:
dum_dict['t'] = t
dum_dict['G_frac'] = Gfrac_list
else:
dum_dict['t'] = t_list
dum_dict['G_frac'] = Gfrac_list
try:
df = pd.DataFrame(dum_dict)
except:
print(parameters)
for key in dum_dict.keys():
print(key,len(dum_dict[key]))
return np.nan
return df
[docs]
def reformat_data(self,df,X,parameters,exp_format='trPL'):
"""Reformat the data to the experimental format
Parameters
----------
df : DataFrame
DataFrame containing the simulation results.
X : array-like
1-D or 2-D array containing the voltage (1st column) and if specified the Gfrac (2nd column) values.
parameters : dict
Dictionary of parameter names and values.
exp_format : str, optional
Format of the experimental data, cane be ['trPL','TAS','trMC'], by default 'trPL'.
Returns
-------
DataFrame
DataFrame containing the simulation results in the experimental format.
"""
# check if Gfrac is None is unique in df
Gfracs, index = np.unique(df['G_frac'],return_index=True)
if len(Gfracs) == 1:
Gfracs = None
else:
Gfracs = Gfracs[np.argsort(index)]
Xfit,yfit = None,None # will be np.array
do_interp = True
if exp_format == 'trPL': # transient photoluminescence
if 'k_direct' in parameters.keys():
k_direct = parameters['k_direct']
elif 'k_direct' in self.fixed_model_args.keys():
k_direct = self.fixed_model_args['k_direct']
else:
raise ValueError('k_direct should be provided in the parameters or fixed_model_args')
if 'I_factor_PL' in parameters.keys():
I_factor = parameters['I_factor_PL']
elif 'I_factor_PL' in self.fixed_model_args.keys():
I_factor = self.fixed_model_args['I_factor_PL']
else:
raise ValueError('I_factor_PL should be provided in the parameters or fixed_model_args')
if 'N_A' in parameters.keys():
N_A = parameters['N_A']
elif 'N_A' in self.fixed_model_args.keys():
N_A = self.fixed_model_args['N_A']
else:
N_A = 0
if Gfracs is None:
# check if we have an array
if not isinstance(df['n'].iloc[0], (np.ndarray, list)):
# if n and p are not arrays, then we can use them directly
signal = I_factor * k_direct * df['n'] * (df['p'] + N_A)
else:
n_dens = np.asarray(df['n'].values.tolist())
p_dens = np.asarray(df['p'].values.tolist())
Rrad_calc = (n_dens * (p_dens + N_A) * k_direct)
signal = np.sum((Rrad_calc[:, 1:] + Rrad_calc[:, :-1]) / 2, axis=1) * I_factor # need to integrate over the space axis
# signal = I_factor * k_direct * df['n'] * (df['p'] + N_A)
t = df['t']
# check if t = X
if len(t) == len(X):
if np.allclose(t,X):
do_interp = False
if do_interp:
try:
tck = interpolate.splrep(t,signal)
signal = interpolate.splev(X,tck)
except:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit = f(X)
else:
yfit = np.asarray(signal)
Xfit = np.asarray(X)
else:
for Gfrac in Gfracs:
dum_df = df[df['G_frac'] == Gfrac]
if not isinstance(dum_df['n'].iloc[0], (np.ndarray, list)):
# if n and p are not arrays, then we can use them directly
signal = I_factor * k_direct * dum_df['n'] * (dum_df['p'] + N_A)
else:
n_dens = np.asarray(dum_df['n'].values.tolist())
p_dens = np.asarray(dum_df['p'].values.tolist())
Rrad_calc = (n_dens * (p_dens + N_A) * k_direct)
signal = np.sum((Rrad_calc[:, 1:] + Rrad_calc[:, :-1]) / 2, axis=1) * I_factor # need to integrate over the space axis
t = dum_df['t']
X_ = X[X[:,1] == Gfrac,0]
if do_interp:
try:
tck = interpolate.splrep(t,signal)
yfit_ = interpolate.splev(X_,tck)
except Exception as e:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit_ = f(X_)
else:
yfit_ = signal
if yfit is None:
yfit = np.asarray(yfit_)
else:
yfit = np.hstack((yfit,yfit_))
if Xfit is None:
Xfit = X
else:
Xfit = np.hstack((Xfit,X))
Xfit = np.asarray(Xfit)
yfit = np.asarray(yfit)
elif exp_format == 'trMC': # transient microwave conductivity
if 'I_factor_MC' in parameters.keys():
I_factor = parameters['I_factor_MC']
elif 'I_factor_MC' in self.fixed_model_args.keys():
I_factor = self.fixed_model_args['I_factor_MC']
else:
raise ValueError('I_factor_MC should be provided in the parameters or fixed_model_args')
if 'ratio_mu' in parameters.keys():
ratio_mu = parameters['ratio_mu']
elif 'ratio_mu' in self.fixed_model_args.keys():
ratio_mu = self.fixed_model_args['ratio_mu']
else:
raise ValueError('ratio_mu should be provided in the parameters or fixed_model_args')
if Gfracs is None:
if not isinstance(df['n'].iloc[0], (np.ndarray, list)):
# if n and p are not arrays, then we can use them directly
signal = I_factor * (ratio_mu * df['n'] + df['p'])
else:
n_dens = np.asarray(df['n'].values.tolist())
p_dens = np.asarray(df['p'].values.tolist())
signal = (ratio_mu * n_dens + p_dens)
signal = np.mean((signal), axis=1) * I_factor # need to integrate over the space axis, no need to
# signal = I_factor*(ratio_mu*df['n'] + df['p'])
t = df['t']
# check if t = X
if len(t) == len(X):
if np.allclose(t,X):
do_interp = False
if do_interp:
try:
tck = interpolate.splrep(t,signal)
signal = interpolate.splev(X,tck)
except:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit = f(X)
else:
yfit = np.asarray(signal)
Xfit = np.asarray(X)
else:
for Gfrac in Gfracs:
dum_df = df[df['G_frac'] == Gfrac]
# signal = I_factor*(ratio_mu*dum_df['n'] + dum_df['p'])
if not isinstance(dum_df['n'].iloc[0], (np.ndarray, list)):
# if n and p are not arrays, then we can use them directly
signal = I_factor * (ratio_mu * dum_df['n'] + dum_df['p'])
else:
n_dens = np.asarray(dum_df['n'].values.tolist())
p_dens = np.asarray(dum_df['p'].values.tolist())
signal = (ratio_mu * n_dens + p_dens)
signal = np.mean((signal), axis=1) * I_factor # need to integrate over the space axis, no need to
t = dum_df['t']
X_ = X[X[:,1] == Gfrac,0]
if do_interp:
try:
tck = interpolate.splrep(t,signal)
yfit_ = interpolate.splev(X_,tck)
except Exception as e:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit_ = f(X_)
else:
yfit_ = signal
if yfit is None:
yfit = np.asarray(yfit_)
else:
yfit = np.hstack((yfit,yfit_))
if Xfit is None:
Xfit = X
else:
Xfit = np.hstack((Xfit,X))
Xfit = np.asarray(Xfit)
yfit = np.asarray(yfit)
elif exp_format == 'TAS': # transient absorption spectroscopy
if 'I_factor_TAS' in parameters.keys():
I_factor = parameters['I_factor_TAS']
elif 'I_factor_TAS' in self.fixed_model_args.keys():
I_factor = self.fixed_model_args['I_factor_TAS']
else:
I_factor = 1 # we usually don't have a factor for TAS and correct with cross-sections and thickness
# raise ValueError('I_factor_TAS should be provided in the parameters or fixed_model_args')
if 'cross_section' in parameters.keys():
cross_section = parameters['cross_section']
elif 'cross_section' in self.fixed_model_args.keys():
cross_section = self.fixed_model_args['cross_section']
else:
raise ValueError('cross_section should be provided in the parameters or fixed_model_args')
if 'L' in parameters.keys():
L = parameters['L']
elif 'L' in self.fixed_model_args.keys():
L = self.fixed_model_args['L']
else:
raise ValueError('L should be provided in the parameters or fixed_model_args')
if Gfracs is None:
# check if we have an array
if not isinstance(df['n'].iloc[0], (np.ndarray, list)):
# if n is not an array, then we can use it directly
signal = df['n']
else:
signal = np.mean(np.asarray(df['n'].values.tolist()))
# signal = df['n']
t = df['t']
# check if t = X
if len(t) == len(X):
if np.allclose(t,X):
do_interp = False
if do_interp:
try:
tck = interpolate.splrep(t,signal)
signal = interpolate.splev(X,tck)
except:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit = f(X)
else:
yfit = np.asarray(signal)
yfit -= np.mean(yfit) # simulate AC coupling
yfit = I_factor*(np.exp(-cross_section*L*yfit) - 1) # \Delta T/T = -\sigma*L*\Delta \n - 1 (the minus one comes from the fact that we are looking at the change in transmission and since Beer-Lambert law is A = exp(-\sigma*L*n)
Xfit = np.asarray(X)
else:
for Gfrac in Gfracs:
dum_df = df[df['G_frac'] == Gfrac]
signal = dum_df['n']
t = dum_df['t']
X_ = X[X[:,1] == Gfrac,0]
if do_interp:
try:
tck = interpolate.splrep(t,signal)
yfit_ = interpolate.splev(X_,tck)
except Exception as e:
f = interpolate.interp1d(t,signal,kind='linear',fill_value='extrapolate')
yfit_ = f(X_)
else:
yfit_ = signal
yfit_ -= np.mean(yfit_) # simulate AC coupling
yfit_ = I_factor*(np.exp(-cross_section*L*yfit_) - 1) # \Delta T/T = -\sigma*L*\Delta \n
if yfit is None:
yfit = np.asarray(yfit_)
else:
yfit = np.hstack((yfit,yfit_))
if Xfit is None:
Xfit = X
else:
Xfit = np.hstack((Xfit,X))
Xfit = np.asarray(Xfit)
if self.detection_limit is not None:
yfit = yfit + self.detection_limit
return Xfit,yfit
[docs]
def run(self,parameters,X=None,exp_format='trPL'):
"""Run the diode model and calculate the loss function
Parameters
----------
parameters : dict
Dictionary of parameter names and values.
Returns
-------
array-like
Array containing the current values.
"""
parameters_rescaled = self.params_rescale(parameters, self.params)
df = self.run_RateEq(parameters_rescaled)
if df is np.nan:
return np.nan
if X is None:
X = self.X[0]
Xfit, yfit = self.reformat_data(df, X, parameters_rescaled, exp_format)
return yfit
[docs]
def run_Ax(self,parameters):
"""Run the diode model and calculate the loss function
Parameters
----------
parameters : dict
Dictionary of parameter names and values.
Returns
-------
float
Loss function value.
"""
parameters_rescaled = self.params_rescale(parameters, self.params)
# print('Running RateEqAgent with parameters:', parameters_rescaled)
df = self.run_RateEq(parameters_rescaled)
# print('Finished running RateEqAgent with parameters:', parameters_rescaled)
if df is np.nan:
dum_dict = {}
for i in range(len(self.exp_format)):
dum_dict[self.all_agent_metrics[i]] = np.nan
# Add NaN values for tracking metrics
if self.tracking_metric is not None:
for j in range(len(self.tracking_metric)):
dum_dict[self.all_agent_tracking_metrics[j]] = np.nan
return dum_dict
dum_dict = {}
# First loop: calculate main metrics for each exp_format
for i in range(len(self.exp_format)):
Xfit, yfit = self.reformat_data(df, self.X[i], parameters_rescaled, self.exp_format[i])
# Transform both true and predicted values together using the enhanced function
# Pass both X values as well for future extensibility
if self.compare_type == 'linear':
metric_value = calc_metric(
self.y[i],
yfit,
sample_weight=self.weight[i],
metric_name=self.metric[i]
)
else:
y_true_transformed, y_pred_transformed = transform_data(
self.y[i],
yfit,
X=self.X[i],
X_pred=Xfit,
transform_type=self.compare_type
)
# Calculate metric with transformed data
metric_value = calc_metric(
y_true_transformed,
y_pred_transformed,
sample_weight=self.weight[i],
metric_name=self.metric[i]
)
dum_dict[self.all_agent_metrics[i]] = loss_function(metric_value, loss=self.loss[i])
# Second loop: calculate all tracking metrics using tracking_X and tracking_y
if self.tracking_metric is not None:
for j in range(len(self.tracking_metric)):
exp_fmt = self.tracking_exp_format[j]
metric_name = self.tracking_metric[j]
loss_type = self.tracking_loss[j]
Xfit, yfit = self.reformat_data(df, self.tracking_X[j], parameters_rescaled, exp_fmt)
# Transform data once for each format
y_true_transformed, y_pred_transformed = transform_data(
self.tracking_y[j],
yfit,
X=self.tracking_X[j],
X_pred=Xfit,
transform_type=self.compare_type
)
metric_value = calc_metric(
y_true_transformed,
y_pred_transformed,
sample_weight=self.tracking_weight[j],
metric_name=metric_name
)
dum_dict[self.all_agent_tracking_metrics[j]] = loss_function(metric_value, loss=loss_type)
return dum_dict