Source code for syne_tune.optimizer.schedulers.searchers.kde.kde_searcher

from typing import Optional, List, Dict, Any, Tuple
import logging
import numpy as np
import statsmodels.api as sm
import scipy.stats as sps

from syne_tune.optimizer.schedulers.searchers.single_objective_searcher import (
    SingleObjectiveBaseSearcher,
)
import syne_tune.config_space as sp

logger = logging.getLogger(__name__)


[docs] class KernelDensityEstimator(SingleObjectiveBaseSearcher): """ Fits two kernel density estimators (KDE) to model the density of the top N configurations as well as the density of the configurations that are not among the top N, respectively. New configurations are sampled by optimizing the ratio of these two densities. KDE as model for Bayesian optimization has been originally proposed by Bergstra et al. Compared to their original implementation TPE, we use multi-variate instead of univariate KDE, as proposed by Falkner et al. Code is based on the implementation by Falkner et al: https://github.com/automl/HpBandSter/tree/master/hpbandster | Algorithms for Hyper-Parameter Optimization | J. Bergstra and R. Bardenet and Y. Bengio and B. K{\'e}gl | Proceedings of the 24th International Conference on Advances in Neural Information Processing Systems | https://papers.nips.cc/paper/2011/hash/86e8f7ab32cfd12577bc2619bc635690-Abstract.html and | BOHB: Robust and Efficient Hyperparameter Optimization at Scale | S. Falkner and A. Klein and F. Hutter | Proceedings of the 35th International Conference on Machine Learning | https://arxiv.org/abs/1807.01774 :param config_space: Configuration space for the evaluation function. :param points_to_evaluate: A set of initial configurations to be evaluated before starting the optimization. :param num_min_data_points: Minimum number of data points that we use to fit the KDEs. As long as less observations have been received in :meth:`update`, randomly drawn configurations are returned in :meth:`get_config`. If set to ``None``, we set this to the number of hyperparameters. Defaults to ``None``. :param top_n_percent: Determines how many datapoints we use to fit the first KDE model for modeling the well performing configurations. Defaults to 15 :param min_bandwidth: The minimum bandwidth for the KDE models. Defaults to 1e-3 :param num_candidates: Number of candidates that are sampled to optimize the acquisition function. Defaults to 64 :param bandwidth_factor: We sample continuous hyperparameter from a truncated Normal. This factor is multiplied to the bandwidth to define the standard deviation of this truncated Normal. Defaults to 3 :param random_fraction: Defines the fraction of configurations that are drawn uniformly at random instead of sampling from the model. Defaults to 0.33 :param random_seed: Seed for initializing random number generators. """ def __init__( self, config_space: Dict[str, Any], points_to_evaluate: Optional[List[dict]] = None, num_min_data_points: Optional[int] = None, top_n_percent: int = 15, min_bandwidth: float = 1e-3, num_candidates: int = 64, bandwidth_factor: int = 3, random_fraction: float = 0.33, random_seed: Optional[int] = None, ): super().__init__( config_space=config_space, points_to_evaluate=points_to_evaluate, random_seed=random_seed, ) self.num_evaluations = 0 self.min_bandwidth = min_bandwidth self.random_fraction = random_fraction self.num_candidates = num_candidates self.bandwidth_factor = bandwidth_factor self.top_n_percent = top_n_percent self.X = [] self.y = [] self.categorical_maps = { k: {cat: i for i, cat in enumerate(v.categories)} for k, v in config_space.items() if isinstance(v, sp.Categorical) } self.inv_categorical_maps = { hp: dict(zip(map.values(), map.keys())) for hp, map in self.categorical_maps.items() } self.random_state = np.random.RandomState(self.random_seed) self.good_kde = None self.bad_kde = None self.vartypes = list() for name, hp in self.config_space.items(): if isinstance(hp, sp.Categorical): self.vartypes.append(("u", len(hp.categories))) elif isinstance(hp, sp.Integer): self.vartypes.append(("o", (hp.lower, hp.upper))) elif isinstance(hp, sp.Float): self.vartypes.append(("c", 0)) elif isinstance(hp, sp.FiniteRange): if hp.cast_int: self.vartypes.append(("o", (hp.lower, hp.upper))) else: self.vartypes.append(("c", 0)) self.num_min_data_points = ( len(self.vartypes) if num_min_data_points is None else num_min_data_points ) assert self.num_min_data_points >= len( self.vartypes ), f"num_min_data_points = {num_min_data_points}, must be >= {len(self.vartypes)}" def _to_feature(self, config): def numerize(value, domain, categorical_map): if isinstance(domain, sp.Categorical): res = categorical_map[value] / len(domain) return res elif isinstance(domain, sp.Float): return [(value - domain.lower) / (domain.upper - domain.lower)] elif isinstance(domain, sp.FiniteRange): if domain.cast_int: a = 1 / (2 * (domain.upper - domain.lower + 1)) b = domain.upper return [(value - a) / (b - a)] else: return [(value - domain.lower) / (domain.upper - domain.lower)] elif isinstance(domain, sp.Integer): a = 1 / (2 * (domain.upper - domain.lower + 1)) b = domain.upper return [(value - a) / (b - a)] return np.hstack( [ numerize( value=config[k], domain=v, categorical_map=self.categorical_maps.get(k, {}), ) for k, v in self.config_space.items() if isinstance(v, sp.Domain) ] ) def _from_feature(self, feature_vector): def inv_numerize(values, domain, categorical_map): if not isinstance(domain, sp.Domain): # constant value return domain else: if isinstance(domain, sp.Categorical): index = int(values * len(domain)) return categorical_map[index] elif isinstance(domain, sp.Float): return values * (domain.upper - domain.lower) + domain.lower elif isinstance(domain, sp.FiniteRange): if domain.cast_int: a = 1 / (2 * (domain.upper - domain.lower + 1)) b = domain.upper return np.ceil(values * (b - a) + a) else: return values * (domain.upper - domain.lower) + domain.lower elif isinstance(domain, sp.Integer): a = 1 / (2 * (domain.upper - domain.lower + 1)) b = domain.upper return np.ceil(values * (b - a) + a) res = dict() curr_pos = 0 for k, domain in self.config_space.items(): if isinstance(domain, sp.Domain): res[k] = domain.cast( inv_numerize( values=feature_vector[curr_pos], domain=domain, categorical_map=self.inv_categorical_maps.get(k, {}), ) ) curr_pos += 1 else: res[k] = domain return res
[docs] def on_trial_complete( self, trial_id: int, config: Dict[str, Any], metric: float, ): self.X.append(self._to_feature(config=config)) self.y.append(metric)
def _get_random_config(self): return { k: v.sample() if hasattr(v, "sample") else v for k, v in self.config_space.items() }
[docs] def suggest(self, **kwargs) -> Optional[Dict[str, Any]]: suggestion = self._next_points_to_evaluate() if suggestion is None: if self.y: models = self._train_kde(np.array(self.X), np.array(self.y)) else: models = None if models is None or self.random_state.rand() < self.random_fraction: # return random candidate because a) we don't have enough data points or # b) we sample some fraction of all samples randomly suggestion = self._get_random_config() else: self.bad_kde = models[0] self.good_kde = models[1] l = self.good_kde.pdf g = self.bad_kde.pdf def acquisition_function(x): return max(1e-32, g(x)) / max(l(x), 1e-32) val_current_best = None for i in range(self.num_candidates): idx = self.random_state.randint(0, len(self.good_kde.data)) mean = self.good_kde.data[idx] candidate = [] for m, bw, t in zip(mean, self.good_kde.bw, self.vartypes): bw = max(bw, self.min_bandwidth) vartype = t[0] domain = t[1] if vartype == "c": # continuous parameter bw = self.bandwidth_factor * bw candidate.append( sps.truncnorm.rvs( -m / bw, (1 - m) / bw, loc=m, scale=bw, random_state=self.random_state, ) ) else: # categorical or integer parameter if self.random_state.rand() < (1 - bw): candidate.append(m) else: if vartype == "o": # integer sample = self.random_state.randint( domain[0], domain[1] ) sample = (sample - domain[0]) / ( domain[1] - domain[0] ) candidate.append(sample) elif vartype == "u": # categorical candidate.append( self.random_state.randint(domain) / domain ) val = acquisition_function(candidate) if not np.isfinite(val): logging.warning( "candidate has non finite acquisition function value" ) config = self._from_feature(candidate) if val_current_best is None or val_current_best > val: suggestion = config val_current_best = val if suggestion is None: # This can happen if the configuration space is almost exhausted logger.warning( "Could not find configuration by optimizing the acquisition function. Drawing at random instead." ) suggestion = self._get_random_config() return suggestion
def _check_data_shape_and_good_size( self, data_shape: Tuple[int, int] ) -> Optional[int]: """ Determine size of data for "good" model (the rest of the data is for the "bad" model). Both sizes must be larger than the number of features, otherwise ``None`` is returned. :param data_shape: Shape of ``train_data`` :return: Size of data for "good" model, or ``None`` (models cannot be fit, too little data) """ num_data, num_features = data_shape n_good = max(self.num_min_data_points, (self.top_n_percent * num_data) // 100) # Number of data points have to be larger than the number of features to meet # the input constraints of ``statsmodels.KDEMultivariate`` if min(n_good, num_data - n_good) <= num_features: return None else: return n_good def _train_kde( self, train_data: np.ndarray, train_targets: np.ndarray ) -> Optional[Tuple[Any, Any]]: train_data = train_data.reshape((train_targets.size, -1)) n_good = self._check_data_shape_and_good_size(train_data.shape) if n_good is None: return None idx = np.argsort(train_targets) train_data_good = train_data[idx[:n_good]] train_data_bad = train_data[idx[n_good:]] types = [t[0] for t in self.vartypes] bad_kde = sm.nonparametric.KDEMultivariate( data=train_data_bad, var_type=types, bw="normal_reference" ) good_kde = sm.nonparametric.KDEMultivariate( data=train_data_good, var_type=types, bw="normal_reference" ) bad_kde.bw = np.clip(bad_kde.bw, self.min_bandwidth, None) good_kde.bw = np.clip(good_kde.bw, self.min_bandwidth, None) return bad_kde, good_kde