Source code for syne_tune.optimizer.schedulers.searchers.bayesopt.models.meanstd_acqfunc_impl

from typing import Dict, Optional, Set, List, Tuple
import logging
import itertools

import numpy as np
from scipy.stats import norm
from scipy.special import erfc

from syne_tune.optimizer.schedulers.searchers.bayesopt.models.meanstd_acqfunc import (
    MeanStdAcquisitionFunction,
    HeadWithGradient,
    SamplePredictionsPerOutput,
    CurrentBestProvider,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.models.model_base import (
    BasePredictor,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.tuning_algorithms.base_classes import (
    OutputPredictor,
    Predictor,
)

logger = logging.getLogger(__name__)


MIN_COST = 1e-12  # For numerical stability when dividing EI / cost

MIN_STD_CONSTRAINT = (
    1e-12  # For numerical stability when computing the constraint probability in CEI
)


def _extract_active_and_secondary_metric(predictor_output_names, active_metric):
    """
    Returns the active metric and the secondary metric (such as the cost or constraint metric) from predictor_output_names.
    """
    assert len(predictor_output_names) == 2, (
        f"The predictor should consist of exactly 2 outputs, "
        f"while the current outputs are {predictor_output_names}"
    )
    assert active_metric in predictor_output_names, (
        f"{active_metric} is not a valid metric. "
        f"The metric name must match one of the following metrics "
        f"in the predictor output: {predictor_output_names}"
    )
    if predictor_output_names[0] == active_metric:
        secondary_metric = predictor_output_names[1]
    else:
        secondary_metric = predictor_output_names[0]
    logger.debug(
        f"There are two metrics in the output: {predictor_output_names}. "
        f"The metric to optimize was set to '{active_metric}'. "
        f"The secondary metric is assumed to be '{secondary_metric}'"
    )
    return active_metric, secondary_metric


def _postprocess_gradient(grad: np.ndarray, nf: int) -> np.ndarray:
    if nf > 1:
        assert grad.size == nf  # Sanity check
        return grad / nf
    else:
        return np.mean(grad, keepdims=True)


[docs] class EIAcquisitionFunction(MeanStdAcquisitionFunction): """ Minus expected improvement acquisition function (minus because the convention is to always minimize acquisition functions) """ def __init__( self, predictor: Predictor, active_metric: str = None, jitter: float = 0.01, debug_collect_stats: bool = False, ): assert isinstance(predictor, Predictor) super().__init__(predictor, active_metric) self.jitter = jitter if debug_collect_stats: self._debug_data = {"absdiff": [], "std": [], "u": []} else: self._debug_data = None def _head_needs_current_best(self) -> bool: return True def _compute_head( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> np.ndarray: assert current_best is not None means, stds = self._extract_mean_and_std(output_to_predictions) # phi, Phi is PDF and CDF of Gaussian phi, Phi, u = get_quantiles(self.jitter, current_best, means, stds) self._debug_append_data(means, stds, current_best, u) return np.mean((-stds) * (u * Phi + phi), axis=1) def _compute_head_and_gradient( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> HeadWithGradient: assert current_best is not None mean, std = self._extract_mean_and_std(output_to_predictions) nf_mean = mean.size assert current_best.size == nf_mean # phi, Phi is PDF and CDF of Gaussian phi, Phi, u = get_quantiles(self.jitter, current_best, mean, std) self._debug_append_data(mean, std, current_best, u) f_acqu = std * (u * Phi + phi) dh_dmean = _postprocess_gradient(Phi, nf=nf_mean) dh_dstd = _postprocess_gradient(-phi, nf=1) return HeadWithGradient( hval=-np.mean(f_acqu), gradient={self.active_metric: dict(mean=dh_dmean, std=dh_dstd)}, ) def _debug_append_data( self, means: np.ndarray, stds: np.ndarray, current_best: np.ndarray, u: np.ndarray, ): if self._debug_data is not None: new_data = [ ("absdiff", np.abs(current_best - means)), ("std", stds), ("u", u), ] for k, v in new_data: self._debug_data[k].extend(list(v.flatten()))
[docs] def debug_stats_message(self) -> str: if self._debug_data is None or len(self._debug_data["absdiff"]) == 0: return "" msg_parts = [ f"{k:7s}: min={np.min(v):.2e}, median={np.median(v):.2e}, " f"max={np.max(v):.2e}, num={len(v)}" for k, v in self._debug_data.items() ] return "\n".join(msg_parts)
[docs] class LCBAcquisitionFunction(MeanStdAcquisitionFunction): r""" Lower confidence bound (LCB) acquisition function: .. math:: h(\mu, \sigma) = \mu - \kappa * \sigma """ def __init__(self, predictor: Predictor, kappa: float, active_metric: str = None): super().__init__(predictor, active_metric) assert isinstance(predictor, Predictor) assert kappa > 0, "kappa must be positive" self.kappa = kappa def _head_needs_current_best(self) -> bool: return False def _compute_head( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> np.ndarray: means, stds = self._extract_mean_and_std(output_to_predictions) return np.mean(means - stds * self.kappa, axis=1) def _compute_head_and_gradient( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> HeadWithGradient: mean, std = self._extract_mean_and_std(output_to_predictions) nf_mean = mean.size dh_dmean = np.ones_like(mean) / nf_mean dh_dstd = (-self.kappa) * np.ones_like(std) return HeadWithGradient( hval=np.mean(mean - std * self.kappa), gradient={self.active_metric: dict(mean=dh_dmean, std=dh_dstd)}, )
[docs] class EIpuAcquisitionFunction(MeanStdAcquisitionFunction): r""" Minus cost-aware expected improvement acquisition function. This is defined as .. math:: \mathrm{EIpu}(x) = \frac{\mathrm{EI(x)}}{\mathrm{power}(\mathrm{cost}(x), \mathrm{exponent_cost})}, where :math:`\mathrm{EI}(x)` is expected improvement, :math:`\mathrm{cost}(x)` is the predictive mean of a cost model, and ``exponent_cost`` is an exponent in :math:`(0, 1]`. ``exponent_cost`` scales the influence of the cost term on the acquisition function. See also: | Lee etal. | Cost-aware Bayesian Optimization | https://arxiv.org/abs/2003.10870 Note: two metrics are expected in the model output: the main objective and the cost. The main objective needs to be indicated as ``active_metric`` when initializing :class:`EIpuAcquisitionFunction`. The cost is automatically assumed to be the other metric. :param predictor: Predictors for main objective and cost :param active_metric: Name of main objective :param exponent_cost: Exponent for cost in denominator. Defaults to 1 :param jitter: Jitter factor, must be positive. Defaults to 0.01 """ def __init__( self, predictor: OutputPredictor, active_metric: Optional[str] = None, exponent_cost: float = 1.0, jitter: float = 0.01, ): super().__init__(predictor, active_metric) assert ( 0 < exponent_cost <= 1 ), f"exponent_cost = {exponent_cost} must lie in (0, 1]" self.jitter = jitter self.exponent_cost = exponent_cost self.active_metric, self.cost_metric = _extract_active_and_secondary_metric( self.predictor_output_names, active_metric ) def _head_needs_current_best(self) -> bool: return True def _output_to_keys_predict(self) -> Dict[str, Set[str]]: """ The cost model may be deterministic, as the acquisition function only needs the mean. """ return { self.predictor_output_names[0]: {"mean", "std"}, self.predictor_output_names[1]: {"mean"}, } def _compute_head( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> np.ndarray: """ Returns minus the cost-aware expected improvement. """ assert current_best is not None means, stds = self._extract_mean_and_std(output_to_predictions) pred_costs = self._extract_positive_cost(output_to_predictions) # phi, Phi is PDF and CDF of Gaussian phi, Phi, u = get_quantiles(self.jitter, current_best, means, stds) f_acqu = stds * (u * Phi + phi) * np.power(pred_costs, -self.exponent_cost) return -np.mean(f_acqu, axis=1) def _compute_head_and_gradient( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> HeadWithGradient: """ Returns minus cost-aware expected improvement and, for each output predictor, the gradients with respect to the mean and standard deviation of that predictor. """ assert current_best is not None mean, std = self._extract_mean_and_std(output_to_predictions) pred_cost = self._extract_positive_cost(output_to_predictions) nf_active = mean.size nf_cost = pred_cost.size # phi, Phi is PDF and CDF of Gaussian phi, Phi, u = get_quantiles(self.jitter, current_best, mean, std) inv_cost_power = np.power(pred_cost, -self.exponent_cost) f_acqu = std * (u * Phi + phi) * inv_cost_power dh_dmean_active = _postprocess_gradient(Phi * inv_cost_power, nf=nf_active) dh_dstd_active = _postprocess_gradient(-phi * inv_cost_power, nf=1) # Flip the sign twice: once because of the derivative of 1 / x, and # once because the head is actually - f_ei dh_dmean_cost = _postprocess_gradient( self.exponent_cost * f_acqu / pred_cost, nf=nf_cost ) gradient = { self.active_metric: dict(mean=dh_dmean_active, std=dh_dstd_active), self.cost_metric: dict(mean=dh_dmean_cost), } return HeadWithGradient(hval=-np.mean(f_acqu), gradient=gradient) def _extract_positive_cost(self, output_to_predictions): pred_cost = output_to_predictions[self.cost_metric]["mean"] if np.any(pred_cost) < 0.0: logger.warning( f"The predictor for {self.cost_metric} predicted some negative cost. " f"Capping the minimum cost at {MIN_COST}." ) pred_cost = np.maximum( pred_cost, MIN_COST ) # ensure that the predicted cost/run-time is positive return pred_cost
[docs] class ConstraintCurrentBestProvider(CurrentBestProvider): """ Here, ``current_best`` depends on two predictors, for active and constraint metric. """ def __init__(self, current_best_list: List[np.ndarray], num_samples_active: int): list_size = len(current_best_list) assert list_size > 0 and list_size % num_samples_active == 0 self._active_and_constraint_current_best = [ v.reshape((1, -1)) for v in current_best_list ] self._num_samples_active = num_samples_active def __call__(self, positions: Tuple[int, ...]) -> Optional[np.ndarray]: flat_pos = positions[1] * self._num_samples_active + positions[0] return self._active_and_constraint_current_best[flat_pos]
[docs] class CEIAcquisitionFunction(MeanStdAcquisitionFunction): """ Minus legacy_constrained expected improvement acquisition function. (minus because the convention is to always minimize the acquisition function) This is defined as ``CEI(x) = EI(x) * P(c(x) <= 0)``, where ``EI`` is the standard expected improvement with respect to the current *feasible best*, and ``P(c(x) <= 0)`` is the probability that the hyperparameter configuration ``x`` satisfies the constraint modeled by ``c(x)``. If there are no feasible hyperparameters yet, the current feasible best is undefined. Thus, ``CEI`` is reduced to the ``P(c(x) <= 0)`` term until a feasible configuration is found. Two metrics are expected in the model output: the main objective and the constraint metric. The main objective needs to be indicated as ``active_metric`` when initializing :class:`CEIAcquisitionFunction`. The constraint is automatically assumed to be the other metric. References on ``CEI``: | Gardner et al. | Bayesian Optimization with Inequality Constraints | ICML 2014 and | Gelbart et al. | Bayesian Optimization with Unknown Constraints | UAI 2014. :param predictor: Predictors for main objective and cost :param active_metric: Name of main objective :param jitter: Jitter factor, must be positive. Defaults to 0.01 """ def __init__( self, predictor: OutputPredictor, active_metric: Optional[str] = None, jitter: float = 0.01, ): super().__init__(predictor, active_metric) self.jitter = jitter self._feasible_best_list = None ( self.active_metric, self.constraint_metric, ) = _extract_active_and_secondary_metric( self.predictor_output_names, active_metric ) def _head_needs_current_best(self) -> bool: return True def _compute_head( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> np.ndarray: """ Returns minus the legacy_constrained expected improvement (- CEI). """ assert current_best is not None means, stds = self._extract_mean_and_std(output_to_predictions) means_constr, stds_constr = self._extract_mean_and_std( output_to_predictions, metric=self.constraint_metric ) # Compute the probability of satisfying the constraint P(c(x) <= 0) constr_probs = norm.cdf(-means_constr / (stds_constr + MIN_STD_CONSTRAINT)) # If for some fantasies there are not feasible candidates, there is also no current_best (i.e., a nan). # The acquisition function is replaced by only the P(c(x) <= 0) term when no feasible best exist. feas_idx = ~np.isnan(current_best).reshape((1, -1)) # phi, Phi is PDF and CDF of Gaussian phi, Phi, u = get_quantiles(self.jitter, current_best, means, stds) f_ei = stds * (u * Phi + phi) # CEI(x) = EI(x) * P(c(x) <= 0) if feasible best exists, CEI(x) = P(c(x) <= 0) otherwise f_acqu = np.where(feas_idx, f_ei * constr_probs, constr_probs) return -np.mean(f_acqu, axis=1) def _compute_head_and_gradient( self, output_to_predictions: SamplePredictionsPerOutput, current_best: Optional[np.ndarray], ) -> HeadWithGradient: """ Returns minus cost-aware expected improvement (- CEI) and, for each output predictor, the gradients with respect to the mean and standard deviation of that predictor. """ assert current_best is not None mean, std = self._extract_mean_and_std(output_to_predictions) mean_constr, std_constr = self._extract_mean_and_std( output_to_predictions, metric=self.constraint_metric ) nf_mean = mean.size nf_constr = mean_constr.size # Compute the probability of satisfying the constraint P(c(x) <= 0) std_constr = std_constr + MIN_STD_CONSTRAINT z = -mean_constr / std_constr constr_prob = norm.cdf(z) # Useful variables for computing the head gradients mean_over_squared_std_constr = mean_constr / std_constr**2 inverse_std_constr = 1.0 / std_constr phi_constr = norm.pdf(z) # If for some fantasies there are not feasible candidates, there is also no current_best (i.e., a nan). # The acquisition function is replaced by only the P(c(x) <= 0) term when no feasible best exist. feas_idx = ~np.isnan(current_best) phi, Phi, u = get_quantiles( self.jitter, current_best, mean, std ) # phi, Phi is PDF and CDF of Gaussian f_ei = std * (u * Phi + phi) f_acqu = np.where( feas_idx, f_ei * constr_prob, constr_prob ) # CEI(x) = EI(x) * P(c(x) <= 0) if feasible best # exists, CEI(x) = P(c(x) <= 0) otherwise dh_dmean_constraint_feas = f_ei * inverse_std_constr * phi_constr dh_dstd_constraint_feas = -f_ei * mean_over_squared_std_constr * phi_constr dh_dmean_active_feas = Phi * constr_prob dh_dstd_active_feas = -phi * constr_prob dh_dmean_constraint_infeas = inverse_std_constr * phi_constr dh_dstd_constraint_infeas = -mean_over_squared_std_constr * phi_constr dh_dmean_active_infeas = np.zeros_like(phi_constr) dh_dstd_active_infeas = np.zeros_like(phi_constr) dh_dmean_active = _postprocess_gradient( np.where(feas_idx, dh_dmean_active_feas, dh_dmean_active_infeas), nf=nf_mean ) dh_dstd_active = _postprocess_gradient( np.where(feas_idx, dh_dstd_active_feas, dh_dstd_active_infeas), nf=1 ) dh_dmean_constraint = _postprocess_gradient( np.where(feas_idx, dh_dmean_constraint_feas, dh_dmean_constraint_infeas), nf=nf_constr, ) dh_dstd_constraint = _postprocess_gradient( np.where(feas_idx, dh_dstd_constraint_feas, dh_dstd_constraint_infeas), nf=1 ) gradient = { self.active_metric: dict(mean=dh_dmean_active, std=dh_dstd_active), self.constraint_metric: dict( mean=dh_dmean_constraint, std=dh_dstd_constraint ), } return HeadWithGradient(hval=-np.mean(f_acqu), gradient=gradient) def _get_current_bests_internal( self, predictor: OutputPredictor ) -> CurrentBestProvider: active_predictor = predictor[self.active_metric] assert isinstance(active_predictor, BasePredictor) all_means_active = active_predictor.predict_mean_current_candidates() num_samples_active = len(all_means_active) constraint_predictor = predictor[self.constraint_metric] assert isinstance(constraint_predictor, BasePredictor) all_means_constraint = constraint_predictor.predict_mean_current_candidates() common_shape = all_means_active[0].shape assert all( x.shape == common_shape for x in all_means_constraint ), "Shape mismatch between predictors for predict_mean_current_candidates" current_best_list = [] for means_constraint, means_active in itertools.product( all_means_constraint, all_means_active ): # Remove all infeasible candidates (i.e., where means_constraint # is >= 0) means_active[means_constraint >= 0] = np.nan # Compute the current *feasible* best (separately for every fantasy) min_across_observations = np.nanmin(means_active, axis=0) current_best_list.append(min_across_observations) return ConstraintCurrentBestProvider(current_best_list, num_samples_active)
[docs] def get_quantiles(acquisition_par, fmin, m, s): """ Quantiles of the Gaussian distribution, useful to determine the acquisition function values. :param acquisition_par: parameter of the acquisition function :param fmin: current minimum. :param m: vector of means. :param s: vector of standard deviations. """ if isinstance(s, np.ndarray): s[s < 1e-10] = 1e-10 elif s < 1e-10: s = 1e-10 u = (fmin - m - acquisition_par) / s phi = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi) # vectorized version of erfc to not depend on scipy Phi = 0.5 * erfc(-u / np.sqrt(2)) return phi, Phi, u