Skip to content

File based distributed

evaluate_specifications(target_directory, evaluation_function, objectives, timeout_s=inf, max_evaluations=None, catch_exceptions_from_evaluation_function=False, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)

Evaluate specifications from the target directory until the timeout_s or max_evaluations is reached.

Parameters:

Name Type Description Default
target_directory PathLike

The directory where the evaluation specifications and results are stored.

required
evaluation_function Callable[[blackboxopt.evaluation.EvaluationSpecification], blackboxopt.evaluation.Evaluation]

The function that evaluates the evaluation specification.

required
objectives List[blackboxopt.base.Objective]

The objectives reported in the evaluation function. This is used to specify None values for objectives in case of an error.

required
timeout_s float

The maximum time in seconds to run the optimization loop.

inf
max_evaluations Optional[int]

The maximum number of evaluations to perform on this agent.

None
catch_exceptions_from_evaluation_function bool

Whether to exit on an unhandled exception raised by the evaluation function or instead store their stack trace in the evaluation's stacktrace attribute. Set to True if there are spurious errors due to e.g. numerical instability that should not halt the optimization loop. For more details, see the wrapper that is used internally blackboxopt.optimization_loops.utils.evaluation_function_wrapper

False
pre_evaluation_callback Optional[Callable[[blackboxopt.evaluation.EvaluationSpecification], Any]]

Reference to a callable that is invoked before each evaluation and takes a blackboxopt.EvaluationSpecification as an argument.

None
post_evaluation_callback Optional[Callable[[blackboxopt.evaluation.Evaluation], Any]]

Reference to a callable that is invoked after each evaluation and takes a blackboxopt.Evaluation as an argument.

None
logger Optional[logging.Logger]

The logger to use for logging. If None, the default logger is used.

None
Source code in blackboxopt/optimization_loops/file_based_distributed.py
def evaluate_specifications(
    target_directory: PathLike,
    evaluation_function: Callable[[EvaluationSpecification], Evaluation],
    objectives: List[Objective],
    timeout_s: float = float("inf"),
    max_evaluations: Optional[int] = None,
    catch_exceptions_from_evaluation_function: bool = False,
    pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
    post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
    logger: Optional[logging.Logger] = None,
):
    """Evaluate specifications from the target directory until the `timeout_s` or
    `max_evaluations` is reached.

    Args:
        target_directory: The directory where the evaluation specifications and results
            are stored.
        evaluation_function: The function that evaluates the evaluation specification.
        objectives: The objectives reported in the evaluation function. This is used to
            specify None values for objectives in case of an error.
        timeout_s: The maximum time in seconds to run the optimization loop.
        max_evaluations: The maximum number of evaluations to perform on this agent.
        catch_exceptions_from_evaluation_function: Whether to exit on an unhandled
            exception raised by the evaluation function or instead store their stack
            trace in the evaluation's `stacktrace` attribute. Set to True if there are
            spurious errors due to e.g. numerical instability that should not halt the
            optimization loop. For more details, see the wrapper that is used internally
            `blackboxopt.optimization_loops.utils.evaluation_function_wrapper`
        pre_evaluation_callback: Reference to a callable that is invoked before each
            evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
        post_evaluation_callback: Reference to a callable that is invoked after each
            evaluation and takes a `blackboxopt.Evaluation` as an argument.
        logger: The logger to use for logging. If `None`, the default logger is used.
    """
    if logger is None:
        logger = default_logger

    target_directory = Path(target_directory)

    _max_evaluations = init_max_evaluations_with_limit_logging(
        max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
    )

    start = time.time()
    num_evaluations = 0
    while time.time() - start < timeout_s and num_evaluations < _max_evaluations:
        current_proposals = glob.glob(
            str(target_directory / f"{EVALUATION_SPECIFICATION_FILE_NAME_PREFIX}*.json")
        )
        if not current_proposals:
            logger.info("No proposals found, retrying in one second.")
            time.sleep(1.0)
            continue

        # Just pick one random proposal instead of iterating over all available ones, to
        # reduce the risk of a race condition when two or more agents are running in
        # parallel and trying to evaluate the same proposals.
        eval_spec_path = random.choice(current_proposals)
        try:
            with open(eval_spec_path, "r", encoding="utf-8") as fh:
                eval_spec = EvaluationSpecification(**json.load(fh))
            # Allow missing, in case the proposal was already evaluated by another agent
        except FileNotFoundError:
            logging.warning(
                f"Could not read evaluation specification from {eval_spec_path}, "
                + "it was likely already evaluated elsewhere."
            )
            continue
        Path(eval_spec_path).unlink(missing_ok=True)

        logger.info(
            "The optimizer proposed this evaluation specification:\n%s",
            pprint.pformat(eval_spec.to_dict(), compact=True),
        )
        if pre_evaluation_callback:
            pre_evaluation_callback(eval_spec)

        evaluation = evaluation_function_wrapper(
            evaluation_function=evaluation_function,
            evaluation_specification=eval_spec,
            logger=logger,
            objectives=objectives,
            catch_exceptions_from_evaluation_function=catch_exceptions_from_evaluation_function,
        )

        logger.info(
            "Reporting this evaluation result to the optimizer:\n%s",
            pprint.pformat(evaluation.to_dict(), compact=True),
        )
        if post_evaluation_callback:
            post_evaluation_callback(evaluation)

        with open(
            target_directory
            / (
                EVALUATION_RESULT_FILE_NAME_PREFIX
                + f"{eval_spec.optimizer_info['eval_spec_id']}.json"
            ),
            "w",
            encoding="utf-8",
        ) as fh:
            json.dump(evaluation.to_dict(), fh)
            num_evaluations += 1

run_optimization_loop(optimizer, target_directory, timeout_s=inf, max_evaluations=None, proposal_queue_size=1, pre_evaluation_callback=None, post_evaluation_callback=None, logger=None)

Parameters:

Name Type Description Default
optimizer Union[blackboxopt.base.SingleObjectiveOptimizer, blackboxopt.base.MultiObjectiveOptimizer]

The optimizer instance to use for specification generation and evaluation ingestion.

required
target_directory PathLike

The directory where the evaluation specifications and results are stored.

required
timeout_s float

The maximum time in seconds to run the optimization loop.

inf
max_evaluations Optional[int]

The maximum number of evaluations to perform.

None
proposal_queue_size int

The number of proposals to keep in the queue, i.e. the size of the evaluation batch for parallel/batch evaluation.

1
logger Optional[logging.Logger]

The logger to use for logging. If None, the default logger is used.

None
Source code in blackboxopt/optimization_loops/file_based_distributed.py
def run_optimization_loop(
    optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
    target_directory: PathLike,
    timeout_s: float = float("inf"),
    max_evaluations: Optional[int] = None,
    proposal_queue_size: int = 1,
    pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
    post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
    logger: Optional[logging.Logger] = None,
) -> List[Evaluation]:
    """
    Args:
        optimizer: The optimizer instance to use for specification generation and
            evaluation ingestion.
        target_directory: The directory where the evaluation specifications and results
            are stored.
        timeout_s: The maximum time in seconds to run the optimization loop.
        max_evaluations: The maximum number of evaluations to perform.
        proposal_queue_size: The number of proposals to keep in the queue, i.e. the size
            of the evaluation batch for parallel/batch evaluation.
        logger: The logger to use for logging. If `None`, the default logger is used.
    """
    if logger is None:
        logger = default_logger

    target_directory = Path(target_directory)
    objectives = (
        optimizer.objectives
        if isinstance(optimizer, MultiObjectiveOptimizer)
        else [optimizer.objective]
    )
    evaluations: List[Evaluation] = []

    _max_evaluations = init_max_evaluations_with_limit_logging(
        max_evaluations=max_evaluations, timeout_s=timeout_s, logger=logger
    )

    start = time.time()
    while True:
        evaluations_to_report = glob.glob(
            str(target_directory / f"{EVALUATION_RESULT_FILE_NAME_PREFIX}*.json")
        )
        for eval_result_file_path in evaluations_to_report:
            with open(eval_result_file_path, "r", encoding="utf-8") as fh:
                evaluation = Evaluation(**json.load(fh))
            Path(eval_result_file_path).unlink()
            if not evaluation.objectives and evaluation.stacktrace:
                evaluation.objectives = {o.name: None for o in objectives}

            logger.info(
                "Reporting this evaluation result to the optimizer:\n%s",
                pprint.pformat(evaluation.to_dict(), compact=True),
            )
            if post_evaluation_callback:
                post_evaluation_callback(evaluation)

            optimizer.report(evaluation)
            evaluations.append(evaluation)

        if time.time() - start >= timeout_s or len(evaluations) >= _max_evaluations:
            return evaluations

        current_proposals = glob.glob(
            str(target_directory / f"{EVALUATION_SPECIFICATION_FILE_NAME_PREFIX}*.json")
        )
        while len(current_proposals) < proposal_queue_size:
            eval_spec_id = str(uuid4())
            try:
                eval_spec = optimizer.generate_evaluation_specification()
                eval_spec.optimizer_info["eval_spec_id"] = eval_spec_id

                logger.info(
                    "The optimizer proposed this evaluation specification:\n%s",
                    pprint.pformat(eval_spec.to_dict(), compact=True),
                )
                if pre_evaluation_callback:
                    pre_evaluation_callback(eval_spec)

                with open(
                    target_directory / f"eval_spec_{eval_spec_id}.json",
                    "w",
                    encoding="utf-8",
                ) as fh:
                    json.dump(eval_spec.to_dict(), fh)
                current_proposals = glob.glob(
                    str(target_directory / "eval_spec_*.json")
                )
            except OptimizerNotReady:
                logger.info("Optimizer is not ready yet, retrying in two seconds")
                time.sleep(2.0)
            except OptimizationComplete:
                logger.info("Optimization is complete")
                return evaluations

        time.sleep(0.5)