Source code for syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.independent.posterior_state

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from typing import Dict, Tuple, Optional, Callable, Union
import numpy as np
import autograd.numpy as anp
from numpy.random import RandomState

from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.posterior_state import (
    PosteriorStateWithSampleJoint,
    GaussProcPosteriorState,
)
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_impl import (
    decode_extended_features,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel import (
    KernelFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
    MeanFunction,
)


NoiseVariance = Union[np.ndarray, Dict[int, np.ndarray]]


[docs] class IndependentGPPerResourcePosteriorState(PosteriorStateWithSampleJoint): """ Posterior state for model over f(x, r), where for a fixed set of resource levels r, each f(x, r) is represented by an independent Gaussian process. These processes share a common covariance function k(x, x), but can have their own mean functions mu_r and covariance scales c_r. They can also have their own noise variances, or the noise variance is shared. Attention: Predictions can only be done at (x, r) where r has at least one training datapoint. This is because a posterior state cannot represent the prior. """ def __init__( self, features: np.ndarray, targets: np.ndarray, kernel: KernelFunction, mean: Dict[int, MeanFunction], covariance_scale: Dict[int, np.ndarray], noise_variance: NoiseVariance, resource_attr_range: Tuple[int, int], debug_log: bool = False, ): """ ``mean`` and ``covariance_scale`` map supported resource levels r to mean function mu_r and covariance scale c_r. :param features: Input points X, extended features, shape (n, d) :param targets: Targets Y, shape (n, m) :param kernel: Kernel function k(X, X') :param mean: See above :param covariance_scale: See above :param noise_variance: See above :param resource_attr_range: (r_min, r_max) """ assert isinstance(kernel, KernelFunction), "kernel must be KernelFunction" self.rung_levels = sorted(mean.keys()) assert self.rung_levels == sorted( covariance_scale.keys() ), "mean, covariance_scale must have the same keys" if isinstance(noise_variance, dict): assert self.rung_levels == sorted( noise_variance.keys() ), "mean, noise_variance must have the same keys" else: _noise_variance = noise_variance noise_variance = { resource: _noise_variance for resource in self.rung_levels } self._compute_states( features, targets, kernel, mean, covariance_scale, noise_variance, resource_attr_range, debug_log, ) self._mean = mean # See ``sample_joint`` self._num_data = features.shape[0] self._num_features = features.shape[1] self._num_fantasies = targets.shape[1] self._resource_attr_range = resource_attr_range def _compute_states( self, features: np.ndarray, targets: np.ndarray, kernel: KernelFunction, mean: Dict[int, MeanFunction], covariance_scale: Dict[int, np.ndarray], noise_variance: Dict[int, np.ndarray], resource_attr_range: Tuple[int, int], debug_log: bool = False, ): features, resources = decode_extended_features(features, resource_attr_range) self._states = dict() for resource, mean_function in mean.items(): cov_scale = covariance_scale[resource] rows = np.flatnonzero(resources == resource) if rows.size > 0: r_features = features[rows] r_targets = targets[rows] self._states[resource] = GaussProcPosteriorState( features=r_features, targets=r_targets, mean=mean_function, kernel=(kernel, cov_scale), noise_variance=noise_variance[resource], debug_log=debug_log, )
[docs] def state(self, resource: int) -> GaussProcPosteriorState: return self._states[resource]
@property def num_data(self): return self._num_data @property def num_features(self): return self._num_features @property def num_fantasies(self): return self._num_fantasies
[docs] def neg_log_likelihood(self) -> anp.ndarray: return anp.sum([state.neg_log_likelihood() for state in self._states.values()])
# Different to ``sample_marginals``, ``sample_joint``, this method supports # ``autograd`` differentiation
[docs] def predict(self, test_features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: test_features, resources = decode_extended_features( test_features, self._resource_attr_range ) if len(set(resources)) == 1: return self._states[resources[0]].predict(test_features) else: num_rows = resources.size # Group resources together by sorting them ind = np.argsort(resources) test_features = test_features[ind] resources = resources[ind] # Find positions where resource value changes change_pos = ( [0] + list(np.flatnonzero(resources[:-1] != resources[1:]) + 1) + [num_rows] ) p_means, p_vars = zip( *[ self._states[resources[start]].predict(test_features[start:end]) for start, end in zip(change_pos[:-1], change_pos[1:]) ] ) reverse_ind = np.empty_like(ind) reverse_ind[ind] = np.arange(num_rows) posterior_means = anp.concatenate(p_means, axis=0)[reverse_ind] posterior_variances = anp.concatenate(p_vars, axis=0)[reverse_ind] return posterior_means, posterior_variances
def _split_features(self, features: np.ndarray): features, resources = decode_extended_features( features, self._resource_attr_range ) result = dict() for resource in set(resources): rows = np.flatnonzero(resources == resource) result[resource] = (features[rows], rows) return result def _sample_internal( self, test_features: np.ndarray, sample_func: Callable[[int, np.ndarray], np.ndarray], num_samples: int, ) -> np.ndarray: features_per_resource = self._split_features(test_features) num_test = test_features.shape[0] nf = self.num_fantasies shp = (num_test, num_samples) if nf == 1 else (num_test, nf, num_samples) samples = np.zeros(shp) bc_shp = (1,) * (len(shp) - 1) for resource, (features, rows) in features_per_resource.items(): if resource in self._states: sample_part = sample_func(resource, features) else: assert resource in self._mean, ( f"resource = {resource} not supported (keys = " + str(list(self._mean.keys())) + ")" ) vec = self._mean[resource](features) sample_part = np.reshape(vec, (vec.size,) + bc_shp) samples[rows] = sample_part return samples
[docs] def sample_marginals( self, test_features: np.ndarray, num_samples: int = 1, random_state: Optional[RandomState] = None, ) -> np.ndarray: """ Different to ``predict``, entries in ``test_features`` may have resources not covered by data in posterior state. For such entries, we return the prior mean. We do not sample from the prior. If ``sample_marginals`` is used to draw fantasy values, this corresponds to the Kriging believer heuristic. """ def sample_func(resource: int, features: np.ndarray): return self._states[resource].sample_marginals( features, num_samples, random_state ) return self._sample_internal( test_features=test_features, sample_func=sample_func, num_samples=num_samples, )
[docs] def sample_joint( self, test_features: np.ndarray, num_samples: int = 1, random_state: Optional[RandomState] = None, ) -> np.ndarray: """ Different to ``predict``, entries in ``test_features`` may have resources not covered by data in posterior state. For such entries, we return the prior mean. We do not sample from the prior. If ``sample_joint`` is used to draw fantasy values, this corresponds to the Kriging believer heuristic. """ def sample_func(resource: int, features: np.ndarray): return self._states[resource].sample_joint( features, num_samples, random_state ) return self._sample_internal( test_features=test_features, sample_func=sample_func, num_samples=num_samples, )
[docs] def backward_gradient( self, input: np.ndarray, head_gradients: Dict[str, np.ndarray], mean_data: float, std_data: float, ) -> np.ndarray: inner_input, resource = decode_extended_features( input.reshape((1, -1)), self._resource_attr_range ) assert resource.size == 1 resource = resource.item() inner_grad = ( self._states[resource] .backward_gradient(inner_input, head_gradients, mean_data, std_data) .reshape((-1,)) ) return np.reshape(np.concatenate((inner_grad, np.zeros((1,)))), input.shape)