Skip to content

blackboxopt.optimization_loops.dask_distributed

run_optimization_loop(optimizer, evaluation_function, dask_client, timeout_s=inf, max_evaluations=None, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)

Convenience wrapper for an optimization loop that uses Dask to parallelize optimization until a given timeout or maximum number of evaluations is reached.

This already handles signals from the optimizer in case there is no evaluation specification available yet.

Parameters:

Name Type Description Default
optimizer Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer]

The blackboxopt optimizer to run.

required
dask_client Client

A Dask Distributed client that is configured with workers.

required
evaluation_function Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation]

The function that is called with configuration, settings and optimizer info dictionaries as arguments like provided by an evaluation specification. This is the function that encapsulates the actual execution of a parametrized experiment (e.g. ML model training) and should return a blackboxopt.Evaluation as a result.

required
timeout_s float

If given, the optimization loop will terminate after the first optimization step that exceeded the timeout (in seconds). Defaults to inf.

inf
max_evaluations int

If given, the optimization loop will terminate after the given number of steps. Defaults to None.

None
pre_evaluation_callback Optional[Callable[[blackboxopt.evaluation.EvaluationSpecification], Any]]

Reference to a callable that is invoked before each evaluation and takes a blackboxopt.EvaluationSpecification as an argument.

None
post_evaluation_callback Optional[Callable[[blackboxopt.evaluation.Evaluation], Any]]

Reference to a callable that is invoked after each evaluation and takes a blackboxopt.Evaluation as an argument.

None
logger Logger

The logger to use for logging progress. Defaults to None.

None

Returns:

Type Description
List[blackboxopt.evaluation.Evaluation]

List of evluation specification and result for all evaluations.

Source code in blackboxopt/optimization_loops/dask_distributed.py
def run_optimization_loop(
    optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
    evaluation_function: Callable[[EvaluationSpecification], Evaluation],
    dask_client: dd.Client,
    timeout_s: float = float("inf"),
    max_evaluations: int = None,
    pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
    post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
    logger: logging.Logger = None,
) -> List[Evaluation]:
    """Convenience wrapper for an optimization loop that uses Dask to parallelize
    optimization until a given timeout or maximum number of evaluations is reached.

    This already handles signals from the optimizer in case there is no evaluation
    specification available yet.

    Args:
        optimizer: The blackboxopt optimizer to run.
        dask_client: A Dask Distributed client that is configured with workers.
        evaluation_function: The function that is called with configuration, settings
            and optimizer info dictionaries as arguments like provided by an evaluation
            specification.
            This is the function that encapsulates the actual execution of
            a parametrized experiment (e.g. ML model training) and should return a
            `blackboxopt.Evaluation` as a result.
        timeout_s: If given, the optimization loop will terminate after the first
            optimization step that exceeded the timeout (in seconds). Defaults to inf.
        max_evaluations: If given, the optimization loop will terminate after the given
            number of steps. Defaults to None.
        pre_evaluation_callback: Reference to a callable that is invoked before each
            evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
        post_evaluation_callback: Reference to a callable that is invoked after each
            evaluation and takes a `blackboxopt.Evaluation` as an argument.
        logger: The logger to use for logging progress. Defaults to None.

    Returns:
        List of evluation specification and result for all evaluations.
    """
    logger = logging.getLogger("blackboxopt") if logger is None else logger

    objectives = (
        optimizer.objectives
        if isinstance(optimizer, MultiObjectiveOptimizer)
        else [optimizer.objective]
    )
    evaluations: List[Evaluation] = []

    dask_scheduler = MinimalDaskScheduler(
        dask_client=dask_client, objectives=objectives, logger=logger
    )

    _max_evaluations = init_max_evaluations_with_limit_logging(
        max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
    )

    n_eval_specs = 0
    start = time.time()
    while time.time() - start < timeout_s and n_eval_specs < _max_evaluations:
        if dask_scheduler.has_capacity():
            try:
                eval_spec = optimizer.generate_evaluation_specification()

                if pre_evaluation_callback:
                    pre_evaluation_callback(eval_spec)

                dask_scheduler.submit(evaluation_function, eval_spec)
                n_eval_specs += 1
                continue

            except OptimizerNotReady:
                logger.info("Optimizer is not ready yet; will retry after short pause.")

            except OptimizationComplete:
                logger.info("Optimization is complete")
                break

        new_evaluations = dask_scheduler.check_for_results(timeout_s=20)

        if post_evaluation_callback:
            list(map(post_evaluation_callback, new_evaluations))

        optimizer.report(new_evaluations)
        evaluations.extend(new_evaluations)

    while dask_scheduler.has_running_jobs():
        new_evaluations = dask_scheduler.check_for_results(timeout_s=20)
        if post_evaluation_callback:
            list(map(post_evaluation_callback, new_evaluations))
        optimizer.report(new_evaluations)
        evaluations.extend(new_evaluations)

    return evaluations