Source code for syne_tune.optimizer.schedulers.neuralbands.neuralband_supplement

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import logging
from typing import Dict, Optional
import numpy as np
import torch

from syne_tune.backend.time_keeper import RealTimeKeeper
from syne_tune.optimizer.scheduler import SchedulerDecision, TrialSuggestion
from syne_tune.backend.trial_status import Trial
from syne_tune.optimizer.schedulers.searchers.searcher import BaseSearcher
from syne_tune.optimizer.schedulers.searchers.searcher_factory import searcher_factory
from syne_tune.config_space import cast_config_values
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_factory import (
    make_hyperparameter_ranges,
)
from syne_tune.optimizer.schedulers.neuralbands.networks import Exploitation
from syne_tune.optimizer.schedulers.hyperband import HyperbandScheduler


logger = logging.getLogger(__name__)


[docs] def is_continue_decision(trial_decision: str) -> bool: return trial_decision == SchedulerDecision.CONTINUE
[docs] class NeuralbandSchedulerBase(HyperbandScheduler): def __init__( self, config_space: Dict, step_size: int, max_while_loop: int, **kwargs ): """ Shared base scheduler for NeuralBand. :param config_space: :param step_size: How many trials we train the network once :param max_while_loop: Maximal number of times we can draw a configuration from configuration space :param kwargs: """ super(NeuralbandSchedulerBase, self).__init__(config_space, **kwargs) self.kwargs = kwargs # to encode configuration self.hp_ranges = make_hyperparameter_ranges(config_space=self.config_space) self.input_dim = self.hp_ranges.ndarray_size # initialize neural network self.net = Exploitation(dim=self.input_dim) self.currnet_best_score = 1.0 self.train_step_size = step_size self.max_while_loop = max_while_loop def _initialize_searcher_new(self): searcher = self.kwargs["searcher"] search_options = self.kwargs.get("search_options") if search_options is None: search_options = dict() else: search_options = search_options.copy() search_options.update( { "config_space": self.config_space.copy(), "metric": self.metric, "points_to_evaluate": self.kwargs.get("points_to_evaluate"), "mode": self.kwargs["mode"], "random_seed_generator": self.random_seed_generator, } ) if self.max_t is not None: search_options["max_epochs"] = self.max_t # Subclasses may extend ``search_options`` search_options = self._extend_search_options(search_options) # Adjoin scheduler info to search_options, if not already done by # subclass (via ``_extend_search_options``) if "scheduler" not in search_options: search_options["scheduler"] = "fifo" self.searcher: BaseSearcher = searcher_factory(searcher, **search_options) self._searcher_initialized = True
[docs] def on_trial_result(self, trial: Trial, result: Dict) -> str: self._check_result(result) trial_id = str(trial.trial_id) debug_log = self.searcher.debug_log trial_decision = SchedulerDecision.CONTINUE if len(result) == 0: # An empty dict should just be skipped if debug_log is not None: logger.info( f"trial_id {trial_id}: Skipping empty dict received " "from reporter" ) else: # Time since start of experiment time_since_start = self._elapsed_time() do_update = False config = self._preprocess_config(trial.config) cost_and_promotion = ( self._cost_attr is not None and self._cost_attr in result and self.does_pause_resume() ) if cost_and_promotion: # Trial may have paused/resumed before, so need to add cost # offset from these cost_offset = self._cost_offset.get(trial_id, 0) result[self._total_cost_attr()] = result[self._cost_attr] + cost_offset if trial_id not in self._active_trials: # Trial not in self._active_trials anymore, so must have been # stopped trial_decision = SchedulerDecision.STOP logger.warning( f"trial_id {trial_id}: Was STOPPED, but receives another " f"report {result}\nThis report is ignored" ) elif not self._active_trials[trial_id]["running"]: # Trial must have been paused before trial_decision = SchedulerDecision.PAUSE logger.warning( f"trial_id {trial_id}: Was PAUSED, but receives another " f"report {result}\nThis report is ignored" ) else: # update neural network config = trial.config config_encoding = self.hp_ranges.to_ndarray(config) if "epoch" in result: hp_budget = float(result["epoch"] / self.max_t) else: hp_budget = float(result["hp_epoch"] / self.max_t) test_loss = result[self.metric] # update current best score if self.mode == "min": if test_loss < self.currnet_best_score: self.currnet_best_score = test_loss else: if test_loss > self.currnet_best_score: self.currnet_best_score = test_loss self.net.add_data((config_encoding, hp_budget), test_loss) # train network if self.net.data_size % self.train_step_size == 0: self.net.train() task_info = self.terminator.on_task_report(trial_id, result) task_continues = task_info["task_continues"] milestone_reached = task_info["milestone_reached"] if cost_and_promotion: if milestone_reached: # Trial reached milestone and will pause there: Update # cost offset if self._cost_attr is not None: self._cost_offset[trial_id] = result[ self._total_cost_attr() ] elif task_info.get("ignore_data", False): if self._cost_offset[trial_id] > 0: logger.info( f"trial_id {trial_id}: Resumed trial seems to have been " + "started from scratch (no checkpointing?), so we erase " + "the cost offset." ) self._cost_offset[trial_id] = 0 do_update = self._update_searcher(trial_id, config, result, task_info) resource = int(result[self._resource_attr]) self._active_trials[trial_id].update( { "time_stamp": time_since_start, "reported_result": { self.metric: result[self.metric], self._resource_attr: resource, }, "keep_case": milestone_reached, } ) if do_update: largest_update_resource = self._active_trials[trial_id][ "largest_update_resource" ] if largest_update_resource is None: largest_update_resource = resource - 1 assert largest_update_resource <= resource, ( f"Internal error (trial_id {trial_id}): " + f"on_trial_result called with resource = {resource}, " + f"but largest_update_resource = {largest_update_resource}" ) if resource == largest_update_resource: do_update = False # Do not update again else: self._active_trials[trial_id][ "largest_update_resource" ] = resource if not task_continues: if (not self.does_pause_resume()) or resource >= self.max_t: trial_decision = SchedulerDecision.STOP act_str = "Terminating" else: trial_decision = SchedulerDecision.PAUSE act_str = "Pausing" self._cleanup_trial(trial_id) if debug_log is not None: if not task_continues: logger.info( f"trial_id {trial_id}: {act_str} evaluation " f"at {resource}" ) elif milestone_reached: msg = f"trial_id {trial_id}: Reaches {resource}, continues" next_milestone = task_info.get("next_milestone") if next_milestone is not None: msg += f" to {next_milestone}" logger.info(msg) self.searcher.on_trial_result( trial_id, config, result=result, update=do_update ) # Extra info in debug mode log_msg = f"trial_id {trial_id} (metric = {result[self.metric]:.3f}" for k, is_float in ((self._resource_attr, False), ("elapsed_time", True)): if k in result: if is_float: log_msg += f", {k} = {result[k]:.2f}" else: log_msg += f", {k} = {result[k]}" log_msg += f"): decision = {trial_decision}" logger.debug(log_msg) return trial_decision
[docs] class NeuralbandEGreedyScheduler(NeuralbandSchedulerBase): def __init__( self, config_space: Dict, epsilon: float = 0.1, step_size: int = 30, max_while_loop: int = 100, **kwargs, ): """ We combine the epsilon-greedy strategy with NeuralBand, where, with probability epsilon, we select configurations either randomly or, with probability 1 - epsilon, greedily by maximizing the acquisition function in each trial. :param config_space: :param epsilon: :param step_size: :param max_while_loop: :param kwargs: """ super(NeuralbandEGreedyScheduler, self).__init__( config_space, step_size, max_while_loop, **kwargs ) self.epsilon = epsilon def _suggest(self, trial_id: int) -> Optional[TrialSuggestion]: self._initialize_searcher() # If no time keeper was provided at construction, we use a local # one which is started here if self.time_keeper is None: self.time_keeper = RealTimeKeeper() self.time_keeper.start_of_time() # For pause/resume schedulers: Can a paused trial be promoted? promote_trial_id, extra_kwargs = self._promote_trial() if promote_trial_id is not None: promote_trial_id = int(promote_trial_id) return TrialSuggestion.resume_suggestion( trial_id=promote_trial_id, config=extra_kwargs ) # Ask searcher for config of new trial to start extra_kwargs["elapsed_time"] = self._elapsed_time() trial_id = str(trial_id) # epsilon greedy selection criterion initial_budget = self.net.average_b while_loop_count = 0 l_t_score = [] epsilon = np.random.binomial(1, self.epsilon) if epsilon: config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) else: while 1: config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) if config is not None: config_encoding = self.hp_ranges.to_ndarray(config) predict_score = self.net.predict( (config_encoding, initial_budget) ).item() l_t_score.append((config, predict_score)) while_loop_count += 1 if self.mode == "min": if while_loop_count > self.max_while_loop: l_t_score = sorted(l_t_score, key=lambda x: x[1]) config = l_t_score[0][0] break else: if while_loop_count > self.max_while_loop: l_t_score = sorted( l_t_score, key=lambda x: x[1], reverse=True ) config = l_t_score[0][0] break else: self._searcher_initialized = False self._initialize_searcher_new() config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) break if config is not None: config = cast_config_values(config, self.config_space) config = self._on_config_suggest(config, trial_id, **extra_kwargs) config = TrialSuggestion.start_suggestion(config) return config
[docs] class NeuralbandTSScheduler(NeuralbandSchedulerBase): def __init__( self, config_space: Dict, lamdba: float = 0.1, nu: float = 0.01, step_size: int = 30, max_while_loop: int = 100, **kwargs, ): """ We combine Thompson Sampling strategy with NeuralBand, where configurations are selected based on the criterion described by [1]. Reference: [1] ZHANG, Weitong, et al. "Neural Thompson Sampling." International Conference on Learning Representations. 2020. :param config_space: :param lamdba: Regularization term of gradient vector; :param nu: Control aggressiveness of exploration. :param step_size: :param max_while_loop: :param kwargs: """ super(NeuralbandTSScheduler, self).__init__( config_space, step_size, max_while_loop, **kwargs ) self.lamdba = lamdba self.nu = nu # graident vector self.U = self.lamdba * torch.ones((self.net.total_param,)) def _suggest(self, trial_id: int) -> Optional[TrialSuggestion]: self._initialize_searcher() # If no time keeper was provided at construction, we use a local # one which is started here if self.time_keeper is None: self.time_keeper = RealTimeKeeper() self.time_keeper.start_of_time() # For pause/resume schedulers: Can a paused trial be promoted? promote_trial_id, extra_kwargs = self._promote_trial() if promote_trial_id is not None: promote_trial_id = int(promote_trial_id) return TrialSuggestion.resume_suggestion( trial_id=promote_trial_id, config=extra_kwargs ) # Ask searcher for config of new trial to start extra_kwargs["elapsed_time"] = self._elapsed_time() trial_id = str(trial_id) # TS selection criterion initial_budget = self.net.average_b while_loop_count = 0 l_t_score = [] while 1: config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) if config is not None: config_encoding = self.hp_ranges.to_ndarray(config) predict_value = self.net.predict((config_encoding, initial_budget)) self.net.func.zero_grad() predict_value.backward(retain_graph=True) g = torch.cat( [p.grad.flatten().detach() for p in self.net.func.parameters()] ) cb = ( torch.sqrt(torch.sum(self.lamdba * g * g / self.U)).item() * self.nu ) mean_value = torch.tensor(predict_value.item()) predict_score = torch.normal(mean=mean_value, std=cb).item() l_t_score.append((config, predict_score)) if self.mode == "min": if while_loop_count > self.max_while_loop: l_t_score = sorted(l_t_score, key=lambda x: x[1]) config = l_t_score[0][0] break else: if while_loop_count > (self.max_while_loop / 20): l_t_score = sorted(l_t_score, key=lambda x: x[1], reverse=True) config = l_t_score[0][0] break while_loop_count += 1 else: self._searcher_initialized = False self._initialize_searcher_new() config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) break if config is not None: config = cast_config_values(config, self.config_space) config = self._on_config_suggest(config, trial_id, **extra_kwargs) config = TrialSuggestion.start_suggestion(config) return config
[docs] class NeuralbandUCBScheduler(NeuralbandSchedulerBase): def __init__( self, config_space: Dict, lamdba: float = 0.01, nu: float = 0.01, step_size: int = 30, max_while_loop: int = 100, **kwargs, ): """ We combine Upper Confidence Bound with NeuralBand, where configurations are selected based on the upper confidence bound criterion following [1]. Reference: [1] Zhou, Dongruo, Lihong Li, and Quanquan Gu. "Neural contextual bandits with ucb-based exploration." International Conference on Machine Learning. PMLR, 2020. :param config_space: :param lamdba: Regularization term of gradient vector :param nu: Control aggressiveness of exploration :param step_size: :param max_while_loop: :param kwargs: """ super(NeuralbandUCBScheduler, self).__init__( config_space, step_size, max_while_loop, **kwargs ) self.lamdba = lamdba self.nu = nu # graident vector self.U = self.lamdba * torch.ones((self.net.total_param,)) def _suggest(self, trial_id: int) -> Optional[TrialSuggestion]: self._initialize_searcher() # If no time keeper was provided at construction, we use a local # one which is started here if self.time_keeper is None: self.time_keeper = RealTimeKeeper() self.time_keeper.start_of_time() # For pause/resume schedulers: Can a paused trial be promoted? promote_trial_id, extra_kwargs = self._promote_trial() if promote_trial_id is not None: promote_trial_id = int(promote_trial_id) return TrialSuggestion.resume_suggestion( trial_id=promote_trial_id, config=extra_kwargs ) # Ask searcher for config of new trial to start extra_kwargs["elapsed_time"] = self._elapsed_time() trial_id = str(trial_id) # UCB selection criterion initial_budget = self.net.average_b while_loop_count = 0 l_t_score = [] while 1: config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) if config is not None: config_encoding = self.hp_ranges.to_ndarray(config) predict_value = self.net.predict((config_encoding, initial_budget)) self.net.func.zero_grad() predict_value.backward(retain_graph=True) g = torch.cat( [p.grad.flatten().detach() for p in self.net.func.parameters()] ) cb = ( torch.sqrt(torch.sum(self.lamdba * g * g / self.U)).item() * self.nu ) predict_score = predict_value.item() + cb l_t_score.append((config, predict_score)) while_loop_count += 1 if self.mode == "min": if while_loop_count > self.max_while_loop: l_t_score = sorted(l_t_score, key=lambda x: x[1]) config = l_t_score[0][0] break else: if while_loop_count > self.max_while_loop: l_t_score = sorted(l_t_score, key=lambda x: x[1], reverse=True) config = l_t_score[0][0] break else: self._searcher_initialized = False self._initialize_searcher_new() config = self.searcher.get_config(**extra_kwargs, trial_id=trial_id) break if config is not None: config = cast_config_values(config, self.config_space) config = self._on_config_suggest(config, trial_id, **extra_kwargs) config = TrialSuggestion.start_suggestion(config) return config