Dask Distributed Optimization Loop
In case you are working with dask, this optimization
loop can help you run blackboxopt
based optimization leveraging your dask cluster.
See also the corresponding example for more
details.
blackboxopt.optimization_loops.dask_distributed
run_optimization_loop(optimizer, evaluation_function, dask_client, timeout_s=inf, max_evaluations=None, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)
Convenience wrapper for an optimization loop that uses Dask to parallelize optimization until a given timeout or maximum number of evaluations is reached.
This already handles signals from the optimizer in case there is no evaluation specification available yet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
optimizer |
Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer] |
The blackboxopt optimizer to run. |
required |
dask_client |
Client |
A Dask Distributed client that is configured with workers. |
required |
evaluation_function |
Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation] |
The function that is called with configuration, settings
and optimizer info dictionaries as arguments like provided by an evaluation
specification.
This is the function that encapsulates the actual execution of
a parametrized experiment (e.g. ML model training) and should return a
|
required |
timeout_s |
float |
If given, the optimization loop will terminate after the first optimization step that exceeded the timeout (in seconds). Defaults to inf. |
inf |
max_evaluations |
int |
If given, the optimization loop will terminate after the given number of steps. Defaults to None. |
None |
pre_evaluation_callback |
Optional[Callable[[blackboxopt.evaluation.EvaluationSpecification], Any]] |
Reference to a callable that is invoked before each
evaluation and takes a |
None |
post_evaluation_callback |
Optional[Callable[[blackboxopt.evaluation.Evaluation], Any]] |
Reference to a callable that is invoked after each
evaluation and takes a |
None |
logger |
Logger |
The logger to use for logging progress. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
List[blackboxopt.evaluation.Evaluation] |
List of evluation specification and result for all evaluations. |
Source code in blackboxopt/optimization_loops/dask_distributed.py
def run_optimization_loop(
optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
evaluation_function: Callable[[EvaluationSpecification], Evaluation],
dask_client: dd.Client,
timeout_s: float = float("inf"),
max_evaluations: int = None,
pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
logger: logging.Logger = None,
) -> List[Evaluation]:
"""Convenience wrapper for an optimization loop that uses Dask to parallelize
optimization until a given timeout or maximum number of evaluations is reached.
This already handles signals from the optimizer in case there is no evaluation
specification available yet.
Args:
optimizer: The blackboxopt optimizer to run.
dask_client: A Dask Distributed client that is configured with workers.
evaluation_function: The function that is called with configuration, settings
and optimizer info dictionaries as arguments like provided by an evaluation
specification.
This is the function that encapsulates the actual execution of
a parametrized experiment (e.g. ML model training) and should return a
`blackboxopt.Evaluation` as a result.
timeout_s: If given, the optimization loop will terminate after the first
optimization step that exceeded the timeout (in seconds). Defaults to inf.
max_evaluations: If given, the optimization loop will terminate after the given
number of steps. Defaults to None.
pre_evaluation_callback: Reference to a callable that is invoked before each
evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
post_evaluation_callback: Reference to a callable that is invoked after each
evaluation and takes a `blackboxopt.Evaluation` as an argument.
logger: The logger to use for logging progress. Defaults to None.
Returns:
List of evluation specification and result for all evaluations.
"""
logger = logging.getLogger("blackboxopt") if logger is None else logger
objectives = (
optimizer.objectives
if isinstance(optimizer, MultiObjectiveOptimizer)
else [optimizer.objective]
)
evaluations: List[Evaluation] = []
dask_scheduler = MinimalDaskScheduler(
dask_client=dask_client, objectives=objectives, logger=logger
)
_max_evaluations = init_max_evaluations_with_limit_logging(
max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
)
n_eval_specs = 0
start = time.time()
while time.time() - start < timeout_s and n_eval_specs < _max_evaluations:
if dask_scheduler.has_capacity():
try:
eval_spec = optimizer.generate_evaluation_specification()
if pre_evaluation_callback:
pre_evaluation_callback(eval_spec)
dask_scheduler.submit(evaluation_function, eval_spec)
n_eval_specs += 1
continue
except OptimizerNotReady:
logger.info("Optimizer is not ready yet; will retry after short pause.")
except OptimizationComplete:
logger.info("Optimization is complete")
break
new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
if post_evaluation_callback:
list(map(post_evaluation_callback, new_evaluations))
optimizer.report(new_evaluations)
evaluations.extend(new_evaluations)
while dask_scheduler.has_running_jobs():
new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
if post_evaluation_callback:
list(map(post_evaluation_callback, new_evaluations))
optimizer.report(new_evaluations)
evaluations.extend(new_evaluations)
return evaluations