Skip to content

blackboxopt.optimization_loops.dask_distributed

run_optimization_loop(optimizer, evaluation_function, dask_client, timeout_s=inf, max_evaluations=None, logger=None)

Convenience wrapper for an optimization loop that uses Dask to parallelize optimization until a given timeout or maximum number of evaluations is reached.

This already handles signals from the optimizer in case there is no evaluation specification available yet.

Parameters:

Name Type Description Default
optimizer Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer]

The blackboxopt optimizer to run.

required
dask_client Client

A Dask Distributed client that is configured with workers.

required
evaluation_function Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation]

The function that is called with configuration, settings and optimizer info dictionaries as arguments like provided by an evaluation specification. This is the function that encapsulates the actual execution of a parametrized experiment (e.g. ML model training) and should return a blackboxopt.Evaluation as a result.

required
timeout_s float

If given, the optimization loop will terminate after the first optimization step that exceeded the timeout (in seconds). Defaults to inf.

inf
max_evaluations int

If given, the optimization loop will terminate after the given number of steps. Defaults to None.

None
logger Logger

The logger to use for logging progress. Defaults to None.

None

Returns:

Type Description
List[blackboxopt.evaluation.Evaluation]

List of evluation specification and result for all evaluations.

Source code in blackboxopt/optimization_loops/dask_distributed.py
def run_optimization_loop(
    optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
    evaluation_function: Callable[[EvaluationSpecification], Evaluation],
    dask_client: dd.Client,
    timeout_s: float = float("inf"),
    max_evaluations: int = None,
    logger: logging.Logger = None,
) -> List[Evaluation]:
    """Convenience wrapper for an optimization loop that uses Dask to parallelize
    optimization until a given timeout or maximum number of evaluations is reached.

    This already handles signals from the optimizer in case there is no evaluation
    specification available yet.

    Args:
        optimizer: The blackboxopt optimizer to run.
        dask_client: A Dask Distributed client that is configured with workers.
        evaluation_function: The function that is called with configuration, settings
            and optimizer info dictionaries as arguments like provided by an evaluation
            specification.
            This is the function that encapsulates the actual execution of
            a parametrized experiment (e.g. ML model training) and should return a
            `blackboxopt.Evaluation` as a result.
        timeout_s: If given, the optimization loop will terminate after the first
            optimization step that exceeded the timeout (in seconds). Defaults to inf.
        max_evaluations: If given, the optimization loop will terminate after the given
            number of steps. Defaults to None.
        logger: The logger to use for logging progress. Defaults to None.

    Returns:
        List of evluation specification and result for all evaluations.
    """
    logger = logging.getLogger("blackboxopt") if logger is None else logger

    objectives = (
        optimizer.objectives
        if isinstance(optimizer, MultiObjectiveOptimizer)
        else [optimizer.objective]
    )
    evaluations: List[Evaluation] = []

    dask_scheduler = MinimalDaskScheduler(
        dask_client=dask_client, objectives=objectives, logger=logger
    )

    _max_evaluations = init_max_evaluations_with_limit_logging(
        max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
    )

    n_eval_specs = 0
    start = time.time()
    while time.time() - start < timeout_s and n_eval_specs < _max_evaluations:
        if dask_scheduler.has_capacity():
            try:
                eval_spec = optimizer.get_evaluation_specification()
                dask_scheduler.submit(evaluation_function, eval_spec)
                n_eval_specs += 1
                continue

            except OptimizerNotReady:
                logger.info("Optimizer is not ready yet; will retry after short pause.")

            except OptimizationComplete:
                logger.info("Optimization is complete")
                break

        for evaluation in dask_scheduler.check_for_results(timeout_s=20):
            optimizer.report_evaluation(evaluation)
            evaluations.append(evaluation)

    while dask_scheduler.has_running_jobs():
        for evaluation in dask_scheduler.check_for_results(timeout_s=20):
            optimizer.report_evaluation(evaluation)
            evaluations.append(evaluation)

    return evaluations