# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import List, Dict, Callable, Optional, Union, Any, Tuple
import numpy as np
import pandas as pd
from syne_tune import Tuner
from syne_tune.constants import (
ST_METADATA_FILENAME,
ST_RESULTS_DATAFRAME_FILENAME,
ST_TUNER_DILL_FILENAME,
ST_TUNER_CREATION_TIMESTAMP,
ST_TUNER_TIME,
)
from syne_tune.try_import import try_import_aws_message, try_import_visual_message
from syne_tune.util import experiment_path, s3_experiment_path, metric_name_mode
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
print(try_import_aws_message())
try:
import matplotlib.pyplot as plt
except ImportError:
print(try_import_visual_message())
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ExperimentResult:
"""
Wraps results dataframe and provides retrieval services.
:param name: Name of experiment
:param results: Dataframe containing results of experiment
:param metadata: Metadata stored along with results
:param tuner: :class:`~syne_tune.Tuner` object stored along with results
:param path: local path where the experiment is stored
"""
name: str
results: pd.DataFrame
metadata: Dict[str, Any]
tuner: Tuner
path: Path
def __str__(self):
res = f"Experiment {self.name}"
if self.results is not None:
res += f" contains {len(self.results)} evaluations from {len(self.results.trial_id.unique())} trials"
return res
[docs]
def creation_date(self):
"""
:return: Timestamp when :class:`~syne_tune.Tuner` was created
"""
return datetime.fromtimestamp(self.metadata[ST_TUNER_CREATION_TIMESTAMP])
[docs]
def plot_hypervolume(
self,
metrics_to_plot: Union[List[int], List[str]] = None,
reference_point: np.ndarray = None,
figure_path: str = None,
**plt_kwargs,
):
"""Plot best hypervolume value as function of wallclock time
:param reference_point: Reference point for hypervolume calculations.
If None, the maximum values of each metric is used.
:param figure_path: If specified, defines the path where the figure will be saved.
If None, the figure is shown
:param plt_kwargs: Arguments to :func:`matplotlib.pyplot.plot`
"""
if metrics_to_plot is None:
metrics_to_plot = self.metric_names()
assert (
len(metrics_to_plot) > 1
), "Only one metric defined, cannot compute hypervolume"
metric_names, metric_modes = zip(
*[self._metric_name_mode(metric) for metric in metrics_to_plot]
)
assert np.all(
[metric_mode == "max" for metric_mode in metric_modes]
), f"All metrics must be maximized but the following modes were selected: {metric_modes}"
if self.results is not None and len(self.results) > 0:
from syne_tune.optimizer.schedulers.multiobjective.utils import (
hypervolume_cumulative,
)
results_df = self.results.sort_values(ST_TUNER_TIME)
x = self.results.loc[:, ST_TUNER_TIME]
results_array = results_df[list(metric_names)].values
hypervolume_indicator = hypervolume_cumulative(
results_array, reference_point
)
fig, ax = plt.subplots()
ax.plot(x, hypervolume_indicator, **plt_kwargs)
ax.set_xlabel("wallclock time (secs)")
ax.set_ylabel("Hypervolume indicator")
ax.set_title(f"Hypervolume over time {self.name}")
if figure_path is not None:
fig.savefig(figure_path)
else:
fig.show()
[docs]
def plot(
self, metric_to_plot: Union[str, int] = 0, figure_path: str = None, **plt_kwargs
):
"""Plot best metric value as function of wallclock time
:param metric_to_plot: Indicates which metric to plot, can be the index or a name of the metric.
default to 0 - first metric defined
:param figure_path: If specified, defines the path where the figure will be saved.
If None, the figure is shown
:param plt_kwargs: Arguments to :func:`matplotlib.pyplot.plot`
"""
metric_name, metric_mode = self._metric_name_mode(metric_to_plot)
df = self.results
if df is not None and len(df) > 0:
df = df.sort_values(ST_TUNER_TIME)
x = df.loc[:, ST_TUNER_TIME]
y = (
df.loc[:, metric_name].cummax()
if metric_mode == "max"
else df.loc[:, metric_name].cummin()
)
fig, ax = plt.subplots()
ax.plot(x, y, **plt_kwargs)
ax.set_xlabel("wallclock time (secs)")
ax.set_ylabel(metric_name)
ax.set_title(f"Best result over time {self.name}")
if figure_path is not None:
fig.savefig(figure_path)
else:
fig.show()
[docs]
def plot_trials_over_time(
self, metric_to_plot: Union[str, int] = 0, figure_path: str = None, figsize=None
):
"""Plot trials results over as function of wallclock time
:param metric_to_plot: Indicates which metric to plot, can be the index or a name of the metric.
default to 0 - first metric defined
:param figure_path: If specified, defines the path where the figure will be saved.
If None, the figure is shown
:param figsize: width and height of figure
"""
metric_name, metric_mode = self._metric_name_mode(metric_to_plot)
df = self.results
fig, ax = plt.subplots(1, 1, figsize=figsize if figsize else (12, 4))
for trial_id in sorted(df.trial_id.unique()):
df_trial = df[df.trial_id == trial_id]
df_trial.plot(
x=ST_TUNER_TIME,
y=metric_name,
marker=".",
ax=ax,
legend=None,
alpha=0.5,
)
df_stop = df[df["st_decision"] == "STOP"]
plt.scatter(
df_stop[ST_TUNER_TIME], df_stop[metric_name], marker="x", color="red"
)
plt.xlabel("Wallclock time (s)")
plt.ylabel(metric_name)
plt.title("Trials value over time")
if figure_path is not None:
fig.savefig(figure_path)
else:
fig.show()
[docs]
def metric_mode(self) -> Union[str, List[str]]:
return self.metadata["metric_mode"]
[docs]
def metric_names(self) -> List[str]:
return self.metadata["metric_names"]
[docs]
def entrypoint_name(self) -> str:
return self.metadata["entrypoint"]
[docs]
def best_config(self, metric: Union[str, int] = 0) -> Dict[str, Any]:
"""
Return the best config found for the specified metric
:param metric: Indicates which metric to use, can be the index or a name of the metric.
default to 0 - first metric defined in the Scheduler
:return: Configuration corresponding to best metric value
"""
metric_name, metric_mode = self._metric_name_mode(metric)
# locate best result
if metric_mode == "min":
best_index = self.results.loc[:, metric_name].argmin()
else:
best_index = self.results.loc[:, metric_name].argmax()
res = dict(self.results.loc[best_index])
# Don't include internal fields
return {k: v for k, v in res.items() if not k.startswith("st_")}
def _metric_name_mode(self, metric: Union[str, int]) -> Tuple[str, str]:
"""
Determine the name and its mode given ambiguous input.
:param metric: Index or name of the selected metric
"""
return metric_name_mode(
metric_names=self.metric_names(),
metric_mode=self.metric_mode(),
metric=metric,
)
[docs]
def download_single_experiment(
tuner_name: str,
s3_bucket: Optional[str] = None,
experiment_name: Optional[str] = None,
):
"""Downloads results from S3 of a tuning experiment
:param tuner_name: Name of tuner to be retrieved.
:param s3_bucket: If not given, the default bucket for the SageMaker session
is used
:param experiment_name: If given, this is used as first directory.
"""
s3_path = s3_experiment_path(
s3_bucket=s3_bucket, tuner_name=tuner_name, experiment_name=experiment_name
).rstrip("/")
tgt_dir = experiment_path(tuner_name=tuner_name)
tgt_dir.mkdir(exist_ok=True, parents=True)
s3 = boto3.client("s3")
parts_path = s3_path.replace("s3://", "").split("/")
s3_bucket = parts_path[0]
s3_key = "/".join(parts_path[1:])
result_files = [
ST_METADATA_FILENAME,
ST_RESULTS_DATAFRAME_FILENAME,
ST_TUNER_DILL_FILENAME,
]
for file in result_files:
try:
logger.info(f"downloading {file} on {s3_path}")
s3.download_file(s3_bucket, f"{s3_key}/{file}", str(tgt_dir / file))
except ClientError as e:
logger.info(f"could not find {file} on {s3_path}")
[docs]
def load_experiment(
tuner_name: str,
download_if_not_found: bool = True,
load_tuner: bool = False,
local_path: Optional[str] = None,
experiment_name: Optional[str] = None,
) -> ExperimentResult:
"""Load results from an experiment
:param tuner_name: Name of a tuning experiment previously run
:param download_if_not_found: If True, fetch results from S3 if not found locally
:param load_tuner: Whether to load the tuner in addition to metadata and results
:param local_path: Path containing the experiment to load. If not specified,
``~/{SYNE_TUNE_FOLDER}/`` is used.
:param experiment_name: If given, this is used as first directory.
:return: Result object
"""
path = experiment_path(tuner_name, local_path)
metadata_path = path / ST_METADATA_FILENAME
if not (metadata_path.exists()) and download_if_not_found:
logger.info(
f"experiment {tuner_name} not found locally, trying to get it from s3."
)
download_single_experiment(
tuner_name=tuner_name, experiment_name=experiment_name
)
try:
with open(metadata_path, "r") as f:
metadata = json.load(f)
except FileNotFoundError:
metadata = None
try:
results_fname = ST_RESULTS_DATAFRAME_FILENAME
if (path / results_fname).exists():
results = pd.read_csv(path / results_fname)
else:
results = pd.read_csv(path / results_fname[:-4])
except Exception:
results = None
if load_tuner:
try:
tuner = Tuner.load(str(path))
except FileNotFoundError:
tuner = None
except Exception:
tuner = None
else:
tuner = None
return ExperimentResult(
name=tuner.name if tuner is not None else path.stem,
results=results,
tuner=tuner,
metadata=metadata,
path=path,
)
PathFilter = Callable[[str], bool]
ExperimentFilter = Callable[[ExperimentResult], bool]
PathOrExperimentFilter = Union[PathFilter, ExperimentFilter]
def _impute_filter(filt: Optional[PathOrExperimentFilter]) -> PathOrExperimentFilter:
if filt is None:
def filt(path) -> bool:
return True
return filt
[docs]
def list_experiments(
path_filter: Optional[PathFilter] = None,
experiment_filter: Optional[ExperimentFilter] = None,
root: Path = experiment_path(),
load_tuner: bool = False,
) -> List[ExperimentResult]:
"""List experiments for which results are found
:param path_filter: If passed then only experiments whose path matching
the filter are kept. This allows rapid filtering in the presence of many
experiments.
:param experiment_filter: Filter on :class:`ExperimentResult`, optional
:param root: Root path for experiment results. Default is result of
:func:`experiment_path`
:param load_tuner: Whether to load the tuner in addition to metadata and results
:return: List of result objects
"""
path_filter = _impute_filter(path_filter)
experiment_filter = _impute_filter(experiment_filter)
res = []
for metadata_path in root.glob(f"**/{ST_METADATA_FILENAME}"):
path = metadata_path.parent
tuner_name = path.name
if path_filter(str(metadata_path)):
result = load_experiment(
tuner_name, load_tuner, local_path=str(path.parent)
)
if (
experiment_filter(result)
and result.results is not None
and result.metadata is not None
):
res.append(result)
return sorted(
res,
key=lambda result: result.metadata.get(ST_TUNER_CREATION_TIMESTAMP, 0),
reverse=True,
)
[docs]
def load_experiments_df(
path_filter: Optional[PathFilter] = None,
experiment_filter: Optional[ExperimentFilter] = None,
root: Path = experiment_path(),
load_tuner: bool = False,
) -> pd.DataFrame:
"""
:param path_filter: If passed then only experiments whose path matching
the filter are kept. This allows rapid filtering in the presence of many
experiments.
:param experiment_filter: Filter on :class:`ExperimentResult`
:param root: Root path for experiment results. Default is
:func:`experiment_path`
:param load_tuner: Whether to load the tuner in addition to metadata and results
:return: Dataframe that contains all evaluations reported by tuners according
to the filter given. The columns contain trial-id, hyperparameter
evaluated, metrics reported via :class:`~syne_tune.Reporter`. These metrics
are collected automatically:
* ``st_worker_time`` (indicating time spent in the worker when report was
seen)
* ``time`` (indicating wallclock time measured by the tuner)
* ``decision`` decision taken by the scheduler when observing the result
* ``status`` status of the trial that was shown to the tuner
* ``config_{xx}`` configuration value for the hyperparameter ``{xx}``
* ``tuner_name`` named passed when instantiating the Tuner
* ``entry_point_name``, ``entry_point_path`` name and path of the entry
point that was tuned
"""
dfs = []
for experiment in list_experiments(
path_filter=path_filter,
experiment_filter=experiment_filter,
root=root,
load_tuner=load_tuner,
):
assert experiment.results is not None
assert experiment.metadata is not None
df = experiment.results
df["tuner_name"] = experiment.name
for k, v in experiment.metadata.items():
if isinstance(v, List):
if len(v) > 1:
for i, x in enumerate(v):
df[f"{k}-{i}"] = x
else:
df[k] = v[0]
else:
df[k] = v
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
if __name__ == "__main__":
for exp in list_experiments():
if exp.results is not None:
print(exp)