Skip to content

Dask Distributed Optimization Loop

In case you are working with dask, this optimization loop can help you run blackboxopt based optimization leveraging your dask cluster. See also the corresponding example for more details.

blackboxopt.optimization_loops.dask_distributed

run_optimization_loop(optimizer, evaluation_function, dask_client, timeout_s=inf, max_evaluations=None, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)

Convenience wrapper for an optimization loop that uses Dask to parallelize optimization until a given timeout or maximum number of evaluations is reached.

This already handles signals from the optimizer in case there is no evaluation specification available yet.

Parameters:

Name Type Description Default
optimizer Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer]

The blackboxopt optimizer to run.

required
dask_client Client

A Dask Distributed client that is configured with workers.

required
evaluation_function Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation]

The function that is called with configuration, settings and optimizer info dictionaries as arguments like provided by an evaluation specification. This is the function that encapsulates the actual execution of a parametrized experiment (e.g. ML model training) and should return a blackboxopt.Evaluation as a result.

required
timeout_s float

If given, the optimization loop will terminate after the first optimization step that exceeded the timeout (in seconds). Defaults to inf.

inf
max_evaluations int

If given, the optimization loop will terminate after the given number of steps. Defaults to None.

None
pre_evaluation_callback Optional[Callable[[blackboxopt.evaluation.EvaluationSpecification], Any]]

Reference to a callable that is invoked before each evaluation and takes a blackboxopt.EvaluationSpecification as an argument.

None
post_evaluation_callback Optional[Callable[[blackboxopt.evaluation.Evaluation], Any]]

Reference to a callable that is invoked after each evaluation and takes a blackboxopt.Evaluation as an argument.

None
logger Logger

The logger to use for logging progress. Defaults to None.

None

Returns:

Type Description
List[blackboxopt.evaluation.Evaluation]

List of evluation specification and result for all evaluations.

Source code in blackboxopt/optimization_loops/dask_distributed.py
def run_optimization_loop(
    optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
    evaluation_function: Callable[[EvaluationSpecification], Evaluation],
    dask_client: dd.Client,
    timeout_s: float = float("inf"),
    max_evaluations: int = None,
    pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
    post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
    logger: logging.Logger = None,
) -> List[Evaluation]:
    """Convenience wrapper for an optimization loop that uses Dask to parallelize
    optimization until a given timeout or maximum number of evaluations is reached.

    This already handles signals from the optimizer in case there is no evaluation
    specification available yet.

    Args:
        optimizer: The blackboxopt optimizer to run.
        dask_client: A Dask Distributed client that is configured with workers.
        evaluation_function: The function that is called with configuration, settings
            and optimizer info dictionaries as arguments like provided by an evaluation
            specification.
            This is the function that encapsulates the actual execution of
            a parametrized experiment (e.g. ML model training) and should return a
            `blackboxopt.Evaluation` as a result.
        timeout_s: If given, the optimization loop will terminate after the first
            optimization step that exceeded the timeout (in seconds). Defaults to inf.
        max_evaluations: If given, the optimization loop will terminate after the given
            number of steps. Defaults to None.
        pre_evaluation_callback: Reference to a callable that is invoked before each
            evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
        post_evaluation_callback: Reference to a callable that is invoked after each
            evaluation and takes a `blackboxopt.Evaluation` as an argument.
        logger: The logger to use for logging progress. Defaults to None.

    Returns:
        List of evluation specification and result for all evaluations.
    """
    logger = logging.getLogger("blackboxopt") if logger is None else logger

    objectives = (
        optimizer.objectives
        if isinstance(optimizer, MultiObjectiveOptimizer)
        else [optimizer.objective]
    )
    evaluations: List[Evaluation] = []

    dask_scheduler = MinimalDaskScheduler(
        dask_client=dask_client, objectives=objectives, logger=logger
    )

    _max_evaluations = init_max_evaluations_with_limit_logging(
        max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
    )

    n_eval_specs = 0
    start = time.time()
    while time.time() - start < timeout_s and n_eval_specs < _max_evaluations:
        if dask_scheduler.has_capacity():
            try:
                eval_spec = optimizer.generate_evaluation_specification()

                if pre_evaluation_callback:
                    pre_evaluation_callback(eval_spec)

                dask_scheduler.submit(evaluation_function, eval_spec)
                n_eval_specs += 1
                continue

            except OptimizerNotReady:
                logger.info("Optimizer is not ready yet; will retry after short pause.")

            except OptimizationComplete:
                logger.info("Optimization is complete")
                break

        new_evaluations = dask_scheduler.check_for_results(timeout_s=20)

        if post_evaluation_callback:
            list(map(post_evaluation_callback, new_evaluations))

        optimizer.report(new_evaluations)
        evaluations.extend(new_evaluations)

    while dask_scheduler.has_running_jobs():
        new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
        if post_evaluation_callback:
            list(map(post_evaluation_callback, new_evaluations))
        optimizer.report(new_evaluations)
        evaluations.extend(new_evaluations)

    return evaluations