Source code for syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.kernel.base

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import autograd.numpy as anp
from autograd.tracer import getval
from typing import Dict, Any

from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.constants import (
    INITIAL_COVARIANCE_SCALE,
    INITIAL_INVERSE_BANDWIDTHS,
    DEFAULT_ENCODING,
    INVERSE_BANDWIDTHS_LOWER_BOUND,
    INVERSE_BANDWIDTHS_UPPER_BOUND,
    COVARIANCE_SCALE_LOWER_BOUND,
    COVARIANCE_SCALE_UPPER_BOUND,
    NUMERICAL_JITTER,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.distribution import (
    Uniform,
    LogNormal,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.gluon import Block
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.gluon_blocks_helpers import (
    encode_unwrap_parameter,
    register_parameter,
    create_encoding,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.mean import (
    MeanFunction,
)


[docs] class KernelFunction(MeanFunction): """ Base class of kernel (or covariance) function math:``k(x, x')`` :param dimension: Dimensionality of input points after encoding into ``ndarray`` """ def __init__(self, dimension: int, **kwargs): super().__init__(**kwargs) self._dimension = dimension @property def dimension(self): """ :return: Dimension d of input points """ return self._dimension
[docs] def diagonal(self, X): """ :param X: Input data, shape ``(n, d)`` :return: Diagonal of :math:`k(X, X)`, shape ``(n,)`` """ raise NotImplementedError
[docs] def diagonal_depends_on_X(self): """ For stationary kernels, diagonal does not depend on ``X`` :return: Does :meth:`diagonal` depend on ``X``? """ raise NotImplementedError
def _check_input_shape(self, X): return anp.reshape(X, (getval(X.shape[0]), self._dimension))
[docs] class SquaredDistance(Block): r""" Block that is responsible for the computation of matrices of squared distances. The distances can possibly be weighted (e.g., ARD parametrization). For instance: .. math:: m_{i j} = \sum_{k=1}^d ib_k^2 (x_{1: i k} - x_{2: j k})^2 \mathbf{X}_1 = [x_{1: i j}],\quad \mathbf{X}_2 = [x_{2: i j}] Here, :math:`[ib_k]` is the vector :attr:`inverse_bandwidth`. if ``ARD == False``, ``inverse_bandwidths`` is equal to a scalar broadcast to the d components (with ``d = dimension``, i.e., the number of features in ``X``). :param dimension: Dimensionality :math:`d` of input vectors :param ARD: Automatic relevance determination (``inverse_bandwidth`` vector of size ``d``)? Defaults to ``False`` :param encoding_type: Encoding for ``inverse_bandwidth``. Defaults to :const:`~syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.constants.DEFAULT_ENCODING` """ def __init__( self, dimension: int, ARD: bool = False, encoding_type: str = DEFAULT_ENCODING, **kwargs ): super().__init__(**kwargs) self.ARD = ARD inverse_bandwidths_dimension = 1 if not ARD else dimension self.encoding = create_encoding( encoding_type, INITIAL_INVERSE_BANDWIDTHS, INVERSE_BANDWIDTHS_LOWER_BOUND, INVERSE_BANDWIDTHS_UPPER_BOUND, inverse_bandwidths_dimension, Uniform(INVERSE_BANDWIDTHS_LOWER_BOUND, INVERSE_BANDWIDTHS_UPPER_BOUND), ) with self.name_scope(): self.inverse_bandwidths_internal = register_parameter( self.params, "inverse_bandwidths", self.encoding, shape=(inverse_bandwidths_dimension,), ) def _inverse_bandwidths(self): return encode_unwrap_parameter(self.inverse_bandwidths_internal, self.encoding)
[docs] def forward(self, X1, X2): """Computes matrix of squared distances :param X1: input matrix, shape ``(n1, d)`` :param X2: input matrix, shape ``(n2, d)`` """ # In case inverse_bandwidths if of size (1, dimension), dimension>1, # ARD is handled by broadcasting inverse_bandwidths = anp.reshape(self._inverse_bandwidths(), (1, -1)) X1_scaled = anp.multiply(X1, inverse_bandwidths) X1_squared_norm = anp.sum(anp.square(X1_scaled), axis=1) if X2 is X1: D = -2.0 * anp.dot(X1_scaled, anp.transpose(X1_scaled)) X2_squared_norm = X1_squared_norm else: X2_scaled = anp.multiply(X2, inverse_bandwidths) D = -2.0 * anp.matmul(X1_scaled, anp.transpose(X2_scaled)) X2_squared_norm = anp.sum(anp.square(X2_scaled), axis=1) D = D + anp.reshape(X1_squared_norm, (-1, 1)) D = D + anp.reshape(X2_squared_norm, (1, -1)) return anp.abs(D)
[docs] def get_params(self) -> Dict[str, Any]: """ Parameter keys are "inv_bw<k> "if ``dimension > 1``, and "inv_bw" if ``dimension == 1``. """ inverse_bandwidths = anp.reshape(self._inverse_bandwidths(), (-1,)) if inverse_bandwidths.size == 1: return {"inv_bw": inverse_bandwidths[0]} else: return { "inv_bw{}".format(k): inverse_bandwidths[k] for k in range(inverse_bandwidths.size) }
[docs] def set_params(self, param_dict: Dict[str, Any]): dimension = self.encoding.dimension if dimension == 1: inverse_bandwidths = [param_dict["inv_bw"]] else: keys = ["inv_bw{}".format(k) for k in range(dimension)] for k in keys: assert k in param_dict, "'{}' not in param_dict = {}".format( k, param_dict ) inverse_bandwidths = [param_dict[k] for k in keys] self.encoding.set(self.inverse_bandwidths_internal, inverse_bandwidths)
[docs] class Matern52(KernelFunction): """ Block that is responsible for the computation of Matern 5/2 kernel. if ``ARD == False``, ``inverse_bandwidths`` is equal to a scalar broadcast to the d components (with ``d = dimension``, i.e., the number of features in ``X``). Arguments on top of base class :class:`SquaredDistance`: :param has_covariance_scale: Kernel has covariance scale parameter? Defaults to ``True`` """ def __init__( self, dimension: int, ARD: bool = False, encoding_type: str = DEFAULT_ENCODING, has_covariance_scale: bool = True, **kwargs ): super(Matern52, self).__init__(dimension, **kwargs) self.has_covariance_scale = has_covariance_scale self.squared_distance = SquaredDistance( dimension=dimension, ARD=ARD, encoding_type=encoding_type ) if has_covariance_scale: self.encoding = create_encoding( encoding_name=encoding_type, init_val=INITIAL_COVARIANCE_SCALE, constr_lower=COVARIANCE_SCALE_LOWER_BOUND, constr_upper=COVARIANCE_SCALE_UPPER_BOUND, dimension=1, prior=LogNormal(0.0, 1.0), ) with self.name_scope(): self.covariance_scale_internal = register_parameter( self.params, "covariance_scale", self.encoding ) @property def ARD(self) -> bool: return self.squared_distance.ARD def _covariance_scale(self): if self.has_covariance_scale: return encode_unwrap_parameter( self.covariance_scale_internal, self.encoding ) else: return 1.0
[docs] def forward(self, X1, X2): """Computes Matern 5/2 kernel matrix :param X1: input matrix, shape ``(n1,d)`` :param X2: input matrix, shape ``(n2,d)`` """ covariance_scale = self._covariance_scale() X1 = self._check_input_shape(X1) if X2 is not X1: X2 = self._check_input_shape(X2) D = 5.0 * self.squared_distance(X1, X2) # Using the plain np.sqrt is numerically unstable for D ~ 0 # (non-differentiability) # that's why we add NUMERICAL_JITTER B = anp.sqrt(D + NUMERICAL_JITTER) return anp.multiply((1.0 + B + D / 3.0) * anp.exp(-B), covariance_scale)
[docs] def diagonal(self, X): X = self._check_input_shape(X) covariance_scale = self._covariance_scale() covariance_scale_times_ones = anp.multiply( anp.ones((getval(X.shape[0]), 1)), covariance_scale ) return anp.reshape(covariance_scale_times_ones, (-1,))
[docs] def diagonal_depends_on_X(self): return False
[docs] def param_encoding_pairs(self): result = [ ( self.squared_distance.inverse_bandwidths_internal, self.squared_distance.encoding, ) ] if self.has_covariance_scale: result.insert(0, (self.covariance_scale_internal, self.encoding)) return result
[docs] def get_covariance_scale(self): if self.has_covariance_scale: return self._covariance_scale()[0] else: return 1.0
[docs] def set_covariance_scale(self, covariance_scale): assert self.has_covariance_scale, "covariance_scale is fixed to 1" self.encoding.set(self.covariance_scale_internal, covariance_scale)
[docs] def get_params(self) -> Dict[str, Any]: result = self.squared_distance.get_params() if self.has_covariance_scale: result["covariance_scale"] = self.get_covariance_scale() return result
[docs] def set_params(self, param_dict: Dict[str, Any]): self.squared_distance.set_params(param_dict) if self.has_covariance_scale: self.set_covariance_scale(param_dict["covariance_scale"])