Source code for syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.learncurve.issm

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from typing import List, Dict, Tuple

import numpy as np
import scipy.linalg as spl
import autograd.numpy as anp
from autograd.scipy.special import logsumexp
from autograd.scipy.linalg import solve_triangular
from autograd.tracer import getval
from numpy.random import RandomState
from operator import itemgetter
from collections import Counter


from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.constants import (
    NUMERICAL_JITTER,
    MIN_POSTERIOR_VARIANCE,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.gpautograd.custom_op import (
    cholesky_factorization,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.datatypes.tuning_job_state import (
    TuningJobState,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.datatypes.common import (
    FantasizedPendingEvaluation,
    TrialEvaluations,
)
from syne_tune.optimizer.schedulers.searchers.utils.common import Configuration
from syne_tune.optimizer.schedulers.searchers.bayesopt.datatypes.config_ext import (
    ExtendedConfiguration,
)


def _prepare_data_internal(
    state: TuningJobState,
    data_lst: List[Tuple[Configuration, List, str]],
    config_space_ext: ExtendedConfiguration,
    active_metric: str,
    do_fantasizing: bool,
    mean: float,
    std: float,
) -> (List[Configuration], List[np.ndarray], List[str]):
    r_min, r_max = config_space_ext.resource_attr_range
    configs = [x[0] for x in data_lst]
    trial_ids = [x[2] for x in data_lst]
    targets = []

    fantasized = dict()
    num_fantasy_samples = None
    if do_fantasizing:
        for ev in state.pending_evaluations:
            assert isinstance(ev, FantasizedPendingEvaluation)
            trial_id = ev.trial_id
            entry = (ev.resource, ev.fantasies[active_metric])
            sz = entry[1].size
            if num_fantasy_samples is None:
                num_fantasy_samples = sz
            else:
                assert sz == num_fantasy_samples, (
                    "Number of fantasy samples must be the same for all "
                    + f"pending evaluations ({sz}, {num_fantasy_samples})"
                )
            if trial_id in fantasized:
                fantasized[trial_id].append(entry)
            else:
                fantasized[trial_id] = [entry]

    trial_ids_done = set()
    for config, observed, trial_id in data_lst:
        # Observations must be from r_min without any missing
        obs_res = [x[0] for x in observed]
        num_obs = len(observed)
        if num_obs > 0:
            test = list(range(r_min, r_min + num_obs))
            assert obs_res == test, (
                f"trial_id {trial_id} has observations at {obs_res}, but "
                + f"we need them at {test}"
            )
        # Note: Only observed targets are normalized, not fantasized ones
        this_targets = (
            np.array([x[1] for x in observed]).reshape((-1, 1)) - mean
        ) / std
        if do_fantasizing:
            if num_fantasy_samples > 1:
                this_targets = this_targets * np.ones((1, num_fantasy_samples))
            if trial_id in fantasized:
                this_fantasized = sorted(fantasized[trial_id], key=itemgetter(0))
                fanta_res = [x[0] for x in this_fantasized]
                start = r_min + num_obs
                test = list(range(start, start + len(this_fantasized)))
                assert fanta_res == test, (
                    f"trial_id {trial_id} has pending evaluations at {fanta_res}"
                    + f", but we need them at {test}"
                )
                this_targets = np.vstack(
                    [this_targets] + [x[1].reshape((1, -1)) for x in this_fantasized]
                )
                trial_ids_done.add(trial_id)
        targets.append(this_targets)

    if do_fantasizing:
        # There may be trials with pending evals, but no observes ones
        for trial_id, this_fantasized in fantasized.items():
            if trial_id not in trial_ids_done:
                configs.append(state.config_for_trial[trial_id])
                trial_ids.append(trial_id)
                this_fantasized = sorted(this_fantasized, key=itemgetter(0))
                fanta_res = [x[0] for x in this_fantasized]
                test = list(range(r_min, r_min + len(this_fantasized)))
                assert fanta_res == test, (
                    f"trial_id {trial_id} has pending evaluations at {fanta_res}"
                    + f", but we need them at {test}"
                )
                this_targets = np.vstack(
                    [x[1].reshape((1, -1)) for x in this_fantasized]
                )
                targets.append(this_targets)

    return configs, targets, trial_ids


def _create_tuple(ev: TrialEvaluations, active_metric: str, config_for_trial: Dict):
    metric_vals = ev.metrics[active_metric]
    assert isinstance(metric_vals, dict)
    observed = list(
        sorted(((int(k), v) for k, v in metric_vals.items()), key=itemgetter(0))
    )
    trial_id = ev.trial_id
    config = config_for_trial[trial_id]
    return config, observed, trial_id


[docs] def prepare_data( state: TuningJobState, config_space_ext: ExtendedConfiguration, active_metric: str, normalize_targets: bool = False, do_fantasizing: bool = False, ) -> Dict: """ Prepares data in ``state`` for further processing. The entries ``configs``, ``targets`` of the result dict are lists of one entry per trial, they are sorted in decreasing order of number of target values. ``features`` is the feature matrix corresponding to ``configs``. If ``normalize_targets`` is True, the target values are normalized to mean 0, variance 1 (over all values), and ``mean_targets``, ``std_targets`` is returned. If ``do_fantasizing`` is True, ``state.pending_evaluations`` is also taken into account. Entries there have to be of type ``FantasizedPendingEvaluation``. Also, in terms of their resource levels, they need to be adjacent to observed entries, so there are no gaps. In this case, the entries of the ``targets`` list are matrices, each column corrĀ“esponding to a fantasy sample. Note: If ``normalize_targets``, mean and stddev are computed over observed values only. Also, fantasy values in ``state.pending_evaluations`` are not normalized, because they are assumed to be sampled from the posterior with normalized targets as well. :param state: ``TuningJobState`` with data :param config_space_ext: Extended config space :param active_metric: :param normalize_targets: See above :param do_fantasizing: See above :return: See above """ r_min, r_max = config_space_ext.resource_attr_range hp_ranges = config_space_ext.hp_ranges data_lst = [] targets = [] for ev in state.trials_evaluations: tpl = _create_tuple(ev, active_metric, state.config_for_trial) data_lst.append(tpl) observed = tpl[1] targets += [x[1] for x in observed] mean = 0.0 std = 1.0 if normalize_targets: std = max(np.std(targets), 1e-9) mean = np.mean(targets) configs, targets, trial_ids = _prepare_data_internal( state=state, data_lst=data_lst, config_space_ext=config_space_ext, active_metric=active_metric, do_fantasizing=do_fantasizing, mean=mean, std=std, ) # Sort in decreasing order w.r.t. number of targets configs, targets, trial_ids = zip( *sorted(zip(configs, targets, trial_ids), key=lambda x: -x[1].shape[0]) ) features = hp_ranges.to_ndarray_matrix(configs) result = { "configs": list(configs), "features": features, "targets": list(targets), "trial_ids": list(trial_ids), "r_min": r_min, "r_max": r_max, "do_fantasizing": do_fantasizing, } if normalize_targets: result["mean_targets"] = mean result["std_targets"] = std return result
[docs] def prepare_data_with_pending( state: TuningJobState, config_space_ext: ExtendedConfiguration, active_metric: str, normalize_targets: bool = False, ) -> (Dict, Dict): """ Similar to ``prepare_data`` with ``do_fantasizing=False``, but two dicts are returned, the first for trials without pending evaluations, the second for trials with pending evaluations. The latter dict also contains trials which have pending, but no observed evaluations. The second dict has the additional entry ``num_pending``, which lists the number of pending evals for each trial. These evals must be contiguous and adjacent with observed evals, so that the union of observed and pending evals are contiguous (when it comes to resource levels). :param state: See ``prepare_data`` :param config_space_ext: See ``prepare_data`` :param active_metric: See ``prepare_data`` :param normalize_targets: See ``prepare_data`` :return: See above """ r_min, r_max = config_space_ext.resource_attr_range hp_ranges = config_space_ext.hp_ranges data1_lst = [] # trials without pending evals data2_lst = [] # trials with pending evals num_pending = [] num_pending_for_trial = Counter(ev.trial_id for ev in state.pending_evaluations) targets = [] done_trial_ids = set() for ev in state.trials_evaluations: tpl = _create_tuple(ev, active_metric, state.config_for_trial) _, observed, trial_id = tpl if trial_id not in num_pending_for_trial: data1_lst.append(tpl) else: data2_lst.append(tpl) num_pending.append(num_pending_for_trial[trial_id]) done_trial_ids.add(trial_id) targets += [x[1] for x in observed] mean = 0.0 std = 1.0 if normalize_targets: std = max(np.std(targets), 1e-9) mean = np.mean(targets) # There may be trials with pending evaluations, but no observed ones for ev in state.pending_evaluations: trial_id = ev.trial_id if trial_id not in done_trial_ids: config = state.config_for_trial[trial_id] data2_lst.append((config, [], trial_id)) num_pending.append(num_pending_for_trial[trial_id]) results = () with_pending = False for data_lst in (data1_lst, data2_lst): configs, targets, trial_ids = _prepare_data_internal( state=state, data_lst=data_lst, config_space_ext=config_space_ext, active_metric=active_metric, do_fantasizing=False, mean=mean, std=std, ) if configs: # Sort in decreasing order w.r.t. number of targets if not with_pending: configs, targets, trial_ids = zip( *sorted( zip(configs, targets, trial_ids), key=lambda x: -x[1].shape[0] ) ) else: configs, targets, num_pending, trial_ids = zip( *sorted( zip(configs, targets, num_pending, trial_ids), key=lambda x: -x[1].shape[0], ) ) features = hp_ranges.to_ndarray_matrix(configs) else: # It is possible that ``data1_lst`` is empty features = None result = { "configs": list(configs), "features": features, "targets": list(targets), "trial_ids": list(trial_ids), "r_min": r_min, "r_max": r_max, "do_fantasizing": False, } if with_pending: result["num_pending"] = num_pending if normalize_targets: result["mean_targets"] = mean result["std_targets"] = std results = results + (result,) with_pending = True return results
[docs] def issm_likelihood_precomputations(targets: List[np.ndarray], r_min: int) -> Dict: """ Precomputations required by ``issm_likelihood_computations``. Importantly, ``prepare_data`` orders datapoints by nonincreasing number of targets ``ydims[i]``. For ``0 <= j < ydim_max``, ``ydim_max = ydims[0] = max(ydims)``, ``num_configs[j]`` is the number of datapoints i for which ``ydims[i] > j``. ``deltay`` is a flat matrix (rows corresponding to fantasy samples; column vector if no fantasizing) consisting of ``ydim_max`` parts, where part j is of size ``num_configs[j]`` and contains ``y[j] - y[j-1]`` for targets of those i counted in ``num_configs[j]``, the term needed in the recurrence to compute ``w[j]``. 'logr`` is a flat vector consisting of ``ydim_max - 1`` parts, where part j (starting from 1) is of size ``num_configs[j]`` and contains the logarithmic term for computing ``a[j-1]`` and ``e[j]``. :param targets: Targets from data representation returned by ``prepare_data`` :param r_min: Value of r_min, as returned by ``prepare_data`` :return: See above """ ydims = [y.shape[0] for y in targets] ydim_max = ydims[0] num_configs = list( np.sum( np.array(ydims).reshape((-1, 1)) > np.arange(ydim_max).reshape((1, -1)), axis=0, ).reshape((-1,)) ) assert num_configs[0] == len(targets), (num_configs, len(targets)) assert num_configs[-1] > 0, num_configs total_size = sum(num_configs) assert total_size == sum(ydims) yprev = np.vstack([y[-1].reshape((1, -1)) for y in targets]) deltay_parts = [yprev] log_r = [] for pos, num in enumerate(num_configs[1:], start=1): ycurr = np.vstack([y[-(pos + 1)].reshape((1, -1)) for y in targets[:num]]) deltay_parts.append(ycurr - yprev[:num, :]) yprev = ycurr logr_curr = [np.log(ydim + r_min - pos) for ydim in ydims[:num]] log_r.extend(logr_curr) deltay = np.vstack(deltay_parts) assert deltay.shape[0] == total_size assert len(log_r) == total_size - num_configs[0] return { "ydims": ydims, "num_configs": num_configs, "deltay": deltay, "logr": np.array(log_r), }
def _squared_norm(a, _np=anp): return _np.sum(_np.square(a)) def _inner_product(a, b, _np=anp): return _np.sum(_np.multiply(a, b)) def _colvec(a, _np=anp): return _np.reshape(a, (-1, 1)) def _rowvec(a, _np=anp): return _np.reshape(a, (1, -1)) def _flatvec(a, _np=anp): return _np.reshape(a, (-1,))
[docs] def issm_likelihood_computations( precomputed: Dict, issm_params: Dict, r_min: int, r_max: int, skip_c_d: bool = False, ) -> Dict: """ Given ``precomputed`` from ``issm_likelihood_precomputations`` and ISSM parameters ``issm_params``, compute quantities required for inference and marginal likelihood computation, pertaining to the ISSM likelihood. The index for r is range(r_min, r_max + 1). Observations must be contiguous from r_min. The ISSM parameters are: - alpha: n-vector, negative - beta: n-vector - gamma: scalar, positive Results returned are: - c: n vector [c_i], negative - d: n vector [d_i], positive - vtv: n vector [|v_i|^2] - wtv: (n, F) matrix [(W_i)^T v_i], F number of fantasy samples - wtw: n-vector [|w_i|^2] (only if no fantasizing) :param precomputed: Output of ``issm_likelihood_precomputations`` :param issm_params: Parameters of ISSM likelihood :param r_min: Smallest resource value :param r_max: Largest resource value :param skip_c_d: If True, c and d are not computed :return: Quantities required for inference and learning criterion """ num_all_configs = precomputed["num_configs"][0] num_res = r_max + 1 - r_min assert num_all_configs > 0, "targets must not be empty" assert num_res > 0, f"r_min = {r_min} must be <= r_max = {r_max}" num_fantasy_samples = precomputed["deltay"].shape[1] compute_wtw = num_fantasy_samples == 1 alphas = _flatvec(issm_params["alpha"]) betas = _flatvec(issm_params["beta"]) gamma = issm_params["gamma"] n = getval(alphas.size) assert n == num_all_configs, f"alpha.size = {n} != {num_all_configs}" n = getval(betas.size) assert n == num_all_configs, f"beta.size = {n} != {num_all_configs}" if not skip_c_d: # We could probably refactor this to fit into the loop below, but it # seems subdominant c_lst = [] d_lst = [] for i, ydim in enumerate(precomputed["ydims"]): alpha = alphas[i] alpha_m1 = alpha - 1.0 beta = betas[i] r_obs = r_min + ydim # Observed in range(r_min, r_obs) assert 0 < ydim <= num_res, f"len(y[{i}]) = {ydim}, num_res = {num_res}" # c_i, d_i if ydim < num_res: lrvec = ( anp.array([np.log(r) for r in range(r_obs, r_max + 1)]) * alpha_m1 + beta ) c_scal = alpha * anp.exp(logsumexp(lrvec)) d_scal = anp.square(gamma * alpha) * anp.exp(logsumexp(lrvec * 2.0)) c_lst.append(c_scal) d_lst.append(d_scal) else: c_lst.append(0.0) d_lst.append(0.0) # Loop over ydim deltay = precomputed["deltay"] logr = precomputed["logr"] off_dely = num_all_configs vvec = anp.ones(off_dely) wmat = deltay[:off_dely, :] # [y_0] vtv = anp.ones(off_dely) wtv = wmat.copy() if compute_wtw: wtw = _flatvec(anp.square(wmat)) # Note: We need the detour via ``vtv_lst``, etc, because ``autograd`` does not # support overwriting the content of an ``ndarray``. Their role is to collect # parts of the final vectors, in reverse ordering vtv_lst = [] wtv_lst = [] wtw_lst = [] alpham1s = alphas - 1 num_prev = off_dely for num in precomputed["num_configs"][1:]: if num < num_prev: # Size of working vectors is shrinking assert vtv.size == num_prev # These parts are done: Collect them in the lists # All vectors are resized to ``num``, dropping the tails vtv_lst.append(vtv[num:]) wtv_lst.append(wtv[num:, :]) vtv = vtv[:num] wtv = wtv[:num, :] if compute_wtw: wtw_lst.append(wtw[num:]) wtw = wtw[:num] alphas = alphas[:num] alpham1s = alpham1s[:num] betas = betas[:num] vvec = vvec[:num] wmat = wmat[:num, :] num_prev = num # [a_{j-1}] off_logr = off_dely - num_all_configs logr_curr = logr[off_logr : (off_logr + num)] avec = alphas * anp.exp(logr_curr * alpham1s + betas) evec = avec * gamma + 1 # [e_j] vvec = vvec * evec # [v_j] deltay_curr = deltay[off_dely : (off_dely + num), :] off_dely += num wmat = _colvec(evec) * wmat + deltay_curr + _colvec(avec) # [w_j] vtv = vtv + anp.square(vvec) if compute_wtw: wtw = wtw + _flatvec(anp.square(wmat)) wtv = wtv + _colvec(vvec) * wmat vtv_lst.append(vtv) wtv_lst.append(wtv) vtv_all = anp.concatenate(tuple(reversed(vtv_lst)), axis=0) wtv_all = anp.concatenate(tuple(reversed(wtv_lst)), axis=0) if compute_wtw: wtw_lst.append(wtw) wtw_all = anp.concatenate(tuple(reversed(wtw_lst)), axis=0) # Compile results result = {"num_data": sum(precomputed["ydims"]), "vtv": vtv_all, "wtv": wtv_all} if compute_wtw: result["wtw"] = wtw_all if not skip_c_d: result["c"] = anp.array(c_lst) result["d"] = anp.array(d_lst) return result
[docs] def posterior_computations( features, mean, kernel, issm_likelihood: Dict, noise_variance ) -> Dict: """ Computes posterior state (required for predictions) and negative log marginal likelihood (returned in ``criterion``), The latter is computed only when there is no fantasizing (i.e., if ``issm_likelihood`` contains ``wtw``). :param features: Input matrix X :param mean: Mean function :param kernel: Kernel function :param issm_likelihood: Outcome of ``issm_likelihood_computations`` :param noise_variance: Variance of ISSM innovations :return: Internal posterior state """ num_data = issm_likelihood["num_data"] kernel_mat = kernel(features, features) # K dvec = issm_likelihood["d"] s2vec = issm_likelihood["vtv"] / noise_variance # S^2 svec = _colvec(anp.sqrt(s2vec + NUMERICAL_JITTER)) # A = I + S K_hat S = (I + S^2 D) + S K S dgvec = s2vec * dvec + 1.0 amat = anp.multiply(svec, anp.multiply(kernel_mat, _rowvec(svec))) + anp.diag(dgvec) # TODO: Do we need AddJitterOp here? lfact = cholesky_factorization(amat) # L (Cholesky factor) # r vectors muhat = _flatvec(mean(features)) - issm_likelihood["c"] s2muhat = s2vec * muhat # S^2 mu_hat r2mat = issm_likelihood["wtv"] / noise_variance - _colvec(s2muhat) r3mat = anp.matmul(kernel_mat, r2mat) + _colvec(dvec) * r2mat r4mat = solve_triangular(lfact, r3mat * svec, lower=True) # Prediction matrix P pmat = r2mat - svec * solve_triangular(lfact, r4mat, lower=True, trans="T") result = { "features": features, "chol_fact": lfact, "svec": _flatvec(svec), "pmat": pmat, "likelihood": issm_likelihood, } if "wtw" in issm_likelihood: # Negative log marginal likelihood # Part sigma^{-2} |r_1|^2 - (r_2)^T r_3 + |r_4|^2 r2vec = _flatvec(r2mat) r3vec = _flatvec(r3mat) r4vec = _flatvec(r4mat) # We use: r_1 = w - V mu_hat, sigma^{-2} V^T V = S^2, and # r_2 = sigma^{-2} V^T w - S^2 mu_hat, so that: # sigma^{-2} |r_1|^2 # = sigma^{-2} |w|^2 + |S mu_hat|^2 - 2 sigma^{-2} w^T V mu_hat # = sigma^{-2} |w|^2 - |S mu_hat|^2 - 2 * (r_2)^T mu_hat # = sigma^{-2} |w|^2 - mu_hat^T (S^2 mu_hat + 2 r_2) part2 = 0.5 * ( anp.sum(issm_likelihood["wtw"]) / noise_variance - _inner_product(muhat, s2muhat + 2.0 * r2vec) - _inner_product(r2vec, r3vec) + _squared_norm(r4vec) ) part1 = anp.sum(anp.log(anp.abs(anp.diag(lfact)))) + 0.5 * num_data * anp.log( 2 * anp.pi * noise_variance ) result["criterion"] = part1 + part2 result["r2vec"] = r2vec result["r4vec"] = r4vec return result
[docs] def predict_posterior_marginals(poster_state: Dict, mean, kernel, test_features): """ These are posterior marginals on the h variable, whereas the full model is for f_r = h + g_r (additive). ``posterior_means`` is a (n, F) matrix, where F is the number of fantasy samples, or F == 1 without fantasizing. :param poster_state: Posterior state :param mean: Mean function :param kernel: Kernel function :param test_features: Feature matrix for test points (not extended) :return: posterior_means, posterior_variances """ k_tr_te = kernel(poster_state["features"], test_features) posterior_means = anp.matmul(k_tr_te.T, poster_state["pmat"]) + _colvec( mean(test_features) ) qmat = solve_triangular( poster_state["chol_fact"], anp.multiply(_colvec(poster_state["svec"]), k_tr_te), lower=True, ) posterior_variances = kernel.diagonal(test_features) - anp.sum( anp.square(qmat), axis=0 ) return posterior_means, _flatvec( anp.maximum(posterior_variances, MIN_POSTERIOR_VARIANCE) )
[docs] def sample_posterior_marginals( poster_state: Dict, mean, kernel, test_features, random_state: RandomState, num_samples: int = 1, ): """ We sample from posterior marginals on the h variance, see also ``predict_posterior_marginals``. """ post_means, post_vars = predict_posterior_marginals( poster_state, mean, kernel, test_features ) assert getval(post_means.shape[1]) == 1, ( "sample_posterior_marginals cannot be used for posterior state " + "based on fantasizing" ) n01_mat = random_state.normal(size=(getval(post_means.shape[0]), num_samples)) post_stds = _colvec(anp.sqrt(post_vars)) return anp.multiply(post_stds, n01_mat) + _colvec(post_means)
[docs] def predict_posterior_marginals_extended( poster_state: Dict, mean, kernel, test_features, resources: List[int], issm_params: Dict, r_min: int, r_max: int, ): """ These are posterior marginals on f_r = h + g_r variables, where (x, r) are zipped from ``test_features``, ``resources``. ``issm_params`` are likelihood parameters for the test configs. ``posterior_means`` is a (n, F) matrix, where F is the number of fantasy samples, or F == 1 without fantasizing. :param poster_state: Posterior state :param mean: Mean function :param kernel: Kernel function :param test_features: Feature matrix for test points (not extended) :param resources: Resource values corresponding to rows of ``test_features`` :param issm_params: See above :param r_min: :param r_max: :return: posterior_means, posterior_variances """ num_test = test_features.shape[0] assert len(resources) == num_test, ( f"test_features.shape[0] = {num_test} != {len(resources)} " + "= len(resources)" ) alphas = anp.reshape(issm_params["alpha"], (-1,)) betas = anp.reshape(issm_params["beta"], (-1,)) gamma = issm_params["gamma"] n = getval(alphas.size) assert n == num_test, ( f"Entries in issm_params must have size {num_test}, but " + f"have size {n}" ) # Predictive marginals over h h_means, h_variances = predict_posterior_marginals( poster_state, mean, kernel, test_features ) if all(r == r_max for r in resources): # Frequent special case posterior_means = h_means posterior_variances = h_variances else: # Convert into predictive marginals over f_r posterior_means = [] posterior_variances = [] for h_mean, h_variance, resource, alpha, beta in zip( h_means, h_variances, resources, alphas, betas ): sz = r_max - resource h_mean = _rowvec(h_mean) if sz == 0: posterior_means.append(h_mean) posterior_variances.append(h_variance) else: lrvec = ( anp.array([np.log(r_max - t) for t in range(sz)]) * (alpha - 1.0) + beta ) avec = alpha * anp.exp(lrvec) a2vec = anp.square(alpha * gamma) * anp.exp(lrvec * 2.0) c = anp.sum(avec) d = anp.sum(a2vec) posterior_means.append(h_mean - c) posterior_variances.append(h_variance + d) posterior_means = anp.vstack(posterior_means) posterior_variances = anp.array(posterior_variances) return posterior_means, posterior_variances
[docs] def sample_posterior_joint( poster_state: Dict, mean, kernel, feature, targets: np.ndarray, issm_params: Dict, r_min: int, r_max: int, random_state: RandomState, num_samples: int = 1, ) -> Dict: """ Given ``poster_state`` for some data plus one additional configuration with data (``feature``, ``targets``, ``issm_params``), draw joint samples of the latent variables not fixed by the data, and of the latent target values. ``targets`` may be empty, but must not reach all the way to ``r_max``. The additional configuration must not be in the dataset used to compute ``poster_state``. If ``targets`` correspond to resource values range(r_min, r_obs), we sample latent target values y_r corresponding to range(r_obs, r_max+1) and latent function values f_r corresponding to range(r_obs-1, r_max+1), unless r_obs = r_min (i.e. ``targets`` empty), in which case both [y_r] and [f_r] ranges in range(r_min, r_max+1). We return a dict with [f_r] under ``f``, [y_r] under ``y``. These are matrices with ``num_samples`` columns. :param poster_state: Posterior state for data :param mean: Mean function :param kernel: Kernel function :param feature: Features for additional config :param targets: Target values for additional config :param issm_params: Likelihood parameters for additional config :param r_min: Smallest resource value :param r_max: Largest resource value :param random_state: numpy.random.RandomState :param num_samples: Number of joint samples to draw (default: 1) :return: See above """ num_res = r_max + 1 - r_min targets = _colvec(targets, _np=np) ydim = targets.size t_obs = num_res - ydim assert t_obs > 0, f"targets.size = {ydim} must be < {num_res}" assert getval(poster_state["pmat"].shape[1]) == 1, ( "sample_posterior_joint cannot be used for posterior state " + "based on fantasizing" ) # ISSM parameters alpha = issm_params["alpha"][0] alpha_m1 = alpha - 1.0 beta = issm_params["beta"][0] gamma = issm_params["gamma"] # Posterior mean and variance of h for additional config post_mean, post_variance = predict_posterior_marginals( poster_state, mean, kernel, _rowvec(feature, _np=np) ) post_mean = post_mean[0].item() post_variance = post_variance[0].item() # Compute [a_t], [gamma^2 a_t^2] lrvec = np.array([np.log(r_max - t) for t in range(num_res - 1)]) * alpha_m1 + beta avec = alpha * np.exp(lrvec) a2vec = np.square(alpha * gamma) * np.exp(lrvec * 2.0) # Draw the [eps_t] for all samples epsmat = random_state.normal(size=(num_res, num_samples)) # Compute samples [f_t], [y_t], not conditioned on targets hvec = ( random_state.normal(size=(1, num_samples)) * np.sqrt(post_variance) + post_mean ) f_rows = [] y_rows = [] fcurr = hvec for t in range(num_res - 1): eps_row = _rowvec(epsmat[t], _np=np) f_rows.append(fcurr) y_rows.append(fcurr + eps_row) fcurr = fcurr - avec[t] * (eps_row * gamma + 1.0) eps_row = _rowvec(epsmat[-1], _np=np) f_rows.append(fcurr) y_rows.append(fcurr + eps_row) if ydim > 0: # Condition on targets # Prior samples (reverse order t -> r) fsamples = np.concatenate(tuple(reversed(f_rows[: (t_obs + 1)])), axis=0) # Compute c1 and d1 vectors (same for all samples) zeroscal = np.zeros((1,)) c1vec = np.flip(np.concatenate((zeroscal, np.cumsum(avec[:t_obs])), axis=None)) d1vec = np.flip(np.concatenate((zeroscal, np.cumsum(a2vec[:t_obs])), axis=None)) # Assemble targets for conditional means ymat = np.concatenate(tuple(reversed(y_rows[t_obs:])), axis=0) ycols = np.split(ymat, num_samples, axis=1) assert ycols[0].size == ydim # Sanity check # v^T v, w^T v for sampled targets onevec = np.ones((num_samples,)) _issm_params = {"alpha": alpha * onevec, "beta": beta * onevec, "gamma": gamma} issm_likelihood = issm_likelihood_slow_computations( targets=[_colvec(v, _np=np) for v in ycols], issm_params=_issm_params, r_min=r_min, r_max=r_max, skip_c_d=True, ) vtv = issm_likelihood["vtv"] wtv = issm_likelihood["wtv"] # v^T v, w^T v for observed (last entry) issm_likelihood = issm_likelihood_slow_computations( targets=[targets], issm_params=issm_params, r_min=r_min, r_max=r_max ) vtv = _rowvec(np.concatenate((vtv, issm_likelihood["vtv"]), axis=None), _np=np) wtv = _rowvec(np.concatenate((wtv, issm_likelihood["wtv"]), axis=None), _np=np) cscal = issm_likelihood["c"][0] dscal = issm_likelihood["d"][0] c1vec = _colvec(c1vec, _np=np) d1vec = _colvec(d1vec, _np=np) c2vec = cscal - c1vec d2vec = dscal - d1vec # Compute num_samples + 1 conditional mean vectors in one go denom = vtv * (post_variance + dscal) + 1.0 cond_means = ( (post_mean - c1vec) * (d2vec * vtv + 1.0) + (d1vec + post_variance) * (c2vec * vtv + wtv) ) / denom fsamples = ( fsamples - cond_means[:, :num_samples] + _colvec(cond_means[:, num_samples], _np=np) ) # Samples [y_r] from [f_r] frmat = fsamples[1:] frm1mat = fsamples[:-1] arvec = _colvec( np.minimum(np.flip(avec[:t_obs]), -MIN_POSTERIOR_VARIANCE), _np=np ) ysamples = ((frmat - frm1mat) / arvec - 1.0) * (1.0 / gamma) + frmat else: # Nothing to condition on fsamples = np.concatenate(tuple(reversed(f_rows)), axis=0) ysamples = np.concatenate(tuple(reversed(y_rows)), axis=0) return {"f": fsamples, "y": ysamples}
[docs] def issm_likelihood_slow_computations( targets: List[np.ndarray], issm_params: Dict, r_min: int, r_max: int, skip_c_d: bool = False, ) -> Dict: """ Naive implementation of ``issm_likelihood_computations``, which does not require precomputations, but is much slower. Here, results are computed one datapoint at a time, instead of en bulk. This code is used in unit testing, and called from ``sample_posterior_joint``. """ num_configs = len(targets) num_res = r_max + 1 - r_min assert num_configs > 0, "targets must not be empty" assert num_res > 0, f"r_min = {r_min} must be <= r_max = {r_max}" compute_wtw = targets[0].shape[1] == 1 alphas = _flatvec(issm_params["alpha"]) betas = _flatvec(issm_params["beta"]) gamma = issm_params["gamma"] n = getval(alphas.shape[0]) assert n == num_configs, f"alpha.size = {n} != {num_configs}" n = getval(betas.shape[0]) assert n == num_configs, f"beta.size = {n} != {num_configs}" # Outer loop over configurations c_lst = [] d_lst = [] vtv_lst = [] wtv_lst = [] wtw_lst = [] num_data = 0 for i, ymat in enumerate(targets): alpha = alphas[i] alpha_m1 = alpha - 1.0 beta = betas[i] ydim = ymat.shape[0] num_data += ydim r_obs = r_min + ydim # Observed in range(r_min, r_obs) assert 0 < ydim <= num_res, f"len(y[{i}]) = {ydim}, num_res = {num_res}" if not skip_c_d: # c_i, d_i if ydim < num_res: lrvec = ( anp.array([np.log(r) for r in range(r_obs, r_max + 1)]) * alpha_m1 + beta ) c_scal = alpha * anp.exp(logsumexp(lrvec)) d_scal = anp.square(gamma * alpha) * anp.exp(logsumexp(lrvec * 2.0)) c_lst.append(c_scal) d_lst.append(d_scal) else: c_lst.append(0.0) d_lst.append(0.0) # Inner loop for v_i, w_i yprev = ymat[-1].reshape((1, -1)) # y_{j-1} (vector) vprev = 1.0 # v_{j-1} (scalar) wprev = yprev # w_{j-1} (row vector) vtv = vprev * vprev # scalar wtv = wprev * vprev # row vector if compute_wtw: wtw = wprev * wprev # shape (1, 1) for j in range(1, ydim): ycurr = ymat[ydim - j - 1].reshape((1, -1)) # y_j (row vector) # a_{j-1} ascal = alpha * anp.exp(np.log(r_obs - j) * alpha_m1 + beta) escal = gamma * ascal + 1.0 vcurr = escal * vprev # v_j wcurr = escal * wprev + ycurr - yprev + ascal # w_j vtv = vtv + vcurr * vcurr wtv = wtv + wcurr * vcurr if compute_wtw: wtw = wtw + wcurr * wcurr yprev = ycurr vprev = vcurr wprev = wcurr vtv_lst.append(vtv) wtv_lst.append(wtv) if compute_wtw: assert wtw.shape == (1, 1) wtw_lst.append(wtw.item()) # Compile results result = { "num_data": num_data, "vtv": anp.array(vtv_lst), "wtv": anp.vstack(wtv_lst), } if compute_wtw: result["wtw"] = anp.array(wtw_lst) if not skip_c_d: result["c"] = anp.array(c_lst) result["d"] = anp.array(d_lst) return result
def _update_posterior_internal( poster_state: Dict, kernel, feature, d_new, s_new, r2_new ) -> Dict: assert "r2vec" in poster_state and "r4vec" in poster_state features = poster_state["features"] r2vec = poster_state["r2vec"] r4vec = poster_state["r4vec"] svec = poster_state["svec"] chol_fact = poster_state["chol_fact"] # New row of L: [evec * s_new, l_new] feature = _rowvec(feature, _np=np) kvec = _flatvec(kernel(features, feature), _np=np) evec = _flatvec( spl.solve_triangular(chol_fact, _colvec(kvec * svec), lower=True), _np=np ) kscal = _flatvec(kernel.diagonal(feature), _np=np)[0] khat_min_esq = kscal + d_new - _squared_norm(evec, _np=np) l_new = np.sqrt(khat_min_esq * np.square(s_new) + 1.0) # New entry of r_4 pref = s_new / l_new r4_new = pref * ( _inner_product(kvec, r2vec, _np=np) + khat_min_esq * r2_new - _inner_product(evec, r4vec, _np=np) ) # Update of p p_new = r2_new - pref * r4_new # L^{-T} e ltinv_evec = _flatvec( spl.solve_triangular(chol_fact, _colvec(evec, _np=np), lower=True, trans="T"), _np=np, ) return { "evec": evec, "ltinv_evec": ltinv_evec, "l_new": l_new, "r4_new": r4_new, "p_new": p_new, }
[docs] def update_posterior_state( poster_state: Dict, kernel, feature, d_new, s_new, r2_new ) -> Dict: """ Incremental update of posterior state, given data for one additional configuration. The new datapoint gives rise to a new row/column of the Cholesky factor. r2vec and svec are extended by ``r2_new``, ``s_new`` respectively. r4vec and pvec are extended and all entries change. The new datapoint is represented by ``feature``, ``d_new``, ``s_new``, ``r2_new``. Note: The field ``criterion`` is not updated, but set to np.nan. :param poster_state: Posterior state for data :param kernel: Kernel function :param feature: Features for additional config :param d_new: See above :param s_new: See above :param r2_new: See above :return: Updated posterior state """ features = poster_state["features"] assert "r2vec" in poster_state and "r4vec" in poster_state r2vec = poster_state["r2vec"] r4vec = poster_state["r4vec"] svec = poster_state["svec"] pvec = poster_state["pmat"] assert pvec.shape[1] == 1, "Cannot update fantasizing posterior_state" pvec = _flatvec(pvec, _np=np) chol_fact = poster_state["chol_fact"] feature = _rowvec(feature, _np=np) # Update computations: result = _update_posterior_internal( poster_state, kernel, feature, d_new, s_new, r2_new ) # Put together new state variables # Note: Criterion not updated, but invalidated new_poster_state = { "criterion": np.nan, "features": np.concatenate((features, feature), axis=0), "svec": np.concatenate((svec, np.array([s_new]))), "r2vec": np.concatenate((r2vec, np.array([r2_new]))), } evec = result["evec"] ltinv_evec = result["ltinv_evec"] l_new = result["l_new"] r4_new = result["r4_new"] p_new = result["p_new"] new_poster_state["r4vec"] = np.concatenate( [r4vec + evec * r2_new, np.array([r4_new])] ) new_poster_state["pmat"] = _colvec( np.concatenate([pvec - (ltinv_evec * svec) * p_new, np.array([p_new])]), _np=np ) lvec = _rowvec(evec, _np=np) * s_new zerovec = _colvec(np.zeros_like(lvec), _np=np) lscal = np.array([l_new]).reshape((1, 1)) new_poster_state["chol_fact"] = np.concatenate( ( np.concatenate((chol_fact, lvec), axis=0), np.concatenate((zerovec, lscal), axis=0), ), axis=1, ) return new_poster_state
[docs] def update_posterior_pvec( poster_state: Dict, kernel, feature, d_new, s_new, r2_new ) -> np.ndarray: """ Part of ``update_posterior_state``, just returns the new p vector. :param poster_state: See ``update_posterior_state`` :param kernel: See ``update_posterior_state`` :param feature: See ``update_posterior_state`` :param d_new: See ``update_posterior_state`` :param s_new: See ``update_posterior_state`` :param r2_new: See ``update_posterior_state`` :return: New p vector, as flat vector """ # Update computations: result = _update_posterior_internal( poster_state, kernel, feature, d_new, s_new, r2_new ) svec = poster_state["svec"] pvec = poster_state["pmat"] assert pvec.shape[1] == 1, "Cannot update fantasizing posterior_state" pvec = _flatvec(pvec, _np=np) ltinv_evec = result["ltinv_evec"] p_new = result["p_new"] return np.concatenate((pvec - (ltinv_evec * svec) * p_new, np.array([p_new])))