Launch HPO Experiment Locally
import logging
from pathlib import Path
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import LocalBackend
from syne_tune.config_space import randint
from syne_tune.optimizer.baselines import (
RandomSearch,
ASHA,
)
from examples.training_scripts.height_example.train_height import (
RESOURCE_ATTR,
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
from syne_tune.try_import import try_import_gpsearchers_message
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_epochs = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_epochs,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = (
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
scheduler_kwargs = {
"config_space": config_space,
"metric": METRIC_ATTR,
"mode": METRIC_MODE,
"max_resource_attr": MAX_RESOURCE_ATTR,
}
schedulers = [
RandomSearch(**scheduler_kwargs),
ASHA(**scheduler_kwargs, resource_attr=RESOURCE_ATTR),
]
try:
from syne_tune.optimizer.baselines import BayesianOptimization
# example of setting additional kwargs arguments
schedulers.append(
BayesianOptimization(
**scheduler_kwargs,
search_options={"num_init_random": n_workers + 2},
)
)
from syne_tune.optimizer.baselines import MOBSTER
schedulers.append(MOBSTER(*scheduler_kwargs, resource_attr=RESOURCE_ATTR))
except Exception:
logging.info(try_import_gpsearchers_message())
for scheduler in schedulers:
logging.info(f"\n*** running scheduler {scheduler} ***\n")
trial_backend = LocalBackend(entry_point=str(entry_point))
stop_criterion = StoppingCriterion(
max_wallclock_time=20, min_metric_value={METRIC_ATTR: -6.0}
)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
Along with several of the examples below, this launcher script is using the following train_height.py training script:
"""
Example similar to Raytune, https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/skopt_example.py
"""
import logging
import time
from typing import Optional, Dict, Any
from syne_tune import Reporter
from argparse import ArgumentParser
from syne_tune.config_space import randint
report = Reporter()
RESOURCE_ATTR = "epoch"
METRIC_ATTR = "mean_loss"
METRIC_MODE = "min"
MAX_RESOURCE_ATTR = "steps"
def train_height(step: int, width: float, height: float) -> float:
return 100 / (10 + width * step) + 0.1 * height
def height_config_space(
max_steps: int, sleep_time: Optional[float] = None
) -> Dict[str, Any]:
kwargs = {"sleep_time": sleep_time} if sleep_time is not None else dict()
return {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
**kwargs,
}
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.INFO)
parser = ArgumentParser()
parser.add_argument("--" + MAX_RESOURCE_ATTR, type=int)
parser.add_argument("--width", type=float)
parser.add_argument("--height", type=float)
parser.add_argument("--sleep_time", type=float, default=0.1)
args, _ = parser.parse_known_args()
width = args.width
height = args.height
num_steps = getattr(args, MAX_RESOURCE_ATTR)
for step in range(num_steps):
# Sleep first, since results are returned at end of "epoch"
time.sleep(args.sleep_time)
# Feed the score back to Syne Tune.
dummy_score = train_height(step, width, height)
report(
**{
"step": step,
METRIC_ATTR: dummy_score,
RESOURCE_ATTR: step + 1,
}
)
Fine-Tuning Hugging Face Model for Sentiment Classification
"""
Example for how to fine-tune a DistilBERT model on the IMDB sentiment classification task using the Hugging Face SageMaker Framework.
"""
import logging
from pathlib import Path
from sagemaker.huggingface import HuggingFace
import syne_tune
from benchmarking.benchmark_definitions import distilbert_imdb_benchmark
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune.remote.constants import (
HUGGINGFACE_LATEST_FRAMEWORK_VERSION,
HUGGINGFACE_LATEST_PYTORCH_VERSION,
HUGGINGFACE_LATEST_TRANSFORMERS_VERSION,
HUGGINGFACE_LATEST_PY_VERSION,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
# We pick the DistilBERT on IMDB benchmark
# The 'benchmark' dict contains arguments needed by scheduler and
# searcher (e.g., 'mode', 'metric'), along with suggested default values
# for other arguments (which you are free to override)
random_seed = 31415927
n_workers = 4
benchmark = distilbert_imdb_benchmark()
mode = benchmark.mode
metric = benchmark.metric
config_space = benchmark.config_space
# Define Hugging Face SageMaker estimator
root = Path(syne_tune.__path__[0]).parent
estimator = HuggingFace(
framework_version=HUGGINGFACE_LATEST_FRAMEWORK_VERSION,
transformers_version=HUGGINGFACE_LATEST_TRANSFORMERS_VERSION,
pytorch_version=HUGGINGFACE_LATEST_PYTORCH_VERSION,
py_version=HUGGINGFACE_LATEST_PY_VERSION,
entry_point=str(benchmark.script),
base_job_name="hpo-transformer",
instance_type=benchmark.instance_type,
instance_count=1,
role=get_execution_role(),
dependencies=[root / "benchmarking"],
sagemaker_session=default_sagemaker_session(),
)
# SageMaker backend
trial_backend = SageMakerBackend(
sm_estimator=estimator,
metrics_names=[metric],
)
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=mode, metric=metric, random_seed=random_seed
)
stop_criterion = StoppingCriterion(
max_wallclock_time=3000
) # wall clock time can be increased to 1 hour for more performance
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
Requirements:
Running this script requires Syne Tune to be installed from source.
Runs on four
ml.g4dn.xlarge
instances
In this example, we use the SageMaker backend together with the SageMaker Hugging Face framework in order to fine-tune a DistilBERT model on the IMDB sentiment classification task. This task is one of our built-in benchmarks. For other ways to run this benchmark on different backends or remotely, consult this tutorial.
A more advanced example for fine-tuning Hugging Face transformers is given here.
Launch HPO Experiment with Python Backend
"""
An example showing to launch a tuning of a python function ``train_height``.
"""
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import PythonBackend
from syne_tune.config_space import randint
from syne_tune.optimizer.baselines import ASHA
def train_height(steps: int, width: float, height: float):
"""
The function to be tuned, note that import must be in PythonBackend and no global variable are allowed,
more details on requirements of tuned functions can be found in
:class:`~syne_tune.backend.PythonBackend`.
"""
import logging
from syne_tune import Reporter
import time
root = logging.getLogger()
root.setLevel(logging.INFO)
reporter = Reporter()
for step in range(steps):
dummy_score = (0.1 + width * step / 100) ** (-1) + height * 0.1
# Feed the score back to Syne Tune.
reporter(step=step, mean_loss=dummy_score, epoch=step + 1)
time.sleep(0.1)
if __name__ == "__main__":
import logging
root = logging.getLogger()
root.setLevel(logging.INFO)
max_steps = 100
n_workers = 4
metric = "mean_loss"
mode = "min"
max_resource_attr = "steps"
config_space = {
max_resource_attr: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
scheduler = ASHA(
config_space,
metric=metric,
max_resource_attr=max_resource_attr,
resource_attr="epoch",
mode=mode,
)
trial_backend = PythonBackend(tune_function=train_height, config_space=config_space)
stop_criterion = StoppingCriterion(
max_wallclock_time=10, min_metric_value={metric: -6.0}
)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
The Python backend does not need a separate training script.
Population-Based Training (PBT)
import logging
from pathlib import Path
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.schedulers import PopulationBasedTraining
from syne_tune import Tuner
from syne_tune.config_space import loguniform
from syne_tune import StoppingCriterion
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
max_trials = 100
config_space = {
"lr": loguniform(0.0001, 0.02),
}
entry_point = (
Path(__file__).parent / "training_scripts" / "pbt_example" / "pbt_example.py"
)
trial_backend = LocalBackend(entry_point=str(entry_point))
mode = "max"
metric = "mean_accuracy"
time_attr = "training_iteration"
population_size = 2
pbt = PopulationBasedTraining(
config_space=config_space,
metric=metric,
resource_attr=time_attr,
population_size=population_size,
mode=mode,
max_t=200,
perturbation_interval=1,
)
local_tuner = Tuner(
trial_backend=trial_backend,
scheduler=pbt,
stop_criterion=StoppingCriterion(max_wallclock_time=20),
n_workers=population_size,
results_update_interval=1,
)
local_tuner.run()
This launcher script is using the following pbt_example.py training script:
import numpy as np
import argparse
import logging
import json
import os
import random
import time
from syne_tune import Reporter
from syne_tune.constants import ST_CHECKPOINT_DIR
report = Reporter()
def pbt_function(config):
"""Toy PBT problem for benchmarking adaptive learning rate.
The goal is to optimize this trainable's accuracy. The accuracy increases
fastest at the optimal lr, which is a function of the current accuracy.
The optimal lr schedule for this problem is the triangle wave as follows.
Note that many lr schedules for real models also follow this shape:
best lr
^
| /\
| / \
| / \
| / \
------------> accuracy
In this problem, using PBT with a population of 2-4 is sufficient to
roughly approximate this lr schedule. Higher population sizes will yield
faster convergence. Training will not converge without PBT.
"""
lr = config["lr"]
checkpoint_dir = config.get(ST_CHECKPOINT_DIR)
accuracy = 0.0 # end = 1000
start = 1
if checkpoint_dir and os.path.isdir(checkpoint_dir):
with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
state = json.loads(f.read())
accuracy = state["acc"]
start = state["step"]
midpoint = 100 # lr starts decreasing after acc > midpoint
q_tolerance = 3 # penalize exceeding lr by more than this multiple
noise_level = 2 # add gaussian noise to the acc increase
# triangle wave:
# - start at 0.001 @ t=0,
# - peak at 0.01 @ t=midpoint,
# - end at 0.001 @ t=midpoint * 2,
for step in range(start, 200):
if accuracy < midpoint:
optimal_lr = 0.01 * accuracy / midpoint
else:
optimal_lr = 0.01 - 0.01 * (accuracy - midpoint) / midpoint
optimal_lr = min(0.01, max(0.001, optimal_lr))
# Compute accuracy increase
q_err = max(lr, optimal_lr) / min(lr, optimal_lr)
if q_err < q_tolerance:
accuracy += (1.0 / q_err) * random.random()
elif lr > optimal_lr:
accuracy -= (q_err - q_tolerance) * random.random()
accuracy += noise_level * np.random.normal()
accuracy = max(0, accuracy)
# Save checkpoint
if checkpoint_dir is not None:
os.makedirs(os.path.join(checkpoint_dir), exist_ok=True)
path = os.path.join(checkpoint_dir, "checkpoint.json")
with open(path, "w") as f:
f.write(json.dumps({"acc": accuracy, "step": step}))
report(
mean_accuracy=accuracy,
cur_lr=lr,
training_iteration=step,
optimal_lr=optimal_lr, # for debugging
q_err=q_err, # for debugging
# done=accuracy > midpoint * 2 # this stops the training process
)
time.sleep(2)
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--lr", type=float)
parser.add_argument(f"--{ST_CHECKPOINT_DIR}", type=str)
args, _ = parser.parse_known_args()
params = vars(args)
pbt_function(params)
For this toy example, PBT is run with a population size of 2, so only two parallel workers are needed. In order to use PBT competitively, choose the SageMaker backend. Note that PBT requires your training script to support checkpointing.
Visualize Tuning Progress with Tensorboard
"""
Example showing how to visualize the HPO process of Syne Tune with Tensorboard.
Results will be stored in ~/syne-tune/{tuner_name}/tensoboard_output. To start
tensorboard, execute in a separate shell:
.. code:: bash
tensorboard --logdir /~/syne-tune/{tuner_name}/tensorboard_output
Open the displayed URL in the browser.
To use this functionality you need to install tensorboardX:
.. code:: bash
pip install tensorboardX
"""
import logging
from pathlib import Path
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune import Tuner, StoppingCriterion
from syne_tune.config_space import randint
from syne_tune.callbacks.tensorboard_callback import TensorboardCallback
from syne_tune.results_callback import StoreResultsCallback
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
random_seed = 31415927
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
trial_backend = LocalBackend(entry_point=entry_point)
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=METRIC_MODE, metric=METRIC_ATTR, random_seed=random_seed
)
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
n_workers=n_workers,
stop_criterion=stop_criterion,
results_update_interval=5,
# Adding the TensorboardCallback overwrites the default callback which consists of the StoreResultsCallback.
# To write results on this disk as well, we put this in here as well.
callbacks=[
TensorboardCallback(target_metric=METRIC_ATTR, mode=METRIC_MODE),
StoreResultsCallback(),
],
tuner_name="tensorboardx-demo",
metadata={"description": "just an example"},
)
tuner.run()
Requirements:
Needs
tensorboardX
to be installed:pip install tensorboardX
.
Makes use of train_height.py.
Tensorboard visualization works by using a callback, for example
TensorboardCallback
,
which is passed to the Tuner
. In order to visualize
other metrics, you may have to modify this callback.
Bayesian Optimization with Scikit-learn Based Surrogate Model
import copy
from pathlib import Path
from typing import Tuple
import logging
import numpy as np
from sklearn.linear_model import BayesianRidge
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import LocalBackend
from syne_tune.config_space import randint
from syne_tune.optimizer.schedulers import FIFOScheduler
from syne_tune.optimizer.schedulers.searchers.bayesopt.models.meanstd_acqfunc_impl import (
EIAcquisitionFunction,
)
from syne_tune.optimizer.schedulers.searchers.sklearn import (
SKLearnSurrogateSearcher,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.sklearn import (
SKLearnEstimator,
SKLearnPredictor,
)
class BayesianRidgePredictor(SKLearnPredictor):
"""
Predictor for surrogate model given by ``sklearn.linear_model.BayesianRidge``.
"""
def __init__(self, ridge: BayesianRidge):
self.ridge = ridge
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
return self.ridge.predict(X, return_std=True)
class BayesianRidgeEstimator(SKLearnEstimator):
"""
Estimator for surrogate model given by ``sklearn.linear_model.BayesianRidge``.
None of the parameters of ``BayesianRidge`` are exposed here, so they are all
fixed up front.
"""
def __init__(self, *args, **kwargs):
self.ridge = BayesianRidge(*args, **kwargs)
def fit(
self, X: np.ndarray, y: np.ndarray, update_params: bool
) -> SKLearnPredictor:
self.ridge.fit(X, y.ravel())
return BayesianRidgePredictor(ridge=copy.deepcopy(self.ridge))
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_epochs = 100
n_workers = 4
config_space = {
"width": randint(1, 20),
"height": randint(1, 20),
MAX_RESOURCE_ATTR: 100,
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# We use ``FIFOScheduler`` with a specific searcher based on our surrogate
# model
searcher = SKLearnSurrogateSearcher(
config_space=config_space,
metric=METRIC_ATTR,
estimator=BayesianRidgeEstimator(),
scoring_class=EIAcquisitionFunction,
)
scheduler = FIFOScheduler(
config_space,
metric=METRIC_ATTR,
mode=METRIC_MODE,
max_resource_attr=MAX_RESOURCE_ATTR,
searcher=searcher,
)
tuner = Tuner(
trial_backend=LocalBackend(entry_point=entry_point),
scheduler=scheduler,
stop_criterion=StoppingCriterion(max_wallclock_time=60),
n_workers=n_workers,
)
tuner.run()
Requirements:
Needs
sckit-learn
to be installed. If you installed Syne Tune withsklearn
orbasic
, this dependence is included.
In this example, a simple new surrogate model is implemented based on
sklearn.linear_model.BayesianRidge
, and Bayesian optimization is run with
this surrogate model rather than a Gaussian process model.
Launch HPO Experiment with Simulator Backend
"""
Example for running the simulator backend on a tabulated benchmark
"""
import logging
from syne_tune.experiments.benchmark_definitions.nas201 import nas201_benchmark
from syne_tune.blackbox_repository import BlackboxRepositoryBackend
from syne_tune.backend.simulator_backend.simulator_callback import SimulatorCallback
from syne_tune.optimizer.baselines import ASHA
from syne_tune import Tuner, StoppingCriterion
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
n_workers = 4
dataset_name = "cifar100"
benchmark = nas201_benchmark(dataset_name)
# Simulator backend specialized to tabulated blackboxes
max_resource_attr = benchmark.max_resource_attr
trial_backend = BlackboxRepositoryBackend(
elapsed_time_attr=benchmark.elapsed_time_attr,
max_resource_attr=max_resource_attr,
blackbox_name=benchmark.blackbox_name,
dataset=dataset_name,
)
# Asynchronous successive halving (ASHA)
blackbox = trial_backend.blackbox
scheduler = ASHA(
config_space=blackbox.configuration_space_with_max_resource_attr(
max_resource_attr
),
max_resource_attr=max_resource_attr,
resource_attr=blackbox.fidelity_name(),
mode=benchmark.mode,
metric=benchmark.metric,
search_options={"debug_log": False},
random_seed=random_seed,
)
max_wallclock_time = 3600
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
# Printing the status during tuning takes a lot of time, and so does
# storing results.
print_update_interval = 700
results_update_interval = 300
# It is important to set ``sleep_time`` to 0 here (mandatory for simulator
# backend)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=0,
results_update_interval=results_update_interval,
print_update_interval=print_update_interval,
# This callback is required in order to make things work with the
# simulator callback. It makes sure that results are stored with
# simulated time (rather than real time), and that the time_keeper
# is advanced properly whenever the tuner loop sleeps
callbacks=[SimulatorCallback()],
)
tuner.run()
Requirements:
Syne Tune dependencies
blackbox-repository
need to be installed.Needs
nasbench201
blackbox to be downloaded and preprocessed. This can take quite a while when done for the first timeIf AWS SageMaker is used or an S3 bucket is accessible, the blackbox files are uploaded to your S3 bucket
In this example, we use the simulator backend with the NASBench-201
blackbox. Since time is simulated, we can use
max_wallclock_time=3600
(one hour), but the experiment finishes
in mere seconds. More details about the simulator backend is found in
this tutorial.
Joint Tuning of Instance Type and Hyperparameters using MOASHA
"""
Example showing how to tune instance types and hyperparameters with a Sagemaker Framework.
"""
import logging
from pathlib import Path
from sagemaker.huggingface import HuggingFace
from syne_tune import StoppingCriterion, Tuner
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.instance_info import select_instance_type
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.config_space import loguniform, choice
from syne_tune.constants import (
ST_WORKER_TIME,
ST_WORKER_COST,
ST_INSTANCE_TYPE,
)
from syne_tune.optimizer.schedulers.multiobjective import MOASHA
from syne_tune.remote.constants import (
DEFAULT_CPU_INSTANCE_SMALL,
HUGGINGFACE_LATEST_FRAMEWORK_VERSION,
HUGGINGFACE_LATEST_PYTORCH_VERSION,
HUGGINGFACE_LATEST_TRANSFORMERS_VERSION,
HUGGINGFACE_LATEST_PY_VERSION,
)
from syne_tune.remote.remote_launcher import RemoteLauncher
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
n_workers = 2
epochs = 4
# Select the instance types that are searched.
# Alternatively, you can define the instance list explicitly:
# :code:`instance_types = ["ml.c5.xlarge", "ml.m5.2xlarge"]`
instance_types = select_instance_type(min_gpu=1, max_cost_per_hour=5.0)
print(f"tuning over hyperparameters and instance types: {instance_types}")
# define a search space that contains hyperparameters (learning-rate, weight-decay) and instance-type.
config_space = {
ST_INSTANCE_TYPE: choice(instance_types),
"learning_rate": loguniform(1e-6, 1e-4),
"weight_decay": loguniform(1e-5, 1e-2),
"epochs": epochs,
"dataset_path": "./",
}
entry_point = (
Path(__file__).parent.parent
/ "benchmarking"
/ "training_scripts"
/ "distilbert_on_imdb"
/ "distilbert_on_imdb.py"
)
metric = "accuracy"
# Define a MOASHA scheduler that searches over the config space to maximise accuracy and minimize cost and time.
scheduler = MOASHA(
max_t=epochs,
time_attr="step",
metrics=[metric, ST_WORKER_COST, ST_WORKER_TIME],
mode=["max", "min", "min"],
config_space=config_space,
)
# Define the training function to be tuned, use the Sagemaker backend to execute trials as separate training job
# (since they are quite expensive).
trial_backend = SageMakerBackend(
sm_estimator=HuggingFace(
framework_version=HUGGINGFACE_LATEST_FRAMEWORK_VERSION,
transformers_version=HUGGINGFACE_LATEST_TRANSFORMERS_VERSION,
pytorch_version=HUGGINGFACE_LATEST_PYTORCH_VERSION,
py_version=HUGGINGFACE_LATEST_PY_VERSION,
entry_point=str(entry_point),
base_job_name="hpo-transformer",
# instance-type given here are override by Syne Tune with values sampled from ST_INSTANCE_TYPE.
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
max_run=3600,
role=get_execution_role(),
dependencies=[str(Path(__file__).parent.parent / "benchmarking")],
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
),
)
remote_launcher = RemoteLauncher(
tuner=Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=StoppingCriterion(max_wallclock_time=3600, max_cost=10.0),
n_workers=n_workers,
sleep_time=5.0,
),
dependencies=[str(Path(__file__).parent.parent / "benchmarking")],
)
remote_launcher.run(wait=False)
Requirements:
Needs code from
benchmarking.training_scripts.distilbert_on_imdb
,which requires Syne Tune to be installed from source.
Runs training jobs on instances of type
ml.g4dn.xlarge
,ml.g5.xlarge
,ml.g4dn.2xlarge
,ml.p2.xlarge
,ml.g5.2xlarge
,ml.g5.4xlarge
,ml.g4dn.4xlarge
,ml.g5.8xlarge
,ml.g4dn.8xlarge
,ml.p3.2xlarge
,ml.g5.16xlarge
. This list of instances types to be searched over can be modified by the user
In this example, we use the SageMaker backend together with the SageMaker Hugging Face framework in order to fine-tune a DistilBERT model on the IMDB sentiment classification task:
Instead of optimizing a single objective, we use
MOASHA
in order to sample the Pareto frontier w.r.t. three objectivesWe not only tune hyperparameters such as learning rate and weight decay, but also the AWS instance type to be used for training. Here, one of the objectives to minimize is the training cost (in dollars).
Multi-objective Asynchronous Successive Halving (MOASHA)
"""
Example showing how to tune multiple objectives at once of an artificial function.
"""
import logging
from pathlib import Path
import numpy as np
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.schedulers.multiobjective import MOASHA
from syne_tune import Tuner, StoppingCriterion
from syne_tune.config_space import uniform
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
np.random.seed(0)
max_steps = 27
n_workers = 4
config_space = {
"steps": max_steps,
"theta": uniform(0, np.pi / 2),
"sleep_time": 0.01,
}
entry_point = (
Path(__file__).parent
/ "training_scripts"
/ "mo_artificial"
/ "mo_artificial.py"
)
mode = "min"
np.random.seed(0)
scheduler = MOASHA(
max_t=max_steps,
time_attr="step",
mode=mode,
metrics=["y1", "y2"],
config_space=config_space,
)
trial_backend = LocalBackend(entry_point=str(entry_point))
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=0.5,
)
tuner.run()
This launcher script is using the following mo_artificial.py training script:
import time
from argparse import ArgumentParser
import numpy as np
from syne_tune import Reporter
def f(t, theta):
# Function drawing upper-right circles with radius set to ``t`` and with center set at
# (-t, -t). ``t`` is interpreted as a fidelity and larger ``t`` corresponds to larger radius and better candidates.
# The optimal multiobjective solution should select theta uniformly from [0, pi/2].
return {
"y1": -t + t * np.cos(theta),
"y2": -t + t * np.sin(theta),
}
def plot_function():
import matplotlib.pyplot as plt
ts = np.linspace(0, 27, num=5)
thetas = np.linspace(0, 1) * np.pi / 2
y1s = []
y2s = []
for t in ts:
for theta in thetas:
res = f(t, theta)
y1s.append(res["y1"])
y2s.append(res["y2"])
plt.scatter(y1s, y2s)
plt.show()
if __name__ == "__main__":
# plot_function()
parser = ArgumentParser()
parser.add_argument("--steps", type=int, required=True)
parser.add_argument("--theta", type=float, required=True)
parser.add_argument("--sleep_time", type=float, required=False, default=0.1)
args, _ = parser.parse_known_args()
assert 0 <= args.theta < np.pi / 2
reporter = Reporter()
for step in range(args.steps):
y = f(t=step, theta=args.theta)
reporter(step=step, **y)
time.sleep(args.sleep_time)
PASHA: Efficient HPO and NAS with Progressive Resource Allocation
"""
Example for running PASHA on NASBench201
"""
import logging
from syne_tune.experiments.benchmark_definitions.nas201 import nas201_benchmark
from syne_tune.blackbox_repository import BlackboxRepositoryBackend
from syne_tune.backend.simulator_backend.simulator_callback import SimulatorCallback
from syne_tune.optimizer.baselines import PASHA
from syne_tune import Tuner, StoppingCriterion
if __name__ == "__main__":
logging.getLogger().setLevel(logging.WARNING)
random_seed = 1
nb201_random_seed = 0
n_workers = 4
dataset_name = "cifar100"
benchmark = nas201_benchmark(dataset_name)
# simulator backend specialized to tabulated blackboxes
max_resource_attr = benchmark.max_resource_attr
trial_backend = BlackboxRepositoryBackend(
blackbox_name=benchmark.blackbox_name,
elapsed_time_attr=benchmark.elapsed_time_attr,
max_resource_attr=max_resource_attr,
dataset=dataset_name,
seed=nb201_random_seed,
)
blackbox = trial_backend.blackbox
scheduler = PASHA(
config_space=blackbox.configuration_space_with_max_resource_attr(
max_resource_attr
),
max_resource_attr=max_resource_attr,
resource_attr=blackbox.fidelity_name(),
mode=benchmark.mode,
metric=benchmark.metric,
random_seed=random_seed,
)
max_num_trials_started = 256
stop_criterion = StoppingCriterion(max_num_trials_started=max_num_trials_started)
# printing the status during tuning takes a lot of time, and so does
# storing results
print_update_interval = 700
results_update_interval = 300
# it is important to set ``sleep_time`` to 0 here (mandatory for simulator
# backend)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=0,
results_update_interval=results_update_interval,
print_update_interval=print_update_interval,
# this callback is required in order to make things work with the
# simulator callback. It makes sure that results are stored with
# simulated time (rather than real time), and that the time_keeper
# is advanced properly whenever the tuner loop sleeps
callbacks=[SimulatorCallback()],
)
tuner.run()
Requirements:
Syne Tune dependencies
blackbox-repository
need to be installed.Needs
nasbench201
blackbox to be downloaded and preprocessed. This can take quite a while when done for the first time
PASHA typically uses max_num_trials_completed
as the stopping criterion.
After finding a strong configuration using PASHA,
the next step is to fully train a model with the configuration.
Constrained Bayesian Optimization
"""
Example for running constrained Bayesian optimization on a toy example
"""
import logging
from pathlib import Path
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.schedulers import FIFOScheduler
from syne_tune.config_space import uniform
from syne_tune import StoppingCriterion, Tuner
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
n_workers = 2
config_space = {
"x1": uniform(-5, 10),
"x2": uniform(0, 15),
"constraint_offset": 1.0, # the lower, the stricter
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "constrained_hpo"
/ "train_constrained_example.py"
)
mode = "max"
metric = "objective"
constraint_attr = "my_constraint_metric"
# Local backend
trial_backend = LocalBackend(entry_point=entry_point)
# Bayesian constrained optimization:
# :math:`max_x f(x), \mathrm{s.t.} c(x) <= 0`
# Here, ``metric`` represents :math:`f(x)`, ``constraint_attr`` represents
# :math:`c(x)`.
search_options = {
"num_init_random": n_workers,
"constraint_attr": constraint_attr,
}
scheduler = FIFOScheduler(
config_space,
searcher="bayesopt_constrained",
search_options=search_options,
mode=mode,
metric=metric,
random_seed=random_seed,
)
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
This launcher script is using the following train_constrained_example.py training script:
import logging
import numpy as np
from syne_tune import Reporter
from argparse import ArgumentParser
report = Reporter()
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.DEBUG)
parser = ArgumentParser()
parser.add_argument("--x1", type=float)
parser.add_argument("--x2", type=float)
parser.add_argument("--constraint_offset", type=float)
args, _ = parser.parse_known_args()
x1 = args.x1
x2 = args.x2
constraint_offset = args.constraint_offset
r = 6
objective_value = (
(x2 - (5.1 / (4 * np.pi**2)) * x1**2 + (5 / np.pi) * x1 - r) ** 2
+ 10 * (1 - 1 / (8 * np.pi)) * np.cos(x1)
+ 10
)
constraint_value = (
x1 * 2.0 - constraint_offset
) # feasible iff x1 <= 0.5 * constraint_offset
report(objective=-objective_value, my_constraint_metric=constraint_value)
Restrict Scheduler to Tabulated Configurations with Simulator Backend
"""
Example for running the simulator backend on the "lcbench" tabulated
benchmark. The scheduler is restricted to work with the configurations
which have been evaluated under the benchmark.
"""
import logging
from syne_tune.experiments.benchmark_definitions.lcbench import lcbench_benchmark
from syne_tune.blackbox_repository import BlackboxRepositoryBackend
from syne_tune.backend.simulator_backend.simulator_callback import SimulatorCallback
from syne_tune.optimizer.baselines import BayesianOptimization
from syne_tune import Tuner, StoppingCriterion
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
n_workers = 4
dataset_name = "airlines"
benchmark = lcbench_benchmark(dataset_name)
# Simulator backend specialized to tabulated blackboxes
# Note: Even though ``lcbench_benchmark`` defines a surrogate, we
# do not use this here
max_resource_attr = benchmark.max_resource_attr
trial_backend = BlackboxRepositoryBackend(
elapsed_time_attr=benchmark.elapsed_time_attr,
max_resource_attr=max_resource_attr,
blackbox_name=benchmark.blackbox_name,
dataset=dataset_name,
)
# GP-based Bayesian optimization
# Using ``restrict_configurations``, we restrict the scheduler to only
# suggest configurations which have observations in the tabulated
# blackbox
blackbox = trial_backend.blackbox
restrict_configurations = blackbox.all_configurations()
scheduler = BayesianOptimization(
config_space=blackbox.configuration_space_with_max_resource_attr(
max_resource_attr
),
max_resource_attr=max_resource_attr,
mode=benchmark.mode,
metric=benchmark.metric,
random_seed=random_seed,
search_options=dict(restrict_configurations=restrict_configurations),
)
max_wallclock_time = 3600
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
# Printing the status during tuning takes a lot of time, and so does
# storing results.
print_update_interval = 700
results_update_interval = 300
# It is important to set ``sleep_time`` to 0 here (mandatory for simulator
# backend)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=0,
results_update_interval=results_update_interval,
print_update_interval=print_update_interval,
# This callback is required in order to make things work with the
# simulator callback. It makes sure that results are stored with
# simulated time (rather than real time), and that the time_keeper
# is advanced properly whenever the tuner loop sleeps
callbacks=[SimulatorCallback()],
)
tuner.run()
Requirements:
Syne Tune dependencies
blackbox-repository
need to be installed.Needs
lcbench
blackbox to be downloaded and preprocessed. This can take quite a while when done for the first timeIf AWS SageMaker is used or an S3 bucket is accessible, the blackbox files are uploaded to your S3 bucket
This example is similar to the one above, but here we use the tabulated LCBench benchmark, whose configuration space is infinite, and whose objective values have not been evaluated on a grid. With such a benchmark, we can either use a surrogate to interpolate objective values, or we can restrict the scheduler to only suggest configurations which have been observed in the benchmark. This example demonstrates the latter.
Since time is simulated, we can use max_wallclock_time=3600
(one hour),
but the experiment finishes in mere seconds. More details about the simulator
backend is found in
this tutorial.
Tuning Reinforcement Learning
"""
This launches a local HPO tuning the discount factor of PPO on cartpole.
To run this example, you should have installed dependencies in ``requirements.txt``.
"""
import logging
from pathlib import Path
import numpy as np
from syne_tune.backend import LocalBackend
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.baselines import ASHA
import syne_tune.config_space as sp
from syne_tune import Tuner, StoppingCriterion
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
np.random.seed(0)
max_steps = 100
metric = "episode_reward_mean"
mode = "max"
max_resource_attr = "max_iterations"
trial_backend = LocalBackend(
entry_point=Path(__file__).parent
/ "training_scripts"
/ "rl_cartpole"
/ "train_cartpole.py"
)
scheduler = ASHA(
config_space={
max_resource_attr: max_steps,
"gamma": sp.uniform(0.5, 0.99),
"lr": sp.loguniform(1e-6, 1e-3),
},
metric=metric,
mode=mode,
max_resource_attr=max_resource_attr,
resource_attr="training_iter",
search_options={"debug_log": False},
)
stop_criterion = StoppingCriterion(max_wallclock_time=60)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=2,
)
tuner.run()
tuning_experiment = load_experiment(tuner.name)
print(f"best result found: {tuning_experiment.best_config()}")
tuning_experiment.plot()
This launcher script is using the following train_cartpole.py training script:
"""
Adapts the introductionary example of rllib that trains a Cartpole with PPO.
https://docs.ray.io/en/master/rllib/index.html
The input arguments learning-rate and gamma discount factor can be tuned for maximizing the episode mean reward.
"""
from argparse import ArgumentParser
from syne_tune import Reporter
from ray.rllib.agents.ppo import PPOTrainer
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--max_training_steps", type=int, default=100)
parser.add_argument("--lr", type=float, default=5e-5)
parser.add_argument("--gamma", type=float, default=0.99)
args, _ = parser.parse_known_args()
# Configure the algorithm.
config = {
# Environment (RLlib understands openAI gym registered strings).
"env": "CartPole-v0",
"num_workers": 2,
# Use "tf" for TensorFlow, "torch" for PyTorch, "tf2" for
# tf2.x eager execution
"framework": "torch",
"gamma": args.gamma,
"lr": args.lr,
}
trainer = PPOTrainer(config=config)
reporter = Reporter()
# Run it for n max_training_steps iterations. A training iteration includes
# parallel sample collection by the environment workers as well as
# loss calculation on the collected batch and a model update.
# Episode reward mean is reported each time.
for i in range(args.max_training_steps):
results = trainer.train()
reporter(
training_iter=i + 1,
episode_reward_mean=results["episode_reward_mean"],
)
This training script requires the following dependencies to be installed:
tensorboardX==2.5.1
opencv-python
ray[rllib]==2.2.0
dm-tree==0.1.8
gym==0.23.1
tensorflow==2.11.1
pygame==2.1.2
Launch HPO Experiment with SageMaker Backend
"""
Example showing how to run on Sagemaker with a Sagemaker Framework.
"""
import logging
import os
from pathlib import Path
from sagemaker.pytorch import PyTorch
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune.remote.constants import (
DEFAULT_CPU_INSTANCE_SMALL,
PYTORCH_LATEST_FRAMEWORK,
PYTORCH_LATEST_PY_VERSION,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_steps = 100
n_workers = 4
max_wallclock_time = 5 * 60
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = (
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=METRIC_MODE, metric=METRIC_ATTR, random_seed=random_seed
)
if "AWS_DEFAULT_REGION" not in os.environ:
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
trial_backend = SageMakerBackend(
# we tune a PyTorch Framework from Sagemaker
sm_estimator=PyTorch(
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
framework_version=PYTORCH_LATEST_FRAMEWORK,
py_version=PYTORCH_LATEST_PY_VERSION,
entry_point=str(entry_point),
role=get_execution_role(),
max_run=10 * 60,
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
),
# names of metrics to track. Each metric will be detected by Sagemaker if it is written in the
# following form: "[RMSE]: 1.2", see in train_main_example how metrics are logged for an example
metrics_names=[METRIC_ATTR],
)
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=5.0,
tuner_name="hpo-hyperband",
)
tuner.run()
Requirements:
Access to AWS SageMaker. More details are provided in this tutorial.
This example can be sped up by using SageMaker managed warm pools, as in this example.
Makes use of train_height.py.
SageMaker Backend and Checkpointing
import logging
from pathlib import Path
from sagemaker.pytorch import PyTorch
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
RESOURCE_ATTR,
)
from syne_tune.optimizer.baselines import ASHA
from syne_tune.remote.constants import (
DEFAULT_CPU_INSTANCE_SMALL,
PYTORCH_LATEST_FRAMEWORK,
PYTORCH_LATEST_PY_VERSION,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_steps = 100
n_workers = 4
delete_checkpoints = True
max_wallclock_time = 5 * 60
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = (
Path(__file__).parent
/ "training_scripts"
/ "checkpoint_example"
/ "train_height_checkpoint.py"
)
# ASHA promotion
scheduler = ASHA(
config_space,
metric=METRIC_ATTR,
mode=METRIC_MODE,
max_resource_attr=MAX_RESOURCE_ATTR,
resource_attr=RESOURCE_ATTR,
type="promotion",
search_options={"debug_log": True},
)
# SageMaker backend: We use the warm pool feature here
trial_backend = SageMakerBackend(
sm_estimator=PyTorch(
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
framework_version=PYTORCH_LATEST_FRAMEWORK,
py_version=PYTORCH_LATEST_PY_VERSION,
entry_point=str(entry_point),
role=get_execution_role(),
max_run=10 * 60,
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
keep_alive_period_in_seconds=60, # warm pool feature
),
metrics_names=[METRIC_ATTR],
delete_checkpoints=delete_checkpoints,
)
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=5.0,
tuner_name="height-sagemaker-checkpoints",
start_jobs_without_delay=False,
)
tuner.run()
Requirements:
This launcher script is using the following train_height_checkpoint.py training script:
import logging
import time
from typing import Optional, Dict, Any
import json
from pathlib import Path
import os
import numpy as np
from syne_tune import Reporter
from argparse import ArgumentParser
from syne_tune.config_space import randint
from syne_tune.constants import ST_CHECKPOINT_DIR
report = Reporter()
RESOURCE_ATTR = "epoch"
METRIC_ATTR = "mean_loss"
METRIC_MODE = "min"
MAX_RESOURCE_ATTR = "steps"
def load_checkpoint(checkpoint_path: Path) -> Dict[str, Any]:
with open(checkpoint_path, "r") as f:
return json.load(f)
def save_checkpoint(checkpoint_path: Path, epoch: int, value: float):
os.makedirs(checkpoint_path.parent, exist_ok=True)
with open(checkpoint_path, "w") as f:
json.dump({"epoch": epoch, "value": value}, f)
def train_height_delta(step: int, width: float, height: float, value: float) -> float:
"""
For the original example, we have that
.. math::
f(t + 1) - f(t) = f(t) \cdot \frac{w}{10 + w \cdot t},
f(0) = 10 + h / 10
We implement an incremental version with a stochastic term.
:param step: Step t, nonnegative int
:param width: Width w, nonnegative
:param height: Height h
:param value: Value :math:`f(t - 1)` if :math:`t > 0`
:return: New value :math:`f(t)`
"""
u = 1.0 - 0.1 * np.random.rand() # uniform(0.9, 1) multiplier
if step == 0:
return u * 10 + 0.1 * height
else:
return value * (1.0 + u * width / (width * (step - 1) + 10))
def height_config_space(
max_steps: int, sleep_time: Optional[float] = None
) -> Dict[str, Any]:
kwargs = {"sleep_time": sleep_time} if sleep_time is not None else dict()
return {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
**kwargs,
}
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.INFO)
parser = ArgumentParser()
parser.add_argument("--" + MAX_RESOURCE_ATTR, type=int)
parser.add_argument("--width", type=float)
parser.add_argument("--height", type=float)
parser.add_argument("--sleep_time", type=float, default=0.1)
parser.add_argument(f"--{ST_CHECKPOINT_DIR}", type=str)
args, _ = parser.parse_known_args()
width = args.width
height = args.height
checkpoint_dir = getattr(args, ST_CHECKPOINT_DIR)
num_steps = getattr(args, MAX_RESOURCE_ATTR)
start_step = 0
value = 0.0
if checkpoint_dir is not None:
checkpoint_path = Path(checkpoint_dir) / "checkpoint.json"
if checkpoint_path.exists():
state = load_checkpoint(checkpoint_path)
start_step = state["epoch"]
value = state["value"]
else:
checkpoint_path = None
for step in range(start_step, num_steps):
# Sleep first, since results are returned at end of "epoch"
time.sleep(args.sleep_time)
# Feed the score back to Syne Tune.
value = train_height_delta(step, width, height, value)
epoch = step + 1
if checkpoint_path is not None:
save_checkpoint(checkpoint_path, epoch, value)
report(
**{
"step": step,
METRIC_ATTR: value,
RESOURCE_ATTR: epoch,
}
)
Note that SageMakerBackend
is configured to use
SageMaker managed warm pools:
keep_alive_period_in_seconds=300
in the definition of the SageMaker estimatorstart_jobs_without_delay=False
when creatingTuner
Managed warm pools reduce both start-up and stop delays substantially, they are strongly recommended for multi-fidelity HPO with the SageMaker backend. More details are found in this tutorial.
Retrieving the Best Checkpoint
"""
An example showing how to retrieve the best checkpoint of an XGBoost model.
The script being tuned ``xgboost_checkpoint.py`` stores the checkpoint obtained after each trial evaluation.
After the tuning is done, this example loads the best checkpoint and evaluate the model.
"""
import logging
from pathlib import Path
from examples.training_scripts.xgboost.xgboost_checkpoint import evaluate_accuracy
from syne_tune.backend import LocalBackend
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.baselines import BayesianOptimization
from syne_tune import Tuner, StoppingCriterion
import syne_tune.config_space as cs
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
n_workers = 4
config_space = {
"max_depth": cs.randint(2, 5),
"gamma": cs.uniform(1, 9),
"reg_lambda": cs.loguniform(1e-6, 1),
"n_estimators": cs.randint(1, 10),
}
entry_point = (
Path(__file__).parent / "training_scripts" / "xgboost" / "xgboost_checkpoint.py"
)
trial_backend = LocalBackend(entry_point=str(entry_point))
tuner = Tuner(
trial_backend=trial_backend,
scheduler=BayesianOptimization(config_space, metric="merror", mode="min"),
stop_criterion=StoppingCriterion(max_wallclock_time=10),
n_workers=n_workers,
)
tuner.run()
exp = load_experiment(tuner.name)
best_config = exp.best_config()
checkpoint = trial_backend.checkpoint_trial_path(best_config["trial_id"])
assert checkpoint.exists()
print(f"Best config found {best_config} checkpointed at {checkpoint}")
print(
f"Retrieve best checkpoint and evaluate accuracy of best model: "
f"found {evaluate_accuracy(checkpoint_dir=checkpoint)}"
)
This launcher script is using the following xgboost_checkpoint.py training script:
import os
from argparse import ArgumentParser
from pathlib import Path
import numpy as np
import xgboost
from sklearn.datasets import load_digits
from syne_tune import Reporter
from syne_tune.constants import ST_CHECKPOINT_DIR
class SyneTuneCallback(xgboost.callback.TrainingCallback):
def __init__(self, error_metric: str) -> None:
self.reporter = Reporter()
self.error_metric = error_metric
def after_iteration(self, model, epoch, evals_log):
metrics = list(evals_log.values())[-1][self.error_metric]
self.reporter(**{self.error_metric: metrics[-1]})
pass
def train(
checkpoint_dir: str,
n_estimators: int,
max_depth: int,
gamma: float,
reg_lambda: float,
early_stopping_rounds: int = 5,
) -> None:
eval_metric = "merror"
early_stop = xgboost.callback.EarlyStopping(
rounds=early_stopping_rounds, save_best=True
)
X, y = load_digits(return_X_y=True)
clf = xgboost.XGBClassifier(
n_estimators=n_estimators,
reg_lambda=reg_lambda,
gamma=gamma,
max_depth=max_depth,
eval_metric=eval_metric,
callbacks=[early_stop, SyneTuneCallback(error_metric=eval_metric)],
)
clf.fit(
X,
y,
eval_set=[(X, y)],
)
print("Total boosted rounds:", clf.get_booster().num_boosted_rounds())
save_model(clf, checkpoint_dir=checkpoint_dir)
def save_model(clf, checkpoint_dir):
checkpoint_dir.mkdir(parents=True, exist_ok=True)
path = os.path.join(checkpoint_dir, "model.json")
clf.save_model(path)
def load_model(checkpoint_dir):
path = os.path.join(checkpoint_dir, "model.json")
loaded = xgboost.XGBClassifier()
loaded.load_model(path)
return loaded
def evaluate_accuracy(checkpoint_dir):
X, y = load_digits(return_X_y=True)
clf = load_model(checkpoint_dir=checkpoint_dir)
y_pred = clf.predict(X)
return (np.equal(y, y_pred) * 1.0).mean()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--max_depth", type=int, required=False, default=1)
parser.add_argument("--gamma", type=float, required=False, default=2)
parser.add_argument("--reg_lambda", type=float, required=False, default=0.001)
parser.add_argument("--n_estimators", type=int, required=False, default=10)
parser.add_argument(f"--{ST_CHECKPOINT_DIR}", type=str, default="./")
args, _ = parser.parse_known_args()
checkpoint_dir = Path(vars(args)[ST_CHECKPOINT_DIR])
train(
checkpoint_dir=checkpoint_dir,
max_depth=args.max_depth,
gamma=args.gamma,
reg_lambda=args.reg_lambda,
n_estimators=args.n_estimators,
)
Launch with SageMaker Backend and Custom Docker Image
"""
Example showing how to run on Sagemaker with a custom docker image.
"""
import logging
from pathlib import Path
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.custom_framework import CustomFramework
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune.remote.constants import DEFAULT_CPU_INSTANCE_SMALL
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=METRIC_MODE, metric=METRIC_ATTR, random_seed=random_seed
)
# indicate here an image_uri that is available in ecr, something like that "XXXXXXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/my_image:latest"
image_uri = ...
trial_backend = SageMakerBackend(
sm_estimator=CustomFramework(
entry_point=entry_point,
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
role=get_execution_role(),
image_uri=image_uri,
max_run=10 * 60,
job_name_prefix="hpo-hyperband",
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
),
# names of metrics to track. Each metric will be detected by Sagemaker if it is written in the
# following form: "[RMSE]: 1.2", see in train_main_example how metrics are logged for an example
metrics_names=[METRIC_ATTR],
)
stop_criterion = StoppingCriterion(max_wallclock_time=600)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
sleep_time=5.0,
)
tuner.run()
Requirements:
This example is incomplete. If your training script has dependencies which you would to provide as a Docker image, you need to upload it to ECR, after which you can refer to it with
image_uri
.
Makes use of train_height.py.
Launch Experiments Remotely on SageMaker
"""
This example show how to launch a tuning job that will be executed on Sagemaker rather than on your local machine.
"""
import logging
from pathlib import Path
from argparse import ArgumentParser
from sagemaker.pytorch import PyTorch
from syne_tune import StoppingCriterion, Tuner
from syne_tune.backend import LocalBackend
from syne_tune.backend import SageMakerBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune.remote.constants import (
DEFAULT_CPU_INSTANCE_SMALL,
PYTORCH_LATEST_FRAMEWORK,
PYTORCH_LATEST_PY_VERSION,
)
from syne_tune.remote.remote_launcher import RemoteLauncher
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = ArgumentParser()
parser.add_argument("--use_sagemaker_backend", type=int, default=0)
args = parser.parse_args()
use_sagemaker_backend = bool(args.use_sagemaker_backend)
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# We can use the local or sagemaker backend when tuning remotely.
# Using the local backend means that the remote instance will evaluate the trials locally.
# Using the sagemaker backend means the remote instance will launch one sagemaker job per trial.
if use_sagemaker_backend:
trial_backend = SageMakerBackend(
sm_estimator=PyTorch(
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
framework_version=PYTORCH_LATEST_FRAMEWORK,
py_version=PYTORCH_LATEST_PY_VERSION,
entry_point=entry_point,
role=get_execution_role(),
max_run=10 * 60,
base_job_name="hpo-height",
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
),
)
else:
trial_backend = LocalBackend(entry_point=entry_point)
num_seeds = 1 if use_sagemaker_backend else 2
for seed in range(num_seeds):
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=METRIC_MODE, metric=METRIC_ATTR, random_seed=seed
)
tuner = RemoteLauncher(
tuner=Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
n_workers=n_workers,
tuner_name="height-tuning",
stop_criterion=StoppingCriterion(max_wallclock_time=600),
),
# Extra arguments describing the resource of the remote tuning instance and whether we want to wait
# the tuning to finish. The instance-type where the tuning job runs can be different than the
# instance-type used for evaluating the training jobs.
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
# We can specify a custom container to use with this launcher with <image_uri=TK>
# otherwise a sagemaker pre-build will be used
)
tuner.run(wait=False)
Requirements:
Makes use of train_height.py.
This launcher script starts the HPO experiment as SageMaker training job, which allows you to select any instance type you like, while not having your local machine being blocked. This tutorial explains how to run many such remote experiments in parallel, so to speed up comparisons between alternatives.
Launch HPO Experiment with Home-Made Scheduler
"""
Example showing how to implement a new Scheduler.
"""
import logging
from pathlib import Path
from typing import Optional, List, Dict, Any
import numpy as np
from syne_tune.backend import LocalBackend
from syne_tune.backend.trial_status import Trial
from syne_tune.optimizer.scheduler import (
TrialScheduler,
SchedulerDecision,
TrialSuggestion,
)
from syne_tune.tuner import Tuner
from syne_tune.stopping_criterion import StoppingCriterion
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
class SimpleScheduler(TrialScheduler):
def __init__(
self, config_space: Dict[str, Any], metric: str, mode: Optional[str] = None
):
super(SimpleScheduler, self).__init__(config_space=config_space)
self.metric = metric
self.mode = mode if mode is not None else "min"
self.sorted_results = []
def _suggest(self, trial_id: int) -> Optional[TrialSuggestion]:
# Called when a slot exists to run a trial, here we simply draw a
# random candidate.
config = {
k: v.sample() if hasattr(v, "sample") else v
for k, v in self.config_space.items()
}
return TrialSuggestion.start_suggestion(config)
def on_trial_result(self, trial: Trial, result: Dict[str, Any]) -> str:
# Given a new result, we decide whether the trial should stop or continue.
# In this case, we implement a naive strategy that stops if the result is worse than 80% of previous results.
# This is a naive strategy as we do not account for the fact that trial improves with more steps.
new_metric = result[self.metric]
# insert new metric in sorted results
index = np.searchsorted(self.sorted_results, new_metric)
self.sorted_results = np.insert(self.sorted_results, index, new_metric)
normalized_rank = index / float(len(self.sorted_results))
if self.mode == "max":
normalized_rank = 1 - normalized_rank
if normalized_rank < 0.8:
return SchedulerDecision.CONTINUE
else:
logging.info(
f"see new results {new_metric} which rank {normalized_rank * 100}%, "
f"stopping it as it does not rank on the top 80%"
)
return SchedulerDecision.STOP
def metric_names(self) -> List[str]:
return [self.metric]
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
random_seed = 31415927
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# Local backend
trial_backend = LocalBackend(entry_point=entry_point)
np.random.seed(random_seed)
scheduler = SimpleScheduler(
config_space=config_space, metric=METRIC_ATTR, mode=METRIC_MODE
)
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
Makes use of train_height.py.
For a more thorough introduction on how to develop new schedulers and searchers in Syne Tune, consider this tutorial.
Launch HPO Experiment on mlp_fashionmnist Benchmark
"""
Example for how to tune one of the benchmarks.
"""
import logging
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.schedulers import HyperbandScheduler
from syne_tune import Tuner, StoppingCriterion
from benchmarking.benchmark_definitions.mlp_on_fashionmnist import (
mlp_fashionmnist_benchmark,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
# We pick the MLP on FashionMNIST benchmark
# The 'benchmark' dict contains arguments needed by scheduler and
# searcher (e.g., 'mode', 'metric'), along with suggested default values
# for other arguments (which you are free to override)
random_seed = 31415927
n_workers = 4
benchmark = mlp_fashionmnist_benchmark()
# If you don't like the default config_space, change it here. But let
# us use the default
config_space = benchmark.config_space
# Local backend
trial_backend = LocalBackend(entry_point=str(benchmark.script))
# GP-based Bayesian optimization searcher. Many options can be specified
# via ``search_options``, but let's use the defaults
searcher = "bayesopt"
search_options = {"num_init_random": n_workers + 2}
# Hyperband (or successive halving) scheduler of the stopping type.
# Together with 'bayesopt', this selects the MOBSTER algorithm.
# If you don't like the defaults suggested, just change them:
scheduler = HyperbandScheduler(
config_space,
searcher=searcher,
search_options=search_options,
max_resource_attr=benchmark.max_resource_attr,
resource_attr=benchmark.resource_attr,
mode=benchmark.mode,
metric=benchmark.metric,
random_seed=random_seed,
)
stop_criterion = StoppingCriterion(max_wallclock_time=120)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
Requirements:
Needs “mlp_fashionmnist” benchmark, which requires Syne Tune to have been installed from source.
In this example, we tune one of the built-in benchmark problems, which is useful in order to compare different HPO methods. More details on benchmarking is provided in this tutorial.
Transfer Tuning on NASBench-201
from typing import Dict
from syne_tune.blackbox_repository import load_blackbox, BlackboxRepositoryBackend
from syne_tune.backend.simulator_backend.simulator_callback import SimulatorCallback
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.schedulers import FIFOScheduler
from syne_tune.optimizer.schedulers.transfer_learning import (
TransferLearningTaskEvaluations,
BoundingBox,
)
from syne_tune import StoppingCriterion, Tuner
def load_transfer_learning_evaluations(
blackbox_name: str, test_task: str, metric: str
) -> Dict[str, TransferLearningTaskEvaluations]:
bb_dict = load_blackbox(blackbox_name)
metric_index = [
i
for i, name in enumerate(bb_dict[test_task].objectives_names)
if name == metric
][0]
transfer_learning_evaluations = {
task: TransferLearningTaskEvaluations(
hyperparameters=bb.hyperparameters,
configuration_space=bb.configuration_space,
objectives_evaluations=bb.objectives_evaluations[
..., metric_index : metric_index + 1
],
objectives_names=[metric],
)
for task, bb in bb_dict.items()
if task != test_task
}
return transfer_learning_evaluations
if __name__ == "__main__":
blackbox_name = "nasbench201"
test_task = "cifar100"
elapsed_time_attr = "metric_elapsed_time"
metric = "metric_valid_error"
bb_dict = load_blackbox(blackbox_name)
transfer_learning_evaluations = load_transfer_learning_evaluations(
blackbox_name, test_task, metric
)
scheduler = BoundingBox(
scheduler_fun=lambda new_config_space, mode, metric: FIFOScheduler(
new_config_space,
points_to_evaluate=[],
searcher="random",
metric=metric,
mode=mode,
),
mode="min",
config_space=bb_dict[test_task].configuration_space,
metric=metric,
num_hyperparameters_per_task=10,
transfer_learning_evaluations=transfer_learning_evaluations,
)
stop_criterion = StoppingCriterion(max_wallclock_time=7200)
trial_backend = BlackboxRepositoryBackend(
blackbox_name=blackbox_name,
elapsed_time_attr=elapsed_time_attr,
dataset=test_task,
)
# It is important to set ``sleep_time`` to 0 here (mandatory for simulator backend)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=4,
sleep_time=0,
# This callback is required in order to make things work with the
# simulator callback. It makes sure that results are stored with
# simulated time (rather than real time), and that the time_keeper
# is advanced properly whenever the tuner loop sleeps
callbacks=[SimulatorCallback()],
)
tuner.run()
tuning_experiment = load_experiment(tuner.name)
print(tuning_experiment)
print(f"best result found: {tuning_experiment.best_config()}")
tuning_experiment.plot()
Requirements:
Syne Tune dependencies
blackbox-repository
need to be installed.Needs
nasbench201
blackbox to be downloaded and preprocessed. This can take quite a while when done for the first timeIf AWS SageMaker is used or an S3 bucket is accessible, the blackbox files are uploaded to your S3 bucket
In this example, we use the simulator backend with the NASBench-201 blackbox. It serves as a simple demonstration how evaluations from related tasks can be used to speed up HPO.
Transfer Learning Example
"""
Example collecting evaluations and using them for transfer learning on a
related task.
"""
from examples.training_scripts.height_example.train_height import (
height_config_space,
METRIC_ATTR,
METRIC_MODE,
)
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.baselines import BayesianOptimization, ZeroShotTransfer
from syne_tune.optimizer.schedulers import FIFOScheduler
from syne_tune.optimizer.schedulers.transfer_learning import (
TransferLearningTaskEvaluations,
BoundingBox,
)
from syne_tune.optimizer.schedulers.transfer_learning.quantile_based.quantile_based_searcher import (
QuantileBasedSurrogateSearcher,
)
import argparse
import copy
import numpy as np
from pathlib import Path
def add_labels(ax, conf_space, title):
ax.legend()
ax.set_xlabel("width")
ax.set_ylabel("height")
ax.set_xlim([conf_space["width"].lower - 1, conf_space["width"].upper + 1])
ax.set_ylim([conf_space["height"].lower - 10, conf_space["height"].upper + 10])
ax.set_title(title)
def scatter_space_exploration(ax, task_hyps, max_trials, label, color=None):
ax.scatter(
task_hyps["width"][:max_trials],
task_hyps["height"][:max_trials],
alpha=0.4,
label=label,
color=color,
)
colours = {
"BayesianOptimization": "C0",
"BoundingBox": "C1",
"ZeroShotTransfer": "C2",
"Quantiles": "C3",
}
def plot_last_task(max_trials, df, label, metric, color):
max_tr = min(max_trials, len(df))
plt.scatter(range(max_tr), df[metric][:max_tr], label=label, color=color)
plt.plot([np.min(df[metric][:ii]) for ii in range(1, max_trials + 1)], color=color)
def filter_completed(df):
# Filter out runs that didn't finish
return df[df["status"] == "Completed"].reset_index()
def extract_transferable_evaluations(df, metric, config_space):
"""
Take a dataframe from a tuner run, filter it and generate
TransferLearningTaskEvaluations from it
"""
filter_df = filter_completed(df)
return TransferLearningTaskEvaluations(
configuration_space=config_space,
hyperparameters=filter_df[config_space.keys()],
objectives_names=[metric],
# objectives_evaluations need to be of shape
# (num_evals, num_seeds, num_fidelities, num_objectives)
# We only have one seed, fidelity and objective
objectives_evaluations=np.array(filter_df[metric], ndmin=4).T,
)
def run_scheduler_on_task(entry_point, scheduler, max_trials):
"""
Take a scheduler and run it for max_trials on the backend specified by entry_point
Return a dataframe of the optimisation results
"""
tuner = Tuner(
trial_backend=LocalBackend(entry_point=str(entry_point)),
scheduler=scheduler,
stop_criterion=StoppingCriterion(max_num_trials_finished=max_trials),
n_workers=4,
sleep_time=0.001,
)
tuner.run()
return tuner.tuning_status.get_dataframe()
def init_scheduler(
scheduler_str, max_steps, seed, mode, metric, transfer_learning_evaluations
):
"""
Initialise the scheduler
"""
kwargs = {
"metric": metric,
"config_space": height_config_space(max_steps=max_steps),
"mode": mode,
"random_seed": seed,
}
kwargs_w_trans = copy.deepcopy(kwargs)
kwargs_w_trans["transfer_learning_evaluations"] = transfer_learning_evaluations
if scheduler_str == "BayesianOptimization":
return BayesianOptimization(**kwargs)
if scheduler_str == "ZeroShotTransfer":
return ZeroShotTransfer(use_surrogates=True, **kwargs_w_trans)
if scheduler_str == "Quantiles":
return FIFOScheduler(
searcher=QuantileBasedSurrogateSearcher(**kwargs_w_trans),
**kwargs,
)
if scheduler_str == "BoundingBox":
kwargs_sched_fun = {key: kwargs[key] for key in kwargs if key != "config_space"}
kwargs_w_trans[
"scheduler_fun"
] = lambda new_config_space, mode, metric: BayesianOptimization(
new_config_space,
**kwargs_sched_fun,
)
del kwargs_w_trans["random_seed"]
return BoundingBox(**kwargs_w_trans)
raise ValueError("scheduler_str not recognised")
if __name__ == "__main__":
max_trials = 10
np.random.seed(1)
# Use train_height backend for our tests
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# Collect evaluations on preliminary tasks
transfer_learning_evaluations = {}
for max_steps in range(1, 6):
scheduler = init_scheduler(
"BayesianOptimization",
max_steps=max_steps,
seed=np.random.randint(100),
mode=METRIC_MODE,
metric=METRIC_ATTR,
transfer_learning_evaluations=None,
)
print("Optimising preliminary task %s" % max_steps)
prev_task = run_scheduler_on_task(entry_point, scheduler, max_trials)
# Generate TransferLearningTaskEvaluations from previous task
transfer_learning_evaluations[max_steps] = extract_transferable_evaluations(
prev_task, METRIC_ATTR, scheduler.config_space
)
# Collect evaluations on transfer task
max_steps = 6
transfer_task_results = {}
labels = ["BayesianOptimization", "BoundingBox", "ZeroShotTransfer", "Quantiles"]
for scheduler_str in labels:
scheduler = init_scheduler(
scheduler_str,
max_steps=max_steps,
seed=max_steps,
mode=METRIC_MODE,
metric=METRIC_ATTR,
transfer_learning_evaluations=transfer_learning_evaluations,
)
print("Optimising transfer task using %s" % scheduler_str)
transfer_task_results[scheduler_str] = run_scheduler_on_task(
entry_point, scheduler, max_trials
)
# Optionally generate plots. Defaults to False
parser = argparse.ArgumentParser()
parser.add_argument(
"--generate_plots", action="store_true", help="generate optimisation plots."
)
args = parser.parse_args()
if args.generate_plots:
from syne_tune.try_import import try_import_visual_message
try:
import matplotlib.pyplot as plt
except ImportError:
print(try_import_visual_message())
print("Generating optimisation plots.")
""" Plot the results on the transfer task """
for label in labels:
plot_last_task(
max_trials,
transfer_task_results[label],
label=label,
metric=METRIC_ATTR,
color=colours[label],
)
plt.legend()
plt.ylabel(METRIC_ATTR)
plt.xlabel("Iteration")
plt.title("Transfer task (max_steps=6)")
plt.savefig("Transfer_task.png", bbox_inches="tight")
""" Plot the configs tried for the preliminary tasks """
fig, ax = plt.subplots()
for key in transfer_learning_evaluations:
scatter_space_exploration(
ax,
transfer_learning_evaluations[key].hyperparameters,
max_trials,
"Task %s" % key,
)
add_labels(
ax,
scheduler.config_space,
"Explored locations of BO for preliminary tasks",
)
plt.savefig("Configs_explored_preliminary.png", bbox_inches="tight")
""" Plot the configs tried for the transfer task """
fig, ax = plt.subplots()
# Plot the configs tried by the different schedulers on the transfer task
for label in labels:
finished_trials = filter_completed(transfer_task_results[label])
scatter_space_exploration(
ax, finished_trials, max_trials, label, color=colours[label]
)
# Plot the first config tested as a big square
ax.scatter(
finished_trials["width"][0],
finished_trials["height"][0],
marker="s",
color=colours[label],
s=100,
)
# Plot the optima from the preliminary tasks as black crosses
past_label = "Preliminary optima"
for key in transfer_learning_evaluations:
argmin = np.argmin(
transfer_learning_evaluations[key].objective_values(METRIC_ATTR)[
:max_trials, 0, 0
]
)
ax.scatter(
transfer_learning_evaluations[key].hyperparameters["width"][argmin],
transfer_learning_evaluations[key].hyperparameters["height"][argmin],
color="k",
marker="x",
label=past_label,
)
past_label = None
add_labels(ax, scheduler.config_space, "Explored locations for transfer task")
plt.savefig("Configs_explored_transfer.png", bbox_inches="tight")
Requirements:
Needs
matplotlib
to be installed if the plotting flag is given:pip install matplotlib
. If you installed Syne Tune withvisual
orextra
, this dependence is included.
An example of how to use evaluations collected in Syne Tune to run a transfer
learning scheduler. Makes use of train_height.py.
Used in the
transfer learning tutorial.
To plot the figures, run as
python launch_transfer_learning_example.py --generate_plots
.
Plot Results of Tuning Experiment
import logging
from pathlib import Path
from syne_tune.backend import LocalBackend
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.baselines import RandomSearch
from syne_tune import Tuner, StoppingCriterion
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
random_seed = 31415927
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
trial_backend = LocalBackend(entry_point=entry_point)
# Random search without stopping
scheduler = RandomSearch(
config_space, mode=METRIC_MODE, metric=METRIC_ATTR, random_seed=random_seed
)
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
n_workers=n_workers,
stop_criterion=stop_criterion,
results_update_interval=5,
tuner_name="plot-results-demo",
metadata={"description": "just an example"},
)
tuner.run()
tuning_experiment = load_experiment(tuner.name)
print(tuning_experiment)
print(f"best result found: {tuning_experiment.best_config()}")
tuning_experiment.plot()
Requirements:
Needs
matplotlib
to be installed:pip install matplotlib
. If you installed Syne Tune withvisual
orextra
, this dependence is included.
Makes use of train_height.py.
Customize Results Written during an Experiment
from typing import Dict, Any, Optional, List
from pathlib import Path
import logging
from syne_tune.backend import LocalBackend
from syne_tune.config_space import randint
from syne_tune.constants import ST_TUNER_TIME
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.baselines import DyHPO
from syne_tune.optimizer.schedulers.searchers.dyhpo.hyperband_dyhpo import (
DyHPORungSystem,
)
from syne_tune.results_callback import ExtraResultsComposer, StoreResultsCallback
from syne_tune import Tuner, StoppingCriterion
# We would like to extract some extra information from the scheduler during the
# experiment. To this end, we implement a class for extracting this information
class DyHPOExtraResults(ExtraResultsComposer):
def __call__(self, tuner: Tuner) -> Optional[Dict[str, Any]]:
scheduler = tuner.scheduler
assert isinstance(scheduler, DyHPO) # sanity check
# :class:`~syne_tune.optimizer.schedulers.searchers.dyhpo.hyperband_dyhpo.DyHPORungSystem`
# collects statistics about how often several types of decisions were made in
# ``on_task_schedule``
return scheduler.terminator._rung_systems[0].summary_schedule_records()
def keys(self) -> List[str]:
return DyHPORungSystem.summary_schedule_keys()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
max_epochs = 100
n_workers = 4
# Hyperparameter configuration space
config_space = {
"width": randint(1, 20),
"height": randint(1, 20),
"epochs": 100,
}
# We use the DyHPO scheduler, since it records some interesting extra
# informations
scheduler = DyHPO(
config_space,
metric="mean_loss",
resource_attr="epoch",
max_resource_attr="epochs",
search_options={"debug_log": False},
grace_period=2,
)
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height_simple.py"
)
# Extra results are stored by the
# :class:`~syne_tune.results_callback.StoreResultsCallback`. In fact, they
# are appended to the default time-stamped results whenever a report is
# received.
extra_results_composer = DyHPOExtraResults()
callbacks = [StoreResultsCallback(extra_results_composer=extra_results_composer)]
tuner = Tuner(
trial_backend=LocalBackend(entry_point=entry_point),
scheduler=scheduler,
stop_criterion=StoppingCriterion(max_wallclock_time=30),
n_workers=4, # how many trials are evaluated in parallel
callbacks=callbacks,
)
tuner.run()
# Let us have a look what was written. Here, we just look at the information
# at the end of the experiment
results_df = load_experiment(tuner.name).results
final_pos = results_df.loc[:, ST_TUNER_TIME].argmax()
final_row = dict(results_df.loc[final_pos])
extra_results_at_end = {
name: final_row[name] for name in extra_results_composer.keys()
}
print(f"\nExtra results at end of experiment:\n{extra_results_at_end}")
Makes use of train_height.py.
An example for how to append extra results to those written by default to
results.csv.zip
. This is done by customizing the
StoreResultsCallback
.
Pass Configuration as JSON File to Training Script
import os
import logging
from pathlib import Path
from argparse import ArgumentParser
from syne_tune.backend import LocalBackend, SageMakerBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
get_execution_role,
default_sagemaker_session,
)
from syne_tune.optimizer.baselines import (
ASHA,
)
from syne_tune import Tuner, StoppingCriterion
from syne_tune.remote.constants import (
DEFAULT_CPU_INSTANCE_SMALL,
PYTORCH_LATEST_FRAMEWORK,
PYTORCH_LATEST_PY_VERSION,
)
from examples.training_scripts.height_example.train_height_config_json import (
height_config_space,
RESOURCE_ATTR,
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = ArgumentParser()
parser.add_argument("--use_sagemaker_backend", type=int, default=0)
args = parser.parse_args()
use_sagemaker_backend = bool(args.use_sagemaker_backend)
random_seed = 31415927
max_epochs = 100
n_workers = 4
max_wallclock_time = 5 * 60 if use_sagemaker_backend else 10
config_space = height_config_space(max_epochs)
entry_point = (
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height_config_json.py"
)
scheduler = ASHA(
config_space,
metric=METRIC_ATTR,
mode=METRIC_MODE,
max_resource_attr=MAX_RESOURCE_ATTR,
resource_attr=RESOURCE_ATTR,
)
if not use_sagemaker_backend:
trial_backend = LocalBackend(
entry_point=str(entry_point),
pass_args_as_json=True,
)
else:
from sagemaker.pytorch import PyTorch
import syne_tune
if "AWS_DEFAULT_REGION" not in os.environ:
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
trial_backend = SageMakerBackend(
sm_estimator=PyTorch(
entry_point=str(entry_point),
instance_type=DEFAULT_CPU_INSTANCE_SMALL,
instance_count=1,
framework_version=PYTORCH_LATEST_FRAMEWORK,
py_version=PYTORCH_LATEST_PY_VERSION,
role=get_execution_role(),
dependencies=syne_tune.__path__,
max_run=10 * 60,
sagemaker_session=default_sagemaker_session(),
disable_profiler=True,
debugger_hook_config=False,
keep_alive_period_in_seconds=60, # warm pool feature
),
metrics_names=[METRIC_ATTR],
pass_args_as_json=True,
)
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
start_jobs_without_delay=False,
)
tuner.run()
Requirements:
If
use_sagemaker_backend = True
, needs access to AWS SageMaker.
Makes use of the following train_height_config_json.py training script:
import logging
import time
from typing import Optional, Dict, Any
from argparse import ArgumentParser
from syne_tune import Reporter
from syne_tune.config_space import randint
from syne_tune.utils import add_config_json_to_argparse, load_config_json
report = Reporter()
RESOURCE_ATTR = "epoch"
METRIC_ATTR = "mean_loss"
METRIC_MODE = "min"
MAX_RESOURCE_ATTR = "steps"
def train_height(step: int, width: float, height: float) -> float:
return 100 / (10 + width * step) + 0.1 * height
def height_config_space(
max_steps: int, sleep_time: Optional[float] = None
) -> Dict[str, Any]:
if sleep_time is None:
sleep_time = 0.1
return {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
"sleep_time": sleep_time,
"list_arg": ["this", "is", "a", "list", 1, 2, 3],
"dict_arg": {
"this": 27,
"is": [1, 2, 3],
"a": "dictionary",
"even": {
"a": 0,
"nested": 1,
"one": 2,
},
},
}
def _check_extra_args(config: Dict[str, Any]):
config_space = height_config_space(5)
for k in ("list_arg", "dict_arg"):
a, b = config[k], config_space[k]
assert a == b, (k, a, b)
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.INFO)
parser = ArgumentParser()
# Append required argument(s):
add_config_json_to_argparse(parser)
args, _ = parser.parse_known_args()
# Loads config JSON and merges with ``args``
config = load_config_json(vars(args))
# Check that args with complex types have been received correctly
_check_extra_args(config)
width = config["width"]
height = config["height"]
sleep_time = config["sleep_time"]
num_steps = config[MAX_RESOURCE_ATTR]
for step in range(num_steps):
# Sleep first, since results are returned at end of "epoch"
time.sleep(sleep_time)
# Feed the score back to Syne Tune.
dummy_score = train_height(step, width, height)
report(
**{
"step": step,
METRIC_ATTR: dummy_score,
RESOURCE_ATTR: step + 1,
}
)
Speculative Early Checkpoint Removal
"""
Example for speculative checkpoint removal with asynchronous multi-fidelity
"""
from typing import Optional, Dict, Any, List
import logging
from syne_tune.backend import LocalBackend
from syne_tune.callbacks.hyperband_remove_checkpoints_callback import (
HyperbandRemoveCheckpointsCommon,
)
from syne_tune.constants import ST_TUNER_TIME
from syne_tune.experiments import load_experiment
from syne_tune.optimizer.baselines import MOBSTER
from syne_tune.results_callback import ExtraResultsComposer, StoreResultsCallback
from syne_tune.util import find_first_of_type
from syne_tune import Tuner, StoppingCriterion
from benchmarking.benchmark_definitions.mlp_on_fashionmnist import (
mlp_fashionmnist_benchmark,
)
# This is used to monitor what the checkpoint removal mechanism is doing, and
# writing out results. This is optional, the mechanism works without this.
class CPRemovalExtraResults(ExtraResultsComposer):
def __call__(self, tuner: Tuner) -> Optional[Dict[str, Any]]:
callback = find_first_of_type(tuner.callbacks, HyperbandRemoveCheckpointsCommon)
return None if callback is None else callback.extra_results()
def keys(self) -> List[str]:
return HyperbandRemoveCheckpointsCommon.extra_results_keys()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
random_seed = 31415927
n_workers = 4
max_num_checkpoints = 10
# This time may be too short to see positive effects:
max_wallclock_time = 1800
# Monitor how checkpoint removal is doing over time, appending this
# information to results.csv.zip?
monitor_cp_removal_in_results = True
# We pick the MLP on FashionMNIST benchmark
benchmark = mlp_fashionmnist_benchmark()
# Local backend
# By setting ``delete_checkpoints=True``, we ask for checkpoints to be removed
# once a trial cannot be resumed anymore
trial_backend = LocalBackend(
entry_point=str(benchmark.script),
delete_checkpoints=True,
)
# MOBSTER (model-based ASHA) with promotion scheduling (pause and resume).
# Checkpoints are written for each paused trial, and these are not removed,
# because in principle, every paused trial may be resumed in the future.
# If checkpoints are large, this may fill up your disk.
# Here, we use speculative checkpoint removal to keep the number of checkpoints
# to at most ``max_num_checkpoints``. To this end, paused trials are ranked by
# expected cost of removing their checkpoint.
scheduler = MOBSTER(
benchmark.config_space,
type="promotion",
max_resource_attr=benchmark.max_resource_attr,
resource_attr=benchmark.resource_attr,
mode=benchmark.mode,
metric=benchmark.metric,
random_seed=random_seed,
early_checkpoint_removal_kwargs=dict(
max_num_checkpoints=max_num_checkpoints,
),
)
stop_criterion = StoppingCriterion(max_wallclock_time=max_wallclock_time)
# The tuner activates early checkpoint removal iff
# ``trial_backend.delete_checkpoints``. In this case, it requests details
# from the scheduler (which is ``early_checkpoint_removal_kwargs`` in our
# case). Early checkpoint removal is done by appending a callback to those
# normally used with the tuner.
if monitor_cp_removal_in_results:
# We can monitor how well checkpoint removal is working by storing
# extra results (this is optional)
extra_results_composer = CPRemovalExtraResults()
callbacks = [
StoreResultsCallback(extra_results_composer=extra_results_composer)
]
else:
extra_results_composer = None
callbacks = None
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
callbacks=callbacks,
)
tuner.run()
if monitor_cp_removal_in_results:
# We have monitored how checkpoint removal has been doing over time. Here,
# we just look at the information at the end of the experiment
results_df = load_experiment(tuner.name).results
final_pos = results_df.loc[:, ST_TUNER_TIME].argmax()
final_row = dict(results_df.loc[final_pos])
extra_results_at_end = {
name: final_row[name] for name in extra_results_composer.keys()
}
logging.info(f"Extra results at end of experiment:\n{extra_results_at_end}")
# We can obtain additional details from the callback, which is the last one
# in ``tuner``
callback = find_first_of_type(tuner.callbacks, HyperbandRemoveCheckpointsCommon)
trials_resumed = callback.trials_resumed_without_checkpoint()
if trials_resumed:
logging.info(
f"The following {len(trials_resumed)} trials were resumed without a checkpoint:\n{trials_resumed}"
)
else:
logging.info("No trials were resumed without a checkpoint")
Requirements:
Needs “mlp_fashionmnist” benchmark, which requires Syne Tune to have been installed from source.
This example uses the mlp_fashionmnist
benchmark. It runs for about 30
minutes. It demonstrates speculative early checkpoint removal for MOBSTER
with promotion scheduling (pause and resume).
Launch HPO Experiment with Ray Tune Scheduler
import logging
from pathlib import Path
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search.skopt import SkOptSearch
import numpy as np
from syne_tune.backend import LocalBackend
from syne_tune.optimizer.schedulers import RayTuneScheduler
from syne_tune import Tuner, StoppingCriterion
from syne_tune.config_space import randint
from examples.training_scripts.height_example.train_height import (
RESOURCE_ATTR,
METRIC_ATTR,
METRIC_MODE,
MAX_RESOURCE_ATTR,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
random_seed = 31415927
max_steps = 100
n_workers = 4
config_space = {
MAX_RESOURCE_ATTR: max_steps,
"width": randint(0, 20),
"height": randint(-100, 100),
}
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "height_example"
/ "train_height.py"
)
# Local backend
trial_backend = LocalBackend(entry_point=entry_point)
# Hyperband scheduler with SkOpt searcher
np.random.seed(random_seed)
ray_searcher = SkOptSearch()
ray_searcher.set_search_properties(
mode=METRIC_MODE,
metric=METRIC_ATTR,
config=RayTuneScheduler.convert_config_space(config_space),
)
ray_scheduler = AsyncHyperBandScheduler(
max_t=max_steps,
time_attr=RESOURCE_ATTR,
mode=METRIC_MODE,
metric=METRIC_ATTR,
)
scheduler = RayTuneScheduler(
config_space=config_space,
ray_scheduler=ray_scheduler,
ray_searcher=ray_searcher,
)
stop_criterion = StoppingCriterion(max_wallclock_time=20)
tuner = Tuner(
trial_backend=trial_backend,
scheduler=scheduler,
stop_criterion=stop_criterion,
n_workers=n_workers,
)
tuner.run()
Makes use of train_height.py.
Stand-Alone Bayesian Optimization
import logging
from syne_tune.config_space import uniform, randint, choice
from syne_tune.optimizer.schedulers.searchers.bayesopt.datatypes.common import (
dictionarize_objective,
)
from syne_tune.optimizer.schedulers.searchers.utils.hp_ranges_factory import (
make_hyperparameter_ranges,
)
from syne_tune.optimizer.schedulers.searchers.bayesopt.utils.test_objects import (
create_tuning_job_state,
)
from syne_tune.optimizer.schedulers.searchers.gp_fifo_searcher import GPFIFOSearcher
from syne_tune.optimizer.schedulers.searchers.gp_searcher_utils import encode_state
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
random_seed = 31415927
# toy example of 3 hp's
config_space = {
"hp_1": uniform(-5.0, 5.0),
"hp_2": randint(-5, 5),
"hp_3": choice(["a", "b", "c"]),
}
hp_ranges = make_hyperparameter_ranges(config_space)
batch_size = 16
num_init_candidates_for_batch = 10
state = create_tuning_job_state(
hp_ranges=hp_ranges,
cand_tuples=[
(-3.0, -4, "a"),
(2.2, -3, "b"),
(-4.9, -1, "b"),
(-1.9, -1, "c"),
(-3.5, 3, "a"),
],
metrics=[dictionarize_objective(x) for x in (15.0, 27.0, 13.0, 39.0, 35.0)],
)
gp_searcher = GPFIFOSearcher(
state.hp_ranges.config_space,
points_to_evaluate=None,
random_seed=random_seed,
metric="objective",
debug_log=False,
)
gp_searcher_state = gp_searcher.get_state()
gp_searcher_state["state"] = encode_state(state)
gp_searcher = gp_searcher.clone_from_state(gp_searcher_state)
next_candidate_list = gp_searcher.get_batch_configs(
batch_size=batch_size,
num_init_candidates_for_batch=num_init_candidates_for_batch,
)
assert len(next_candidate_list) == batch_size
Syne Tune combines a scheduler (HPO algorithm) with a backend to provide a complete HPO solution. If you already have a system in place for job scheduling and managing the state of the tuning problem, you may want to call the scheduler on its own. This example demonstrates how to do this for Gaussian process based Bayesian optimization.
Ask Tell Interface
"""
This is an example on how to use syne-tune in the ask-tell mode.
In this setup the tuning loop and experiments are disentangled. The AskTell Scheduler suggests new configurations
and the users themselves perform experiments to test the performance of each configuration.
Once done, user feeds the result into the Scheduler which uses the data to suggest better configurations.
In some cases, experiments needed for function evaluations can be very complex and require extra orchestration
(example vary from setting up jobs on non-aws clusters to runnig physical lab experiments) in which case this
interface provides all the necessary flexibility
"""
from typing import Dict
import datetime
import logging
import dill
import numpy as np
from syne_tune.backend.trial_status import Trial, Status, TrialResult
from syne_tune.config_space import uniform
from syne_tune.optimizer.baselines import RandomSearch, BayesianOptimization
from syne_tune.optimizer.scheduler import TrialScheduler
class AskTellScheduler:
bscheduler: TrialScheduler
trial_counter: int
completed_experiments: Dict[int, TrialResult]
def __init__(self, base_scheduler: TrialScheduler):
self.bscheduler = base_scheduler
self.trial_counter = 0
self.completed_experiments = {}
def ask(self) -> Trial:
"""
Ask the scheduler for new trial to run
:return: Trial to run
"""
trial_suggestion = self.bscheduler.suggest(self.trial_counter)
trial = Trial(
trial_id=self.trial_counter,
config=trial_suggestion.config,
creation_time=datetime.datetime.now(),
)
self.trial_counter += 1
return trial
def tell(self, trial: Trial, experiment_result: Dict[str, float]):
"""
Feed experiment results back to the Scheduler
:param trial: Trial that was run
:param experiment_result: {metric: value} dictionary with experiment results
"""
trial_result = trial.add_results(
metrics=experiment_result,
status=Status.completed,
training_end_time=datetime.datetime.now(),
)
self.bscheduler.on_trial_complete(trial=trial, result=experiment_result)
self.completed_experiments[trial_result.trial_id] = trial_result
def best_trial(self, metric: str) -> TrialResult:
"""
Return the best trial according to the provided metric
"""
if self.bscheduler.mode == "max":
sign = 1.0
else:
sign = -1.0
return max(
[value for key, value in self.completed_experiments.items()],
key=lambda trial: sign * trial.metrics[metric],
)
def target_function(x, noise: bool = True):
fx = x * x + np.sin(x)
if noise:
sigma = np.cos(x) ** 2 + 0.01
noise = 0.1 * np.random.normal(loc=x, scale=sigma)
fx = fx + noise
return fx
def get_objective():
metric = "mean_loss"
mode = "min"
max_iterations = 100
config_space = {
"x": uniform(-1, 1),
}
return metric, mode, config_space, max_iterations
def plot_objective():
"""
In this function, we will inspect the objective by plotting the target function
:return:
"""
from syne_tune.try_import import try_import_visual_message
try:
import matplotlib.pyplot as plt
except ImportError:
print(try_import_visual_message())
metric, mode, config_space, max_iterations = get_objective()
plt.set_cmap("viridis")
x = np.linspace(config_space["x"].lower, config_space["x"].upper, 400)
fx = target_function(x, noise=False)
noise = 0.1 * np.cos(x) ** 2 + 0.01
plt.plot(x, fx, "r--", label="True value")
plt.fill_between(x, fx + noise, fx - noise, alpha=0.2, fc="r")
plt.legend()
plt.grid()
plt.show()
def tune_with_random_search() -> TrialResult:
metric, mode, config_space, max_iterations = get_objective()
scheduler = AskTellScheduler(
base_scheduler=RandomSearch(config_space, metric=metric, mode=mode)
)
for iter in range(max_iterations):
trial_suggestion = scheduler.ask()
test_result = target_function(**trial_suggestion.config)
scheduler.tell(trial_suggestion, {metric: test_result})
return scheduler.best_trial(metric)
def save_restart_with_gp() -> TrialResult:
metric, mode, config_space, max_iterations = get_objective()
scheduler = AskTellScheduler(
base_scheduler=BayesianOptimization(config_space, metric=metric, mode=mode)
)
for iter in range(int(max_iterations / 2)):
trial_suggestion = scheduler.ask()
test_result = target_function(**trial_suggestion.config)
scheduler.tell(trial_suggestion, {metric: test_result})
# --- The scheduler can be written to disk to pause experiment
output_path = "scheduler-checkpoint.dill"
with open(output_path, "wb") as f:
dill.dump(scheduler, f)
# --- The Scheduler can be read from disk at a later time to resume experiments
with open(output_path, "rb") as f:
scheduler = dill.load(f)
for iter in range(int(max_iterations / 2)):
trial_suggestion = scheduler.ask()
test_result = target_function(**trial_suggestion.config)
scheduler.tell(trial_suggestion, {metric: test_result})
return scheduler.best_trial(metric)
def tune_with_gp() -> TrialResult:
metric, mode, config_space, max_iterations = get_objective()
scheduler = AskTellScheduler(
base_scheduler=BayesianOptimization(config_space, metric=metric, mode=mode)
)
for iter in range(max_iterations):
trial_suggestion = scheduler.ask()
test_result = target_function(**trial_suggestion.config)
scheduler.tell(trial_suggestion, {metric: test_result})
return scheduler.best_trial(metric)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.WARN)
# plot_objective() # Please uncomment this to plot the objective
print("Random:", tune_with_random_search())
print("GP with restart:", save_restart_with_gp())
print("GP:", tune_with_gp())
This is an example on how to use syne-tune in the ask-tell mode. In this setup the tuning loop and experiments are disentangled. The AskTell Scheduler suggests new configurations and the users themselves perform experiments to test the performance of each configuration. Once done, user feeds the result into the Scheduler which uses the data to suggest better configurations.
In some cases, experiments needed for function evaluations can be very complex and require extra orchestration (example vary from setting up jobs on non-aws clusters to running physical lab experiments) in which case this interface provides all the necessary flexibility.
Ask Tell interface for Hyperband
"""
This is an example on how to use syne-tune in the ask-tell mode.
In this setup the tuning loop and experiments are disentangled. The AskTell Scheduler suggests new configurations
and the users themselves perform experiments to test the performance of each configuration.
Once done, user feeds the result into the Scheduler which uses the data to suggest better configurations.
In some cases, experiments needed for function evaluations can be very complex and require extra orchestration
(example vary from setting up jobs on non-aws clusters to runnig physical lab experiments) in which case this
interface provides all the necessary flexibility
This is an extension of launch_ask_tell_scheduler.py to run multi-fidelity methods such as Hyperband
"""
import logging
from typing import Tuple
import numpy as np
from examples.launch_ask_tell_scheduler import AskTellScheduler
from syne_tune.backend.trial_status import Trial, TrialResult
from syne_tune.config_space import uniform
from syne_tune.optimizer.baselines import ASHA
from syne_tune.optimizer.scheduler import SchedulerDecision
def target_function(x, step: int = None, noise: bool = True):
fx = x * x + np.sin(x)
if noise:
sigma = np.cos(x) ** 2 + 0.01
noise = 0.1 * np.random.normal(loc=x, scale=sigma)
fx = fx + noise
if step is not None:
fx += step * 0.01
return fx
def get_objective():
metric = "mean_loss"
mode = "min"
max_iterations = 100
config_space = {
"x": uniform(-1, 1),
}
return metric, mode, config_space, max_iterations
def run_hyperband_step(
scheduler: AskTellScheduler, trial_suggestion: Trial, max_steps: int, metric: str
) -> Tuple[float, float]:
for step in range(1, max_steps):
test_result = target_function(**trial_suggestion.config, step=step)
decision = scheduler.bscheduler.on_trial_result(
trial_suggestion, {metric: test_result, "epoch": step}
)
if decision == SchedulerDecision.STOP:
break
return step, test_result
def tune_with_hyperband() -> TrialResult:
metric, mode, config_space, max_iterations = get_objective()
max_steps = 100
scheduler = AskTellScheduler(
base_scheduler=ASHA(
config_space,
metric=metric,
resource_attr="epoch",
max_t=max_steps,
mode=mode,
)
)
for iter in range(max_iterations):
trial_suggestion = scheduler.ask()
final_step, test_result = run_hyperband_step(
scheduler, trial_suggestion, max_steps, metric
)
scheduler.tell(trial_suggestion, {metric: test_result, "epoch": final_step})
return scheduler.best_trial(metric)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.WARN)
print("Hyperband:", tune_with_hyperband())
This is an extension of launch_ask_tell_scheduler.py to run multi-fidelity methods such as Hyperband.
Multi Objective Multi Surrogate (MSMOS) Searcher
from pathlib import Path
import numpy as np
from syne_tune import Tuner, StoppingCriterion
from syne_tune.backend import LocalBackend
from syne_tune.config_space import randint, uniform
from syne_tune.optimizer.baselines import MORandomScalarizationBayesOpt
def main():
random_seed = 6287623
# Hyperparameter configuration space
config_space = {
"steps": randint(0, 100),
"theta": uniform(0, np.pi / 2),
"sleep_time": 0.01,
}
metrics = ["y1", "y2"]
modes = ["min", "min"]
# Creates a FIFO scheduler with a ``MultiObjectiveMultiSurrogateSearcher``. The
# latter is configured by one default GP surrogate per objective, and with the
# ``MultiObjectiveLCBRandomLinearScalarization`` acquisition function.
scheduler = MORandomScalarizationBayesOpt(
config_space=config_space,
metric=metrics,
mode=modes,
random_seed=random_seed,
)
entry_point = str(
Path(__file__).parent
/ "training_scripts"
/ "mo_artificial"
/ "mo_artificial.py"
)
tuner = Tuner(
trial_backend=LocalBackend(entry_point=entry_point),
scheduler=scheduler,
stop_criterion=StoppingCriterion(max_wallclock_time=30),
n_workers=1, # how many trials are evaluated in parallel
)
tuner.run()
if __name__ == "__main__":
main()
This example shows how to use the multi-objective multi-surrogate (MSMOS) searcher to tune
a multi-objective problem. In this example, we use two Gaussian process regresors
as the surrogate models and rely on lower confidence bound random scalarizer
as the acquisition function. With that in mind, any Syne Tune Estimator
can be
used as surrogate.