# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
import pandas as pd
from syne_tune.blackbox_repository.blackbox import (
Blackbox,
ObjectiveFunctionResult,
)
from syne_tune.blackbox_repository.serialize import (
serialize_configspace,
deserialize_configspace,
serialize_metadata,
deserialize_metadata,
)
[docs]
class BlackboxOffline(Blackbox):
"""
A blackbox obtained given offline evaluations. Each row of the dataframe
should contain one evaluation given a fixed configuration, fidelity and
seed. The columns must correspond to the provided configuration and fidelity
space, by default all columns that are prefixed by ``"metric_"`` are assumed
to be metrics but this can be overridden by providing metric columns.
Additional arguments on top of parent class
:class:`~syne_tune.blackbox_repository.Blackbox`:
:param df_evaluations: Data frame with evaluations data
:param seed_col: optional, can be used when multiple seeds are recorded
"""
def __init__(
self,
df_evaluations: pd.DataFrame,
configuration_space: Dict[str, Any],
fidelity_space: Optional[dict] = None,
objectives_names: Optional[List[str]] = None,
seed_col: Optional[str] = None,
):
if objectives_names is not None:
for col in objectives_names:
assert (
col in df_evaluations.columns
), f"column {col} from metric columns not found in dataframe"
else:
objectives_names = [
col for col in df_evaluations.columns if col.startswith("metric_")
]
self.objectives_names = objectives_names
super(BlackboxOffline, self).__init__(
configuration_space=configuration_space,
fidelity_space=fidelity_space,
objectives_names=objectives_names,
)
hp_names = list(configuration_space.keys())
self.index_cols = hp_names
if fidelity_space is not None:
fidelity_names = list(fidelity_space.keys())
assert len(set(fidelity_names).intersection(hp_names)) == 0
self.index_cols += fidelity_names
self.seed_col = seed_col
if seed_col is not None:
assert seed_col not in self.index_cols, f"column {seed_col} duplicated"
self.index_cols.append(seed_col)
for col in self.index_cols:
assert (
col in df_evaluations.columns
), f"column {col} from configuration or fidelity space not found in dataframe"
self.df = df_evaluations.set_index(self.index_cols)
[docs]
def hyperparameter_objectives_values(self, predict_curves: bool = False):
assert not predict_curves, "predict_curves=True not supported"
columns = self.index_cols
if self.seed_col is not None:
columns.remove(self.seed_col)
X = self.df.reset_index().loc[:, columns]
y = self.df.loc[:, self.objectives_names]
return X, y
def _objective_function(
self,
configuration: Dict[str, Any],
fidelity: Optional[dict] = None,
seed: Optional[int] = None,
) -> ObjectiveFunctionResult:
"""
Return the dictionary of objectives for a configuration/fidelity/seed.
:param configuration:
:param fidelity:
:param seed:
:return:
"""
# todo: we should check range configuration with configspaces
# query the configuration in the list of available ones
key_dict = configuration
if self.seed_col is not None:
key_dict[self.seed_col] = seed
if self.fidelity_space is not None and fidelity is not None:
key_dict.update(fidelity)
if self.fidelity_space is not None and fidelity is None:
keys = tuple(set(self.index_cols) - set(self.fidelity_space.keys()))
else:
keys = self.index_cols
output = self.df.xs(tuple(key_dict[col] for col in keys), level=keys).loc[
:, self.objectives_names
]
if len(output) == 0:
raise ValueError(
f"the hyperparameter {configuration} is not present in available evaluations. Use ``add_surrogate(blackbox)`` if"
f" you want to add interpolation or a surrogate model that support querying any configuration."
)
if fidelity is not None or self.fidelity_space is None:
return output.iloc[0].to_dict()
else:
# TODO select only the fidelity values in the self.fidelity_space, since it might be the case there are more
# values in the dataframe. Then the output tensor has larger number of elements than expected num_fidelities.
return output.to_numpy()
def __str__(self):
stats = {
"total evaluations": len(self.df),
"objectives": self.objectives_names,
"hyperparameters": self.configuration_space.get_hyperparameter_names(),
}
stats_str = ", ".join([f"{k}: {v}" for k, v in stats.items()])
return f"offline blackbox: {stats_str}"
[docs]
def serialize(
bb_dict: Dict[str, BlackboxOffline], path: str, categorical_cols: List[str] = []
):
"""
:param bb_dict:
:param path:
:param categorical_cols: optional, allow to retrieve columns as categories, lower drastically the memory
footprint when few values are present
:return:
"""
if isinstance(bb_dict, BlackboxOffline):
# todo hack that allows to call ``serialize(bb)`` instead of ``serialize({"dummy-task-name": bb})``
# not sure if we should keep it
bb_dict = {path.stem, bb_dict}
# check all blackboxes share the same search space and have evaluated the same hyperparameters
bb_first = next(iter(bb_dict.values()))
for bb in bb_dict.values():
assert bb.configuration_space == bb_first.configuration_space
assert bb.fidelity_space == bb_first.fidelity_space
assert bb.objectives_names == bb_first.objectives_names
path = Path(path)
path.mkdir(exist_ok=True)
serialize_configspace(
path=path,
configuration_space=bb_first.configuration_space,
fidelity_space=bb_first.fidelity_space,
)
for name, bb in bb_dict.items():
df = bb.df
df["task"] = name
# we use gzip as snappy is not supported for fastparquet engine compression
# gzip is slower than the default snappy but more compact
df.reset_index().to_parquet(
path / f"data-{name}.parquet",
index=False,
compression="gzip",
engine="fastparquet",
)
serialize_metadata(
path=path,
metadata={
"objectives_names": bb_first.objectives_names,
"task_names": list(bb_dict.keys()),
"seed_col": bb_first.seed_col,
"categorical_cols": categorical_cols,
},
)
[docs]
def deserialize(path: str) -> Union[Dict[str, BlackboxOffline], BlackboxOffline]:
"""
:param path: where to find blackbox serialized information (at least data.csv.zip and configspace.json)
:param groupby_col: separate evaluations into a list of blackbox with different task if the column is provided
:return: list of blackboxes per task, or single blackbox in the case of a single task
"""
configuration_space, fidelity_space = deserialize_configspace(path)
assert (
configuration_space is not None
), f"configspace.json could not be found in {path}"
metadata = deserialize_metadata(path)
objectives_names = metadata["objectives_names"]
seed_col = metadata["seed_col"]
cat_cols = metadata.get("categorical_cols") # optional
task_names = metadata.get("task_names")
# need to specify columns to have categorical encoding of columns (rather than int or float)
# this is required as it has a massive effect on memory usage; we use fastparquet for the engine as pyarrow does
# not handle categorization of int/float columns
df_tasks = {
task: pd.read_parquet(
Path(path) / f"data-{task}.parquet",
categories=cat_cols,
engine="fastparquet",
)
for task in task_names
}
return {
task: BlackboxOffline(
df_evaluations=df,
configuration_space=configuration_space,
fidelity_space=fidelity_space,
objectives_names=objectives_names,
seed_col=seed_col,
)
for task, df in df_tasks.items()
}