"""PymooOptimizer module. This module contains the PymooOptimizer class, which is used to optimize a given function using pymoo's genetic algorithms."""
######### Package Imports #########################################################################
import sys, math, copy, os, shutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from functools import partial
from optimpv import *
from optimpv.general.BaseAgent import BaseAgent
from collections import defaultdict
from torch.multiprocessing import Pool
# PyMOO imports
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.algorithms.moo.moead import MOEAD
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.algorithms.soo.nonconvex.de import DE
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize
from pymoo.core.callback import Callback
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import StarmapParallelization
from pymoo.core.population import Population
from pymoo.core.individual import Individual
from logging import Logger
# from ax.utils.common.logger import get_logger, _round_floats_for_logging
from optimpv.general.logger import get_logger, _round_floats_for_logging
logger: Logger = get_logger('pymooOptimizer')
ROUND_FLOATS_IN_LOGS_TO_DECIMAL_PLACES: int = 6
round_floats_for_logging = partial(
_round_floats_for_logging,
decimal_places=ROUND_FLOATS_IN_LOGS_TO_DECIMAL_PLACES,
)
######### Problem Definition #######################################################################
[docs]
class PymooProblem(ElementwiseProblem):
def __init__(self, agents, **kwargs):
self.agents = agents
self.params = agents.params if hasattr(agents, 'params') else agents[0].params
self.n_var = len([p for p in self.params if p.type != 'fixed'])
self.all_metrics = None
self.all_metrics = self.get_all_metrics()
self.n_obj = len(self.all_metrics)
self.n_threads = kwargs.get('n_threads', 1)
# Handle parameter constraints
self.parameter_constraints = kwargs.get('parameter_constraints', None)
n_constr = 0
self.constraint_data = []
if self.parameter_constraints:
self.constraint_data = self.parse_constraints(self.parameter_constraints)
n_constr = len(self.constraint_data)
# Check parameters and set bounds
self.xl, self.xu = self.create_search_space(self.params)
# Configuration for parallel evaluation
self.parallel_agents = kwargs.get('parallel_agents', True)
self.max_parallelism = kwargs.get('max_parallelism', -1)
if self.max_parallelism == -1:
self.max_parallelism = os.cpu_count() - 1
if len(self.agents) == 1: # If there is only one agent, disable parallelism
self.parallel_agents = False
# initialize the thread pool and create the runner
pool = ThreadPool(self.n_threads)
self.elementwise_runner = StarmapParallelization(pool.starmap)
if self.n_threads > 1:
super().__init__(n_var=self.n_var, n_obj=self.n_obj, n_constr=n_constr,
xl=self.xl, xu=self.xu, elementwise_runner=self.elementwise_runner, **kwargs)
else:
super().__init__(n_var=self.n_var, n_obj=self.n_obj, n_constr=n_constr,
xl=self.xl, xu=self.xu, **kwargs)
[docs]
def get_all_metrics(self):
"""Create the objectives for the optimization process."""
append_metrics = False
if self.all_metrics is None:
self.all_metrics = []
append_metrics = True
for agent in self.agents:
for i in range(len(agent.all_agent_metrics)):
self.all_metrics.append(agent.all_agent_metrics[i])
return self.all_metrics
[docs]
def create_search_space(self, params):
"""
Create a search space suitable for scipy.optimize.minimize based on parameters.
Parameters
----------
params : list of Fitparam() objects
List of parameter objects
Returns
-------
tuple
A tuple containing (x0, bounds) where x0 is the initial parameter vector
and bounds is a list of (lower, upper) bound tuples
"""
x0 = []
xl, xu = [], []
self.param_mapping = [] # Store mapping of parameter indices to names for reconstruction
for i, param in enumerate(params):
if param.type == 'fixed':
# Fixed parameters are not included in the optimization
continue
self.param_mapping.append(param.name)
if param.value_type == 'float':
if param.force_log:
# For log-scale parameters, we optimize in log space
x0.append(np.log10(param.value))
xu.append(np.log10(param.bounds[1]))
xl.append(np.log10(param.bounds[0]))
else:
# For regular float parameters
scale_factor = param.fscale if hasattr(param, 'fscale') else 1.0
x0.append(param.value / scale_factor)
xu.append(param.bounds[1] / scale_factor)
xl.append(param.bounds[0] / scale_factor)
# elif param.value_type == 'int':
# # For integer parameters, we optimize in integer space
# # but will round to integers when evaluating
# step_size = param.stepsize if hasattr(param, 'stepsize') else 1
# x0.append(param.value / step_size)
# xu.append(param.bounds[1] / step_size)
# xl.append(param.bounds[0] / step_size)
else:
raise ValueError(f"Unsupported parameter type: {param.value_type} for parameter {param.name}")
return np.asarray(xl), np.asarray(xu)
[docs]
def reconstruct_params(self, x):
"""
Reconstruct a parameter dictionary from an optimization vector.
Parameters
----------
x : array-like
Parameter vector from the optimizer
Returns
-------
dict
Dictionary mapping parameter names to values
"""
param_dict = {}
x_idx = 0
for i, param in enumerate(self.params):
if param.type == 'fixed':
# Include fixed parameters with their fixed values
# param_dict[param.name] = param.value
continue
# Get the optimized value for this parameter
if param.value_type == 'float':
if param.force_log:
# Convert back from log space
param_dict[param.name] = 10 ** x[x_idx]
else:
# Apply scaling factor
scale_factor = param.fscale if hasattr(param, 'fscale') else 1.0
param_dict[param.name] = x[x_idx] * scale_factor
elif param.value_type == 'int':
# Convert back to integer with proper step size
step_size = param.stepsize if hasattr(param, 'stepsize') else 1
param_dict[param.name] = int(round(x[x_idx] * step_size))
x_idx += 1
return param_dict
[docs]
def evaluate_single(self, args):
"""Evaluate a single agent on a parameter point"""
agent_idx, agent, param_dict = args
res = agent.run_Ax(param_dict)
return agent_idx, res
def _evaluate(self, x, out, *args, **kwargs):
"""Evaluate the objective functions for a given parameter vector"""
# Convert parameter vector to parameter dictionary
# param_dict = self.reconstruct_params(x)
param_dict = {}
x_idx = 0
for i, param in enumerate(self.params):
if param.type == 'fixed':
# Include fixed parameters with their fixed values
# param_dict[param.name] = param.value
continue
param_dict[param.name] = x[x_idx] # Initialize with the current value
x_idx += 1
if not self.parallel_agents:
# Sequential evaluation: run each agent sequentially
results = []
for agent in self.agents:
agent_result = agent.run_Ax(param_dict)
results.append(agent_result)
# Merge results from all agents
merged_results = {}
for result in results:
merged_results.update(result)
else:
# Parallel evaluation: run all agents in parallel
agent_param_list = []
for idx, agent in enumerate(self.agents):
agent_param_list.append((idx, agent, param_dict))
# Run all agents in parallel using multiprocessing
with Pool(processes=min(len(agent_param_list), self.max_parallelism)) as pool:
parallel_results = pool.map(self.evaluate_single, agent_param_list)
# Collect and merge results
results_dict = {}
for agent_idx, res in parallel_results:
results_dict[agent_idx] = res
# Merge results from all agents
merged_results = {}
for agent_idx in sorted(results_dict.keys()):
merged_results.update(results_dict[agent_idx])
# Extract objective values in the correct order
objectives = []
for agent in self.agents:
for i in range(len(agent.all_agent_metrics)):
metric_name = agent.all_agent_metrics[i]
if metric_name in merged_results:
value = merged_results[metric_name]
# Apply sign for minimization/maximization
sign = 1 if agent.minimize[i] else -1
objectives.append(sign * value)
else:
# Handle missing metrics with large penalty
objectives.append(1e6)
# Check for NaN values and handle failed evaluations
if any(np.isnan(obj) for obj in objectives):
# Return a large penalty value for failed evaluations
objectives = [1e6] * len(self.all_metrics)
out["F"] = np.array(objectives)
# Evaluate constraints if they exist
if self.n_constr > 0:
constraints = []
for constraint in self.constraint_data:
indices = constraint['indices']
coefficients = constraint['coefficients']
rhs = constraint['rhs']
operator = constraint['operator']
# Calculate constraint value: sum_i (coef[i] * x[idx[i]])
constraint_value = np.sum(coefficients * x[indices])
# Convert BoTorch-style to pymoo-style constraints
if operator == '<=':
# BoTorch: sum_i coef[i]*x[idx[i]] <= rhs
# Pymoo: constraint_value - rhs <= 0
g_val = constraint_value - rhs
elif operator == '>=':
# BoTorch: sum_i coef[i]*x[idx[i]] >= rhs
# Pymoo: rhs - constraint_value <= 0
g_val = rhs - constraint_value
elif operator == '==':
# Equality constraint: |constraint_value - rhs| <= tolerance
# For pymoo, we can use constraint_value - rhs = 0
g_val = constraint_value - rhs
constraints.append(g_val)
out["G"] = np.array(constraints)
[docs]
def parse_constraints(self, constraints):
"""
Parse parameter constraints from string format to numerical format.
Parameters
----------
constraints : list of str
List of constraint strings in format like "x1 + 2*x2 <= 5"
Returns
-------
list of dict
List of constraint dictionaries with 'indices', 'coefficients', 'rhs', 'operator'
"""
constraint_data = []
param_names = [p.name for p in self.params if p.type != 'fixed']
for constraint_str in constraints:
# Parse the constraint string
# Split by comparison operators
if '<=' in constraint_str:
left, right = constraint_str.split('<=')
operator = '<='
elif '>=' in constraint_str:
left, right = constraint_str.split('>=')
operator = '>='
elif '==' in constraint_str:
left, right = constraint_str.split('==')
operator = '=='
else:
raise ValueError(f"Unsupported constraint operator in: {constraint_str}")
# Parse RHS
rhs = float(right.strip())
# Parse LHS to extract coefficients and parameter indices
indices = []
coefficients = []
# Simple parsing for linear constraints
terms = left.replace(' ', '').replace('-', '+-').split('+')
terms = [t for t in terms if t] # Remove empty strings
for term in terms:
if '*' in term:
coef_str, param_name = term.split('*')
coef = float(coef_str) if coef_str not in ['', '+'] else 1.0
if coef_str == '-':
coef = -1.0
else:
# Term is just a parameter name (coefficient is 1)
param_name = term
coef = 1.0
if param_name.startswith('-'):
param_name = param_name[1:]
coef = -1.0
if param_name in param_names:
indices.append(param_names.index(param_name))
coefficients.append(coef)
else:
raise ValueError(f"Parameter {param_name} not found in optimization parameters")
constraint_data.append({
'indices': np.array(indices),
'coefficients': np.array(coefficients),
'rhs': rhs,
'operator': operator
})
return constraint_data
######### Optimizer Definition #######################################################################
[docs]
class PymooOptimizer(BaseAgent):
"""
Initialize the PymooOptimizer class. This class is used to optimize a given function using pymoo algorithms.
Parameters
----------
params : list of Fitparam() objects, optional
List of Fitparam() objects, by default None
agents : list of Agent() objects, optional
List of Agent() objects see optimpv/general/BaseAgent.py for a base class definition, by default None
algorithm : str, optional
Optimization algorithm to use, by default 'NSGA2'
Options: 'NSGA2', 'NSGA3', 'MOEAD', 'GA', 'DE', 'PSO'
pop_size : int, optional
Population size, by default 100
n_gen : int, optional
Number of generations, by default 100
algorithm_kwargs : dict, optional
Additional keyword arguments for the algorithm, by default None
existing_data : DataFrame, optional
existing data to use for the optimization process, by default None
suggest_only : bool, optional
if True, the optimization process will only suggest new points without running the agents, by default False
name : str, optional
Name of the optimization process, by default 'pymoo'
**kwargs : dict
Additional keyword arguments including:
- parallel_agents: bool, whether to run agents in parallel
- max_parallelism: int, maximum number of parallel processes
- verbose_logging: bool, whether to log verbose information
- seed: int, random seed for reproducibility
"""
def __init__(self, params=None, agents=None, algorithm='NSGA2', pop_size=100, n_gen=100,
algorithm_kwargs=None, existing_data = None, suggest_only = False,name='pymoo', **kwargs):
self.params = params
if not isinstance(agents, list):
agents = [agents]
self.agents = agents
# Store algorithm as is (can be string or object)
self.algorithm = algorithm
self.algorithm_name = algorithm if isinstance(algorithm, str) else algorithm.__class__.__name__
self.pop_size = pop_size
self.n_gen = n_gen
self.algorithm_kwargs = algorithm_kwargs if algorithm_kwargs is not None else {}
self.name = name
self.kwargs = kwargs
self.results = None
self.all_metrics = None
self.all_minimize = None
self.all_metrics,self.all_minimize = self.create_metrics_list()
self.all_evaluations = []
self.problem = None
self.existing_data = existing_data
self.suggest_only = suggest_only
# Set defaults from kwargs
self.parallel_agents = kwargs.get('parallel_agents', True)
self.parallel = kwargs.get('parallel', True)
self.max_parallelism = kwargs.get('max_parallelism', os.cpu_count()-1)
self.n_threads = kwargs.get('n_threads', int(self.max_parallelism/len(self.agents)))
self.verbose_logging = kwargs.get('verbose_logging', True)
self.seed = kwargs.get('seed', None)
self.parameter_constraints = self.kwargs.get('parameter_constraints',None)
self.max_attempts = self.kwargs.get('max_attempts', 10) # Maximum attempts for constraint satisfaction
if len(self.agents) == 1:
self.parallel_agents = False
[docs]
def create_problem(self):
"""Create a pymoo problem from agents and parameters."""
self.problem = PymooProblem(
self.agents,
parallel_agents=self.parallel_agents,
max_parallelism=self.max_parallelism,
n_threads=self.n_threads,
parameter_constraints=self.parameter_constraints
)
return self.problem
[docs]
def create_algorithm(self):
"""Create a pymoo algorithm based on the specified algorithm name or return the provided algorithm object."""
# If algorithm is already an object, return it directly
if not isinstance(self.algorithm, str):
return self.algorithm
algorithm_map = {
'NSGA2': NSGA2,
'NSGA3': NSGA3,
'MOEAD': MOEAD,
'GA': GA,
'DE': DE,
'PSO': PSO
}
# If algorithm string is not in map, return None or raise warning
if self.algorithm not in algorithm_map:
if self.verbose_logging:
logger.warning(f"Warning: Algorithm '{self.algorithm}' not found in predefined algorithms. "
f"Available algorithms: {list(algorithm_map.keys())}")
return None
algorithm_class = algorithm_map[self.algorithm]
# Set default parameters
if self.algorithm in ['NSGA2', 'NSGA3', 'MOEAD']:
# Multi-objective algorithms
default_kwargs = {'pop_size': self.pop_size}
else:
# Single-objective algorithms
default_kwargs = {'pop_size': self.pop_size}
# Update with user-provided kwargs
default_kwargs.update(self.algorithm_kwargs)
if self.seed is not None:
default_kwargs['seed'] = self.seed
return algorithm_class(**default_kwargs)
[docs]
def create_callback(self):
"""Create a callback to track optimization progress."""
class OptimizationCallback(Callback):
def __init__(self, optimizer):
super().__init__()
self.optimizer = optimizer
def notify(self, algorithm):
# Store current generation results
if hasattr(algorithm, 'pop') and algorithm.pop is not None:
for ind in algorithm.pop:
param_dict = self.optimizer.problem.reconstruct_params(ind.X)
# Convert objectives back to original values
objectives = {}
obj_idx = 0
for agent in self.optimizer.agents:
for i in range(len(agent.all_agent_metrics)):
# Convert back from minimization format
sign = 1 if agent.minimize[i] else -1
objectives[agent.all_agent_metrics[i]] = sign * ind.F[obj_idx]
obj_idx += 1
self.optimizer.all_evaluations.append({
'params': param_dict.copy(),
'results': objectives.copy()
})
if self.optimizer.verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(
f"Generation {algorithm.n_gen}: Best objective = {algorithm.pop.get('F').min():.6f}"
)
return OptimizationCallback(self)
[docs]
def existing_population(self):
"""Create an existing population from the provided data."""
if self.existing_data is not None:
# Convert the existing data to a DataFrame if it is not already
if not isinstance(self.existing_data, pd.DataFrame):
raise ValueError("existing_data must be a pandas DataFrame")
pnames = [p.name for p in self.params if p.type != 'fixed']
# make numpy array from the pnames in the existing data
existing_params = self.existing_data[pnames].values
objectives = self.existing_data[self.all_metrics].values
for i in range(len(self.all_minimize)):
if not self.all_minimize[i]:
# If the metric is to be maximized, we need to invert the values
objectives[:, i] = -objectives[:, i]
# Create a pymoo Population object from the existing data
individuals = [Individual(X=existing_params[i], F=objectives[i]) for i in range(len(existing_params))]
pop = Population.create(*individuals)
return pop
[docs]
def optimize(self):
"""Run the optimization process using pymoo."""
if self.verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(
f"Starting optimization using {self.algorithm_name} algorithm"
)
logger.info(
f"Population size: {self.pop_size}, Generations: {self.n_gen}"
)
# Create the metrics list
if self.all_metrics is None:
self.all_metrics,self.all_minimize = self.create_metrics_list()
# Create problem and algorithm
problem = self.create_problem()
algorithm = self.create_algorithm()
callback = self.create_callback()
if self.existing_data is not None:
# If existing data is provided, create an initial population from it
initial_pop = self.existing_population()
if self.verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(
f"Using existing population of size {len(initial_pop)}"
)
algorithm.sampling = initial_pop
# add the initial population to self.all_evaluations
for ind in initial_pop:
param_dict = problem.reconstruct_params(ind.X)
objectives = {metric: ind.F[i] for i, metric in enumerate(self.all_metrics)}
self.all_evaluations.append({
'params': param_dict,
'results': objectives
})
# Run the optimization
if self.suggest_only:
# only suggest new points without running the agents
if self.verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(
"Suggesting new points without running agents"
)
# prepare the algorithm to solve the specific problem (same arguments as for the minimize function)
algorithm.setup(problem, termination=('n_gen', 1), seed=self.seed, verbose=False)
valid_candidates = []
max_attempts = self.max_attempts # Use max_attempts from kwargs
attempt = 0
while len(valid_candidates) < self.pop_size and attempt < max_attempts:
# ask the algorithm for the next solution to be evaluated
pop = algorithm.ask()
# Check constraints for each candidate if constraints exist
if problem.n_constr > 0:
for ind in pop:
x = ind.X
# Evaluate constraints
constraint_violations = []
for constraint in problem.constraint_data:
indices = constraint['indices']
coefficients = constraint['coefficients']
rhs = constraint['rhs']
operator = constraint['operator']
# Calculate constraint value
constraint_value = np.sum(coefficients * x[indices])
# Check if constraint is satisfied
if operator == '<=':
violation = constraint_value - rhs
elif operator == '>=':
violation = rhs - constraint_value
elif operator == '==':
violation = abs(constraint_value - rhs)
constraint_violations.append(violation)
# Only add candidate if all constraints are satisfied (violations <= tolerance)
tolerance = 1e-6
if all(violation <= tolerance for violation in constraint_violations):
valid_candidates.append(x)
# Stop if we have enough valid candidates
if len(valid_candidates) >= self.pop_size:
break
else:
# No constraints, all candidates are valid
for ind in pop:
valid_candidates.append(ind.X)
if len(valid_candidates) >= self.pop_size:
break
attempt += 1
if self.verbose_logging and problem.n_constr > 0 and attempt > 1:
logger.info(f"Attempt {attempt}: Found {len(valid_candidates)} valid candidates out of {self.pop_size} requested")
if len(valid_candidates) == 0:
raise ValueError("Could not generate any valid candidates that satisfy constraints")
if len(valid_candidates) < self.pop_size and self.verbose_logging:
logger.warning(f"Only found {len(valid_candidates)} valid candidates out of {self.pop_size} requested")
# Take only the requested number of candidates
valid_candidates = valid_candidates[:self.pop_size]
# convert the valid candidates to a numpy array of parameters
params_array = np.asarray(valid_candidates)
# store the parameters to try next in a DataFrame
params_df = pd.DataFrame(params_array, columns=[p.name for p in self.params if p.type != 'fixed'])
# add the fixed parameters to the DataFrame
for p in self.params:
if p.type == 'fixed':
params_df[p.name] = p.value
return params_df
else:
result = minimize(
problem,
algorithm,
('n_gen', self.n_gen),
callback=callback,
verbose=self.verbose_logging,
seed=self.seed
)
self.results = result
if self.verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(
f"Optimization completed after {result.algorithm.n_gen} generations"
)
logger.info(
f"Number of function evaluations: {result.algorithm.evaluator.n_eval}"
)
if len(self.all_metrics) == 1:
logger.info(
f"Best objective value: {result.F.min():.6f}"
)
else:
logger.info(
f"Final population size: {len(result.F)}"
)
logger.info(
f"Pareto front size: {len(result.F)}"
)
return result
[docs]
def get_best_solution(self, balance_objectives=True):
"""
Get the best solution from the optimization results.
Parameters
----------
balance_objectives : bool, optional
If True and multiple objectives exist, return the solution with best balance.
If False, return the first solution from the Pareto front.
Returns
-------
dict
Dictionary containing the best parameters
"""
if self.results is None:
raise ValueError("No optimization results available. Run optimize() first.")
if len(self.all_metrics) == 1 or not balance_objectives:
# Single objective or just take first Pareto solution
best_params = self.problem.reconstruct_params(self.results.X)
else:
# Multi-objective: find best balance using ranking
if not self.all_evaluations:
raise ValueError("No evaluations recorded. Cannot find best balance.")
# Use the same logic as other optimizers
_, best_params = self.update_params_with_best_balance(return_best_balance=True)
return best_params
[docs]
def update_params_with_best_balance(self, return_best_balance=False):
"""Update the parameters with the best balance of all metrics.
If multiple objectives exist, find the best balance based on ranks.
Parameters
----------
return_best_balance : bool, optional
If True, return the index and parameters of the best balance, by default False
Returns
-------
tuple
If return_best_balance is True, return the index and parameters of the best balance.
Otherwise, return None.
Raises
------
ValueError
If no evaluations have been performed or no metrics are available.
"""
if not self.all_evaluations:
raise ValueError("No evaluations have been performed.")
if len(self.all_metrics) == 0:
raise ValueError("We need at least one metric to update the parameters")
# If we have one objective, just use the best result
if len(self.all_metrics) == 1:
minimizes_ = []
for agent in self.agents:
for i in range(len(agent.minimize)):
minimizes_.append(agent.minimize[i])
best_idx = np.argmin([eval_result['results'][self.all_metrics[0]]
for eval_result in self.all_evaluations])
best_params = self.all_evaluations[best_idx]['params']
# Update parameters
for p in self.params:
if p.name in best_params:
p.value = best_params[p.name]
if return_best_balance:
return best_idx, best_params
else:
# Multiple objectives: find the best balance
metrics_values = []
for metric in self.all_metrics:
metric_vals = [eval_result['results'][metric]
for eval_result in self.all_evaluations]
metrics_values.append(metric_vals)
# Determine minimize/maximize for each metric
minimizes = []
for agent in self.agents:
for i in range(len(agent.minimize)):
minimizes.append(agent.minimize[i])
# Calculate ranks for each metric
ranks = []
for i in range(len(self.all_metrics)):
vals = np.array(metrics_values[i])
rank = np.argsort(np.argsort(vals ))
ranks.append(rank)
# Find the best balance (lowest sum of ranks)
sum_ranks = np.sum(ranks, axis=0)
best_idx = np.argmin(sum_ranks)
best_params = self.all_evaluations[best_idx]['params']
# Update parameters
for p in self.params:
if p.name in best_params:
p.value = best_params[p.name]
if return_best_balance:
return best_idx, best_params
[docs]
def get_pareto_front(self):
"""
Get the Pareto front solutions for multi-objective optimization.
Returns
-------
tuple
Tuple containing (parameters_list, objectives_array)
"""
if self.results is None:
raise ValueError("No optimization results available. Run optimize() first.")
if len(self.all_metrics) == 1:
# Single objective: return best solution
best_idx = np.argmin(self.results.F)
best_params = self.problem.reconstruct_params(self.results.X[best_idx])
return [best_params], self.results.F[best_idx:best_idx+1]
# Multi-objective: return all Pareto front solutions
pareto_params = []
for x in self.results.X:
params = self.problem.reconstruct_params(x)
pareto_params.append(params)
return pareto_params, self.results.F
[docs]
def plot_convergence(self, save_path=None, **kwargs):
"""
Plot the convergence history of the optimization.
Parameters
----------
save_path : str, optional
Path to save the plot, by default None
**kwargs : dict
Additional keyword arguments for the plot.
- xscale : str, optional
Scale for the x-axis, by default 'linear'
- yscale : str, optional
Scale for the y-axis, by default 'linear'
"""
if not self.all_evaluations:
raise ValueError("No evaluations recorded. Cannot plot convergence.")
xscale = kwargs.get('xscale', 'linear')
yscale = kwargs.get('yscale', 'linear')
# Group evaluations by generation (assuming they are stored in order)
n_evals_per_gen = len(self.all_evaluations) // self.n_gen
generations = []
best_values = []
for gen in range(self.n_gen):
start_idx = gen * n_evals_per_gen
end_idx = (gen + 1) * n_evals_per_gen
gen_evals = self.all_evaluations[start_idx:end_idx]
if len(self.all_metrics) == 1:
# Single objective: track best value
metric = self.all_metrics[0]
gen_values = [eval_result['results'][metric] for eval_result in gen_evals]
best_values.append(min(gen_values))
else:
# Multi-objective: track hypervolume or average
metric_values = []
for metric in self.all_metrics:
metric_vals = [eval_result['results'][metric] for eval_result in gen_evals]
metric_values.append(np.mean(metric_vals))
best_values.append(np.mean(metric_values))
generations.append(gen)
plt.figure(figsize=(10, 6))
plt.plot(generations, best_values, 'b-', linewidth=2)
plt.xlabel('Generation')
plt.ylabel('Best Objective Value' if len(self.all_metrics) == 1 else 'Average Objective Value')
plt.title(f'{self.algorithm_name} Convergence History')
plt.grid(True, alpha=0.3)
plt.xscale(xscale)
plt.yscale(yscale)
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
[docs]
def get_df_from_pymoo(self):
"""Convert the optimization results to a pandas DataFrame.
Returns
-------
pd.DataFrame
DataFrame containing the optimization results
"""
resall = self.all_evaluations
dum_dic = {}
for key in resall[0]['params'].keys():
dum_dic[key] = []
for key in resall[0]['results'].keys():
dum_dic[key] = []
for i in range(len(resall)):
for key in resall[i]['params'].keys():
dum_dic[key].append(resall[i]['params'][key])
for key in resall[i]['results'].keys():
dum_dic[key].append(resall[i]['results'][key])
# remove the minus from the metrics that are maximized
for i in range(len(self.all_metrics)):
if not self.all_minimize[i]:
# If the metric is to be maximized, we need to invert the values
dum_dic[self.all_metrics[i]] = [-x for x in dum_dic[self.all_metrics[i]]]
df = pd.DataFrame(dum_dic)
return df