blackboxopt.optimization_loops.dask_distributed
run_optimization_loop(optimizer, evaluation_function, dask_client, timeout_s=inf, max_evaluations=None, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)
Convenience wrapper for an optimization loop that uses Dask to parallelize optimization until a given timeout or maximum number of evaluations is reached.
This already handles signals from the optimizer in case there is no evaluation specification available yet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
optimizer |
Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer] |
The blackboxopt optimizer to run. |
required |
dask_client |
Client |
A Dask Distributed client that is configured with workers. |
required |
evaluation_function |
Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation] |
The function that is called with configuration, settings
and optimizer info dictionaries as arguments like provided by an evaluation
specification.
This is the function that encapsulates the actual execution of
a parametrized experiment (e.g. ML model training) and should return a
|
required |
timeout_s |
float |
If given, the optimization loop will terminate after the first optimization step that exceeded the timeout (in seconds). Defaults to inf. |
inf |
max_evaluations |
int |
If given, the optimization loop will terminate after the given number of steps. Defaults to None. |
None |
pre_evaluation_callback |
Optional[Callable[[blackboxopt.evaluation.EvaluationSpecification], Any]] |
Reference to a callable that is invoked before each
evaluation and takes a |
None |
post_evaluation_callback |
Optional[Callable[[blackboxopt.evaluation.Evaluation], Any]] |
Reference to a callable that is invoked after each
evaluation and takes a |
None |
logger |
Logger |
The logger to use for logging progress. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
List[blackboxopt.evaluation.Evaluation] |
List of evluation specification and result for all evaluations. |
Source code in blackboxopt/optimization_loops/dask_distributed.py
def run_optimization_loop(
optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
evaluation_function: Callable[[EvaluationSpecification], Evaluation],
dask_client: dd.Client,
timeout_s: float = float("inf"),
max_evaluations: int = None,
pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
logger: logging.Logger = None,
) -> List[Evaluation]:
"""Convenience wrapper for an optimization loop that uses Dask to parallelize
optimization until a given timeout or maximum number of evaluations is reached.
This already handles signals from the optimizer in case there is no evaluation
specification available yet.
Args:
optimizer: The blackboxopt optimizer to run.
dask_client: A Dask Distributed client that is configured with workers.
evaluation_function: The function that is called with configuration, settings
and optimizer info dictionaries as arguments like provided by an evaluation
specification.
This is the function that encapsulates the actual execution of
a parametrized experiment (e.g. ML model training) and should return a
`blackboxopt.Evaluation` as a result.
timeout_s: If given, the optimization loop will terminate after the first
optimization step that exceeded the timeout (in seconds). Defaults to inf.
max_evaluations: If given, the optimization loop will terminate after the given
number of steps. Defaults to None.
pre_evaluation_callback: Reference to a callable that is invoked before each
evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
post_evaluation_callback: Reference to a callable that is invoked after each
evaluation and takes a `blackboxopt.Evaluation` as an argument.
logger: The logger to use for logging progress. Defaults to None.
Returns:
List of evluation specification and result for all evaluations.
"""
logger = logging.getLogger("blackboxopt") if logger is None else logger
objectives = (
optimizer.objectives
if isinstance(optimizer, MultiObjectiveOptimizer)
else [optimizer.objective]
)
evaluations: List[Evaluation] = []
dask_scheduler = MinimalDaskScheduler(
dask_client=dask_client, objectives=objectives, logger=logger
)
_max_evaluations = init_max_evaluations_with_limit_logging(
max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
)
n_eval_specs = 0
start = time.time()
while time.time() - start < timeout_s and n_eval_specs < _max_evaluations:
if dask_scheduler.has_capacity():
try:
eval_spec = optimizer.generate_evaluation_specification()
if pre_evaluation_callback:
pre_evaluation_callback(eval_spec)
dask_scheduler.submit(evaluation_function, eval_spec)
n_eval_specs += 1
continue
except OptimizerNotReady:
logger.info("Optimizer is not ready yet; will retry after short pause.")
except OptimizationComplete:
logger.info("Optimization is complete")
break
new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
if post_evaluation_callback:
list(map(post_evaluation_callback, new_evaluations))
optimizer.report(new_evaluations)
evaluations.extend(new_evaluations)
while dask_scheduler.has_running_jobs():
new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
if post_evaluation_callback:
list(map(post_evaluation_callback, new_evaluations))
optimizer.report(new_evaluations)
evaluations.extend(new_evaluations)
return evaluations