Source code for optimpv.posterior.lazy_posterior

"""Module containing classes and functions for posterior analysis of parameters using the ML models from the BO optimization.
This module provides functionality to visualize the posterior distributions of parameters
using various plots, including 1D and 2D posteriors, devil's plots, and density plots."""

######### Package Imports #########################################################################

import copy, itertools, scipy
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from optimpv.general.BaseAgent import BaseAgent
from scipy.stats import gaussian_kde

######### Function Definitions ####################################################################

[docs] class LazyPosterior(BaseAgent): def __init__(self, params, df, outcome_name, best_params = None, is_nat_scale = False, maximize=True, **kwargs): """ LazyPosterior class object to visualize the approximate posterior. Parameters ---------- params : list of Fitparam() objects list of Fitparam() objects defining the parameters to optimize df : pd.DataFrame DataFrame containing the data used for training the surrogate model outcome_name : str Name of the outcome variable in the DataFrame model : BotorchModel, optional Surrogate model used for the approximate posterior, by default None best_params : dict, optional Dictionary of best parameter values, by default None is_nat_scale : bool, optional Indicates if the parameters are in natural scale, by default False maximize : bool, optional Indicates if the outcome is to be maximized, by default True """ self.params = params params_names = [param.name for param in params if not param.type == 'fixed'] params_names_to_index = {name: idx for idx, name in enumerate(params_names)} self.params_names_to_index = params_names_to_index self.outcome_name = outcome_name self.kwargs = kwargs self.name_free_params = [p.name for p in self.params if not p.type == 'fixed'] self.num_free_params = len(self.name_free_params) self.param_by_name = {p.name: p for p in self.params} if best_params is None: best_idx = df[outcome_name].idxmax() best_params = df.loc[best_idx][params_names].to_dict() # get the dataframe in the rescaled space and keep the true dataframe, df and best_params are always rescaled and will be used for training the surrogate model if is_nat_scale: self.true_df = df self.df = self.descale_dataframe(copy.deepcopy(df), params) self.true_best_params = best_params self.best_params = self.params_descale(best_params, params) else: self.true_df = self.rescale_dataframe(copy.deepcopy(df), params) self.df = df self.true_best_params = self.params_rescale(best_params, params) self.best_params = best_params # add the fixed parameters to the true_best_params and true_df for p in self.params: if p.type == 'fixed': self.true_best_params[p.name] = p.value self.true_df[p.name] = p.value*len(self.true_df) self.maximize = maximize self.ascending = not maximize
[docs] def plot_lazyposterior_2D_kde(self, x_param_name, y_param_name, ax = None, **kwargs): """ Plot the 2D posterior KDE for two parameters. Parameters ---------- x_param_name : str Name of the first parameter y_param_name : str Name of the second parameter ax : matplotlib.axes.Axes, optional Matplotlib Axes object to plot on. If None, a new figure and axes are created. **kwargs : dict Additional keyword arguments passed to seaborn.kdeplot: - levels: int, optional, default=10 Number of contour levels to draw. - cmap: str or Colormap, optional, default='viridis' Colormap to use for the plot. - alpha: float, optional, default=0.7 Transparency level for the filled contours. - title: str, optional Title of the plot. - xlabel: str, optional Label for the x-axis. """ if ax is None: fig, ax = plt.subplots(figsize=(8,6)) for p in self.params: if p.name == x_param_name: x_param = p.full_name xlim = p.bounds xlog = p.log_scale or p.force_log if p.name == y_param_name: y_param = p.full_name ylim = p.bounds ylog = p.log_scale or p.force_log x = self.true_df[x_param_name].values y = self.true_df[y_param_name].values sns.kdeplot( x=x, y=y, fill=True, ax=ax, levels=kwargs.get('levels', 10), cmap=kwargs.get('cmap', 'viridis'), alpha=kwargs.get('alpha', 0.7), log_scale=(xlog, ylog) ) ax.set_xlabel(kwargs.get('xlabel', x_param)) ax.set_ylabel(kwargs.get('ylabel', y_param)) if xlog: ax.set_xscale(kwargs.get('xscale', 'log')) if ylog: ax.set_yscale(kwargs.get('yscale', 'log')) if xlim is not None: ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) ax.set_title(kwargs.get('title', f'2D KDE Posterior of {x_param_name} and {y_param_name}')) plt.tight_layout() return ax
[docs] def plot_lazyposterior_1D_kde(self, param_name, ax = None, **kwargs): """ Plot the 1D posterior KDE for a parameter. Parameters ---------- param_name : str Name of the parameter ax : matplotlib.axes.Axes, optional Matplotlib Axes object to plot on. If None, a new figure and axes are created. **kwargs : dict Additional keyword arguments passed to seaborn.kdeplot: - color: str, optional, default='blue' Color of the KDE line. - fill: bool, optional, default=True Whether to fill the area under the KDE curve. - alpha: float, optional, default=0.5 Transparency level for the filled area. - title: str, optional Title of the plot. - xlabel: str, optional Label for the x-axis. """ if ax is None: fig, ax = plt.subplots(figsize=(8,6)) for p in self.params: if p.name == param_name: param_full_name = p.full_name param_bounds = p.bounds param_log = p.log_scale or p.force_log # darkest viridis color x = self.true_df[param_name].values sns.kdeplot( x=x, ax=ax, color=kwargs.get('color', 'C0'), fill=kwargs.get('fill', False), alpha=kwargs.get('alpha', 0.5), log_scale=param_log ) ax.set_xlabel(kwargs.get('xlabel', param_full_name)) ax.set_ylabel(kwargs.get('ylabel', 'Density')) if param_log: ax.set_xscale(kwargs.get('xscale', 'log')) if param_bounds is not None: ax.set_xlim(param_bounds) ax.set_title(kwargs.get('title', f'1D KDE Posterior of {param_name}')) # add all metrics lines if requested if kwargs.get('show_metrics', True): mean, median, hdi_lower, hdi_upper, max_value = self.compute_metrics(param_name, 'all', credibility_level=0.95) ax.axvline(mean, color='C4', linestyle=':', label='Mean') ax.axvline(median, color='C2', linestyle='-.', label='Median') ax.hlines(y=0.05, xmin=hdi_lower, xmax=hdi_upper, color='k', linestyle='-', linewidth=2, label='95% HDI') ax.axvline(max_value, color='C3', linestyle='--', label='Max Value') if kwargs.get('show_top_n', None) is not None: top_n = kwargs.get('show_top_n') top_n_params_list = self.get_top_n_best_params(num=top_n) # get key positions of param_name in self.params_by_name key list pos = self.name_free_params.index(param_name) # get maximum density value for scaling vertical lines kde = self._compute_1D_kde(param_name, log_scale=param_log) x_min, x_max = self.true_df[param_name].min(), self.true_df[param_name].max() if param_log: x_points = np.logspace(np.log10(x_min), np.log10(x_max), 1000) else: x_points = np.linspace(x_min, x_max, 1000) kde_values = kde(x_points) max_density = np.max(kde_values) for i, params in enumerate(top_n_params_list): p_val = params[pos].value # heith of the line need to go from the bottom axis to 0.05*max_density ax.vlines(p_val, ymin=0, ymax=0.05*max_density, color='gray', linestyle='-', alpha=0.7,linewidth=1, label=f'Top {top_n}' if i==0 else None) if kwargs.get('show_legend', True): ax.legend(loc='best', ncol=2) plt.tight_layout() return ax
[docs] def plot_all_lazyposterior(self, fig=None, axes=None, **kwargs): """ Plot the 1D and 2D posterior KDEs for all free parameters. Parameters ---------- fig : matplotlib.figure.Figure, optional Matplotlib Figure object to plot on. If None, a new figure is created. axes : numpy.ndarray of matplotlib.axes.Axes, optional Array of Matplotlib Axes objects to plot on. If None, new axes are created. **kwargs : dict Additional keyword arguments passed to seaborn.kdeplot. Returns ------- fig : matplotlib.figure.Figure The figure object containing the plots. axes : numpy.ndarray of matplotlib.axes.Axes The axes object containing the plots. """ num_params = self.num_free_params fig, axes = plt.subplots(num_params, num_params, figsize=kwargs.get('figsize', (4*num_params, 4*num_params))) param_names = [p.name for p in self.params if not p.type == 'fixed'] # build quick lookup from name -> param object for bounds/display param_by_name = {p.name: p for p in self.params} for row_idx, col_idx in itertools.product(range(num_params), range(num_params)): x_name = param_names[col_idx] # column -> x-axis param y_name = param_names[row_idx] # row -> y-axis param ax = axes[row_idx, col_idx] # Diagonal: 1D KDE if row_idx == col_idx: ax = sns.kdeplot( x=self.true_df[x_name].values, ax=ax, color=kwargs.get('color', 'C0'), fill=kwargs.get('fill', False), alpha=kwargs.get('alpha', 0.5), log_scale=param_by_name[x_name].log_scale or param_by_name[x_name].force_log ) ax.set_xlim(param_by_name[x_name].bounds[0], param_by_name[x_name].bounds[1]) p = param_by_name[x_name] # y-labels: left column shows ylabel, other columns show right ticks/empty if col_idx == 0: ax.set_ylabel(kwargs.get('ylabel_diag','Density')) else: ax.yaxis.tick_right() ax.yaxis.set_label_position("right") ax.set_ylabel('') # x-label only on bottom row if row_idx == num_params - 1: ax.set_xlabel(f"{p.full_name}") else: ax.set_xticklabels([]) # Off-diagonal: 2D KDE elif row_idx > col_idx: xlog = param_by_name[x_name].log_scale or param_by_name[x_name].force_log ylog = param_by_name[y_name].log_scale or param_by_name[y_name].force_log sns.kdeplot( x=self.true_df[x_name].values, y=self.true_df[y_name].values, fill=True, ax=ax, levels=kwargs.get('levels', 10), cmap=kwargs.get('cmap', 'viridis'), alpha=kwargs.get('alpha', 0.7), log_scale=(xlog, ylog) ) # set axis bounds from parameter definitions px = param_by_name[x_name] py = param_by_name[y_name] ax.set_xlim(px.bounds[0], px.bounds[1]) ax.set_ylim(py.bounds[0], py.bounds[1]) # log scales if requested if xlog: ax.set_xscale(kwargs.get('xscale', 'log')) if ylog: ax.set_yscale(kwargs.get('yscale', 'log')) # x-label only for bottom row (same x label for that column) if row_idx == num_params - 1: ax.set_xlabel(f"{px.display_name} [{px.unit}]") else: ax.set_xticklabels([]) # y-label only for first column (same y label for that row) if col_idx == 0: ax.set_ylabel(f"{py.display_name} [{py.unit}]") else: ax.set_yticklabels([]) else: ax.axis('off') plt.tight_layout() return fig, axes
def _compute_1D_kde(self, param_name, log_scale=False, **kwargs): """ Compute the 1D KDE for a parameter. Parameters ---------- param_name : str Name of the parameter log_scale : bool, optional Whether to compute the KDE on a log scale, by default False Returns ------- kde : scipy.stats.gaussian_kde The computed KDE object. Raises ------ ValueError if any values are <= 0 when log_scale is True """ x = self.true_df[param_name].values if log_scale: if np.any(x <= 0): raise ValueError("All values must be > 0 for log-scale KDE.") y = np.log10(x) kde_log = gaussian_kde(y, **kwargs) # Wrapper to transform back to original scale def kde(x_orig): x_log = np.log10(x_orig) return kde_log(x_log) # Adjust for change of variables return kde else: kde = gaussian_kde(x, **kwargs) return kde
[docs] def compute_high_density_interval(self, param_name, credibility_level=0.95, num_points=1000): """ Compute the highest density interval (HDI) for a parameter. Parameters ---------- param_name : str Name of the parameter credibility_level : float, optional Credibility level for the HDI, by default 0.95 num_points : int, optional Number of points to evaluate the KDE, by default 1000 Returns ------- hdi_lower : float Lower bound of the HDI hdi_upper : float Upper bound of the HDI """ # Get parameter object to check for log scale param = next(p for p in self.params if p.name == param_name) log_scale = param.log_scale or param.force_log kde = self._compute_1D_kde(param_name, log_scale=log_scale) # Generate points over the parameter range x_min, x_max = self.true_df[param_name].min(), self.true_df[param_name].max() x_points = np.linspace(x_min, x_max, num_points) if log_scale: x_points = np.logspace(np.log10(x_min), np.log10(x_max), num_points) # Evaluate KDE at these points kde_values = kde(x_points) # Sort points by density sorted_indices = np.argsort(kde_values)[::-1] sorted_x = x_points[sorted_indices] sorted_kde = kde_values[sorted_indices] # Compute cumulative density cumulative_density = np.cumsum(sorted_kde) cumulative_density /= cumulative_density[-1] # Normalize to 1 # Find HDI bounds indices_in_hdi = sorted_indices[cumulative_density <= credibility_level] hdi_x_values = x_points[indices_in_hdi] hdi_lower = np.min(hdi_x_values) hdi_upper = np.max(hdi_x_values) return hdi_lower, hdi_upper
[docs] def compute_metrics(self, param_name, metric_name, credibility_level=0.95, num_points=100): """ Compute the mean, median, and HDI for a parameter. Parameters ---------- param_name : str Name of the parameter metric_name : str Metric to compute: 'mean', 'median', 'hdi', 'max', or 'all' credibility_level : float, optional Credibility level for the HDI, by default 0.95 num_points : int, optional Number of points to evaluate the KDE, by default 100 Returns ------- mean : float Mean of the parameter distribution median : float Median of the parameter distribution hdi_lower : float Lower bound of the HDI hdi_upper : float Upper bound of the HDI max_value : float value at the maximum of the parameter distribution """ # Get parameter object to check for log scale param = next(p for p in self.params if p.name == param_name) log_scale = param.log_scale or param.force_log kde = self._compute_1D_kde(param_name, log_scale=log_scale) # Generate points over the parameter range bounds = self.params[self.params_names_to_index[param_name]].bounds x_points = np.linspace(bounds[0], bounds[1], num_points) if log_scale: x_points = np.logspace(np.log10(bounds[0]), np.log10(bounds[1]), num_points) # Evaluate KDE at these points kde_values = kde(x_points) if metric_name == 'mean': if log_scale: log_x_points = np.log10(x_points) mean_log = np.sum(log_x_points * kde_values) / np.sum(kde_values) mean = 10**mean_log else: mean = np.sum(x_points * kde_values) / np.sum(kde_values) return mean elif metric_name == 'median': cdf = np.cumsum(kde_values) cdf /= cdf[-1] # Normalize CDF to 1 median = np.interp(0.5, cdf, x_points) return median elif metric_name == 'hdi': if credibility_level <= 0 or credibility_level >= 1: raise ValueError("credibility_level must be between 0 and 1.") hdi_lower, hdi_upper = self.compute_high_density_interval(param_name, credibility_level, num_points) return hdi_lower, hdi_upper elif metric_name == 'max': max_idx = np.argmax(kde_values) max_value = x_points[max_idx] return max_value elif metric_name == 'all': mean = np.sum(x_points * kde_values) / np.sum(kde_values) cdf = np.cumsum(kde_values) cdf /= cdf[-1] # Normalize CDF to 1 median = median = np.interp(0.5, cdf, x_points) hdi_lower, hdi_upper = self.compute_high_density_interval(param_name, credibility_level, num_points) max_idx = np.argmax(kde_values) max_value = x_points[max_idx] return mean, median, hdi_lower, hdi_upper, max_value else: raise ValueError("metric_name must be 'mean', 'median', 'hdi', 'max', or 'all'.")
[docs] def get_median_params(self): """ Get the median parameter values for all free parameters.""" median_params = copy.deepcopy(self.params) for p in median_params: median_value = self.compute_metrics(p.name, 'median') p.value = median_value return median_params
[docs] def get_max_params(self): """ Get the parameter values at the maximum of the posterior for all free parameters.""" max_params = copy.deepcopy(self.params) for p in max_params: max_value = self.compute_metrics(p.name, 'max') p.value = max_value return max_params
[docs] def get_mean_params(self): """ Get the mean parameter values for all free parameters.""" mean_params = copy.deepcopy(self.params) for p in mean_params: mean_value = self.compute_metrics(p.name, 'mean') p.value = mean_value return mean_params
[docs] def get_top_n_best_params(self, num = 10): """ Make x params list with the top x best parameters from the true dataframe.""" x_best_params_list = [] for i in range(num): x_best_params = copy.deepcopy(self.params) sorted_df = self.true_df.sort_values(by=self.outcome_name, ascending=self.ascending).reset_index() dum_params = copy.deepcopy(self.params) for p in dum_params: p.value = sorted_df.loc[i][p.name] x_best_params_list.append(dum_params) return x_best_params_list
[docs] def get_top_n_metrics_params(self, metric_name, num = 10): """ Get the parameter values computed from the top n best parameters from the true dataframe. Parameters ---------- metric_name : str Metric to compute from the top n best parameters: 'mean', 'median', 'max', or 'min' num : int, optional Number of top best parameters to consider, by default 10 Returns ------- list List of parameter objects with values computed from the specified metric of the top n best parameters. Raises ------ ValueError if metric_name is not 'mean', 'median', 'max', or 'min' """ x_best_params_list = self.get_top_n_best_params(num = num) dum_params = copy.deepcopy(self.params) params_list_dic = {} for p in self.params: dum_params_values = [] for params in x_best_params_list: for param in params: if param.name == p.name: dum_params_values.append(param.value) params_list_dic[p.name] = dum_params_values for p in dum_params: if metric_name == 'mean': metric_value = np.mean(params_list_dic[p.name]) elif metric_name == 'median': metric_value = np.median(params_list_dic[p.name]) elif metric_name == 'max': metric_value = np.max(params_list_dic[p.name]) elif metric_name == 'min': metric_value = np.min(params_list_dic[p.name]) else: raise ValueError("metric_name must be 'mean', 'median', 'max', or 'min'.") p.value = metric_value return dum_params
[docs] def get_best_params(self): """ Get the best parameter values from the true dataframe.""" best_params = copy.deepcopy(self.params) for p in best_params: p.value = self.true_best_params[p.name] return best_params