# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import numpy as np
import autograd.numpy as anp
from autograd import grad
from typing import Tuple, Dict, List, Optional, Any
from numpy.random import RandomState
import logging
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel import (
KernelFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
MeanFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.posterior_state import (
PosteriorState,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.issm import (
issm_likelihood_slow_computations,
posterior_computations,
sample_posterior_marginals,
_inner_product,
issm_likelihood_computations,
issm_likelihood_precomputations,
_rowvec,
update_posterior_state,
update_posterior_pvec,
_flatvec,
_colvec,
)
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_impl import (
decode_extended_features,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.issm import (
sample_posterior_joint as sample_posterior_joint_issm,
predict_posterior_marginals_extended as predict_posterior_marginals_issm,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.freeze_thaw import (
sample_posterior_joint as sample_posterior_joint_expdecay,
predict_posterior_marginals_extended as predict_posterior_marginals_expdecay,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.model_params import (
ISSModelParameters,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.freeze_thaw import (
resource_kernel_likelihood_slow_computations,
ExponentialDecayBaseKernelFunction,
logdet_cholfact_cov_resource,
resource_kernel_likelihood_computations,
resource_kernel_likelihood_precomputations,
)
from syne_tune.optimizer.schedulers.searchers.utils.common import Configuration
logger = logging.getLogger(__name__)
__all__ = [
"GaussProcAdditivePosteriorState",
"IncrementalUpdateGPAdditivePosteriorState",
"GaussProcISSMPosteriorState",
"GaussProcExpDecayPosteriorState",
]
[docs]
class GaussProcAdditivePosteriorState(PosteriorState):
"""
Represent posterior state for joint Gaussian model of learning curves over
a number of configurations. The (additive) model is the sum of a Gaussian
process model for function values at r_max and independent Gaussian models
over r only.
Importantly, inference scales cubically only in the number of
configurations, not in the number of observations.
"""
def __init__(
self,
data: Optional[dict],
mean: MeanFunction,
kernel: KernelFunction,
noise_variance,
**kwargs
):
"""
``data`` contains input points and targets, as obtained from
``issm.prepare_data``. ``iss_model`` maintains the ISSM parameters.
:param data: Input points and targets
:param mean: Mean function m(X)
:param kernel: Kernel function k(X, X')
:param noise_variance: Noise variance
"""
self.mean = mean
self.kernel = kernel
self.noise_variance = noise_variance
self.poster_state = None
if data is not None:
self.r_min = data["r_min"]
self.r_max = data["r_max"]
# Compute posterior state
self._compute_posterior_state(data, noise_variance, **kwargs)
else:
# Copy constructor, used by ``IncrementalUpdateGPISSMPosteriorState``
# subclass
self.poster_state = kwargs["poster_state"]
self.r_min = kwargs["r_min"]
self.r_max = kwargs["r_max"]
@property
def num_data(self):
return self.poster_state["features"].shape[0]
@property
def num_features(self):
return self.poster_state["features"].shape[1]
@property
def num_fantasies(self):
return self.poster_state["pmat"].shape[1]
def _compute_posterior_state(self, data: Dict[str, Any], noise_variance, **kwargs):
raise NotImplementedError()
[docs]
def neg_log_likelihood(self) -> anp.ndarray:
assert (
"criterion" in self.poster_state
), "neg_log_likelihood not defined for fantasizing posterior state"
return self.poster_state["criterion"]
[docs]
def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
We compute marginals over f(x, r), where ``test_features`` are extended
features.
Note: The test configs must not overlap with any in the training set.
Otherwise, at least if ``r != r_max``, the predictive distributions
computed here may be wrong.
:param test_features: Extended features for test configs
:return: posterior_means, posterior_variances
"""
raise NotImplementedError()
[docs]
def sample_marginals(
self,
test_features: np.ndarray,
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> np.ndarray:
"""
See comments of ``predict``.
:param test_features: Input points for test configs
:param num_samples: Number of samples
:param random_state: PRNG
:return: Marginal samples, (num_test, num_samples)
"""
if random_state is None:
random_state = np.random
return sample_posterior_marginals(
self.poster_state,
self.mean,
self.kernel,
test_features,
random_state=random_state,
num_samples=num_samples,
)
[docs]
def backward_gradient(
self,
input: np.ndarray,
head_gradients: Dict[str, np.ndarray],
mean_data: float,
std_data: float,
) -> np.ndarray:
"""
Implements Predictor.backward_gradient, see comments there.
This is for a single posterior state. If the Predictor uses
MCMC, have to call this for every sample.
:param input: Single input point x, shape (d,)
:param head_gradients: See Predictor.backward_gradient
:param mean_data: Mean used to normalize targets
:param std_data: Stddev used to normalize targets
:return:
"""
test_feature = np.reshape(input, (1, -1))
def diff_test_feature(test_feature_array):
norm_mean, norm_variance = self.predict(test_feature_array)
# De-normalize, and variance -> stddev
pred_mean = norm_mean * std_data + mean_data
pred_std = anp.sqrt(norm_variance) * std_data
head_gradients_mean = anp.reshape(head_gradients["mean"], pred_mean.shape)
head_gradients_std = anp.reshape(head_gradients["std"], pred_std.shape)
# Added to mimic mxnet.autograd.backward
pred_mean_sum = _inner_product(pred_mean, head_gradients_mean)
pred_std_sum = _inner_product(pred_std, head_gradients_std)
return pred_mean_sum + pred_std_sum
test_feature_gradient = grad(diff_test_feature)
return np.reshape(test_feature_gradient(test_feature), input.shape)
def _sample_curves_internal(
self,
data: Dict[str, Any],
poster_state: Dict[str, Any],
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> List[dict]:
raise NotImplementedError()
[docs]
def sample_curves(
self,
data: Dict[str, Any],
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> List[dict]:
"""
Given data from one or more configurations (as returned by
``issm.prepare_data``), for each config, sample a curve from the
joint posterior (predictive) distribution over latent targets.
The curve for each config in ``data`` may be partly observed, but
must not be fully observed. Samples for the different configs are
independent. None of the configs in ``data`` must appear in the dataset
used to compute the posterior state.
The result is a list of dict, one for each config. If for a config,
targets in ``data`` are given for resource values range(r_min, r_obs),
the dict entry ``y`` is a joint sample [y_r], r in range(r_obs, r_max+1).
For some subclasses (e.g., ISSM), there is also an entry ``f`` with a
joint sample [f_r], r in range(r_obs-1, r_max+1), the latent function
values before noise. These entries are matrices with ``num_samples``
columns, which are independent (the joint dependence is along the rows).
:param data: Data for configs to predict at
:param num_samples: Number of samples to draw from each curve
:param random_state: PRNG state to be used for sampling
:return: See above
"""
return self._sample_curves_internal(
data=data,
poster_state=self.poster_state,
num_samples=num_samples,
random_state=random_state,
)
[docs]
@staticmethod
def has_precomputations(data: Dict[str, Any]) -> bool:
raise NotImplementedError()
[docs]
class IncrementalUpdateGPAdditivePosteriorState(GaussProcAdditivePosteriorState):
"""
Extension of :class:`GaussProcAdditivePosteriorState` which allows for
incremental updating (single config added to the dataset).
This is required for simulation-based scoring, and for support of
fantasizing.
"""
def __init__(
self,
data: Optional[dict],
mean: MeanFunction,
kernel: KernelFunction,
noise_variance,
**kwargs
):
super().__init__(data, mean, kernel, noise_variance, **kwargs)
def _prepare_update(
self, feature: np.ndarray, targets: np.ndarray
) -> Tuple[float, float, float]:
"""
Helper method for ``update``. Returns new entries for d, s, r2 vectors.
:param feature: See ``update``
:param targets: See ``update``
:return: (d_new, s_new, r2_new)
"""
raise NotImplementedError()
def _update_internal(
self, feature: np.ndarray, targets: np.ndarray
) -> Dict[str, Any]:
"""
Update posterior state, given a single new datapoint. ``feature``,
``targets`` are like one entry of ``data``. The method returns a new
object with the updated state.
:param feature: See above
:param targets: See above
:return: Arguments to create new posterior state
"""
# Update posterior state
feature = _rowvec(feature, _np=np)
d_new, s_new, r2_new = self._prepare_update(feature, targets)
new_poster_state = update_posterior_state(
self.poster_state, self.kernel, feature, d_new, s_new, r2_new
)
# Return args to create new object by way of "copy constructor"
return dict(
data=None,
mean=self.mean,
kernel=self.kernel,
noise_variance=self.noise_variance,
poster_state=new_poster_state,
r_min=self.r_min,
r_max=self.r_max,
)
[docs]
def update(
self, feature: np.ndarray, targets: np.ndarray
) -> "IncrementalUpdateGPAdditivePosteriorState":
raise NotImplementedError()
[docs]
def update_pvec(self, feature: np.ndarray, targets: np.ndarray) -> np.ndarray:
"""
Part of ``update``: Only update prediction vector p. This cannot be used
to update p for several new datapoints.
:param feature:
:param targets:
:return: New p vector
"""
feature = _rowvec(feature, _np=np)
d_new, s_new, r2_new = self._prepare_update(feature, targets)
return update_posterior_pvec(
self.poster_state, self.kernel, feature, d_new, s_new, r2_new
)
def _sample_posterior_joint_for_config(
self,
poster_state: Dict[str, Any],
config: Configuration,
feature: np.ndarray,
targets: np.ndarray,
random_state: RandomState,
) -> np.ndarray:
data = {"features": [feature], "targets": [targets], "configs": [config]}
results = self._sample_curves_internal(
data, poster_state, num_samples=1, random_state=random_state
)
return results[0]["y"]
[docs]
def sample_and_update_for_pending(
self,
data_pending: Dict[str, Any],
sample_all_nonobserved: bool = False,
random_state: Optional[RandomState] = None,
) -> (List[np.ndarray], "IncrementalUpdateGPAdditivePosteriorState"):
"""
This function is needed for sampling fantasy targets, and also to
support simulation-based scoring.
``issm.prepare_data_with_pending`` creates two data dicts ``data_nopending``,
``data_pending``, the first for configs with observed data, but no
pending evals, the second for configs with pending evals.
You create the state with ``data_nopending``, then call this method with
``data_pending``.
This method is iterating over configs (or trials) in ``data_pending``.
For each config, it draws a joint sample from some non-observed
targets, then updates the state conditioned on observed and sampled
targets (by calling ``update``). If ``sample_all_nonobserved`` is False,
the number of targets sampled is the entry in
``data_pending['num_pending']``. Otherwise, targets are sampled for all
non-observed positions.
The method returns the list of sampled target vectors, and the state
at the end (like ``update`` does as well).
:param data_pending: See above
:param sample_all_nonobserved: See above
:param random_state: PRNG
:return: pending_targets, final_state
"""
if random_state is None:
random_state = np.random
curr_poster_state = self.poster_state
targets_lst = []
final_state = self
for config, feature, targets, num_pending in zip(
data_pending["configs"],
data_pending["features"],
data_pending["targets"],
data_pending["num_pending"],
):
# Draw joint sample
fantasies = _flatvec(
self._sample_posterior_joint_for_config(
curr_poster_state, config, feature, targets, random_state
),
_np=np,
)
if not sample_all_nonobserved:
fantasies = fantasies[:num_pending]
fantasies = _colvec(fantasies, _np=np)
targets_lst.append(fantasies)
# Update state
full_targets = np.vstack((targets, fantasies))
final_state = self.update(feature, full_targets)
curr_poster_state = final_state.poster_state
return targets_lst, final_state
[docs]
class GaussProcISSMPosteriorState(IncrementalUpdateGPAdditivePosteriorState):
"""
Represent posterior state for joint Gaussian model of learning curves over
a number of configurations. The model is the sum of a Gaussian process
model for function values at r_max and independent Gaussian linear
innovation state space models (ISSMs) of a particular power law decay
form.
"""
def __init__(
self,
data: Optional[dict],
mean: MeanFunction,
kernel: KernelFunction,
iss_model: ISSModelParameters,
noise_variance,
**kwargs
):
"""
``data`` contains input points and targets, as obtained from
``issm.prepare_data``. ``iss_model`` maintains the ISSM parameters.
:param data: Input points and targets
:param mean: Mean function m(X)
:param kernel: Kernel function k(X, X')
:param iss_model: ISS model
:param noise_variance: Innovation and noise variance
"""
self.iss_model = iss_model
super().__init__(data, mean, kernel, noise_variance=noise_variance, **kwargs)
[docs]
@staticmethod
def has_precomputations(data: Dict[str, Any]) -> bool:
return all(k in data for k in ("ydims", "num_configs", "deltay", "logr"))
def _compute_posterior_state(self, data: Dict[str, Any], noise_variance, **kwargs):
# Compute posterior state
issm_params = self.iss_model.get_issm_params(data["features"])
if self.has_precomputations(data):
issm_likelihood = issm_likelihood_computations(
precomputed=data,
issm_params=issm_params,
r_min=self.r_min,
r_max=self.r_max,
)
else:
issm_likelihood = issm_likelihood_slow_computations(
targets=data["targets"],
issm_params=issm_params,
r_min=self.r_min,
r_max=self.r_max,
)
self.poster_state = posterior_computations(
features=data["features"],
mean=self.mean,
kernel=self.kernel,
issm_likelihood=issm_likelihood,
noise_variance=noise_variance,
)
[docs]
def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
resource_attr_range = (self.r_min, self.r_max)
features, resources = decode_extended_features(
test_features, resource_attr_range=resource_attr_range
)
issm_params = self.iss_model.get_issm_params(features)
return predict_posterior_marginals_issm(
poster_state=self.poster_state,
mean=self.mean,
kernel=self.kernel,
test_features=features,
resources=list(resources),
issm_params=issm_params,
r_min=self.r_min,
r_max=self.r_max,
)
[docs]
@staticmethod
def data_precomputations(data: Dict[str, Any]):
logger.info("Enhancing data dictionary by precomputed variables")
data.update(
issm_likelihood_precomputations(
targets=data["targets"], r_min=data["r_min"]
)
)
def _sample_curves_internal(
self,
data: Dict[str, Any],
poster_state: Dict[str, Any],
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> List[dict]:
if random_state is None:
random_state = np.random
results = []
for feature, targets, config in zip(
data["features"], data["targets"], data["configs"]
):
issm_params = self.iss_model.get_issm_params(feature.reshape((1, -1)))
results.append(
sample_posterior_joint_issm(
poster_state=poster_state,
mean=self.mean,
kernel=self.kernel,
feature=feature,
targets=targets,
issm_params=issm_params,
r_min=self.r_min,
r_max=self.r_max,
random_state=random_state,
num_samples=num_samples,
)
)
return results
def _prepare_update(
self, feature: np.ndarray, targets: np.ndarray
) -> Tuple[float, float, float]:
issm_params = self.iss_model.get_issm_params(feature.reshape((1, -1)))
issm_likelihood = issm_likelihood_slow_computations(
targets=[_colvec(targets, _np=np)],
issm_params=issm_params,
r_min=self.r_min,
r_max=self.r_max,
)
d_new = issm_likelihood["d"].item()
vtv = issm_likelihood["vtv"].item()
wtv = issm_likelihood["wtv"].item()
s_sq = vtv / self.noise_variance
s_new = np.sqrt(s_sq)
muhat = _flatvec(self.mean(feature)).item() - issm_likelihood["c"].item()
r2_new = wtv / self.noise_variance - s_sq * muhat
return d_new, s_new, r2_new
[docs]
def update(
self, feature: np.ndarray, targets: np.ndarray
) -> "IncrementalUpdateGPAdditivePosteriorState":
create_kwargs = self._update_internal(feature, targets)
return GaussProcISSMPosteriorState(**create_kwargs, iss_model=self.iss_model)
[docs]
class GaussProcExpDecayPosteriorState(IncrementalUpdateGPAdditivePosteriorState):
"""
Represent posterior state for joint Gaussian model of learning curves over
a number of configurations. The model is the sum of a Gaussian process
model for function values at r_max and independent Gaussian processes over
r, using an exponential decay covariance function. The latter is shared
between all configs.
This is essentially the model from the Freeze Thaw paper (see also
:class:`ExponentialDecayResourcesKernelFunction`).
"""
def __init__(
self,
data: Optional[dict],
mean: MeanFunction,
kernel: KernelFunction,
res_kernel: ExponentialDecayBaseKernelFunction,
noise_variance,
**kwargs
):
"""
``data`` contains input points and targets, as obtained from
``issm.prepare_data``.
:param data: Input points and targets
:param mean: Mean function m(X)
:param kernel: Kernel function k(X, X')
:param res_kernel: Kernel function k_r(r, r'), of exponential decay
type
:param noise_variance: Innovation and noise variance
"""
self.res_kernel = res_kernel
super().__init__(data, mean, kernel, noise_variance=noise_variance, **kwargs)
assert self.r_min == res_kernel.r_min and self.r_max == res_kernel.r_max, (
(self.r_min, self.r_max),
(res_kernel.r_min, res_kernel.r_max),
)
[docs]
@staticmethod
def has_precomputations(data: Dict[str, Any]) -> bool:
return all(k in data for k in ("ydims", "num_configs", "yflat"))
def _compute_posterior_state(self, data: Dict[str, Any], noise_variance, **kwargs):
# Compute posterior state
if self.has_precomputations(data):
issm_likelihood = resource_kernel_likelihood_computations(
precomputed=data,
res_kernel=self.res_kernel,
noise_variance=noise_variance,
)
else:
issm_likelihood = resource_kernel_likelihood_slow_computations(
targets=data["targets"],
res_kernel=self.res_kernel,
noise_variance=noise_variance,
)
self.poster_state = posterior_computations(
features=data["features"],
mean=self.mean,
kernel=self.kernel,
issm_likelihood=issm_likelihood,
noise_variance=noise_variance,
)
# Add missing term to criterion value
if "criterion" in self.poster_state:
part3 = logdet_cholfact_cov_resource(issm_likelihood)
self.poster_state["criterion"] += part3
# Extra terms required in ``sample_curves``
self.poster_state["lfact_all"] = issm_likelihood["lfact_all"]
self.poster_state["means_all"] = issm_likelihood["means_all"]
self.poster_state["noise_variance"] = noise_variance
[docs]
def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
resource_attr_range = (self.r_min, self.r_max)
features, resources = decode_extended_features(
test_features, resource_attr_range=resource_attr_range
)
return predict_posterior_marginals_expdecay(
poster_state=self.poster_state,
mean=self.mean,
kernel=self.kernel,
test_features=features,
resources=list(resources),
res_kernel=self.res_kernel,
)
[docs]
@staticmethod
def data_precomputations(data: Dict[str, Any]):
data.update(resource_kernel_likelihood_precomputations(targets=data["targets"]))
def _sample_curves_internal(
self,
data: Dict[str, Any],
poster_state: Dict[str, Any],
num_samples: int = 1,
random_state: Optional[RandomState] = None,
) -> List[dict]:
assert "lfact_all" in poster_state
if random_state is None:
random_state = np.random
lfact_all = poster_state["lfact_all"]
means_all = poster_state["means_all"]
noise_variance = poster_state["noise_variance"]
results = []
for feature, targets in zip(data["features"], data["targets"]):
results.append(
sample_posterior_joint_expdecay(
poster_state=poster_state,
mean=self.mean,
kernel=self.kernel,
feature=feature,
targets=targets,
res_kernel=self.res_kernel,
noise_variance=noise_variance,
lfact_all=lfact_all,
means_all=means_all,
random_state=random_state,
num_samples=num_samples,
)
)
return results
def _prepare_update(
self, feature: np.ndarray, targets: np.ndarray
) -> Tuple[float, float, float]:
issm_likelihood = resource_kernel_likelihood_slow_computations(
targets=[_colvec(targets, _np=np)],
res_kernel=self.res_kernel,
noise_variance=self.noise_variance,
)
vtv = issm_likelihood["vtv"].item()
wtv = issm_likelihood["wtv"].item()
s_sq = vtv / self.noise_variance
s_new = np.sqrt(s_sq)
muhat = _flatvec(self.mean(feature)).item()
r2_new = wtv / self.noise_variance - s_sq * muhat
return 0.0, s_new, r2_new
[docs]
def update(
self, feature: np.ndarray, targets: np.ndarray
) -> "IncrementalUpdateGPAdditivePosteriorState":
create_kwargs = self._update_internal(feature, targets)
# Extra terms required in ``sample_curves``
new_poster_state = create_kwargs["poster_state"]
for k in ("lfact_all", "means_all", "noise_variance"):
new_poster_state[k] = self.poster_state[k]
return GaussProcExpDecayPosteriorState(
**create_kwargs, res_kernel=self.res_kernel
)