Source code for syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.freeze_thaw

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from typing import List, Dict, Any

import numpy as np
import autograd.numpy as anp
from autograd.scipy.linalg import solve_triangular
from numpy.random import RandomState

from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel.exponential_decay import (
    ExponentialDecayResourcesKernelFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel.base import (
    KernelFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
    MeanFunction,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.custom_op import (
    cholesky_factorization,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.issm import (
    _flatvec,
    _colvec,
    _rowvec,
    _squared_norm,
    _inner_product,
    predict_posterior_marginals,
)
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_impl import (
    EPS,
)


[docs] class ZeroKernel(KernelFunction): """ Constant zero kernel. This works only in the context used here, we do return matrices or vectors, but zero scalars. """ def __init__(self, dimension: int, **kwargs): super().__init__(dimension, **kwargs)
[docs] def forward(self, X1, X2, **kwargs): return 0.0
[docs] def diagonal(self, X): return 0.0
[docs] def diagonal_depends_on_X(self): return False
[docs] def param_encoding_pairs(self): return []
[docs] def get_params(self) -> Dict[str, Any]: return dict()
[docs] def set_params(self, param_dict: Dict[str, Any]): pass
[docs] class ZeroMean(MeanFunction): def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def forward(self, X): return 0.0
[docs] def param_encoding_pairs(self): return []
[docs] def get_params(self) -> Dict[str, Any]: return dict()
[docs] def set_params(self, param_dict: Dict[str, Any]): pass
[docs] class ExponentialDecayBaseKernelFunction(KernelFunction): """ Implements exponential decay kernel k_r(r, r') from the Freeze-Thaw paper, corresponding to :class:`ExponentialDecayResourcesKernelFunction` with delta=0 and no x attributes. Note: Inputs r lie in [r_min, r_max]. Optionally, they are normalized to [0, 1]. """ def __init__( self, r_max: int, r_min: int = 1, normalize_inputs: bool = False, **kwargs ): super().__init__(dimension=1, **kwargs) self.kernel = ExponentialDecayResourcesKernelFunction( kernel_x=ZeroKernel(0), mean_x=ZeroMean(), delta_fixed_value=0.0 ) assert r_max > r_min self.r_min = r_min self.r_max = r_max self.lower = r_min - 0.5 + EPS self.width = r_max - r_min + 1 - 2 * EPS self.normalize_inputs = normalize_inputs def _normalize(self, X): return (X - self.lower) / self.width
[docs] def forward(self, X1, X2): same_12 = X2 is X1 if self.normalize_inputs: X1 = self._normalize(X1) if same_12: X2 = X1 else: X2 = self._normalize(X2) return self.kernel(X1, X2)
[docs] def diagonal(self, X): if self.normalize_inputs: X = self._normalize(X) return self.kernel.diagonal(X)
[docs] def diagonal_depends_on_X(self): return self.kernel.diagonal_depends_on_X()
[docs] def param_encoding_pairs(self): return self.kernel.param_encoding_pairs()
[docs] def get_params(self) -> Dict[str, Any]: return self.kernel.get_params()
[docs] def set_params(self, param_dict: Dict[str, Any]): self.kernel.set_params(param_dict)
[docs] def mean_function(self, X): if self.normalize_inputs: X = self._normalize(X) return self.kernel.mean_function(X)
[docs] def logdet_cholfact_cov_resource(likelihood: Dict) -> float: """ Computes the additional log(det(Lbar)) term. This is sum_i log(det(Lbar_i)), where Lbar_i is upper left submatrix of ``likelihood['lfact_all']``, with size ``likelihood['ydims'][i]``. :param likelihood: Result of ``resource_kernel_likelihood_computations`` :return: log(det(Lbar)) """ lfact_all = likelihood["lfact_all"] ydims = likelihood["ydims"] dim = max(ydims) log_diag = anp.log(anp.diag(lfact_all)) # Weights: # w_j = sum_i I[ydims[i] > j], j = 0, 1, ... weights = anp.sum(_colvec(anp.array(ydims)) > _rowvec(anp.arange(dim)), axis=0) return _inner_product(log_diag[:dim], weights)
[docs] def resource_kernel_likelihood_precomputations(targets: List[np.ndarray]) -> Dict: """ Precomputations required by ``resource_kernel_likelihood_computations``. Importantly, ``prepare_data`` orders datapoints by nonincreasing number of targets ``ydims[i]``. For ``0 <= j < ydim_max``, ``ydim_max = ydims[0] = max(ydims)``, ``num_configs[j]`` is the number of datapoints i for which ``ydims[i] > j``. ``yflat`` is a flat matrix (rows corresponding to fantasy samples; column vector if no fantasizing) consisting of ``ydim_max`` parts, where part j is of size ``num_configs[j]`` and contains ``y[j]`` for targets of those i counted in ``num_configs[j]``. :param targets: Targets from data representation returned by ``prepare_data`` :return: See above """ ydims = [y.shape[0] for y in targets] ydim_max = ydims[0] num_configs = list( np.sum( np.array(ydims).reshape((-1, 1)) > np.arange(ydim_max).reshape((1, -1)), axis=0, ).reshape((-1,)) ) assert num_configs[0] == len(targets), (num_configs, len(targets)) assert num_configs[-1] > 0, num_configs total_size = sum(num_configs) assert total_size == sum(ydims) # Attention: When comparing this to ``issm.issm_likelihood_precomputations``, # ``targets`` maps to ``yflat`` in the same ordering (the index is still r), # whereas there the index j of ``deltay`` runs in the opposite direction of # the index r of ``targets`` yflat_rows = [] for pos, num in enumerate(num_configs): yflat_rows.extend([y[pos].reshape((1, -1)) for y in targets[:num]]) yflat = np.vstack(yflat_rows) assert yflat.shape[0] == total_size return {"ydims": ydims, "num_configs": num_configs, "yflat": yflat}
# TODO: This code is complex. If it does not run faster than # ``resource_kernel_likelihood_slow_computations``, remove it.
[docs] def resource_kernel_likelihood_computations( precomputed: Dict, res_kernel: ExponentialDecayBaseKernelFunction, noise_variance, skip_c_d: bool = False, ) -> Dict: """ Given ``precomputed`` from ``resource_kernel_likelihood_precomputations`` and resource kernel function ``res_kernel``, compute quantities required for inference and marginal likelihood computation, pertaining to the likelihood of a additive model, as in the Freeze-Thaw paper. Note that ``res_kernel`` takes raw (unnormalized) r as inputs. The code here works for any resource kernel and mean function, not just for :class:`ExponentialDecayBaseKernelFunction`. Results returned are: - c: n vector [c_i] - d: n vector [d_i], positive - vtv: n vector [|v_i|^2] - wtv: (n, F) matrix[(W_i)^T v_i], F number of fantasy samples - wtw: n vector [|w_i|^2] (only if no fantasizing) - lfact_all: Cholesky factor for kernel matrix - ydims: Target vector sizes (copy from ``precomputed``) :param precomputed: Output of ``resource_kernel_likelihood_precomputations`` :param res_kernel: Kernel k(r, r') over resources :param noise_variance: Noise variance sigma^2 :param skip_c_d: If True, c and d are not computed :return: Quantities required for inference and learning criterion """ num_configs = precomputed["num_configs"] num_all_configs = num_configs[0] r_min, r_max = res_kernel.r_min, res_kernel.r_max num_res = r_max + 1 - r_min assert num_all_configs > 0, "targets must not be empty" assert num_res > 0, f"r_min = {r_min} must be <= r_max = {r_max}" num_fantasy_samples = precomputed["yflat"].shape[1] compute_wtw = num_fantasy_samples == 1 # Compute Cholesky factor for largest target vector size, or for full size ydims = precomputed["ydims"] rvals = _colvec(anp.arange(r_min, r_min + num_res)) means_all = _flatvec(res_kernel.mean_function(rvals)) amat = res_kernel(rvals, rvals) / noise_variance + anp.diag(anp.ones(num_res)) # TODO: Do we need AddJitterOp here? lfact_all = cholesky_factorization(amat) # L (Cholesky factor) # Loop over ydim yflat = precomputed["yflat"] off = num_all_configs ilscal = 1.0 / lfact_all[0, 0] vvec = anp.array([ilscal]).reshape((1, 1)) # ``yflat`` is a (*, F) matrix, where F == ``num_fantasy_samples``. These # matrices are flattened out as rows of ``wmat``, and reshaped back before # writing into ``wtv_lst`` wmat = _rowvec(yflat[:off, :] - means_all[0]) * ilscal # Note: We need the detour via ``wtv_lst``, etc, because ``autograd`` does not # support overwriting the content of an ``ndarray``. Their role is to collect # parts of the final vectors, in reverse ordering wtv_lst = [] wtw_lst = [] num_prev = off for ydim, num in enumerate(num_configs[1:], start=1): if num < num_prev: # These parts are done: pos = num * num_fantasy_samples wdone = wmat[:, pos:] wtv_part = anp.reshape(anp.matmul(vvec, wdone), (-1, num_fantasy_samples)) wtv_lst.append(wtv_part) if compute_wtw: wtw_lst.append(_flatvec(anp.sum(anp.square(wdone), axis=0))) wmat = wmat[:, :pos] num_prev = num # Update W matrix rhs = _rowvec(yflat[off : (off + num), :] - means_all[ydim]) off += num lvec = _rowvec(lfact_all[ydim, :ydim]) ilscal = 1.0 / lfact_all[ydim, ydim] w_new = (rhs - anp.matmul(lvec, wmat)) * ilscal wmat = anp.concatenate((wmat, w_new), axis=0) # Update v vector (row vector) v_new = anp.array([(1.0 - _inner_product(lvec, vvec)) * ilscal]).reshape((1, 1)) vvec = anp.concatenate((vvec, v_new), axis=1) wtv_part = anp.reshape(anp.matmul(vvec, wmat), (-1, num_fantasy_samples)) wtv_lst.append(wtv_part) wtv_all = anp.concatenate(tuple(reversed(wtv_lst)), axis=0) if compute_wtw: wtw_lst.append(_flatvec(anp.sum(anp.square(wmat), axis=0))) wtw_all = anp.concatenate(tuple(reversed(wtw_lst)), axis=0) vtv_for_ydim = anp.cumsum(anp.square(vvec)) vtv_all = anp.array([vtv_for_ydim[ydim - 1] for ydim in ydims]) # Compile results result = { "num_data": sum(ydims), "vtv": vtv_all, "wtv": wtv_all, "lfact_all": lfact_all, "means_all": means_all, "ydims": ydims, } if compute_wtw: result["wtw"] = wtw_all if not skip_c_d: result["c"] = anp.zeros(num_all_configs) result["d"] = anp.zeros(num_all_configs) return result
# TODO: It is not clear whether this code is slower, and it is certainly # simpler.
[docs] def resource_kernel_likelihood_slow_computations( targets: List[np.ndarray], res_kernel: ExponentialDecayBaseKernelFunction, noise_variance, skip_c_d: bool = False, ) -> Dict: """ Naive implementation of ``resource_kernel_likelihood_computations``, which does not require precomputations, but is somewhat slower. Here, results are computed one datapoint at a time, instead of en bulk. This code is used in unit testing only. """ num_configs = len(targets) r_min, r_max = res_kernel.r_min, res_kernel.r_max num_res = r_max + 1 - r_min assert num_configs > 0, "targets must not be empty" compute_wtw = targets[0].shape[1] == 1 # Compute Cholesky factor for largest target vector size ydims = [y.shape[0] for y in targets] rvals = _colvec(anp.arange(r_min, r_min + num_res)) means_all = _flatvec(res_kernel.mean_function(rvals)) amat = res_kernel(rvals, rvals) / noise_variance + anp.diag(anp.ones(num_res)) # TODO: Do we need AddJitterOp here? lfact_all = cholesky_factorization(amat) # L (Cholesky factor) # Outer loop over configurations vtv_lst = [] wtv_lst = [] wtw_lst = [] num_data = 0 for i, (ymat, ydim) in enumerate(zip(targets, ydims)): assert 0 < ydim <= num_res, f"len(y[{i}]) = {ydim}, num_res = {num_res}" num_data += ydim lfact = lfact_all[:ydim, :ydim] rhs = anp.ones((ydim, 1)) vvec = _flatvec(solve_triangular(lfact, rhs, lower=True)) means = means_all[:ydim] rhs = ymat - _colvec(means) wmat = solve_triangular(lfact, rhs, lower=True) vtv_lst.append(_squared_norm(vvec)) wtv_lst.append(anp.matmul(_rowvec(vvec), wmat)) if compute_wtw: wtw_lst.append(_squared_norm(wmat)) # Compile results result = { "num_data": num_data, "vtv": anp.array(vtv_lst), "wtv": anp.vstack(wtv_lst), "lfact_all": lfact_all, "means_all": means_all, "ydims": ydims, } if compute_wtw: result["wtw"] = anp.array(wtw_lst) if not skip_c_d: result["c"] = anp.zeros(num_configs) result["d"] = anp.zeros(num_configs) return result
[docs] def predict_posterior_marginals_extended( poster_state: Dict, mean, kernel, test_features, resources: List[int], res_kernel: ExponentialDecayBaseKernelFunction, ): """ These are posterior marginals on f_r = h + g_r variables, where (x, r) are zipped from ``test_features``, ``resources``. ``posterior_means`` is a (n, F) matrix, where F is the number of fantasy samples, or F == 1 without fantasizing. :param poster_state: Posterior state :param mean: Mean function :param kernel: Kernel function :param test_features: Feature matrix for test points (not extended) :param resources: Resource values corresponding to rows of ``test_features`` :param res_kernel: Kernel k(r, r') over resources :return: posterior_means, posterior_variances """ num_test = test_features.shape[0] assert len(resources) == num_test, ( f"test_features.shape[0] = {num_test} != {len(resources)} " + "= len(resources)" ) # Predictive marginals over h h_means, h_variances = predict_posterior_marginals( poster_state, mean, kernel, test_features ) # Convert into predictive marginals over f_r rvals = _colvec(anp.array(resources)) g_means = _colvec(res_kernel.mean_function(rvals)) g_variances = _flatvec(res_kernel.diagonal(rvals)) posterior_means = h_means + g_means posterior_variances = h_variances + g_variances return posterior_means, posterior_variances
[docs] def sample_posterior_joint( poster_state: Dict, mean, kernel, feature, targets: np.ndarray, res_kernel: ExponentialDecayBaseKernelFunction, noise_variance, lfact_all, means_all, random_state: RandomState, num_samples: int = 1, ) -> Dict: """ Given ``poster_state`` for some data plus one additional configuration with data (``feature``, ``targets``), draw joint samples of unobserved targets for this configuration. ``targets`` may be empty, but must not be complete (there must be some unobserved targets). The additional configuration must not be in the dataset used to compute ``poster_state``. If ``targets`` correspond to resource values range(r_min, r_obs), we sample latent target values y_r corresponding to range(r_obs, r_max+1), returning a dict with [y_r] under ``y`` (matrix with ``num_samples`` columns). :param poster_state: Posterior state for data :param mean: Mean function :param kernel: Kernel function :param feature: Features for additional config :param targets: Target values for additional config :param res_kernel: Kernel k(r, r') over resources :param noise_variance: Noise variance sigma^2 :param lfact_all: Cholesky factor of complete resource kernel matrix :param means_all: See ``lfact_all`` :param random_state: numpy.random.RandomState :param num_samples: Number of joint samples to draw (default: 1) :return: See above """ r_min, r_max = res_kernel.r_min, res_kernel.r_max num_res = r_max + 1 - r_min targets = _colvec(targets, _np=np) ydim = targets.size assert ydim < num_res, f"len(targets) = {ydim} must be < {num_res}" assert lfact_all.shape == ( num_res, num_res, ), f"lfact_all.shape = {lfact_all.shape}, must be {(num_res, num_res)}" assert ( means_all.size == num_res ), f"means_all.size = {means_all.size}, must be {num_res}" # Posterior mean and variance of h for additional config post_mean, post_variance = predict_posterior_marginals( poster_state, mean, kernel, _rowvec(feature) ) post_mean = post_mean[0] post_variance = post_variance[0] # Draw samples from joint distribution epsmat = random_state.normal(size=(num_res, num_samples)) joint_samples = anp.matmul(lfact_all, epsmat) * anp.sqrt( noise_variance ) + anp.reshape(means_all, (-1, 1)) hvec = ( random_state.normal(size=(1, num_samples)) * anp.sqrt(post_variance) + post_mean ) joint_samples = joint_samples + hvec if ydim > 0: # There are observed targets, so have to transform the joint sample # into a conditional one targets_samp = joint_samples[:ydim, :] lfact = lfact_all[:ydim, :ydim] # L_Q rhs = anp.ones((ydim, 1)) vvec = solve_triangular(lfact, rhs, lower=True) # v vtv = _squared_norm(vvec) # alpha # w_hat - w w_delta = solve_triangular(lfact, targets - targets_samp, lower=True) kappa = post_variance / noise_variance fact = kappa / (vtv * kappa + 1.0) # rho_hat - rho rho_delta = anp.matmul(_rowvec(vvec), w_delta) * fact tmpmat = w_delta - vvec * rho_delta lfact_pq = lfact_all[ydim:, :ydim] # L_{P, Q} ysamples = joint_samples[ydim:, :] + anp.matmul(lfact_pq, tmpmat) + rho_delta else: # Nothing to condition on ysamples = joint_samples return {"y": ysamples}