# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import itertools
from pathlib import Path
from typing import Optional, Callable, Dict, Any, List
from tqdm import tqdm
from syne_tune.experiments.baselines import MethodDefinitions
from syne_tune.experiments.launchers.hpo_main_common import (
extra_metadata,
ExtraArgsType,
ConfigDict,
config_from_argparse,
)
from syne_tune.experiments.launchers.hpo_main_simulator import (
SurrogateBenchmarkDefinitions,
is_dict_of_dict,
SIMULATED_BACKEND_EXTRA_PARAMETERS,
BENCHMARK_KEY_EXTRA_PARAMETER,
)
from syne_tune.experiments.launchers.launch_remote_common import (
sagemaker_estimator_args,
fit_sagemaker_estimator,
)
from syne_tune.experiments.launchers.utils import (
filter_none,
message_sync_from_s3,
find_or_create_requirements_txt,
get_master_random_seed,
)
from syne_tune.remote.estimators import (
basic_cpu_instance_sagemaker_estimator,
)
from syne_tune.util import random_string
[docs]
def get_hyperparameters(
seed: int,
method: str,
experiment_tag: str,
random_seed: int,
configuration: ConfigDict,
) -> Dict[str, Any]:
"""Compose hyperparameters for SageMaker training job
:param seed: Seed of repetition
:param method: Method name
:param experiment_tag: Tag of experiment
:param random_seed: Master random seed
:param configuration: Configuration for the job
:return: Dictionary of hyperparameters
"""
hyperparameters = {
"experiment_tag": experiment_tag,
"method": method,
"save_tuner": int(configuration.save_tuner),
"random_seed": random_seed,
"scale_max_wallclock_time": int(configuration.scale_max_wallclock_time),
"launched_remotely": 1,
}
if seed is not None:
hyperparameters["num_seeds"] = seed + 1
hyperparameters["start_seed"] = seed
else:
hyperparameters["num_seeds"] = configuration.num_seeds
hyperparameters["start_seed"] = configuration.start_seed
if configuration.benchmark is not None:
hyperparameters["benchmark"] = configuration.benchmark
for k in (
"n_workers",
"max_wallclock_time",
"max_size_data_for_model",
"fcnet_ordinal",
):
v = getattr(configuration, k)
if v is not None:
hyperparameters[k] = v
hyperparameters.update(
filter_none(extra_metadata(configuration, configuration.extra_parameters()))
)
return hyperparameters
[docs]
def launch_remote(
entry_point: Path,
methods: Dict[str, Any],
benchmark_definitions: SurrogateBenchmarkDefinitions,
source_dependencies: Optional[List[str]] = None,
extra_args: Optional[ExtraArgsType] = None,
is_expensive_method: Optional[Callable[[str], bool]] = None,
):
"""
Launches sequence of SageMaker training jobs, each running an experiment
with the simulator backend.
The loop runs over methods selected from ``methods``. Different repetitions
(seeds) are run sequentially in the remote job. However, if
``is_expensive_method(method_name)`` is true, we launch different remote
jobs for every seed for this particular method. This is to cater for
methods which are themselves expensive to run (e.g., involving Gaussian
process based Bayesian optimization).
If ``benchmark_definitions`` is a single-level dictionary and no benchmark
is selected on the command line, then all benchmarks are run sequentially
in the remote job. However, if ``benchmark_definitions`` is two-level nested,
we loop over the outer level and start separate remote jobs, each of which
iterates over its inner level of benchmarks. This is useful if the number
of benchmarks to iterate over is large.
:param entry_point: Script for running the experiment
:param methods: Dictionary with method constructors; one is selected from
command line arguments
:param benchmark_definitions: Definitions of benchmarks, can be nested
(see above)
:param source_dependencies: If given, these are source dependencies for the
SageMaker estimator, on top of Syne Tune itself
:param extra_args: Extra arguments for command line parser, optional
:param is_expensive_method: See above. The default is a predicative always
returning False (no method is expensive)
"""
simulated_backend_extra_parameters = SIMULATED_BACKEND_EXTRA_PARAMETERS.copy()
if is_dict_of_dict(benchmark_definitions):
simulated_backend_extra_parameters.append(BENCHMARK_KEY_EXTRA_PARAMETER)
configuration = config_from_argparse(extra_args, simulated_backend_extra_parameters)
launch_remote_experiments_simulator(
configuration=configuration,
entry_point=entry_point,
methods=methods,
benchmark_definitions=benchmark_definitions,
source_dependencies=source_dependencies,
is_expensive_method=is_expensive_method,
)
[docs]
def launch_remote_experiments_simulator(
configuration: ConfigDict,
entry_point: Path,
methods: MethodDefinitions,
benchmark_definitions: SurrogateBenchmarkDefinitions,
source_dependencies: Optional[List[str]],
is_expensive_method: Optional[Callable[[str], bool]] = None,
):
"""
Launches sequence of SageMaker training jobs, each running an experiment
with the simulator backend.
The loop runs over methods selected from ``methods``. Different repetitions
(seeds) are run sequentially in the remote job. However, if
``is_expensive_method(method_name)`` is true, we launch different remote
jobs for every seed for this particular method. This is to cater for
methods which are themselves expensive to run (e.g., involving Gaussian
process based Bayesian optimization).
If ``benchmark_definitions`` is a single-level dictionary and no benchmark
is selected on the command line, then all benchmarks are run sequentially
in the remote job. However, if ``benchmark_definitions`` is two-level nested,
we loop over the outer level and start separate remote jobs, each of which
iterates over its inner level of benchmarks. This is useful if the number
of benchmarks to iterate over is large.
:param configuration: ConfigDict with parameters of the benchmark.
Must contain all parameters from
hpo_main_simulator.LOCAL_LOCAL_SIMULATED_BENCHMARK_REQUIRED_PARAMETERS
:param entry_point: Script for running the experiment
:param methods: Dictionary with method constructors; one is selected from
command line arguments
:param benchmark_definitions: Definitions of benchmarks; one is selected from
command line arguments
:param is_expensive_method: See above. The default is a predicative always
returning False (no method is expensive)
"""
if is_expensive_method is None:
# Nothing is expensive
def is_expensive_method(method):
return False
simulated_backend_extra_parameters = SIMULATED_BACKEND_EXTRA_PARAMETERS.copy()
if is_dict_of_dict(benchmark_definitions):
simulated_backend_extra_parameters.append(BENCHMARK_KEY_EXTRA_PARAMETER)
configuration.check_if_all_paremeters_present(simulated_backend_extra_parameters)
configuration.expand_base_arguments(simulated_backend_extra_parameters)
method_names = (
[configuration.method]
if configuration.method is not None
else list(methods.keys())
)
nested_dict = is_dict_of_dict(benchmark_definitions)
experiment_tag = configuration.experiment_tag
master_random_seed = get_master_random_seed(configuration.random_seed)
suffix = random_string(4)
find_or_create_requirements_txt(entry_point)
combinations = []
for method in method_names:
seed_range = configuration.seeds if is_expensive_method(method) else [None]
combinations.extend([(method, seed) for seed in seed_range])
if nested_dict:
benchmark_keys = list(benchmark_definitions.keys())
combinations = list(itertools.product(combinations, benchmark_keys))
else:
combinations = [(x, None) for x in combinations]
for (method, seed), benchmark_key in tqdm(combinations):
tuner_name = method
if seed is not None:
tuner_name += f"-{seed}"
if benchmark_key is not None:
tuner_name += f"-{benchmark_key}"
sm_args = sagemaker_estimator_args(
entry_point=entry_point,
experiment_tag=configuration.experiment_tag,
tuner_name=tuner_name,
source_dependencies=source_dependencies,
)
hyperparameters = get_hyperparameters(
seed=seed,
method=method,
experiment_tag=experiment_tag,
random_seed=master_random_seed,
configuration=configuration,
)
hyperparameters["verbose"] = int(configuration.verbose)
hyperparameters["support_checkpointing"] = int(
configuration.support_checkpointing
)
hyperparameters["restrict_configurations"] = int(
configuration.restrict_configurations
)
if benchmark_key is not None:
hyperparameters["benchmark_key"] = benchmark_key
sm_args["hyperparameters"] = hyperparameters
print(
f"{experiment_tag}-{tuner_name}\n"
f"hyperparameters = {hyperparameters}\n"
f"Results written to {sm_args['checkpoint_s3_uri']}"
)
est = basic_cpu_instance_sagemaker_estimator(**sm_args)
fit_sagemaker_estimator(
backoff_wait_time=configuration.estimator_fit_backoff_wait_time,
estimator=est,
job_name=f"{experiment_tag}-{tuner_name}-{suffix}",
wait=False,
)
print("\n" + message_sync_from_s3(experiment_tag))