blackboxopt.optimizers.staged.bohb
impute_conditional_data(array, vartypes)
Impute NaNs in numerical representation with observed values or prior samples.
This method is needed to use the statsmodels
KDE, which doesn't handle missing
values out of the box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
ndarray |
Numerical representation of the configurations which can include NaN values for inactive variables. |
required |
vartypes |
Union[list, numpy.ndarray] |
Encoding of the types of the variables: 0 mean continuous, >0 means categorical with as many different values. |
required |
Returns:
Type | Description |
---|---|
ndarray |
Numerical representation where all NaNs have been replaced with observed values or prior samples. |
Source code in blackboxopt/optimizers/staged/bohb.py
def impute_conditional_data(
array: np.ndarray, vartypes: Union[list, np.ndarray]
) -> np.ndarray:
"""Impute NaNs in numerical representation with observed values or prior samples.
This method is needed to use the `statsmodels` KDE, which doesn't handle missing
values out of the box.
Args:
array: Numerical representation of the configurations which can include NaN
values for inactive variables.
vartypes: Encoding of the types of the variables: 0 mean continuous, >0 means
categorical with as many different values.
Returns:
Numerical representation where all NaNs have been replaced with observed values
or prior samples.
"""
return_array = np.empty_like(array)
for i in range(array.shape[0]):
datum = np.copy(array[i])
nan_indices = np.argwhere(np.isnan(datum)).flatten()
while np.any(nan_indices):
nan_idx = nan_indices[0]
valid_indices = np.argwhere(np.isfinite(array[:, nan_idx])).flatten()
if len(valid_indices) > 0:
# pick one of them at random and overwrite all NaN values
row_idx = np.random.choice(valid_indices)
datum[nan_indices] = array[row_idx, nan_indices]
else:
# no point in the data has this value activated, so fill it with a valid
# but random value
t = vartypes[nan_idx]
if t == 0:
datum[nan_idx] = np.random.rand()
else:
datum[nan_idx] = (np.random.randint(t) + 0.5) / (t + 1)
nan_indices = np.argwhere(np.isnan(datum)).flatten()
return_array[i, :] = datum
return return_array
sample_around_values(datum, bandwidths, vartypes, min_bandwidth, bw_factor)
Sample numerical representation close to a given datum.
This is specific to the KDE in statsmodels and their kernel for the different variable types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datum |
ndarray |
Numerical representation of a configuration that is used as the 'center' for sampling. |
required |
bandwidths |
ndarray |
Bandwidth of the corresponding kernels in each dimension. |
required |
vartypes |
Union[list, numpy.ndarray] |
Encoding of the types of the variables: 0 mean continuous, >0 means categorical with as many different values. |
required |
min_bandwidth |
float |
Smallest allowed bandwidth. Ensures diversity even if all samples agree on a value in a dimension. |
required |
bw_factor |
float |
To increase diversity, the bandwidth is actually multiplied by this factor before sampling. |
required |
Returns:
Type | Description |
---|---|
Optional[numpy.ndarray] |
Numerical representation of a configuration close to the provided datum. |
Source code in blackboxopt/optimizers/staged/bohb.py
def sample_around_values(
datum: np.ndarray,
bandwidths: np.ndarray,
vartypes: Union[list, np.ndarray],
min_bandwidth: float,
bw_factor: float,
) -> Optional[np.ndarray]:
"""Sample numerical representation close to a given datum.
This is specific to the KDE in statsmodels and their kernel for the different
variable types.
Args:
datum: Numerical representation of a configuration that is used as the 'center'
for sampling.
bandwidths: Bandwidth of the corresponding kernels in each dimension.
vartypes: Encoding of the types of the variables: 0 mean continuous, >0 means
categorical with as many different values.
min_bandwidth: Smallest allowed bandwidth. Ensures diversity even if all
samples agree on a value in a dimension.
bw_factor: To increase diversity, the bandwidth is actually multiplied by this
factor before sampling.
Returns:
Numerical representation of a configuration close to the provided datum.
"""
vector = []
for m, bw, t in zip(datum, bandwidths, vartypes):
bw = max(bw, min_bandwidth)
if t == 0:
bw = bw_factor * bw
try:
v = sps.truncnorm.rvs(-m / bw, (1 - m) / bw, loc=m, scale=bw)
except Exception:
return None
else:
v = (
m
if np.random.rand() < (1 - bw)
else (np.random.randint(t) + 0.5) / (t + 1)
)
vector.append(v)
return np.array(vector)
Sampler
digest_evaluation(self, evaluation)
[summary]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
evaluation |
Evaluation |
[description] |
required |
Source code in blackboxopt/optimizers/staged/bohb.py
def digest_evaluation(self, evaluation: Evaluation):
"""[summary]
Args:
evaluation: [description]
"""
loss = evaluation.objectives[self.objective.name]
if loss is None:
loss = np.inf
config_dict = evaluation.configuration
config_vector = self.search_space.to_numerical(config_dict)
fidelity = evaluation.settings["fidelity"]
if fidelity not in self.configs.keys():
self.configs[fidelity] = []
self.losses[fidelity] = []
self.configs[fidelity].append(config_vector)
self.losses[fidelity].append(loss)
if bool(self.kde_models.keys()) and max(self.kde_models.keys()) > fidelity:
return
if np.isfinite(self.losses[fidelity]).sum() <= self.min_samples_in_model - 1:
n_runs_finite_loss = np.isfinite(self.losses[fidelity]).sum()
self.logger.debug(
f"Only {n_runs_finite_loss} run(s) with a finite loss for fidelity "
+ f"{fidelity} available, need more than {self.min_samples_in_model+1} "
+ "-> can't build model!"
)
return
train_configs = np.array(self.configs[fidelity])
train_losses = np.array(self.losses[fidelity])
n_good = max(
self.min_samples_in_model,
(self.top_n_percent * train_configs.shape[0]) // 100,
)
n_bad = max(
self.min_samples_in_model,
((100 - self.top_n_percent) * train_configs.shape[0]) // 100,
)
# Refit KDE for the current fidelity
idx = np.argsort(train_losses)
train_data_good = impute_conditional_data(
train_configs[idx[:n_good]], self.vartypes
)
train_data_bad = impute_conditional_data(
train_configs[idx[n_good : n_good + n_bad]],
self.vartypes,
)
if train_data_good.shape[0] <= train_data_good.shape[1]:
return
if train_data_bad.shape[0] <= train_data_bad.shape[1]:
return
# more expensive crossvalidation method
# bw_estimation = 'cv_ls'
# quick rule of thumb
bw_estimation = "normal_reference"
bad_kde = sm.nonparametric.KDEMultivariate(
data=train_data_bad, var_type=self.kde_vartypes, bw=bw_estimation
)
good_kde = sm.nonparametric.KDEMultivariate(
data=train_data_good, var_type=self.kde_vartypes, bw=bw_estimation
)
bad_kde.bw = np.clip(bad_kde.bw, self.min_bandwidth, None)
good_kde.bw = np.clip(good_kde.bw, self.min_bandwidth, None)
self.kde_models[fidelity] = {"good": good_kde, "bad": bad_kde}
# update probs for the categorical parameters for later sampling
self.logger.debug(
f"done building a new model for fidelity {fidelity} based on "
+ f"{n_good}/{n_bad} split\nBest loss for this fidelity: "
+ f"{np.min(train_losses)}\n"
+ ("=" * 40)
)
sample_configuration(self)
[summary]
Returns:
Type | Description |
---|---|
Tuple[dict, dict] |
[description] |
Source code in blackboxopt/optimizers/staged/bohb.py
def sample_configuration(self) -> Tuple[dict, dict]:
"""[summary]
Returns:
[description]
"""
self.logger.debug("start sampling a new configuration.")
# Sample from prior, if no model is available or with given probability
if len(self.kde_models) == 0 or np.random.rand() < self.random_fraction:
return self.search_space.sample(), {"model_based_pick": False}
best = np.inf
best_vector = None
try:
# sample from largest fidelity
fidelity = max(self.kde_models.keys())
good = self.kde_models[fidelity]["good"].pdf
bad = self.kde_models[fidelity]["bad"].pdf
minimize_me = lambda x: max(1e-32, bad(x)) / max(good(x), 1e-32)
kde_good = self.kde_models[fidelity]["good"]
kde_bad = self.kde_models[fidelity]["bad"]
for _ in range(self.num_samples):
idx = np.random.randint(0, len(kde_good.data))
datum = kde_good.data[idx]
vector = sample_around_values(
datum,
kde_good.bw,
self.vartypes,
self.min_bandwidth,
self.bw_factor,
)
if vector is None:
continue
val = minimize_me(vector)
if not np.isfinite(val):
self.logger.warning(
"sampled vector: %s has EI value %s" % (vector, val)
)
self.logger.warning(
"data in the KDEs:\n%s\n%s" % (kde_good.data, kde_bad.data)
)
self.logger.warning(
"bandwidth of the KDEs:\n%s\n%s" % (kde_good.bw, kde_bad.bw)
)
# right now, this happens because a KDE does not contain all values
# for a categorical parameter this cannot be fixed with the
# statsmodels KDE, so for now, we are just going to evaluate this
# one if the good_kde has a finite value, i.e. there is no config
# with that value in the bad kde, so it shouldn't be terrible.
if np.isfinite(good(vector)) and best_vector is not None:
best_vector = vector
continue
if val < best:
best = val
best_vector = vector
if best_vector is None:
self.logger.debug(
f"Sampling based optimization with {self.num_samples} samples did "
+ "not find any finite/numerical acquisition function value "
+ "-> using random configuration"
)
return self.search_space.sample(), {"model_based_pick": False}
else:
self.logger.debug(
"best_vector: {}, {}, {}, {}".format(
best_vector, best, good(best_vector), bad(best_vector)
)
)
return (
self.search_space.from_numerical(best_vector),
{"model_based_pick": True},
)
except Exception:
self.logger.debug(
"Sample base optimization failed. Falling back to a random sample."
)
return self.search_space.sample(), {"model_based_pick": False}