import numpy as np
import autograd.numpy as anp
from autograd.tracer import getval
from typing import Optional, Tuple, List, Union, Dict, Callable, Any
from numpy.random import RandomState
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_impl import (
decode_extended_features,
HyperparameterRangeInteger,
)
from syne_tune.optimizer.schedulers.searchers.utils.scaling import (
LinearScaling,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.independent.posterior_state import (
IndependentGPPerResourcePosteriorState,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.posterior_state import (
GaussProcPosteriorState,
PosteriorStateWithSampleJoint,
backward_gradient_given_predict,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
MeanFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel.base import (
KernelFunction,
)
[docs]
class ExtendFeaturesByResourceMixin:
def __init__(self, resource: int, resource_attr_range: Tuple[int, int]):
hp_range = HyperparameterRangeInteger(
name="resource",
lower_bound=resource_attr_range[0],
upper_bound=resource_attr_range[1],
scaling=LinearScaling(),
)
self._resource_encoded = hp_range.to_ndarray(resource).item()
[docs]
def extend_features_by_resource(self, test_features: np.ndarray) -> np.ndarray:
shape = (getval(test_features.shape[0]), 1)
extra_col = anp.full(shape, self._resource_encoded)
return anp.concatenate((test_features, extra_col), axis=1)
[docs]
class PosteriorStateClampedResource(
PosteriorStateWithSampleJoint, ExtendFeaturesByResourceMixin
):
"""
Converts posterior state of :class:`PosteriorStateWithSampleJoint`
over extended inputs into posterior state over non-extended inputs, where
the resource attribute is clamped to a fixed value.
:param poster_state_extended: Posterior state over extended inputs
:param resource: Value to which resource attribute is clamped
:param resource_attr_range: :math:`(r_{min}, r_{max})`
"""
def __init__(
self,
poster_state_extended: PosteriorStateWithSampleJoint,
resource: int,
resource_attr_range: Tuple[int, int],
):
ExtendFeaturesByResourceMixin.__init__(self, resource, resource_attr_range)
self._poster_state_extended = poster_state_extended
@property
def num_data(self):
return self._poster_state_extended.num_data
@property
def num_features(self):
return self._poster_state_extended.num_features - 1
@property
def num_fantasies(self):
return self._poster_state_extended.num_fantasies
[docs]
def neg_log_likelihood(self) -> anp.ndarray:
return self._poster_state_extended.neg_log_likelihood()
[docs]
def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
return self._poster_state_extended.predict(
self.extend_features_by_resource(test_features)
)
[docs]
def sample_marginals(
self,
test_features: np.ndarray,
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> np.ndarray:
return self._poster_state_extended.sample_marginals(
test_features=self.extend_features_by_resource(test_features),
num_samples=num_samples,
random_state=random_state,
)
[docs]
def sample_joint(
self,
test_features: np.ndarray,
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> np.ndarray:
return self._poster_state_extended.sample_joint(
test_features=self.extend_features_by_resource(test_features),
num_samples=num_samples,
random_state=random_state,
)
[docs]
def backward_gradient(
self,
input: np.ndarray,
head_gradients: Dict[str, np.ndarray],
mean_data: float,
std_data: float,
) -> np.ndarray:
def predict_func(test_feature_array):
return self.predict(test_feature_array)
return backward_gradient_given_predict(
predict_func=predict_func,
input=input,
head_gradients=head_gradients,
mean_data=mean_data,
std_data=std_data,
)
[docs]
class MeanFunctionClampedResource(MeanFunction, ExtendFeaturesByResourceMixin):
def __init__(
self,
mean_extended: MeanFunction,
resource: int,
resource_attr_range: Tuple[int, int],
**kwargs,
):
MeanFunction.__init__(self, **kwargs)
ExtendFeaturesByResourceMixin.__init__(self, resource, resource_attr_range)
self._mean_extended = mean_extended
[docs]
def param_encoding_pairs(self):
return self._mean_extended.param_encoding_pairs()
[docs]
def get_params(self) -> Dict[str, Any]:
return self._mean_extended.get_params()
[docs]
def set_params(self, param_dict: Dict[str, Any]):
self._mean_extended.set_params(param_dict)
[docs]
def forward(self, X):
return self._mean_extended.forward(self.extend_features_by_resource(X))
[docs]
class KernelFunctionClampedResource(KernelFunction, ExtendFeaturesByResourceMixin):
def __init__(
self,
kernel_extended: KernelFunction,
resource: int,
resource_attr_range: Tuple[int, int],
**kwargs,
):
KernelFunction.__init__(self, dimension=kernel_extended.dimension - 1, **kwargs)
ExtendFeaturesByResourceMixin.__init__(self, resource, resource_attr_range)
self._kernel_extended = kernel_extended
[docs]
def param_encoding_pairs(self):
return self._kernel_extended.param_encoding_pairs()
[docs]
def get_params(self) -> Dict[str, Any]:
return self._kernel_extended.get_params()
[docs]
def set_params(self, param_dict: Dict[str, Any]):
self._kernel_extended.set_params(param_dict)
[docs]
def diagonal(self, X):
return self._kernel_extended.diagonal(self.extend_features_by_resource(X))
[docs]
def diagonal_depends_on_X(self):
return self._kernel_extended.diagonal_depends_on_X()
[docs]
def forward(self, X1, X2):
X1_ext = self.extend_features_by_resource(X1)
if X2 is X1:
X2_ext = X1_ext
else:
X2_ext = self.extend_features_by_resource(X2)
return self._kernel_extended.forward(X1_ext, X2_ext)
[docs]
class GaussProcPosteriorStateAndRungLevels(PosteriorStateWithSampleJoint):
def __init__(
self,
poster_state: GaussProcPosteriorState,
rung_levels: List[int],
):
self._poster_state = poster_state
self._rung_levels = rung_levels
@property
def poster_state(self) -> GaussProcPosteriorState:
return self._poster_state
@property
def num_data(self):
return self._poster_state.num_data
@property
def num_features(self):
return self._poster_state.num_features
@property
def num_fantasies(self):
return self._poster_state.num_fantasies
[docs]
def neg_log_likelihood(self) -> anp.ndarray:
return self._poster_state.neg_log_likelihood()
[docs]
def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
return self._poster_state.predict(test_features)
[docs]
def sample_marginals(
self,
test_features: np.ndarray,
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> np.ndarray:
return self._poster_state.sample_marginals(
test_features=test_features,
num_samples=num_samples,
random_state=random_state,
)
[docs]
def sample_joint(
self,
test_features: np.ndarray,
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> np.ndarray:
return self._poster_state.sample_joint(
test_features=test_features,
num_samples=num_samples,
random_state=random_state,
)
[docs]
def backward_gradient(
self,
input: np.ndarray,
head_gradients: Dict[str, np.ndarray],
mean_data: float,
std_data: float,
) -> np.ndarray:
return self._poster_state.backward_gradient(
input=input,
head_gradients=head_gradients,
mean_data=mean_data,
std_data=std_data,
)
@property
def rung_levels(self) -> List[int]:
return self._rung_levels
PerResourcePosteriorState = Union[
IndependentGPPerResourcePosteriorState,
GaussProcPosteriorStateAndRungLevels,
]
def _posterior_state_for_rung_level(
poster_state: PerResourcePosteriorState,
resource: int,
resource_attr_range: Tuple[int, int],
) -> Union[GaussProcPosteriorState, PosteriorStateClampedResource]:
if isinstance(poster_state, IndependentGPPerResourcePosteriorState):
return poster_state.state(resource)
else:
return PosteriorStateClampedResource(
poster_state_extended=poster_state,
resource=resource,
resource_attr_range=resource_attr_range,
)
[docs]
def hypertune_ranking_losses(
poster_state: PerResourcePosteriorState,
data: Dict[str, Any],
num_samples: int,
resource_attr_range: Tuple[int, int],
random_state: Optional[RandomState] = None,
) -> np.ndarray:
"""
Samples ranking loss values as defined in the Hyper-Tune paper. We return a
matrix of size ``(num_supp_levels, num_samples)``, where
``num_supp_levels <= poster_state.rung_levels`` is the number of rung levels
supported by at least 6 labeled datapoints.
The loss values depend on the cases in ``data`` at the level
:code:`poster_state.rung_levels[num_supp_levels - 1]`. We must have
``num_supp_levels >= 2``.
Loss values at this highest supported level are estimated by
cross-validation (so the data at this level is split into training and
test, where the training part is used to obtain the posterior state). The
number of CV folds is ``<= 5``, and such that each fold has at least two
points.
:param poster_state: Posterior state over rung levels
:param data: Training data
:param num_samples: Number of independent loss samples
:param resource_attr_range: ``(r_min, r_max)``
:param random_state: PRNG state
:return: See above
"""
independent_models = isinstance(
poster_state, IndependentGPPerResourcePosteriorState
)
if not independent_models:
assert isinstance(poster_state, GaussProcPosteriorStateAndRungLevels), (
"poster_state needs to be IndependentGPPerResourcePosteriorState "
"or GaussProcPosteriorStateAndRungLevels"
)
rung_levels = poster_state.rung_levels
(
num_supp_levels,
data_max_resource,
) = number_supported_levels_and_data_highest_level(
rung_levels=rung_levels,
data=data,
resource_attr_range=resource_attr_range,
)
assert (
num_supp_levels > 1
), "Need to have at least 6 labeled datapoints at 2nd lowest rung level"
max_resource = rung_levels[num_supp_levels - 1]
loss_values = np.zeros((num_supp_levels, num_samples))
# All loss values except for maximum rung (which is special)
common_kwargs = dict(
data_max_resource=data_max_resource,
num_samples=num_samples,
random_state=random_state,
)
for pos, resource in enumerate(rung_levels[: (num_supp_levels - 1)]):
loss_values[pos] = _losses_for_rung(
poster_state=_posterior_state_for_rung_level(
poster_state, resource, resource_attr_range
),
**common_kwargs,
)
# Loss values for maximum rung: Five-fold cross-validation
if independent_models:
poster_state_max_resource = poster_state.state(max_resource)
mean_max_resource = poster_state_max_resource.mean
kernel_max_resource = poster_state_max_resource.kernel
noise_variance = poster_state_max_resource.noise_variance
else:
poster_state_int = poster_state.poster_state
mean_max_resource = MeanFunctionClampedResource(
mean_extended=poster_state_int.mean,
resource=max_resource,
resource_attr_range=resource_attr_range,
)
kernel_max_resource = KernelFunctionClampedResource(
kernel_extended=poster_state_int.kernel,
resource=max_resource,
resource_attr_range=resource_attr_range,
)
noise_variance = poster_state_int.noise_variance
def poster_state_for_fold(
features: np.ndarray, targets: np.ndarray
) -> PosteriorStateWithSampleJoint:
return GaussProcPosteriorState(
features=features,
targets=targets,
mean=mean_max_resource,
kernel=kernel_max_resource,
noise_variance=noise_variance,
)
loss_values[-1] = _losses_for_maximum_rung_by_cross_validation(
poster_state_for_fold=poster_state_for_fold, **common_kwargs
)
return loss_values
[docs]
def number_supported_levels_and_data_highest_level(
rung_levels: List[int],
data: Dict[str, Any],
resource_attr_range: Tuple[int, int],
) -> Tuple[int, dict]:
"""
Finds ``num_supp_levels`` as maximum such that
rung levels up to there have ``>= 6`` labeled datapoints. The set
of labeled datapoints of level ``num_supp_levels - 1`` is
returned as well.
If ``num_supp_levels == 1``, no level except for the lowest
has ``>= 6`` datapoints. In this case, ``data_max_resource`` returned
is invalid.
:param rung_levels: Rung levels
:param data: Training data (only data at highest level is used)
:param resource_attr_range: ``(r_min, r_max)``
:return: ``(num_supp_levels, data_max_resource)``
"""
num_rungs = len(rung_levels)
assert num_rungs >= 2, "There must be at least 2 rung levels"
num_supp_levels = num_rungs
data_max_resource = None
while num_supp_levels > 1:
max_resource = rung_levels[num_supp_levels - 1]
data_max_resource = _extract_data_at_resource(
data=data, resource=max_resource, resource_attr_range=resource_attr_range
)
if data_max_resource["features"].shape[0] >= 6:
break
num_supp_levels -= 1
if num_supp_levels == 1:
data_max_resource = None
return num_supp_levels, data_max_resource
def _extract_data_at_resource(
data: Dict[str, Any], resource: int, resource_attr_range: Tuple[int, int]
) -> Dict[str, Any]:
features_ext = data["features"]
targets = data["targets"]
num_fantasies = targets.shape[1] if targets.ndim == 2 else 1
features, resources = decode_extended_features(
features_ext=features_ext, resource_attr_range=resource_attr_range
)
ind = resources == resource
features_max = features[ind]
targets_max = targets[ind].reshape((-1, num_fantasies))
if num_fantasies > 1:
# Remove pending evaluations at highest level (they are ignored). We
# detect observed cases by all target values being the same.
ind = np.array([x == np.full(num_fantasies, x[0]) for x in targets_max])
features_max = features_max[ind]
targets_max = targets_max[ind, 0]
return {"features": features_max, "targets": targets_max.reshape((-1,))}
def _losses_for_maximum_rung_by_cross_validation(
poster_state_for_fold: Callable[
[np.ndarray, np.ndarray], PosteriorStateWithSampleJoint
],
data_max_resource: Dict[str, Any],
num_samples: int,
random_state: Optional[RandomState],
) -> np.ndarray:
"""
Estimates loss samples at highest rung by K-fold cross-validation, where
``K <= 5`` is chosen such that each fold has at least 2 points (since
:code:`len(data_max_resource) >= 6`, we have ``K >= 3``).
``poster_state_for_fold`` maps training data ``(features, targets)`` to
posterior state.
For simplicity, we ignore pending evaluations here. They would affect the
result only if they are at the highest level.
Note that for a joint (multi-task) GP model, the per-fold models use
restrictions of mean and covariance function learned on all data, but
the posteriors are conditioned on the max resource data only.
"""
features = data_max_resource["features"]
targets = data_max_resource["targets"]
num_data = features.shape[0]
# K <= 5, and each fold has at least two datapoints
num_folds = min(num_data // 2, 5)
low_val = num_data // num_folds
fold_sizes = np.full(num_folds, low_val)
incr_ind = num_folds - num_data + low_val * num_folds
fold_sizes[incr_ind:] += 1
# Loop over folds
result = np.zeros(num_samples)
start = 0
for fold_size in fold_sizes:
end = start + fold_size
train_data = {
"features": np.vstack((features[:start], features[end:])),
"targets": np.concatenate((targets[:start], targets[end:])),
}
test_data = {
"features": features[start:end],
"targets": targets[start:end],
}
start = end
# Note: If there are pending evaluations at the highest level, they
# are not taken into account here (no fantasizing).
poster_state_fold = poster_state_for_fold(
train_data["features"], train_data["targets"]
)
result += _losses_for_rung(
poster_state=poster_state_fold,
data_max_resource=test_data,
num_samples=num_samples,
random_state=random_state,
)
result *= 1 / num_folds
return result
def _losses_for_rung(
poster_state: PosteriorStateWithSampleJoint,
data_max_resource: Dict[str, Any],
num_samples: int,
random_state: Optional[RandomState],
) -> np.ndarray:
joint_sample = poster_state.sample_joint(
test_features=data_max_resource["features"],
num_samples=num_samples,
random_state=random_state,
)
targets = data_max_resource["targets"]
num_data = joint_sample.shape[0]
result = np.zeros(joint_sample.shape[1:])
for j, k in ((j, k) for j in range(num_data - 1) for k in range(j + 1, num_data)):
yj_lt_yk = targets[j] < targets[k]
fj_lt_fk = joint_sample[j] < joint_sample[k]
result += np.logical_xor(fj_lt_fk, yj_lt_yk)
result *= 2 / (num_data * (num_data - 1))
if poster_state.num_fantasies > 1:
assert result.ndim == 2 and result.shape == (
poster_state.num_fantasies,
num_samples,
), result.shape
result = np.mean(result, axis=0)
return result.reshape((-1,))