# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from typing import Optional, List, Dict, Any
import time
import xgboost
import logging
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.calibration import CalibratedClassifierCV
from syne_tune.optimizer.schedulers.searchers import (
StochasticAndFilterDuplicatesSearcher,
)
from syne_tune.optimizer.schedulers.searchers.bore.de import (
DifferentialevolutionOptimizer,
)
logger = logging.getLogger(__name__)
[docs]
class Bore(StochasticAndFilterDuplicatesSearcher):
"""
Implements "Bayesian optimization by Density Ratio Estimation" as described
in the following paper:
| BORE: Bayesian Optimization by Density-Ratio Estimation,
| Tiao, Louis C and Klein, Aaron and Seeger, Matthias W and Bonilla, Edwin V. and Archambeau, Cedric and Ramos, Fabio
| Proceedings of the 38th International Conference on Machine Learning
| https://arxiv.org/abs/2102.09009
Additional arguments on top of parent class
:class:`~syne_tune.optimizer.schedulers.searchers.StochasticAndFilterDuplicatesSearcher`:
:param mode: Can be "min" (default) or "max".
:param gamma: Defines the percentile, i.e how many percent of configurations
are used to model :math:`l(x)`. Defaults to 0.25
:param calibrate: If set to true, we calibrate the predictions of the
classifier via CV. Defaults to False
:param classifier: The binary classifier to model the acquisition
function. Choices: :code:`{"mlp", "gp", "xgboost", "rf", "logreg"}`.
Defaults to "xgboost"
:param acq_optimizer: The optimization method to maximize the acquisition
function. Choices: :code:`{"de", "rs", "rs_with_replacement"}`. Defaults
to "rs"
:param feval_acq: Maximum allowed function evaluations of the acquisition
function. Defaults to 500
:param random_prob: probability for returning a random configurations
(epsilon greedy). Defaults to 0
:param init_random: :meth:`get_config` returns randomly drawn configurations
until at least ``init_random`` observations have been recorded in
:meth:`update`. After that, the BORE algorithm is used. Defaults to 6
:param classifier_kwargs: Parameters for classifier. Optional
"""
def __init__(
self,
config_space: Dict[str, Any],
metric: str,
points_to_evaluate: Optional[List[dict]] = None,
allow_duplicates: Optional[bool] = None,
restrict_configurations: Optional[List[Dict[str, Any]]] = None,
mode: Optional[str] = None,
gamma: Optional[float] = None,
calibrate: Optional[bool] = None,
classifier: Optional[str] = None,
acq_optimizer: Optional[str] = None,
feval_acq: Optional[int] = None,
random_prob: Optional[float] = None,
init_random: Optional[int] = None,
classifier_kwargs: Optional[dict] = None,
**kwargs,
):
super().__init__(
config_space=config_space,
metric=metric,
points_to_evaluate=points_to_evaluate,
allow_duplicates=allow_duplicates,
restrict_configurations=restrict_configurations,
**kwargs,
)
if mode is None:
mode = "min"
if gamma is None:
gamma = 0.25
if calibrate is None:
calibrate = False
if classifier is None:
classifier = "xgboost"
if acq_optimizer is None:
acq_optimizer = "rs"
if feval_acq is None:
feval_acq = 500
if random_prob is None:
random_prob = 0.0
if init_random is None:
init_random = 6
self.calibrate = calibrate
self.gamma = gamma
self.classifier = classifier
assert acq_optimizer in {"rs", "de", "rs_with_replacement"}
assert (
acq_optimizer != "de" or restrict_configurations is None
), "acq_optimizer == 'de' does not support restrict_configurations"
self.acq_optimizer = acq_optimizer
self.feval_acq = feval_acq
self.init_random = init_random
self.random_prob = random_prob
self.mode = mode
if classifier_kwargs is None:
classifier_kwargs = dict()
if self.classifier == "xgboost":
self.model = xgboost.XGBClassifier(use_label_encoder=False)
elif self.classifier == "logreg":
self.model = LogisticRegression()
elif self.classifier == "rf":
self.model = RandomForestClassifier()
elif self.classifier == "gp":
from syne_tune.optimizer.schedulers.searchers.bore.gp_classififer import (
GPModel,
)
self.model = GPModel(**classifier_kwargs)
elif self.classifier == "mlp":
from syne_tune.optimizer.schedulers.searchers.bore.mlp_classififer import (
MLP,
)
self.model = MLP(n_inputs=self._hp_ranges.ndarray_size, **classifier_kwargs)
self.inputs = []
self.targets = []
def _loss(self, x):
if len(x.shape) < 2:
y = -self.model.predict_proba(x[None, :])
else:
y = -self.model.predict_proba(x)
if self.classifier in ["gp", "mlp"]:
return y[:, 0]
else:
return y[:, 1] # return probability of class 1
def _get_config(self, **kwargs):
start_time = time.time()
config = self._next_initial_config()
if config is None:
if (
len(self.inputs) < self.init_random
or self.random_state.rand() < self.random_prob
):
config = self._get_random_config()
else:
# train model
if not self._train_model(self.inputs, self.targets):
config = self._get_random_config()
elif self.acq_optimizer == "de":
def wrapper(x):
l = self._loss(x)
return l[:, None]
bounds = np.array(self._hp_ranges.get_ndarray_bounds())
lower = bounds[:, 0]
upper = bounds[:, 1]
de = DifferentialevolutionOptimizer(
wrapper, lower, upper, self.feval_acq
)
best, traj = de.run()
config = self._hp_ranges.from_ndarray(best)
else:
# sample random configurations with or without replacement
values = []
X = []
if self.acq_optimizer == "rs":
new_excl_list = self._excl_list.copy()
else:
assert self.acq_optimizer == "rs_with_replacement"
new_excl_list = None
for _ in range(self.feval_acq):
xi = self._get_random_config(new_excl_list)
if xi is None:
break
X.append(xi)
values.append(self._loss(self._hp_ranges.to_ndarray(xi))[0])
if new_excl_list is not None:
new_excl_list.add(xi)
if len(values) > 0:
if len(values) < self.feval_acq:
logging.warning(
f"Only {len(values)} instead of {self.feval_acq} configurations "
f"sampled to optimize the acquisition function"
)
ind = np.array(values).argmin()
config = X[ind]
else:
# This can happen if the configuration space is almost exhausted
logger.warning(
"Could not find configuration by optimizing the acquisition function. Drawing at random instead."
)
config = self._get_random_config()
if config is not None:
opt_time = time.time() - start_time
logging.debug(
f"[Select new candidate: "
f"config={config}] "
f"optimization time : {opt_time}"
)
return config
def _train_model(self, train_data: np.ndarray, train_targets: np.ndarray) -> bool:
"""
:param train_data: Training input feature matrix X
:param train_targets: Training targets y
:return: Was training successful?
"""
start_time = time.time()
X = np.array(train_data)
if self.mode == "min":
y = np.array(train_targets)
else:
y = -np.array(train_targets)
tau = np.quantile(y, q=self.gamma)
z = np.less(y, tau)
if self.calibrate:
self.model = CalibratedClassifierCV(
self.model, cv=2, method=self.calibration
)
self.model.fit(X, np.array(z, dtype=np.int64))
else:
self.model.fit(X, np.array(z, dtype=np.int64))
z_hat = self.model.predict(X)
accuracy = np.mean(z_hat == z)
train_time = time.time() - start_time
logging.debug(
f"[Model fit: "
f"accuracy={accuracy:.3f}] "
f"dataset size: {X.shape[0]}, "
f"train time : {train_time}"
)
return True
def _update(self, trial_id: str, config: Dict[str, Any], result: Dict[str, Any]):
self.inputs.append(self._hp_ranges.to_ndarray(config))
self.targets.append(result[self._metric])
[docs]
def clone_from_state(self, state: Dict[str, Any]):
raise NotImplementedError