Skip to content

Commit

Permalink
Fixes based on PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavogaldinoo committed Jun 20, 2024
1 parent 13df5bb commit d7670a2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 63 deletions.
4 changes: 2 additions & 2 deletions experiment/measurer/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
SnapshotMeasureRequest = collections.namedtuple(
'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle'])

ReescheduleRequest = collections.namedtuple(
'ReescheduleRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle'])
RetryRequest = collections.namedtuple(
'RetryRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle'])
66 changes: 30 additions & 36 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from database import models
from experiment.build import build_utils
from experiment.measurer import coverage_utils
from experiment.measurer import datatypes
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest)
from experiment.measurer import measure_worker
from experiment.measurer import run_coverage
from experiment.measurer import run_crashes
Expand Down Expand Up @@ -78,13 +78,8 @@ def measure_main(experiment_config):
measurers_cpus = experiment_config['measurers_cpus']
runners_cpus = experiment_config['runners_cpus']
region_coverage = experiment_config['region_coverage']
local_experiment = experiment_utils.is_local_experiment()
if local_experiment:
measure_manager_loop(experiment, max_total_time, measurers_cpus,
runners_cpus, region_coverage)
else:
measure_loop(experiment, max_total_time, measurers_cpus, runners_cpus,
region_coverage)
measure_manager_loop(experiment, max_total_time, measurers_cpus,
runners_cpus, region_coverage)

# Clean up resources.
gc.collect()
Expand Down Expand Up @@ -251,13 +246,12 @@ def _query_unmeasured_trials(experiment: str):


def _get_unmeasured_first_snapshots(
experiment: str) -> List[datatypes.SnapshotMeasureRequest]:
experiment: str) -> List[SnapshotMeasureRequest]:
"""Returns a list of unmeasured SnapshotMeasureRequests that are the first
snapshot for their trial. The trials are trials in |experiment|."""
trials_without_snapshots = _query_unmeasured_trials(experiment)
return [
datatypes.SnapshotMeasureRequest(trial.fuzzer, trial.benchmark,
trial.id, 0)
SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, trial.id, 0)
for trial in trials_without_snapshots
]

Expand Down Expand Up @@ -285,8 +279,7 @@ def _query_measured_latest_snapshots(experiment: str):


def _get_unmeasured_next_snapshots(
experiment: str,
max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]:
experiment: str, max_cycle: int) -> List[SnapshotMeasureRequest]:
"""Returns a list of the latest unmeasured SnapshotMeasureRequests of
trials in |experiment| that have been measured at least once in
|experiment|. |max_total_time| is used to determine if a trial has another
Expand All @@ -302,15 +295,16 @@ def _get_unmeasured_next_snapshots(
if next_cycle > max_cycle:
continue

snapshot_with_cycle = datatypes.SnapshotMeasureRequest(
snapshot.fuzzer, snapshot.benchmark, snapshot.trial_id, next_cycle)
snapshot_with_cycle = SnapshotMeasureRequest(snapshot.fuzzer,
snapshot.benchmark,
snapshot.trial_id,
next_cycle)
next_snapshots.append(snapshot_with_cycle)
return next_snapshots


def get_unmeasured_snapshots(
experiment: str,
max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]:
def get_unmeasured_snapshots(experiment: str,
max_cycle: int) -> List[SnapshotMeasureRequest]:
"""Returns a list of SnapshotMeasureRequests that need to be measured
(assuming they have been saved already)."""
# Measure the first snapshot of every started trial without any measured
Expand Down Expand Up @@ -681,14 +675,15 @@ def initialize_logs():

def consume_snapshots_from_response_queue(
response_queue, queued_snapshots) -> List[models.Snapshot]:
"""Consume response_queue, allows reeschedule objects to reescheduled, and
"""Consume response_queue, allows retry objects to retried, and
return all measured snapshots in a list."""
measured_snapshots = []
while True:
try:
response_object = response_queue.get_nowait()
if isinstance(response_object, datatypes.ReescheduleRequest):
# Need to reeschedule measurement task, remove from the set
if isinstance(response_object, RetryRequest):
# Need to retry measurement task, will remove identifier from
# the set so task can be retried in next loop iteration.
snapshot_identifier = (response_object.trial_id,
response_object.cycle)
queued_snapshots.remove(snapshot_identifier)
Expand All @@ -715,31 +710,31 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue,
unmeasured_snapshots = get_unmeasured_snapshots(experiment, max_cycle)
logger.info('Retrieved %d unmeasured snapshots from measure manager',
len(unmeasured_snapshots))
# When there are no more snapshots left to be measured, should break loop
# When there are no more snapshots left to be measured, should break loop.
if not unmeasured_snapshots:
return False

# Write measurements requests to request queue
for unmeasured_snapshot in unmeasured_snapshots:
# No need to insert fuzzer and benchmark info here as it's redundant
# (Can be retrieved through trial_id)
# (Can be retrieved through trial_id).
unmeasured_snapshot_identifier = (unmeasured_snapshot.trial_id,
unmeasured_snapshot.cycle)
# Checking if snapshot already was queued so workers will not repeat
# measurement for same snapshot
if unmeasured_snapshot_identifier not in queued_snapshots:
# If corpus does not exist, don't put in measurer workers request
# queue
# queue.
request_queue.put(unmeasured_snapshot)
queued_snapshots.add(unmeasured_snapshot_identifier)

# Read results from response queue
# Read results from response queue.
measured_snapshots = consume_snapshots_from_response_queue(
response_queue, queued_snapshots)
logger.info('Retrieved %d measured snapshots from response queue',
len(measured_snapshots))

# Save measured snapshots to database
# Save measured snapshots to database.
if measured_snapshots:
db_utils.add_all(measured_snapshots)

Expand All @@ -751,24 +746,23 @@ def get_pool_args(measurers_cpus, runners_cpus):
pool_args = ()
if measurers_cpus is not None and runners_cpus is not None:
local_experiment = experiment_utils.is_local_experiment()
if local_experiment:
cores_queue = multiprocessing.Queue()
logger.info('Scheduling measurers from core %d to %d.',
runners_cpus, runners_cpus + measurers_cpus - 1)
for cpu in range(runners_cpus, runners_cpus + measurers_cpus):
cores_queue.put(cpu)
pool_args = (measurers_cpus, _process_init, (cores_queue,))
else:
pool_args = (measurers_cpus,)
if not local_experiment:
return (measurers_cpus,)
cores_queue = multiprocessing.Queue()
logger.info('Scheduling measurers from core %d to %d.', runners_cpus,
runners_cpus + measurers_cpus - 1)
for cpu in range(runners_cpus, runners_cpus + measurers_cpus):
cores_queue.put(cpu)
pool_args = (measurers_cpus, _process_init, (cores_queue,))
return pool_args


# pylint: disable=too-many-locals
def measure_manager_loop(experiment: str,
max_total_time: int,
measurers_cpus=None,
runners_cpus=None,
region_coverage=False):
# pylint: disable=too-many-locals
"""Measure manager loop. Creates request and response queues, request
measurements tasks from workers, retrieve measurement results from response
queue and writes measured snapshots in database."""
Expand Down
13 changes: 6 additions & 7 deletions experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Dict, Optional
from common import logs
from database.models import Snapshot
from experiment.measurer import datatypes
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest)
from experiment.measurer import measure_manager

MEASUREMENT_TIMEOUT = 1
Expand Down Expand Up @@ -68,20 +68,19 @@ class LocalMeasureWorker(BaseMeasureWorker):
"""Class that holds implementations of core methods for running a measure
worker locally."""

def get_task_from_request_queue(self) -> datatypes.SnapshotMeasureRequest:
def get_task_from_request_queue(self) -> SnapshotMeasureRequest:
"""Get item from request multiprocessing queue, block if necessary until
an item is available"""
request = self.request_queue.get(block=True)
return request

def put_result_in_response_queue(self,
measured_snapshot: Optional[Snapshot],
request: datatypes.SnapshotMeasureRequest):
request: SnapshotMeasureRequest):
if measured_snapshot:
logger.info('Put measured snapshot in response_queue')
self.response_queue.put(measured_snapshot)
else:
reeschedule_request = datatypes.ReescheduleRequest(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle)
self.response_queue.put(reeschedule_request)
retry_request = RetryRequest(request.fuzzer, request.benchmark,
request.trial_id, request.cycle)
self.response_queue.put(retry_request)
23 changes: 11 additions & 12 deletions experiment/measurer/test_measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from database import models
from database import utils as db_utils
from experiment.build import build_utils
from experiment.measurer import datatypes
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest)
from experiment.measurer import measure_manager
from test_libs import utils as test_utils

Expand Down Expand Up @@ -174,8 +174,8 @@ def test_measure_trial_coverage(mocked_measure_snapshot_coverage, mocked_queue,
"""Tests that measure_trial_coverage works as expected."""
min_cycle = 1
max_cycle = 10
measure_request = datatypes.SnapshotMeasureRequest(FUZZER, BENCHMARK,
TRIAL_NUM, min_cycle)
measure_request = SnapshotMeasureRequest(FUZZER, BENCHMARK, TRIAL_NUM,
min_cycle)
measure_manager.measure_trial_coverage(measure_request, max_cycle,
mocked_queue(), False)
expected_calls = [
Expand Down Expand Up @@ -423,17 +423,16 @@ def test_consume_unmapped_type_from_response_queue():
assert not snapshots


def test_consume_reeschedule_type_from_response_queue():
"""Tests the secnario where a reeschedule object is retrieved from the
def test_consume_retry_type_from_response_queue():
"""Tests the scenario where a retry object is retrieved from the
response queue. In this scenario, we want to remove the snapshot identifier
from the queued_snapshots set, as this allows the measurement task to be
reescheduled in the future"""
retried in the future"""
# Use normal queue here as multiprocessing queue gives flaky tests
response_queue = queue.Queue()
reeschedule_request_object = datatypes.ReescheduleRequest(
'fuzzer', 'benchmark', TRIAL_NUM, CYCLE)
retry_request_object = RetryRequest('fuzzer', 'benchmark', TRIAL_NUM, CYCLE)
snapshot_identifier = (TRIAL_NUM, CYCLE)
response_queue.put(reeschedule_request_object)
response_queue.put(retry_request_object)
queued_snapshots_set = set([snapshot_identifier])
snapshots = measure_manager.consume_snapshots_from_response_queue(
response_queue, queued_snapshots_set)
Expand Down Expand Up @@ -480,7 +479,7 @@ def test_measure_manager_inner_loop_writes_to_request_queue(
"""Tests that the measure manager inner loop is writing measurement tasks to
request queue"""
mocked_get_unmeasured_snapshots.return_value = [
datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
]
mocked_consume_snapshots_from_response_queue.return_value = []
request_queue = queue.Queue()
Expand All @@ -500,7 +499,7 @@ def test_measure_manager_inner_loop_dont_write_to_db(
"""Tests that the measure manager inner loop does not call add_all to write
to the database, when there are no measured snapshots to be written"""
mocked_get_unmeasured_snapshots.return_value = [
datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
]
request_queue = queue.Queue()
response_queue = queue.Queue()
Expand All @@ -520,7 +519,7 @@ def test_measure_manager_inner_loop_writes_to_db(
"""Tests that the measure manager inner loop calls add_all to write
to the database, when there are measured snapshots to be written"""
mocked_get_unmeasured_snapshots.return_value = [
datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0)
]
request_queue = queue.Queue()
response_queue = queue.Queue()
Expand Down
12 changes: 6 additions & 6 deletions experiment/measurer/test_measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pytest

from database.models import Snapshot
from experiment.measurer import datatypes
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest)
from experiment.measurer import measure_worker


Expand All @@ -37,20 +37,20 @@ def local_measure_worker():
def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
"""Tests the scenario where measure_snapshot is not None, so snapshot is put
in response_queue"""
request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0)
request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0)
snapshot = Snapshot(trial_id=1)
local_measure_worker.put_result_in_response_queue(snapshot, request)
response_queue = local_measure_worker.response_queue
assert response_queue.qsize() == 1
assert isinstance(response_queue.get(), Snapshot)


def test_put_reeschedule_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
def test_put_retry_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
"""Tests the scenario where measure_snapshot is None, so task needs to be
reescheduled"""
request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0)
retried"""
request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0)
snapshot = None
local_measure_worker.put_result_in_response_queue(snapshot, request)
response_queue = local_measure_worker.response_queue
assert response_queue.qsize() == 1
assert isinstance(response_queue.get(), datatypes.ReescheduleRequest)
assert isinstance(response_queue.get(), RetryRequest)

0 comments on commit d7670a2

Please sign in to comment.