Skip to content

Commit

Permalink
add callbacks, logging, comments, handle race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Grossberger Lukas (CR/AIR2.2) <Lukas.Grossberger@de.bosch.com>
  • Loading branch information
LGro committed Feb 8, 2024
1 parent 4542471 commit dc56043
Showing 1 changed file with 100 additions and 26 deletions.
126 changes: 100 additions & 26 deletions blackboxopt/optimization_loops/file_based_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
import glob
import json
import logging
import pprint
import random
import time
from os import PathLike
from pathlib import Path
from typing import Callable, List, Optional, Union
from typing import Any, Callable, List, Optional, Union
from uuid import uuid4

from blackboxopt import (
Evaluation,
EvaluationSpecification,
Objective,
OptimizationComplete,
OptimizerNotReady,
)
from blackboxopt import logger as default_logger
Expand All @@ -25,13 +28,18 @@
init_max_evaluations_with_limit_logging,
)

EVALUATION_RESULT_FILE_NAME_PREFIX = "eval_result_"
EVALUATION_SPECIFICATION_FILE_NAME_PREFIX = "eval_spec_"


def run_optimization_loop(
optimizer: Union[SingleObjectiveOptimizer, MultiObjectiveOptimizer],
target_directory: PathLike,
timeout_s: float = float("inf"),
max_evaluations: Optional[int] = None,
proposal_queue_size: int = 1,
pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
logger: Optional[logging.Logger] = None,
) -> List[Evaluation]:
"""
Expand Down Expand Up @@ -63,25 +71,45 @@ def run_optimization_loop(

start = time.time()
while True:
evaluations_to_report = glob.glob(str(target_directory / "eval_result_*.json"))
for eval_result in evaluations_to_report:
with open(eval_result, "r", encoding="utf-8") as fh:
evaluations_to_report = glob.glob(
str(target_directory / f"{EVALUATION_RESULT_FILE_NAME_PREFIX}*.json")
)
for eval_result_file_path in evaluations_to_report:
with open(eval_result_file_path, "r", encoding="utf-8") as fh:
evaluation = Evaluation(**json.load(fh))
Path(eval_result).unlink(missing_ok=True)
Path(eval_result_file_path).unlink()
if not evaluation.objectives and evaluation.stacktrace:
evaluation.objectives = {o.name: None for o in objectives}
evaluations.append(evaluation)

logger.info(
"Reporting this evaluation result to the optimizer:\n%s",
pprint.pformat(evaluation.to_dict(), compact=True),
)
if post_evaluation_callback:
post_evaluation_callback(evaluation)

optimizer.report(evaluation)
evaluations.append(evaluation)

if time.time() - start >= timeout_s or len(evaluations) >= _max_evaluations:
return evaluations

current_proposals = glob.glob(str(target_directory / "eval_spec_*.json"))
current_proposals = glob.glob(
str(target_directory / f"{EVALUATION_SPECIFICATION_FILE_NAME_PREFIX}*.json")
)
while len(current_proposals) < proposal_queue_size:
eval_spec_id = str(uuid4())
try:
eval_spec = optimizer.generate_evaluation_specification()
eval_spec.optimizer_info["eval_spec_id"] = eval_spec_id

logger.info(
"The optimizer proposed this evaluation specification:\n%s",
pprint.pformat(eval_spec.to_dict(), compact=True),
)
if pre_evaluation_callback:
pre_evaluation_callback(eval_spec)

with open(
target_directory / f"eval_spec_{eval_spec_id}.json",
"w",
Expand All @@ -92,7 +120,11 @@ def run_optimization_loop(
str(target_directory / "eval_spec_*.json")
)
except OptimizerNotReady:
continue
logger.info("Optimizer is not ready yet, retrying in two seconds")
time.sleep(2.0)
except OptimizationComplete:
logger.info("Optimization is complete")
return evaluations

time.sleep(0.5)

Expand All @@ -104,6 +136,8 @@ def evaluate_specifications(
timeout_s: float = float("inf"),
max_evaluations: Optional[int] = None,
catch_exceptions_from_evaluation_function: bool = False,
pre_evaluation_callback: Optional[Callable[[EvaluationSpecification], Any]] = None,
post_evaluation_callback: Optional[Callable[[Evaluation], Any]] = None,
logger: Optional[logging.Logger] = None,
):
"""Evaluate specifications from the target directory until the `timeout_s` or
Expand All @@ -123,6 +157,10 @@ def evaluate_specifications(
spurious errors due to e.g. numerical instability that should not halt the
optimization loop. For more details, see the wrapper that is used internally
`blackboxopt.optimization_loops.utils.evaluation_function_wrapper`
pre_evaluation_callback: Reference to a callable that is invoked before each
evaluation and takes a `blackboxopt.EvaluationSpecification` as an argument.
post_evaluation_callback: Reference to a callable that is invoked after each
evaluation and takes a `blackboxopt.Evaluation` as an argument.
logger: The logger to use for logging. If `None`, the default logger is used.
"""
if logger is None:
Expand All @@ -137,24 +175,60 @@ def evaluate_specifications(
start = time.time()
num_evaluations = 0
while time.time() - start < timeout_s and num_evaluations < _max_evaluations:
current_proposals = glob.glob(str(target_directory / "eval_spec_*.json"))
for eval_spec_path in current_proposals:
current_proposals = glob.glob(
str(target_directory / f"{EVALUATION_SPECIFICATION_FILE_NAME_PREFIX}*.json")
)
if not current_proposals:
logger.info("No proposals found, retrying in one second.")
time.sleep(1.0)
continue

# Just pick one random proposal instead of iterating over all available ones, to
# reduce the risk of a race condition when two or more agents are running in
# parallel and trying to evaluate the same proposals.
eval_spec_path = random.choice(current_proposals)
try:
with open(eval_spec_path, "r", encoding="utf-8") as fh:
eval_spec = EvaluationSpecification(**json.load(fh))
Path(eval_spec_path).unlink(missing_ok=True)
evaluation = evaluation_function_wrapper(
evaluation_function=evaluation_function,
evaluation_specification=eval_spec,
logger=logger,
objectives=objectives,
catch_exceptions_from_evaluation_function=catch_exceptions_from_evaluation_function,
# Allow missing, in case the proposal was already evaluated by another agent
except FileNotFoundError:
logging.warning(
f"Could not read evaluation specification from {eval_spec_path}, "
+ "it was likely already evaluated elsewhere."
)
with open(
target_directory
/ f"eval_result_{eval_spec.optimizer_info['eval_spec_id']}.json",
"w",
encoding="utf-8",
) as fh:
json.dump(evaluation.to_dict(), fh)
num_evaluations += 1
time.sleep(0.5)
continue
Path(eval_spec_path).unlink(missing_ok=True)

logger.info(
"The optimizer proposed this evaluation specification:\n%s",
pprint.pformat(eval_spec.to_dict(), compact=True),
)
if pre_evaluation_callback:
pre_evaluation_callback(eval_spec)

evaluation = evaluation_function_wrapper(
evaluation_function=evaluation_function,
evaluation_specification=eval_spec,
logger=logger,
objectives=objectives,
catch_exceptions_from_evaluation_function=catch_exceptions_from_evaluation_function,
)

logger.info(
"Reporting this evaluation result to the optimizer:\n%s",
pprint.pformat(evaluation.to_dict(), compact=True),
)
if post_evaluation_callback:
post_evaluation_callback(evaluation)

with open(
target_directory
/ (
EVALUATION_RESULT_FILE_NAME_PREFIX
+ f"{eval_spec.optimizer_info['eval_spec_id']}.json"
),
"w",
encoding="utf-8",
) as fh:
json.dump(evaluation.to_dict(), fh)
num_evaluations += 1

0 comments on commit dc56043

Please sign in to comment.