"""axBOtorchOptimizer module. This module contains the axBOtorchOptimizer class. The class is used to run the bayesian optimization process using the Ax library."""
######### Package Imports #########################################################################
from dataclasses import dataclass
import torch
from botorch.acquisition import qExpectedImprovement, qLogExpectedImprovement
from botorch.exceptions import BadInitialCandidatesWarning
from botorch.fit import fit_gpytorch_mll
from botorch.generation import MaxPosteriorSampling
from botorch.models import SingleTaskGP
from botorch.optim import optimize_acqf
from botorch.test_functions import Ackley
from botorch.utils.transforms import unnormalize, normalize
from torch.quasirandom import SobolEngine
import gpytorch
from gpytorch.constraints import Interval
from gpytorch.kernels import MaternKernel, ScaleKernel
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition.objective import ScalarizedPosteriorTransform
import math, copy
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from functools import partial
from optimpv import *
from optimpv.axBOtorch.axUtils import *
# from optimpv.axBOtorch.axSchedulerUtils import * # removed for now
import ax, os, shutil
from ax import *
# from ax.service.ax_client import AxClient
from ax.generation_strategy.generation_strategy import GenerationStep, GenerationStrategy
# from ax import Models
from ax.generation_strategy.center_generation_node import CenterGenerationNode
from ax.generation_strategy.transition_criterion import MinTrials
from ax.generation_strategy.generation_strategy import GenerationStrategy
from ax.generation_strategy.generation_node import GenerationNode
from ax.generation_strategy.generator_spec import GeneratorSpec
from ax.adapter.factory import Generators
from ax.service.ax_client import AxClient, ObjectiveProperties
# from ax.service.scheduler import Scheduler, SchedulerOptions, TrialType
from ax.service.orchestrator import (
# get_fitted_adapter,
Orchestrator,
OrchestratorOptions,
)
from botorch.acquisition.logei import qLogNoisyExpectedImprovement
from ax.adapter.transforms.standardize_y import StandardizeY
from ax.adapter.transforms.unit_x import UnitX
from ax.adapter.transforms.remove_fixed import RemoveFixed
from ax.adapter.transforms.log import Log
from ax.generators.torch.botorch_modular.utils import ModelConfig
from ax.generators.torch.botorch_modular.surrogate import SurrogateSpec
from gpytorch.kernels import MaternKernel
from gpytorch.kernels import ScaleKernel
from botorch.models import SingleTaskGP
from ax.service.utils.orchestrator_options import OrchestratorOptions, TrialType
from ax.api.protocols.metric import IMetric
from collections import defaultdict
from torch.multiprocessing import Pool, set_start_method
# from multiprocessing import Pool, set_start_method
# try: # needed for multiprocessing when using pytorch
set_start_method('spawn',force=True)
# except RuntimeError:
# print("spawn method already set")
# pass
import logging
from logging import Logger
from ax.utils.common.logger import set_ax_logger_levels
# WARN is the next highest log level after INFO
set_ax_logger_levels(logging.WARN)
# from ax.utils.common.logger import get_logger, _round_floats_for_logging
from optimpv.general.logger import get_logger, _round_floats_for_logging
logger: Logger = get_logger('axBOtorchOptimizer')
ROUND_FLOATS_IN_LOGS_TO_DECIMAL_PLACES: int = 6
round_floats_for_logging = partial(
_round_floats_for_logging,
decimal_places=ROUND_FLOATS_IN_LOGS_TO_DECIMAL_PLACES,
)
from ax.api.client import Client
from ax.api.configs import RangeParameterConfig, ChoiceParameterConfig
from optimpv.general.BaseAgent import BaseAgent
from optimpv.posterior.posterior import get_df_from_ax
######### Optimizer Definition #######################################################################
[docs]
class axBOtorchOptimizer(BaseAgent):
"""Initialize the axBOtorchOptimizer class. The class is used to run the optimization process using the Ax library.
Parameters
----------
params : list of Fitparam() objects, optional
List of Fitparam() objects, by default None
agents : list of Agent() objects, optional
List of Agent() objects see optimpv/general/BaseAgent.py for a base class definition, by default None
models : list, optional
list of models to use for the optimization process, by default ['SOBOL','BOTORCH_MODULAR']
n_batches : list, optional
list of the number of batches for each model, by default [1,10]
batch_size : list, optional
list of the batch sizes for each model, by default [10,2]
ax_client : AxClient, optional
AxClient object, by default None
max_parallelism : int, optional
maximum number of parallel processes to run, by default 10
model_kwargs_list : dict, optional
dictionary of model kwargs for each model, by default None
model_gen_kwargs_list : dict, optional
dictionary of model generation kwargs for each model, by default None
existing_data : DataFrame, optional
existing data to use for the optimization process, by default None
suggest_only : bool, optional
if True, the optimization process will only suggest new points without running the agents, by default False
name : str, optional
name of the optimization process, by default 'ax_opti'
Raises
------
ValueError
raised if the number of batches and the number of models are not the same
ValueError
raised if the model is not a string or a Models enum
ValueError
raised if the model_kwargs_list and models do not have the same length
ValueError
raised if the model_gen_kwargs_list and models do not have the same length
"""
def __init__(self, params = None, agents = None, models = ['SOBOL','BOTORCH_MODULAR'],n_batches = [1,10], batch_size = [10,2], ax_client = None, max_parallelism = -1,model_kwargs_list = None, model_gen_kwargs_list = None, existing_data = None, suggest_only = False, name = 'ax_opti', **kwargs):
self.params = params
if not isinstance(agents, list):
agents = [agents]
self.agents = agents
for agent in self.agents: # make sure that the agents have the same params as the optimizer
agent.params = self.params
self.models = models
self.n_batches = n_batches
self.batch_size = batch_size
self.all_metrics = None
self.all_metrics,self.all_minimize = self.create_metrics_list()
self.all_tracking_metrics = None
self.ax_client = ax_client
self.max_parallelism = max_parallelism
if max_parallelism == -1:
self.max_parallelism = os.cpu_count()-1
if model_kwargs_list is None:
model_kwargs_list = [{}]*len(models)
elif isinstance(model_kwargs_list,dict):
model_kwargs_list = [model_kwargs_list]*len(models)
elif len(model_kwargs_list) != len(models):
raise ValueError('model_kwargs_list must have the same length as models')
self.model_kwargs_list = model_kwargs_list
if model_gen_kwargs_list is None:
model_gen_kwargs_list = [{}]*len(models)
elif isinstance(model_gen_kwargs_list,dict):
model_gen_kwargs_list = [model_gen_kwargs_list]*len(models)
elif len(model_gen_kwargs_list) != len(models):
raise ValueError('model_gen_kwargs_list must have the same length as models')
self.model_gen_kwargs_list = model_gen_kwargs_list
self.name = name
self.kwargs = kwargs
self.existing_data = existing_data
self.suggest_only = suggest_only
if len(n_batches) != len(models):
raise ValueError('n_batches and models must have the same length')
if type(batch_size) == int:
self.batch_size = [batch_size]*len(models)
if len(batch_size) != len(models):
raise ValueError('batch_size and models must have the same length')
[docs]
def create_generation_strategy(self):
""" Create a generation strategy for the optimization process using the models and the number of batches and batch sizes. See ax documentation for more details: https://ax.dev/tutorials/generation_strategy.html
Returns
-------
GenerationStrategy
The generation strategy for the optimization process
Raises
------
ValueError
If the model is not a string or a Models enum
"""
nodes_list = []
# get all names
names = []
Gen_strat_name = ""
generators = []
for i, model in enumerate(self.models):
if type(model) == str and model.lower() == 'center':
node_name = 'Center'
Gen_strat_name += 'Center+'
generators.append(node_name)
names.append(node_name)
continue
if type(model) == str:
node_name = model
model = Generators[model]
generators.append(model)
elif isinstance(model, Generators):
model = model
node_name = model.__name__
generators.append(model)
else:
raise ValueError('Model must be a string or a Models enum')
Gen_strat_name += node_name + '+'
names.append(node_name)
# remove the last +
Gen_strat_name = Gen_strat_name[:-1]
# Create the generator spec
for i, model in enumerate(generators):
# Get the next node name
if names[i].lower() == 'center':
# Center node is a customized node that uses a simplified logic and has a
# built-in transition criteria that transitions after generating once.
nodes_list.append(CenterGenerationNode(next_node_name=names[i+1]))
continue
# Create the generator spec
generator_spec = GeneratorSpec(
generator_enum=model,
model_kwargs=self.model_kwargs_list[i],
# We can specify various options for the optimizer here.
model_gen_kwargs = self.model_gen_kwargs_list[i],
)
# Create the generation node
if i < len(self.models)-1:
node = GenerationNode(
node_name=names[i],
generator_specs=[generator_spec],
transition_criteria=[
MinTrials(threshold=self.n_batches[i]*self.batch_size[i],
transition_to=names[i+1],
)
],
)
else:
node = GenerationNode(
node_name=names[i],
generator_specs=[generator_spec],
)
nodes_list.append(node)
# Create the generation strategy
return GenerationStrategy(
name=Gen_strat_name,
nodes=nodes_list
)
[docs]
def get_tracking_metrics(self, agents):
""" Extract tracking metrics from agents
Parameters
----------
agents : list
List of Agent objects
Returns
-------
list
List of tracking metric names formatted with agent name prefix
"""
tracking_metrics = []
for agent in agents:
if hasattr(agent, 'all_agent_tracking_metrics') :
if agent.all_agent_tracking_metrics is not None:
for metric in agent.all_agent_tracking_metrics:
tracking_metrics.append(metric)
return tracking_metrics
[docs]
def create_objectives(self):
""" Create the objectives for the optimization process. The objectives are the metrics of the agents. The objectives are created using the metric, minimize and threshold attributes of the agents. If the agent has an exp_format attribute, it is used to create the objectives.
Returns
-------
dict
A dictionary of the objectives for the optimization process
"""
append_metrics = False
if self.all_metrics is None:
self.all_metrics = []
append_metrics = True
objectives = ""
# objectives = {}
for agent in self.agents:
for i in range(len(agent.all_agent_metrics)):
if agent.minimize[i]:
objectives += "-"+agent.all_agent_metrics[i] + ","
if append_metrics:
self.all_metrics.append(agent.all_agent_metrics[i])
else:
objectives += agent.all_agent_metrics[i] + ","
if append_metrics:
self.all_metrics.append(agent.all_agent_metrics[i])
# remove the last comma
objectives = objectives[:-1]
return objectives
[docs]
def attach_existing_data(self):
""" Attach existing data to the Ax client
"""
if self.existing_data is not None:
# Convert the existing data to a DataFrame if it is not already
if not isinstance(self.existing_data, pd.DataFrame):
raise ValueError("existing_data must be a pandas DataFrame")
# rescale the existing data to match what is expected by the Ax client
copy_data = self.descale_dataframe(copy.deepcopy(self.existing_data), self.params)
for index, row in copy_data.iterrows():
# Create a parameterization dictionary
# parameterization = {p.name: copy_data[p.name].iloc[index] for p in self.params if p.type != 'fixed'}
parameterization = {}
for p in self.params:
if p.type != 'fixed':
if p.value_type == 'float':
parameterization[p.name] = copy_data[p.name].iloc[index]
elif p.value_type == 'int':
parameterization[p.name] = int(copy_data[p.name].iloc[index])
elif p.value_type == 'str':
parameterization[p.name] = str(copy_data[p.name].iloc[index])
elif p.value_type == 'cat' or p.value_type == 'sub':
parameterization[p.name] = str(copy_data[p.name].iloc[index])
# If the parameterization is empty, skip this row
if not parameterization:
continue
# Create a raw data dictionary and keep the correct types
raw_data = {metric: copy_data[metric].iloc[index] for metric in self.all_metrics + self.all_tracking_metrics if metric in row}
# If the raw data is empty, skip this row
if not raw_data:
continue
trial_index = self.ax_client.attach_trial(
parameters=parameterization,
)
# Then complete the trial with the existing data
self.ax_client.complete_trial(
trial_index=trial_index, raw_data=raw_data,
)
if self.kwargs.get('verbose_logging', False):
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Attached trial {trial_index} with parameters {parameterization} and raw data {raw_data}")
[docs]
def get_initial_data_from_existing_data_turbo(self):
if self.existing_data is not None:
if not isinstance(self.existing_data, pd.DataFrame):
raise ValueError("existing_data must be a pandas DataFrame")
# rescale the existing data to match what is expected
copy_data = self.descale_dataframe(copy.deepcopy(self.existing_data), self.params)
X_turbo, Y_turbo, Y_tracking = [], [], []
X_turbo = copy_data[[p.name for p in self.params if p.type != 'fixed']].values
Y_turbo = copy_data[self.all_metrics].values
Y_tracking = copy_data[self.all_tracking_metrics].values if len(self.all_tracking_metrics) > 0 else None
return X_turbo, Y_turbo, Y_tracking
[docs]
def evaluate(self,args):
""" Evaluate the agent on a parameter point
Parameters
----------
args : tuple
Tuple containing the index of the agent, the agent, the index of the parameter point and the parameter point
Returns
-------
tuple
Tuple containing the index of the parameter point and the results of the agent on the parameter point
"""
idx, agent, p_idx, p = args
res = agent.run_Ax(p)
return p_idx, res
[docs]
def optimize(self):
""" Run the optimization process using the agents and the parameters. The optimization process uses the Ax library. The optimization process runs the agents in parallel if the parallel attribute is True. The optimization process runs using the parameters, agents, models, n_batches, batch_size, max_parallelism, model_kwargs_list, model_gen_kwargs_list, name and kwargs attributes of the class.
Raises
------
ValueError
If the number of batches and the number of models are not the same
"""
self.optimize_sequential()
#Note: I might reimplement the runner option in a future version, but for now it is not used.
[docs]
def optimize_sequential(self):
""" Run the optimization process using the agents and the parameters. The optimization process uses the Ax library. The optimization process runs the agents in parallel if the parallel attribute is True. The optimization process runs using the parameters, agents, models, n_batches, batch_size, max_parallelism, model_kwargs_list, model_gen_kwargs_list, name and kwargs attributes of the class.
Raises
------
ValueError
If the number of batches and the number of models are not the same
"""
# from kwargs
enforce_sequential_optimization = self.kwargs.get('enforce_sequential_optimization',False)
global_max_parallelism = self.kwargs.get('global_max_parallelism',-1)
verbose_logging = self.kwargs.get('verbose_logging',True)
global_stopping_strategy = self.kwargs.get('global_stopping_strategy',None)
outcome_constraints = self.kwargs.get('outcome_constraints',None)
parameter_constraints = self.kwargs.get('parameter_constraints',None)
parallel = self.kwargs.get('parallel',True)
torch_dtype = self.kwargs.get('torch_dtype',torch.float64)
torch.set_default_dtype(torch_dtype)
# if len(self.agents) == 1: # If there is only one agent, disable parallelism
# parallel_agents = False
# create parameters space from params
parameters_space, fixed_parameters = ConvertParamsAx(self.params)
# Get tracking metrics directly
self.all_tracking_metrics = self.get_tracking_metrics(self.agents)
# # create generation strategy
gs = self.create_generation_strategy()
# create ax client
if self.ax_client is None:
self.ax_client = Client()
# Configure the experiment
self.ax_client.configure_experiment(
name=self.name,
parameters=parameters_space,
parameter_constraints=parameter_constraints,
)
objective = self.create_objectives()
self.ax_client.configure_optimization(objective=objective)
if len(self.all_tracking_metrics) != 0:
self.ax_client.configure_metrics([IMetric(name=m) for m in self.all_tracking_metrics])
if self.existing_data is not None:
self.attach_existing_data()
self.ax_client.set_generation_strategy(generation_strategy=gs)
# run optimization
num = 0
total_trials = sum(np.asarray(self.n_batches)*np.asarray(self.batch_size))
n_step_points = np.cumsum(np.asarray(self.n_batches)*np.asarray(self.batch_size))
size_pool = None
if self.suggest_only and self.existing_data is not None:
# If suggest_only is True, we only suggest the trials without running the agents
trials = self.ax_client.get_next_trials(total_trials)
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Suggesting {total_trials} trials without running the agents.")
return
while num < total_trials:
# check the current batch size
curr_batch_size = self.batch_size[np.argmax(n_step_points>num)]
num += curr_batch_size
if num > total_trials:
curr_batch_size = curr_batch_size - (num-total_trials)
# parameters, trial_index = self.ax_client.get_next_trials(curr_batch_size)
trials = self.ax_client.get_next_trials(curr_batch_size)
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
for i in trials.keys():
logger.info(f"Trial {i} with parameters: {trials[i]}")
trials_index = list(trials.keys())
parameters = [] * len(trials)
for i in trials_index:
parameters.append(trials[i])
if parallel:
all_results = Parallel(
n_jobs=min(len(parameters) * len(self.agents), self.max_parallelism)
)(
delayed(lambda ag, p, pi: (pi, ag.run_Ax(p)))(
agent, p, pi
)
for agent in self.agents
for pi, p in enumerate(parameters)
)
# merge results
main_results = [{} for _ in parameters]
for param_idx, res in all_results:
main_results[param_idx].update(res)
else:
main_results = [{} for _ in parameters]
for agent in self.agents:
for pi, p in enumerate(parameters):
res = agent.run_Ax(p)
main_results[pi].update(res)
idx = 0
for trial_index_, raw_data in zip(trials_index, main_results):
got_nan = False
for key in raw_data.keys():
if np.isnan(raw_data[key]):
got_nan = True
break
if not got_nan:
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Trial {trial_index_} completed with results: {raw_data} and parameters: {parameters[idx]}")
self.ax_client.complete_trial(trial_index_, raw_data=raw_data)
else:
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Trial {trial_index_} failed with results: {raw_data} and parameters: {parameters[idx]}")
self.ax_client.mark_trial_failed(trial_index_)
idx += 1
[docs]
def update_params_with_best_balance(self,return_best_balance=False):
""" Update the parameters with the best balance of all metrics.
The best balance is defined by ranking the results for each metric and taking the parameters that has the lowest sum of ranks.
Raises
------
ValueError
We need at least one metric to update the parameters
"""
# if we have one objective
if len(self.all_metrics) == 1:
scaled_best_parameters = self.ax_client.get_best_parameterization(use_model_predictions=False)[0]
self.params_w(scaled_best_parameters,self.params)
# if we have multiple objectives
elif len(self.all_metrics) > 1:
# We do this because the ax_client.get_pareto_optimal_parameters does not necessarily return the best parameters for a balanced results on all objectives
df = get_df_ax_client_metrics(self.params, self.ax_client, self.all_metrics)
metrics = self.all_metrics
minimizes_ = []
for agent in self.agents:
for i in range(len(agent.minimize)):
minimizes_.append(agent.minimize[i])
# Filter out rows with NaN values in any metric
df_filtered = df.dropna(subset=metrics)
if len(df_filtered) == 0:
raise ValueError('All rows contain NaN values in at least one metric')
ranked_df = copy.deepcopy(df_filtered)
ranks = []
for i in range(len(metrics)):
ranked_df[metrics[i]+'_rank'] = ranked_df[metrics[i]].rank(ascending=minimizes_[i])
ranks.append(ranked_df[metrics[i]+'_rank'])
# get the index of the best balance
best_balance_index = np.argmin(np.sum(np.array(ranks), axis=0))
# get the best parameters
scaled_best_parameters = ranked_df.iloc[best_balance_index].to_dict()
dum_dic = {}
for p in self.params:
if p.type != 'fixed':
dum_dic[p.name] = scaled_best_parameters[p.name]
else:
dum_dic[p.name] = p.value
scaled_best_parameters = dum_dic
for p in self.params:
if p.name in scaled_best_parameters.keys():
p.value = scaled_best_parameters[p.name]
if return_best_balance:
return best_balance_index, scaled_best_parameters
else:
raise ValueError('We need at least one metric to update the parameters')
[docs]
def optimize_turbo(self,acq_turbo='ts',force_continue = False, kwargs_turbo_state={},kwargs_turbo={}):
"""Run the optimzation using Turbo. This is based on the Botorch implementation of Turbo. See https://botorch.org/docs/tutorials/turbo_1/ for more details.
Parameters
----------
acq_turbo : str, optional
The acquisition function to use can be 'ts' or 'ei', by default 'ts'
force_continue : bool, optional
If True, the optimization will continue even if a restart is triggered, by default False
kwargs_turbo_state : dict, optional
The kwargs to use for the TurboState, by default {}
can be:
- length: float, by default 0.8
- length_min: float, by default 0.5**7
- length_max: float, by default 1.6
- success_tolerance: int, by default 10
kwargs_turbo : dict, optional
The kwargs to use for the Turbo, by default {}
Raises
------
ValueError
Turbo does not support parameter constraints
ValueError
Turbo does not support outcome constraints
ValueError
Turbo only supports single objective optimization
ValueError
Turbo only supports 2 models
ValueError
Turbo only supports Sobol as the first model
ValueError
Turbo only supports BoTorch as the second model
"""
parameters_space, fixed_parameters = ConvertParamsAx(self.params)
objective = self.create_objectives()
# Get tracking metrics directly
self.all_tracking_metrics = self.get_tracking_metrics(self.agents)
# make sure that we do not take fixed params into account
free_pnames = [p.name for p in parameters_space]
dim = len(free_pnames)
parallel = self.kwargs.get('parallel',True)
verbose_logging = self.kwargs.get('verbose_logging',True)
enforce_sequential_optimization = self.kwargs.get('enforce_sequential_optimization',False)
global_stopping_strategy = self.kwargs.get('global_stopping_strategy',None)
outcome_constraints = self.kwargs.get('outcome_constraints',None)
parameter_constraints = self.kwargs.get('parameter_constraints',None)
NUM_RESTARTS = kwargs_turbo.get('NUM_RESTARTS', 10)
RAW_SAMPLES = kwargs_turbo.get('RAW_SAMPLES', 512)
N_CANDIDATES = kwargs_turbo.get('N_CANDIDATES', min(5000, max(2000, 200 * dim)))
if parameter_constraints is not None:
raise ValueError('Turbo does not support parameter constraints')
if outcome_constraints is not None:
raise ValueError('Turbo does not support outcome constraints')
# check if we have a single objective
if "," in objective: # Multiple objectives in string format
raise ValueError('Turbo only supports single objective optimization')
# check if we minimize (objective string starts with -)
minimize = objective.startswith('-')
if minimize:
fac = -1
else:
fac = 1
if self.suggest_only:
if len(self.models)>1:
raise ValueError('Turbo only supports 1 model in suggest_only mode')
if self.models[0] != 'BOTORCH_MODULAR':
raise ValueError('Turbo only supports BoTorch as the model in suggest_only mode')
else:
if len(self.models)>2:
raise ValueError('Turbo only supports 2 models')
if self.models[0] != 'SOBOL':
raise ValueError('Turbo only supports Sobol as the first model')
if self.models[1] != 'BOTORCH_MODULAR':
raise ValueError('Turbo only supports BoTorch as the second model')
# Set the device and dtype
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.double
max_cholesky_size = float("inf") # Always use Cholesky
total_trials = sum(np.asarray(self.n_batches)*np.asarray(self.batch_size))
if verbose_logging:
logger.info('Starting optimization with %d batches and a total of %d trials',sum(np.asarray(self.n_batches)),total_trials)
# Create the bounds for the parameters
bounds = torch.tensor([p.bounds for p in parameters_space], device=device, dtype=dtype)
bounds = bounds.transpose(0,1) # transpose bounds
count_failure = 0
if self.existing_data is not None:
# If we have existing data, we need to use it to initialize the optimization
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info('Using existing data for initialization')
X_turbo, Y_turbo, Y_tracking = self.get_initial_data_from_existing_data_turbo()
# convert to torch tensors
X_turbo_un = torch.tensor(X_turbo, device=device, dtype=dtype)
X_turbo = torch.tensor(X_turbo, device=device, dtype=dtype)
#normalize the X_turbo
X_turbo = normalize(X_turbo, bounds=bounds)
Y_turbo = torch.tensor(Y_turbo, device=device, dtype=dtype)
Y_tracking = torch.tensor(Y_tracking, device=device, dtype=dtype)
else:
# Start with a Sobol sequence
n_total_sobol = self.n_batches[0]*self.batch_size[0]
num_sobol = 0
# Create and run initial points per batch
count_batch = 1
while num_sobol < n_total_sobol:
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info('Starting Sobol batch %d with %d trials', count_batch, self.batch_size[0])
# Get initial points
X_turbo = get_initial_points(
dim=dim,
n_pts=self.batch_size[0],
device=device,
dtype=dtype,
)
# unnormalize
X_turbo_un = unnormalize(X_turbo, bounds=bounds)
# build list of dicts with noam
dics = []
for p in X_turbo_un:
p = p.cpu().numpy()
# write p into a dict with the param names as keys if the param is not fixed
idx = 0
dum_dict = {}
for i in range(len(self.params)):
if self.params[i].type != 'fixed':
dum_dict[self.params[i].name] = p[idx]
idx += 1
dics.append(dum_dict)
if parallel:
all_results = Parallel(
n_jobs=min(len(dics) * len(self.agents), self.max_parallelism)
)(
delayed(lambda ag, p, pi: (pi, ag.run_Ax(p)))(
agent, p, pi
)
for agent in self.agents
for pi, p in enumerate(dics)
)
# merge results
main_results = [{} for _ in dics]
for param_idx, res in all_results:
main_results[param_idx].update(res)
else:
main_results = [{} for _ in dics]
for agent in self.agents:
for pi, p in enumerate(dics):
res = agent.run_Ax(p)
main_results[pi].update(res)
# Only keep values from result dictionary that are in all_metrics
Y_turbo = torch.tensor([[res[metric] for metric in self.all_metrics] for res in main_results], device=device, dtype=dtype)
# multiplication factor
Y_turbo = fac*Y_turbo
# find idx where we have nan in Y_turbo
nan_idx = torch.isnan(Y_turbo).any(dim=1)
count_failure += nan_idx.sum().item()
# remove nan from Y_turbo and X_turbo
Y_turbo = Y_turbo[~nan_idx]
X_turbo = X_turbo[~nan_idx]
# Also collect tracking metrics if they exist
Y_tracking = None
if self.all_tracking_metrics and len(self.all_tracking_metrics) > 0:
tracking_data = []
for res in main_results:
metrics_vals = []
for metric in self.all_tracking_metrics:
if metric in res:
metrics_vals.append(res[metric])
else:
metrics_vals.append(float('nan'))
tracking_data.append(metrics_vals)
if tracking_data:
Y_tracking = torch.tensor(tracking_data, device=device, dtype=dtype)
Y_tracking = Y_tracking[~nan_idx]
num_sobol += self.batch_size[0]
count_batch += 1
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info('Finished Sobol')
if not self.suggest_only:
# Create a new state for each batch
best_value = max(Y_turbo).item()
state = TurboState(dim=dim, batch_size=self.batch_size[1], best_value=best_value,**kwargs_turbo_state)
max_num_trials = self.n_batches[1]*self.batch_size[1]
num_turbo = 0
state = update_state(state=state, Y_next=Y_turbo)
while (not num_turbo > max_num_trials) and not (state.restart_triggered and not force_continue):
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
if state.restart_triggered and force_continue:
logger.setLevel(logging_level)
logger.info('Restart triggered, but we force the optimization to continue.')
try:
# Fit a GP model
train_Y = (Y_turbo - Y_turbo.mean()) / Y_turbo.std()
try:
likelihood = GaussianLikelihood(noise_constraint=Interval(1e-8, 1e-3))
covar_module = ScaleKernel( # Use the same lengthscale prior as in the TuRBO paper
MaternKernel(
nu=2.5, ard_num_dims=dim, lengthscale_constraint=Interval(0.005, 4.0)
)
)
model = SingleTaskGP(
X_turbo, train_Y, covar_module=covar_module, likelihood=likelihood
)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
# Do the fitting and acquisition function optimization inside the Cholesky context
with gpytorch.settings.max_cholesky_size(max_cholesky_size):
# Fit the model
fit_gpytorch_mll(mll)
# Create a batch
X_next = generate_batch(
state=state,
model=model,
X=X_turbo,
Y=train_Y,
batch_size=state.batch_size,
n_candidates=N_CANDIDATES,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES,
acqf=acq_turbo,
device=device,
dtype=dtype,
minimize=minimize,
)
except Exception as e:
logging_level = 20
logger.setLevel(logging_level)
logger.error(f"Error in Turbo batch {count_batch}: {e}")
logger.error(f"We are stopping the optimization process")
break
# Evaluate the batch
X_next_un = unnormalize(X_next, bounds=bounds)
# build list of dicts with noam
dics = []
for p in X_next_un:
p = p.cpu().numpy()
idx = 0
dum_dict = {}
for i in range(len(self.params)):
if self.params[i].type != 'fixed':
dum_dict[self.params[i].name] = p[idx]
idx += 1
dics.append(dum_dict)
# run agents
if parallel:
all_results = Parallel(
n_jobs=min(len(dics) * len(self.agents), self.max_parallelism)
)(
delayed(lambda ag, p, pi: (pi, ag.run_Ax(p)))(
agent, p, pi
)
for agent in self.agents
for pi, p in enumerate(dics)
)
# merge results
main_results = [{} for _ in dics]
for param_idx, res in all_results:
main_results[param_idx].update(res)
else:
main_results = [{} for _ in dics]
for agent in self.agents:
for pi, p in enumerate(dics):
res = agent.run_Ax(p)
main_results[pi].update(res)
# Only keep values from result dictionary that are in all_metrics
Y_next = torch.tensor([[res[metric] for metric in self.all_metrics] for res in main_results], device=device, dtype=dtype)
# multiplication factor
Y_next = fac*Y_next
# find idx where we have nan in Y_turbo
nan_idx = torch.isnan(Y_next).any(dim=1)
count_failure += nan_idx.sum().item()
# remove nan from Y_next and X_next
Y_next = Y_next[~nan_idx]
X_next = X_next[~nan_idx]
if nan_idx.sum() > state.batch_size:
raise ValueError("Too many NaN values in Y_next")
# Also collect tracking metrics if they exist
Y_next_tracking = None
if self.all_tracking_metrics and len(self.all_tracking_metrics) > 0:
tracking_data = []
for res in main_results:
metrics_vals = []
for metric in self.all_tracking_metrics:
if metric in res:
metrics_vals.append(res[metric])
else:
metrics_vals.append(float('nan'))
tracking_data.append(metrics_vals)
if tracking_data:
Y_next_tracking = torch.tensor(tracking_data, device=device, dtype=dtype)
Y_next_tracking = Y_next_tracking[~nan_idx]
# Update state
state = update_state(state=state, Y_next=Y_next)
# Append data
X_turbo = torch.cat((X_turbo, X_next), dim=0)
Y_turbo = torch.cat((Y_turbo, Y_next), dim=0)
if Y_tracking is not None and Y_next_tracking is not None:
Y_tracking = torch.cat((Y_tracking, Y_next_tracking), dim=0)
elif Y_next_tracking is not None:
Y_tracking = Y_next_tracking
num_turbo += state.batch_size
# Print current status
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Finished Turbo batch {count_batch} with {state.batch_size} trials with current best value: {fac*state.best_value:.2e}, TR length: {state.length:.2e}")
count_batch += 1
except Exception as e:
logging_level = 20
logger.setLevel(logging_level)
logger.error(f"Error in Turbo batch {count_batch}: {e}")
logger.error(f"We are stopping the optimization process")
break
else:
# If suggest_only is True, we only suggest the trials without running the agents
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info(f"Suggesting {total_trials} trials without running the agents.")
# Create a new state for each batch
# get previous best value from kwargs_turbo
if 'best_value' in kwargs_turbo:
best_value = kwargs_turbo['best_value']
else:
raise ValueError('best_value from the previous state must be provided in kwargs_turbo for suggest_only mode')
# best_value = max(Y_turbo).item()
state = TurboState(dim=dim, batch_size=self.batch_size[0], best_value=best_value,**kwargs_turbo_state)
state = update_state(state=state, Y_next=Y_turbo)
max_num_trials = self.n_batches[0]*self.batch_size[0] # when suggest_only is True, we only use the first batch size
num_turbo = 0
# Fit a GP model
train_Y = (Y_turbo - Y_turbo.mean()) / Y_turbo.std()
try:
likelihood = GaussianLikelihood(noise_constraint=Interval(1e-8, 1e-3))
covar_module = ScaleKernel( # Use the same lengthscale prior as in the TuRBO paper
MaternKernel(
nu=2.5, ard_num_dims=dim, lengthscale_constraint=Interval(0.005, 4.0)
)
)
model = SingleTaskGP(
X_turbo, train_Y, covar_module=covar_module, likelihood=likelihood
)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
# Do the fitting and acquisition function optimization inside the Cholesky context
with gpytorch.settings.max_cholesky_size(max_cholesky_size):
# Fit the model
fit_gpytorch_mll(mll)
# Create a batch
X_next = generate_batch(
state=state,
model=model,
X=X_turbo,
Y=train_Y,
batch_size=state.batch_size,
n_candidates=N_CANDIDATES,
num_restarts=NUM_RESTARTS,
raw_samples=RAW_SAMPLES,
acqf=acq_turbo,
device=device,
dtype=dtype,
minimize=minimize,
)
except Exception as e:
logging_level = 20
logger.setLevel(logging_level)
logger.error(f"Error in Turbo batch {count_batch}: {e}")
logger.error(f"There was an error in the Turbo optimization process, we are stopping the optimization process")
return
# Evaluate the batch
X_next_un = unnormalize(X_next, bounds=bounds)
# load all data into ax
# create generation strategy using the second model
gs = self.create_generation_strategy()
# create ax client
if self.ax_client is None:
self.ax_client = Client()
# Configure the experiment
self.ax_client.configure_experiment(
name=self.name,
parameters=parameters_space,
parameter_constraints=parameter_constraints,
)
self.ax_client.configure_optimization(objective=objective)
if len(self.all_tracking_metrics) != 0:
self.ax_client.configure_metrics([IMetric(name=m) for m in self.all_tracking_metrics])
self.ax_client.set_generation_strategy(generation_strategy=gs)
# add all data to ax
X_turbo_un = unnormalize(X_turbo, bounds=bounds)
for i in range(len(X_turbo_un)):
dic = {}
for j in range(len(X_turbo_un[i])):
# check the parameter_type of the parameter_space
if parameters_space[j].parameter_type == 'int':
dic[free_pnames[j]] = int(X_turbo_un[i][j].item())
else:
dic[free_pnames[j]] = X_turbo_un[i][j].item()
# add fixed params to dic
# for p in self.params:
# if p.type == 'fixed':
# dic[p.name] = p.value
trial_index = self.ax_client.attach_trial(parameters=dic)
# print(trials)
# trial_index = tr
# add all_metrics and tracking_metrics to ax
raw_data = {}
for j in range(len(self.all_metrics)):
raw_data[self.all_metrics[j]] = fac*Y_turbo[i][j].item()
if Y_tracking is not None:
for j in range(len(self.all_tracking_metrics)):
raw_data[self.all_tracking_metrics[j]] = Y_tracking[i][j].item()
self.ax_client.complete_trial(trial_index, raw_data=raw_data)
# train the model
# self.ax_client.get_next_trials(1) # This will train the model
# dummy_pred = self.ax_client.predict([dic]) # This will train the model
if self.suggest_only:
for i in range(len(X_next_un)):
dic = {}
for j in range(len(X_next_un[i])):
if parameters_space[j].parameter_type == 'int':
dic[free_pnames[j]] = int(X_next_un[i][j].item())
else:
dic[free_pnames[j]] = X_next_un[i][j].item()
# add fixed params to dic
for p in self.params:
if p.type == 'fixed':
dic[p.name] = p.value
trial_index = self.ax_client.attach_trial(parameters=dic)
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
logger.info('Suggesting %d trials without running the agents', len(X_next_un))
# put the new turbo state into kwargs_turbo
kwargs_turbo_next_step = {}
for key, value in state.__dict__.items():
kwargs_turbo_next_step[key] = value
return kwargs_turbo_next_step
else:
if verbose_logging:
logging_level = 20
logger.setLevel(logging_level)
if state.restart_triggered:
logger.info('Turbo converged after %d batches with %d trials', count_batch-1, (count_batch-1)*state.batch_size)
else:
logger.info('Turbo is terminated as the max number (%d) of trials is reached', total_trials)
# if verbose_logging:
# logging_level = 20
# logger.setLevel(logging_level)
# logger.info('Finished Turbo')
######### Turbo specific functions ##############################################################
[docs]
@dataclass
class TurboState:
dim: int
batch_size: int
length: float = 0.8
length_min: float = 0.5**7
length_max: float = 1.6
failure_counter: int = 0
failure_tolerance: int = float("nan") # Note: Post-initialized
success_counter: int = 0
success_tolerance: int = 3 # Note: The original paper uses 3
best_value: float = -float("inf")
restart_triggered: bool = False
def __init__(self, dim, batch_size, best_value, **kwargs):
self.dim = dim
self.batch_size = batch_size
self.best_value = best_value
for key, value in kwargs.items():
setattr(self, key, value)
# if we do not set the failure_tolerance, we set it to a value based on the batch size and dimension
if not hasattr(self, 'failure_tolerance') or self.failure_tolerance is None:
# The original paper uses 4.0 / batch_size, but we use a more robust value
# based on the dimension and batch size
self.failure_tolerance = math.ceil(
max([4.0 / self.batch_size, float(self.dim) / self.batch_size])
)
# def __post_init__(self):
[docs]
def get_initial_points(dim, n_pts, seed=None, device=None, dtype=None):
""" Generate initial points using Sobol sequence.
Parameters
----------
dim : int
Number of dimensions
n_pts : int
Number of points to generate
seed : int, optional
Random seed, by default None
device : torch.device, optional
Device to use for the generated points, by default None
dtype : torch.dtype, optional
Data type of the generated points, by default None
Returns
-------
torch.Tensor
Generated points in the range [0, 1]^d
"""
sobol = SobolEngine(dimension=dim, scramble=True, seed=seed)
X_init = sobol.draw(n_pts).to(dtype=dtype, device=device)
return X_init
[docs]
def update_state(state, Y_next):
""" Update the state of the optimization process based on the new observations.
For TURBO optimization only.
The state is updated based on the success or failure of the new observations.
Parameters
----------
state : TurboState
Current state of the optimization process
Y_next : torch.Tensor
New observations
Returns
-------
TurboState
Updated state of the optimization process
"""
# For maximization, we want the maximum value
current_best = max(Y_next).item()
is_success = current_best > state.best_value + 1e-3 * math.fabs(state.best_value)
state.best_value = max(state.best_value, current_best)
if is_success:
state.success_counter += 1
state.failure_counter = 0
else:
state.success_counter = 0
state.failure_counter += 1
if state.success_counter == state.success_tolerance: # Expand trust region
state.length = min(2.0 * state.length, state.length_max)
state.success_counter = 0
elif state.failure_counter == state.failure_tolerance: # Shrink trust region
state.length /= 2.0
state.failure_counter = 0
if state.length < state.length_min:
state.restart_triggered = True
return state
[docs]
def generate_batch(state, model, X, # Evaluated points on the domain [0, 1]^d
Y, # Function values
batch_size,
n_candidates=None, # Number of candidates for Thompson sampling
num_restarts=10,
raw_samples=512,
acqf="ts", # "ei" or "ts"
device=None,
dtype=None,
minimize=False,
):
""" Generate a batch of points using the TURBO algorithm.
The batch is generated using either Thompson sampling or Expected Improvement.
Parameters
----------
state : TurboState
Current state of the optimization process
model : GPyTorchModel
GPyTorch model for the function
X : torch.Tensor
Evaluated points on the domain [0, 1]^d
Y : torch.Tensor
Function values
batch_size : int
Number of points to generate
n_candidates : int, optional
Number of candidates for Thompson sampling, by default None
num_restarts : int, optional
Number of restarts for the optimization, by default 10
raw_samples : int, optional
Number of raw samples for the optimization, by default 512
acqf : str, optional
Acquisition function to use can be "ts" or "ei", by default "ts"
device : torch.device, optional
Device to use for the generated points, by default None
dtype : torch.dtype, optional
Data type of the generated points, by default None
minimize : bool, optional
Whether to minimize or maximize the function, by default False
Returns
-------
torch.Tensor
Generated points in the range [0, 1]^d
Raises
------
AssertionError
If the acquisition function is not "ts" or "ei"
ValueError
If the acquisition function is not "ts" or "ei"
"""
assert acqf in ("ts", "ei")
assert X.min() >= 0.0 and X.max() <= 1.0 and torch.all(torch.isfinite(Y))
if n_candidates is None:
n_candidates = min(5000, max(2000, 200 * X.shape[-1]))
# Scale the TR to be proportional to the lengthscales
x_center = X[Y.argmax(), :].clone()
weights = model.covar_module.base_kernel.lengthscale.squeeze().detach()
weights = weights / weights.mean()
weights = weights / torch.prod(weights.pow(1.0 / len(weights)))
tr_lb = torch.clamp(x_center - weights * state.length / 2.0, 0.0, 1.0)
tr_ub = torch.clamp(x_center + weights * state.length / 2.0, 0.0, 1.0)
if acqf == "ts":
# Thompson Sampling
dim = X.shape[-1]
sobol = SobolEngine(dim, scramble=True, )#seed=np.random.randint(10000))
pert = sobol.draw(n_candidates).to(dtype=dtype, device=device)
pert = tr_lb + (tr_ub - tr_lb) * pert
# Create a perturbation mask
prob_perturb = min(20.0 / dim, 1.0)
mask = torch.rand(n_candidates, dim, dtype=dtype, device=device) <= prob_perturb
ind = torch.where(mask.sum(dim=1) == 0)[0]
mask[ind, torch.randint(0, dim - 1, size=(len(ind),), device=device)] = 1
# Create candidate points from the perturbations and the mask
X_cand = x_center.expand(n_candidates, dim).clone()
X_cand[mask] = pert[mask]
# Sample from the posterior
thompson_sampling = MaxPosteriorSampling(model=model, replacement=False)
with torch.no_grad():
X_next = thompson_sampling(X_cand, num_samples=batch_size)
elif acqf == "ei":
# Expected Improvement
from botorch.optim import optimize_acqf
if minimize:
best_f = Y.min().item()
else:
best_f = Y.max().item()
# Use qLogExpectedImprovement for better numerical stability
acq_func = qLogExpectedImprovement(
model=model,
best_f=best_f,
)#
X_next, acq_value = optimize_acqf(
acq_function=acq_func,
bounds=torch.stack([tr_lb, tr_ub]),
q=batch_size,
num_restarts=num_restarts,
raw_samples=raw_samples,
options={"batch_limit": 5, "maxiter": 200},
)
else:
raise ValueError(f"Unknown acquisition function type: {acqf}")
return X_next