# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from typing import Callable, Tuple, List, Optional, Dict, Any
from dataclasses import dataclass
import numpy as np
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.constants import (
OptimizationConfig,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.hypertune.likelihood import (
HyperTuneIndependentGPMarginalLikelihood,
HyperTuneJointGPMarginalLikelihood,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.hypertune.utils import (
number_supported_levels_and_data_highest_level,
hypertune_ranking_losses,
GaussProcPosteriorStateAndRungLevels,
PerResourcePosteriorState,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.independent.gpind_model import (
IndependentGPPerResourceModel,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.independent.posterior_state import (
IndependentGPPerResourcePosteriorState,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.gp_regression import (
GaussianProcessRegression,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel import (
KernelFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
MeanFunction,
ScalarMeanFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.target_transform import (
ScalarTargetTransform,
)
[docs]
@dataclass
class HyperTuneDistributionArguments:
num_samples: int
num_brackets: Optional[int] = None
def __post_init__(self):
assert self.num_brackets is None or self.num_brackets >= 1
assert self.num_samples >= 1
[docs]
class HyperTuneModelMixin:
def __init__(self, hypertune_distribution_args: HyperTuneDistributionArguments):
self.hypertune_distribution_args = hypertune_distribution_args
self._bracket_distribution = None
# Tuple (num_supp_levels, data_size) for current distribution. If
# this signature is different in ``fit``, the distribution is recomputed
self._hypertune_distribution_signature = None
[docs]
def hypertune_bracket_distribution(self) -> Optional[np.ndarray]:
"""
Distribution [w_k] of support size ``num_supp_brackets``, where
``num_supp_brackets <= args.num_brackets`` (the latter is maximum if
not given) is maximum such that the first ``num_supp_brackets``
brackets have >= 6 labeled datapoints each.
If ``num_supp_brackets < args.num_brackets``, the distribution must be
extended to full size before being used to sample the next bracket.
"""
return self._bracket_distribution
[docs]
def hypertune_ensemble_distribution(self) -> Optional[Dict[int, float]]:
"""
Distribution [theta_r] which is used to create an ensemble predictive
distribution fed into the acquisition function.
The ensemble distribution runs over all sufficiently supported rung
levels, independent of the number of brackets.
"""
raise NotImplementedError
[docs]
def fit_distributions(
self,
poster_state: PerResourcePosteriorState,
data: Dict[str, Any],
resource_attr_range: Tuple[int, int],
random_state: np.random.RandomState,
) -> Optional[Dict[int, float]]:
ensemble_distribution = None
args = self.hypertune_distribution_args
(
num_supp_levels,
data_resource,
) = number_supported_levels_and_data_highest_level(
rung_levels=poster_state.rung_levels,
data=data,
resource_attr_range=resource_attr_range,
)
if num_supp_levels > 1:
num_data = data_resource["features"].shape[0]
curr_sig = self._hypertune_distribution_signature
new_sig = (num_supp_levels, num_data)
if curr_sig is None or new_sig != curr_sig:
# Data at highest level has changed
self._hypertune_distribution_signature = new_sig
ranking_losses = hypertune_ranking_losses(
poster_state=poster_state,
data=data,
num_samples=args.num_samples,
resource_attr_range=resource_attr_range,
random_state=random_state,
)
min_losses = np.min(ranking_losses, axis=0, keepdims=True)
theta = np.sum(ranking_losses == min_losses, axis=1).reshape((-1,))
theta = theta / np.sum(theta)
# We sparsify the ensemble distribution
rung_levels = poster_state.rung_levels[: theta.size]
ensemble_distribution = {
resource: theta_val
for resource, theta_val in zip(rung_levels, theta)
if theta_val > 0.01
}
self._bracket_distribution = theta * np.array(
[1 / r for r in rung_levels]
)
if args.num_brackets < theta.size:
self._bracket_distribution = self._bracket_distribution[
: args.num_brackets
]
norm_const = np.sum(self._bracket_distribution)
if norm_const > 1e-14:
self._bracket_distribution /= norm_const
else:
self._bracket_distribution[:] = 0.0
self._bracket_distribution[0] = 1.0
return ensemble_distribution
[docs]
class HyperTuneIndependentGPModel(IndependentGPPerResourceModel, HyperTuneModelMixin):
"""
Variant of :class:`IndependentGPPerResourceModel` which implements additional
features of the Hyper-Tune algorithm, see
| Yang Li et al
| Hyper-Tune: Towards Efficient Hyper-parameter Tuning at Scale
| VLDB 2022
Our implementation differs from the Hyper-Tune paper in a number of ways.
Most importantly, their method requires a sufficient number of observed
points at the starting rung of the highest bracket. In contrast, we
estimate ranking loss values already when the starting rung of the 2nd
bracket is sufficiently occupied. This allows us to estimate the head
of the distribution only (over all brackets with sufficiently occupied
starting rungs), and we use the default distribution over the remaining
tail. Eventually, we do the same as Hyper-Tune, but we move away from the
default distribution earlier on.
:param hypertune_distribution_args: Parameters for Hyper-Tune
"""
def __init__(
self,
kernel: KernelFunction,
mean_factory: Callable[[int], MeanFunction],
resource_attr_range: Tuple[int, int],
hypertune_distribution_args: HyperTuneDistributionArguments,
target_transform: Optional[ScalarTargetTransform] = None,
separate_noise_variances: bool = False,
initial_noise_variance: Optional[float] = None,
initial_covariance_scale: Optional[float] = None,
optimization_config: Optional[OptimizationConfig] = None,
random_seed=None,
fit_reset_params: bool = True,
):
IndependentGPPerResourceModel.__init__(
self,
kernel=kernel,
mean_factory=mean_factory,
resource_attr_range=resource_attr_range,
target_transform=target_transform,
separate_noise_variances=separate_noise_variances,
initial_noise_variance=initial_noise_variance,
initial_covariance_scale=initial_covariance_scale,
optimization_config=optimization_config,
random_seed=random_seed,
fit_reset_params=fit_reset_params,
)
HyperTuneModelMixin.__init__(
self, hypertune_distribution_args=hypertune_distribution_args
)
[docs]
def create_likelihood(self, rung_levels: List[int]):
"""
Delayed creation of likelihood, needs to know rung levels of Hyperband
scheduler.
Note: last entry of ``rung_levels`` must be ``max_t``, even if this is not
a rung level in Hyperband.
:param rung_levels: Rung levels
"""
mean = {resource: self._mean_factory(resource) for resource in rung_levels}
# Safe bet to start with:
ensemble_distribution = {min(rung_levels): 1.0}
self._likelihood = HyperTuneIndependentGPMarginalLikelihood(
mean=mean,
ensemble_distribution=ensemble_distribution,
**self._likelihood_kwargs,
)
self.reset_params()
[docs]
def hypertune_ensemble_distribution(self) -> Optional[Dict[int, float]]:
if self._likelihood is not None:
return self._likelihood.ensemble_distribution
else:
return None
[docs]
def fit(self, data: Dict[str, Any]):
super().fit(data)
poster_state: IndependentGPPerResourcePosteriorState = self.states[0]
ensemble_distribution = self.fit_distributions(
poster_state=poster_state,
data=data,
resource_attr_range=self._resource_attr_range,
random_state=self.random_state,
)
if ensemble_distribution is not None:
# Recompute posterior state (likelihood changed)
self._likelihood.set_ensemble_distribution(ensemble_distribution)
self._recompute_states(data)
[docs]
class HyperTuneJointGPModel(GaussianProcessRegression, HyperTuneModelMixin):
"""
Variant of :class:`GaussianProcessRegression` which implements additional
features of the Hyper-Tune algorithm, see
Yang Li et al
Hyper-Tune: Towards Efficient Hyper-parameter Tuning at Scale
VLDB 2022
See also :class:`HyperTuneIndependentGPModel`
:param hypertune_distribution_args: Parameters for Hyper-Tune
"""
def __init__(
self,
kernel: KernelFunction,
resource_attr_range: Tuple[int, int],
hypertune_distribution_args: HyperTuneDistributionArguments,
mean: Optional[MeanFunction] = None,
target_transform: Optional[ScalarTargetTransform] = None,
initial_noise_variance: Optional[float] = None,
optimization_config: Optional[OptimizationConfig] = None,
random_seed=None,
fit_reset_params: bool = True,
):
if mean is None:
mean = ScalarMeanFunction()
GaussianProcessRegression.__init__(
self,
kernel=kernel,
mean=mean,
target_transform=target_transform,
initial_noise_variance=initial_noise_variance,
optimization_config=optimization_config,
random_seed=random_seed,
fit_reset_params=fit_reset_params,
)
HyperTuneModelMixin.__init__(
self, hypertune_distribution_args=hypertune_distribution_args
)
self._likelihood_kwargs = dict(
kernel=kernel,
mean=mean,
target_transform=target_transform,
resource_attr_range=resource_attr_range,
initial_noise_variance=initial_noise_variance,
)
self._likelihood = None
self._rung_levels = None
[docs]
def create_likelihood(self, rung_levels: List[int]):
"""
Delayed creation of likelihood, needs to know rung levels of Hyperband
scheduler.
Note: last entry of ``rung_levels`` must be ``max_t``, even if this is not
a rung level in Hyperband.
:param rung_levels: Rung levels
"""
self._rung_levels = rung_levels.copy()
# Safe bet to start with:
ensemble_distribution = {min(rung_levels): 1.0}
self._likelihood = HyperTuneJointGPMarginalLikelihood(
ensemble_distribution=ensemble_distribution,
**self._likelihood_kwargs,
)
self.reset_params()
[docs]
def hypertune_ensemble_distribution(self) -> Optional[Dict[int, float]]:
if self._likelihood is not None:
return self._likelihood.ensemble_distribution
else:
return None
[docs]
def fit(self, data: Dict[str, Any]):
super().fit(data)
resource_attr_range = self._likelihood_kwargs["resource_attr_range"]
poster_state = GaussProcPosteriorStateAndRungLevels(
poster_state=self.states[0],
rung_levels=self._rung_levels,
)
ensemble_distribution = self.fit_distributions(
poster_state=poster_state,
data=data,
resource_attr_range=resource_attr_range,
random_state=self.random_state,
)
if ensemble_distribution is not None:
# Recompute posterior state (likelihood changed)
self._likelihood.set_ensemble_distribution(ensemble_distribution)
self._recompute_states(data)