Source code for syne_tune.experiments.launchers.hpo_main_local

# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import itertools
from typing import Optional, Callable, Dict, Any
import numpy as np
from tqdm import tqdm
import logging

from syne_tune.experiments.baselines import MethodArguments, MethodDefinitions
from syne_tune.experiments.benchmark_definitions.common import RealBenchmarkDefinition
from syne_tune.experiments.launchers.hpo_main_common import (
    set_logging_level,
    get_metadata,
    ExtraArgsType,
    MapMethodArgsType,
    ConfigDict,
    DictStrKey,
    extra_metadata,
    str2bool,
    config_from_argparse,
)
from syne_tune.experiments.launchers.utils import (
    get_master_random_seed,
    effective_random_seed,
)
from syne_tune.backend import LocalBackend
from syne_tune.backend.sagemaker_backend.sagemaker_utils import (
    backend_path_not_synced_to_s3,
)
from syne_tune.remote.remote_metrics_callback import RemoteTuningMetricsCallback
from syne_tune.results_callback import StoreResultsCallback, ExtraResultsComposer
from syne_tune.stopping_criterion import StoppingCriterion
from syne_tune.tuner import Tuner
from syne_tune.util import sanitize_sagemaker_name

logger = logging.getLogger(__name__)


RealBenchmarkDefinitions = Callable[..., Dict[str, RealBenchmarkDefinition]]


LOCAL_AND_SAGEMAKER_BACKEND_EXTRA_PARAMETERS = [
    dict(
        name="benchmark",
        type=str,
        help="Benchmark to run from benchmark_definitions",
        default=None,
    ),
    dict(
        name="instance_type",
        type=str,
        default=None,
        help="AWS SageMaker instance type",
    ),
    dict(
        name="delete_checkpoints",
        type=str2bool,
        default=False,
        help="Remove checkpoints of trials once no longer needed?",
    ),
    dict(
        name="remote_tuning_metrics",
        type=str2bool,
        default=True,
        help="Remote tuning publishes metrics to Sagemaker console?",
    ),
]


LOCAL_BACKEND_ONLY_EXTRA_PARAMETERS = [
    dict(
        name="verbose",
        type=str2bool,
        default=False,
        help="Verbose log output?",
    ),
    dict(
        name="num_gpus_per_trial",
        type=int,
        default=1,
        help="Number of GPUs to allocate to each trial",
    ),
]


LOCAL_BACKEND_EXTRA_PARAMETERS = (
    LOCAL_AND_SAGEMAKER_BACKEND_EXTRA_PARAMETERS + LOCAL_BACKEND_ONLY_EXTRA_PARAMETERS
)


[docs] def get_benchmark( configuration: ConfigDict, benchmark_definitions: RealBenchmarkDefinitions, **benchmark_kwargs, ) -> RealBenchmarkDefinition: """ If ``configuration.benchmark`` is ``None`` and ``benchmark_definitions`` maps to a single benchmark, ``configuration.benchmark`` is set to its key. """ if configuration.n_workers is not None: benchmark_kwargs["n_workers"] = configuration.n_workers if configuration.max_wallclock_time is not None: benchmark_kwargs["max_wallclock_time"] = configuration.max_wallclock_time if configuration.instance_type is not None: benchmark_kwargs["instance_type"] = configuration.instance_type benchmarks_dict = benchmark_definitions(**benchmark_kwargs) if configuration.benchmark is None: assert ( len(benchmarks_dict) == 1 ), f"--benchmark must be given if benchmark_definitions has more than one entry" configuration.benchmark = next(iter(benchmarks_dict.keys())) benchmark = benchmarks_dict[configuration.benchmark] do_scale = ( configuration.scale_max_wallclock_time and configuration.n_workers is not None and configuration.max_wallclock_time is None ) if do_scale: benchmark_default = benchmark_definitions(**benchmark_kwargs)[ configuration.benchmark ] default_n_workers = benchmark_default.n_workers else: default_n_workers = None if do_scale and configuration.n_workers < default_n_workers: # Scale ``max_wallclock_time`` factor = default_n_workers / configuration.n_workers bm_mwt = benchmark.max_wallclock_time benchmark.max_wallclock_time = int(bm_mwt * factor) print( f"Scaling max_wallclock_time: {benchmark.max_wallclock_time} (from {bm_mwt})" ) return benchmark
[docs] def create_objects_for_tuner( configuration: ConfigDict, methods: MethodDefinitions, method: str, benchmark: RealBenchmarkDefinition, master_random_seed: int, seed: int, verbose: bool, extra_tuning_job_metadata: Optional[DictStrKey] = None, map_method_args: Optional[MapMethodArgsType] = None, extra_results: Optional[ExtraResultsComposer] = None, num_gpus_per_trial: int = 1, ) -> Dict[str, Any]: method_kwargs = dict( config_space=benchmark.config_space, metric=benchmark.metric, mode=benchmark.mode, random_seed=effective_random_seed(master_random_seed, seed), resource_attr=benchmark.resource_attr, max_resource_attr=benchmark.max_resource_attr, num_gpus_per_trial=num_gpus_per_trial, scheduler_kwargs=dict( points_to_evaluate=benchmark.points_to_evaluate, search_options=dict(debug_log=verbose), ), ) if configuration.max_size_data_for_model is not None: method_kwargs["scheduler_kwargs"]["search_options"][ "max_size_data_for_model" ] = configuration.max_size_data_for_model if map_method_args is not None: method_kwargs = map_method_args(configuration, method, method_kwargs) scheduler = methods[method](MethodArguments(**method_kwargs)) stop_criterion = StoppingCriterion( max_wallclock_time=benchmark.max_wallclock_time, max_num_evaluations=benchmark.max_num_evaluations, ) metadata = get_metadata( seed=seed, method=method, experiment_tag=configuration.experiment_tag, benchmark_name=configuration.benchmark, random_seed=master_random_seed, max_size_data_for_model=configuration.max_size_data_for_model, benchmark=benchmark, extra_metadata=extra_tuning_job_metadata, ) metadata["instance_type"] = benchmark.instance_type metadata["num_gpus_per_trial"] = num_gpus_per_trial tuner_name = configuration.experiment_tag if configuration.use_long_tuner_name_prefix: tuner_name += f"-{sanitize_sagemaker_name(configuration.benchmark)}-{seed}" tuner_kwargs = dict( scheduler=scheduler, stop_criterion=stop_criterion, n_workers=benchmark.n_workers, tuner_name=tuner_name, metadata=metadata, save_tuner=configuration.save_tuner, ) callbacks = [StoreResultsCallback(extra_results_composer=extra_results)] if configuration.remote_tuning_metrics: # Use callback to report tuning metrics callbacks.append( RemoteTuningMetricsCallback( metric=benchmark.metric, mode=benchmark.mode, config_space=benchmark.config_space, resource_attr=benchmark.resource_attr, ) ) tuner_kwargs["callbacks"] = callbacks return tuner_kwargs
[docs] def start_experiment_local_backend( configuration: ConfigDict, methods: MethodDefinitions, benchmark_definitions: RealBenchmarkDefinitions, extra_results: Optional[ExtraResultsComposer] = None, map_method_args: Optional[MapMethodArgsType] = None, extra_tuning_job_metadata: Optional[DictStrKey] = None, ): """ Runs sequence of experiments with local backend sequentially. The loop runs over methods selected from ``methods`` and repetitions, ``map_method_args`` can be used to modify ``method_kwargs`` for constructing :class:`~syne_tune.experiments.baselines.MethodArguments`, depending on ``configuration`` and the method. This allows for extra flexibility to specify specific arguments for chosen methods Its signature is :code:`method_kwargs = map_method_args(configuration, method, method_kwargs)`, where ``method`` is the name of the baseline. .. note:: When this is launched remotely as entry point of a SageMaker training job (command line ``--launched_remotely 1``), the backend is configured to write logs and checkpoints to a directory which is not synced to S3. This is different to the tuner path, which is "/opt/ml/checkpoints", so that tuning results are synced to S3. Syncing checkpoints to S3 is not recommended (it is slow and can lead to failures, since several worker processes write to the same synced directory). :param configuration: ConfigDict with parameters of the experiment. Must contain all parameters from LOCAL_BACKEND_EXTRA_PARAMETERS :param methods: Dictionary with method constructors. :param benchmark_definitions: Definitions of benchmarks; one is selected from command line arguments :param extra_results: If given, this is used to append extra information to the results dataframe :param map_method_args: See above, optional :param extra_tuning_job_metadata: Metadata added to the tuner, can be used to manage results """ configuration.check_if_all_paremeters_present(LOCAL_BACKEND_EXTRA_PARAMETERS) configuration.expand_base_arguments(LOCAL_BACKEND_EXTRA_PARAMETERS) experiment_tag = configuration.experiment_tag benchmark = get_benchmark(configuration, benchmark_definitions) benchmark_name = configuration.benchmark master_random_seed = get_master_random_seed(configuration.random_seed) set_logging_level(configuration) combinations = list(itertools.product(list(methods.keys()), configuration.seeds)) print(combinations) for method, seed in tqdm(combinations): random_seed = effective_random_seed(master_random_seed, seed) np.random.seed(random_seed) print( f"Starting experiment ({method}/{benchmark_name}/{seed}) of {experiment_tag}\n" f" max_wallclock_time = {benchmark.max_wallclock_time}, " f" n_workers = {benchmark.n_workers}" ) trial_backend = LocalBackend( entry_point=str(benchmark.script), delete_checkpoints=configuration.delete_checkpoints, num_gpus_per_trial=configuration.num_gpus_per_trial, ) tuner_kwargs = create_objects_for_tuner( configuration, methods=methods, method=method, benchmark=benchmark, master_random_seed=master_random_seed, seed=seed, verbose=configuration.verbose, extra_tuning_job_metadata=extra_tuning_job_metadata, map_method_args=map_method_args, extra_results=extra_results, num_gpus_per_trial=configuration.num_gpus_per_trial, ) # If this experiments runs remotely as a SageMaker training job, logs and # checkpoints are written to a different directory than tuning results, so # the former are not synced to S3. if configuration.launched_remotely: tuner_kwargs["trial_backend_path"] = backend_path_not_synced_to_s3() tuner = Tuner( trial_backend=trial_backend, **tuner_kwargs, ) tuner.run() # Run the experiment
[docs] def main( methods: MethodDefinitions, benchmark_definitions: RealBenchmarkDefinitions, extra_args: Optional[ExtraArgsType] = None, map_method_args: Optional[MapMethodArgsType] = None, extra_results: Optional[ExtraResultsComposer] = None, ): """ Runs sequence of experiments with local backend sequentially. The loop runs over methods selected from ``methods`` and repetitions, both controlled by command line arguments. ``map_method_args`` can be used to modify ``method_kwargs`` for constructing :class:`~syne_tune.experiments.baselines.MethodArguments`, depending on ``configuration`` returned by :func:`parse_args` and the method. Its signature is :code:`method_kwargs = map_method_args(configuration, method, method_kwargs)`, where ``method`` is the name of the baseline. It is called just before the method is created. :param methods: Dictionary with method constructors :param benchmark_definitions: Definitions of benchmarks; one is selected from command line arguments :param extra_args: Extra arguments for command line parser. Optional :param map_method_args: See above, optional :param extra_results: If given, this is used to append extra information to the results dataframe """ configuration = config_from_argparse(extra_args, LOCAL_BACKEND_EXTRA_PARAMETERS) method_names = ( [configuration.method] if configuration.method is not None else list(methods.keys()) ) methods = {mname: methods[mname] for mname in method_names} if extra_args is not None: assert ( map_method_args is not None ), "map_method_args must be specified if extra_args is used" start_experiment_local_backend( configuration, methods=methods, benchmark_definitions=benchmark_definitions, map_method_args=map_method_args, extra_results=extra_results, extra_tuning_job_metadata=None if extra_args is None else extra_metadata(configuration, extra_args), )