From 0afcea20780338dff5f466e527b267d48566f9a5 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Wed, 19 Jun 2024 20:21:53 +0000 Subject: [PATCH 1/8] Adding local implementation for queue based measuring --- experiment/measurer/datatypes.py | 21 +++ experiment/measurer/measure_manager.py | 169 +++++++++++++++++--- experiment/measurer/measure_worker.py | 87 ++++++++++ experiment/measurer/test_measure_manager.py | 135 +++++++++++++++- experiment/measurer/test_measure_worker.py | 56 +++++++ 5 files changed, 441 insertions(+), 27 deletions(-) create mode 100644 experiment/measurer/datatypes.py create mode 100644 experiment/measurer/measure_worker.py create mode 100644 experiment/measurer/test_measure_worker.py diff --git a/experiment/measurer/datatypes.py b/experiment/measurer/datatypes.py new file mode 100644 index 000000000..bd564f1e5 --- /dev/null +++ b/experiment/measurer/datatypes.py @@ -0,0 +1,21 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module for common data types shared under the measurer module.""" +import collections + +SnapshotMeasureRequest = collections.namedtuple( + 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) + +ReescheduleRequest = collections.namedtuple( + 'ReescheduleRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle']) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index f10e556c3..81ef8058a 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -44,20 +44,21 @@ from database import models from experiment.build import build_utils from experiment.measurer import coverage_utils +from experiment.measurer import datatypes +from experiment.measurer import measure_worker from experiment.measurer import run_coverage from experiment.measurer import run_crashes from experiment import scheduler logger = logs.Logger() -SnapshotMeasureRequest = collections.namedtuple( - 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) - NUM_RETRIES = 3 RETRY_DELAY = 3 FAIL_WAIT_SECONDS = 30 SNAPSHOT_QUEUE_GET_TIMEOUT = 1 SNAPSHOTS_BATCH_SAVE_SIZE = 100 +NUM_WORKERS = 4 +MEASURE_MANAGER_LOOP_TIMEOUT = 10 def exists_in_experiment_filestore(path: pathlib.Path) -> bool: @@ -77,8 +78,13 @@ def measure_main(experiment_config): measurers_cpus = experiment_config['measurers_cpus'] runners_cpus = experiment_config['runners_cpus'] region_coverage = experiment_config['region_coverage'] - measure_loop(experiment, max_total_time, measurers_cpus, runners_cpus, - region_coverage) + local_experiment = experiment_utils.is_local_experiment() + if local_experiment: + measure_manager_loop(experiment, max_total_time, measurers_cpus, + runners_cpus, region_coverage) + else: + measure_loop(experiment, max_total_time, measurers_cpus, + runners_cpus, region_coverage) # Clean up resources. gc.collect() @@ -104,18 +110,7 @@ def measure_loop(experiment: str, """Continuously measure trials for |experiment|.""" logger.info('Start measure_loop.') - pool_args = () - if measurers_cpus is not None and runners_cpus is not None: - local_experiment = experiment_utils.is_local_experiment() - if local_experiment: - cores_queue = multiprocessing.Queue() - logger.info('Scheduling measurers from core %d to %d.', - runners_cpus, runners_cpus + measurers_cpus - 1) - for cpu in range(runners_cpus, runners_cpus + measurers_cpus): - cores_queue.put(cpu) - pool_args = (measurers_cpus, _process_init, (cores_queue,)) - else: - pool_args = (measurers_cpus,) + pool_args = get_pool_args(measurers_cpus, runners_cpus) with multiprocessing.Pool( *pool_args) as pool, multiprocessing.Manager() as manager: @@ -256,12 +251,13 @@ def _query_unmeasured_trials(experiment: str): def _get_unmeasured_first_snapshots( - experiment: str) -> List[SnapshotMeasureRequest]: + experiment: str) -> List[datatypes.SnapshotMeasureRequest]: """Returns a list of unmeasured SnapshotMeasureRequests that are the first snapshot for their trial. The trials are trials in |experiment|.""" trials_without_snapshots = _query_unmeasured_trials(experiment) return [ - SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, trial.id, 0) + datatypes.SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, + trial.id, 0) for trial in trials_without_snapshots ] @@ -288,8 +284,8 @@ def _query_measured_latest_snapshots(experiment: str): return (SnapshotWithTime(*snapshot) for snapshot in snapshots_query) -def _get_unmeasured_next_snapshots( - experiment: str, max_cycle: int) -> List[SnapshotMeasureRequest]: +def _get_unmeasured_next_snapshots(experiment: str, max_cycle: int + ) -> List[datatypes.SnapshotMeasureRequest]: """Returns a list of the latest unmeasured SnapshotMeasureRequests of trials in |experiment| that have been measured at least once in |experiment|. |max_total_time| is used to determine if a trial has another @@ -305,7 +301,7 @@ def _get_unmeasured_next_snapshots( if next_cycle > max_cycle: continue - snapshot_with_cycle = SnapshotMeasureRequest(snapshot.fuzzer, + snapshot_with_cycle = datatypes.SnapshotMeasureRequest(snapshot.fuzzer, snapshot.benchmark, snapshot.trial_id, next_cycle) @@ -313,8 +309,8 @@ def _get_unmeasured_next_snapshots( return next_snapshots -def get_unmeasured_snapshots(experiment: str, - max_cycle: int) -> List[SnapshotMeasureRequest]: +def get_unmeasured_snapshots(experiment: str, max_cycle: int + ) -> List[datatypes.SnapshotMeasureRequest]: """Returns a list of SnapshotMeasureRequests that need to be measured (assuming they have been saved already).""" # Measure the first snapshot of every started trial without any measured @@ -682,6 +678,131 @@ def initialize_logs(): 'subcomponent': 'measurer', }) +def consume_snapshots_from_response_queue(response_queue, queued_snapshots + ) -> List[models.Snapshot]: + """Consume response_queue, allows reeschedule objects to reescheduled, and + return all measured snapshots in a list.""" + measured_snapshots = [] + while True: + try: + response_object = response_queue.get_nowait() + match type(response_object): + case datatypes.ReescheduleRequest: + # Need to reeschedule measurement task, remove from the set + snapshot_identifier = (response_object.trial_id, + response_object.cycle) + queued_snapshots.remove(snapshot_identifier) + logger.info( + 'Reescheduling task for trial %s and cycle %s', + response_object.trial_id, response_object.cycle) + case models.Snapshot: + measured_snapshots.append( response_object ) + case _: + logger.error('Type of response object not mapped! %s', + type(response_object)) + except queue.Empty: + break + return measured_snapshots + +def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, + response_queue, queued_snapshots): + """Reads from database to determine which snapshots needs measuring. Write + measurements tasks to request queue, get results from response queue, and + write measured snapshots to database. Returns False if there's no more + snapshots left to be measured""" + initialize_logs() + # Read database to determine which snapshots needs measuring. + unmeasured_snapshots = get_unmeasured_snapshots(experiment, max_cycle) + logger.info('Retrieved %d unmeasured snapshots from measure manager', + {len(unmeasured_snapshots)}) + # When there are no more snapshots left to be measured, should break loop + if not unmeasured_snapshots: + return False + + # Write measurements requests to request queue + for unmeasured_snapshot in unmeasured_snapshots: + # No need to insert fuzzer and benchmark info here as it's redundant + # (Can be retrieved through trial_id) + unmeasured_snapshot_identifier = ( unmeasured_snapshot.trial_id, + unmeasured_snapshot.cycle ) + # Checking if snapshot already was queued so workers will not repeat + # measurement for same snapshot + if unmeasured_snapshot_identifier not in queued_snapshots: + # If corpus does not exist, don't put in measurer workers request + # queue + request_queue.put(unmeasured_snapshot) + queued_snapshots.add(unmeasured_snapshot_identifier) + + # Read results from response queue + measured_snapshots = consume_snapshots_from_response_queue(response_queue, + queued_snapshots) + logger.info('Retrieved %d measured snapshots from response queue', + {len(measured_snapshots)}) + + # Save measured snapshots to database + if measured_snapshots: + db_utils.add_all(measured_snapshots) + + return True + +def get_pool_args(measurers_cpus, runners_cpus): + """Return pool args based on measurer cpus and runner cpus arguments.""" + pool_args = () + if measurers_cpus is not None and runners_cpus is not None: + local_experiment = experiment_utils.is_local_experiment() + if local_experiment: + cores_queue = multiprocessing.Queue() + logger.info('Scheduling measurers from core %d to %d.', + runners_cpus, runners_cpus + measurers_cpus - 1) + for cpu in range(runners_cpus, runners_cpus + measurers_cpus): + cores_queue.put(cpu) + pool_args = (measurers_cpus, _process_init, (cores_queue,)) + else: + pool_args = (measurers_cpus,) + return pool_args + +# pylint: disable=too-many-locals +def measure_manager_loop( + experiment: str, max_total_time: int,measurers_cpus=None, + runners_cpus=None, region_coverage=False): + """Measure manager loop. Creates request and response queues, request + measurements tasks from workers, retrieve measurement results from response + queue and writes measured snapshots in database.""" + logger.info('Starting measure manager loop.') + pool_args = get_pool_args(measurers_cpus, runners_cpus) + with multiprocessing.Pool( + *pool_args) as pool, multiprocessing.Manager() as manager: + logger.info('Setting up coverage binaries') + set_up_coverage_binaries(pool, experiment) + request_queue = manager.Queue() + response_queue = manager.Queue() + + # Since each worker is gonna be in forever loop, we dont need result + # return. Workers life scope will end automatically when there are no + # more snapshots left to measure. + logger.info('Starting measure worker loop for %d workers', + NUM_WORKERS) + config = { + 'request_queue': request_queue, + 'response_queue': response_queue, + 'region_coverage': region_coverage, + } + local_measure_worker = measure_worker.LocalMeasureWorker(config) + measure_trial_coverage_args = [()] * NUM_WORKERS + _result = pool.starmap_async(local_measure_worker.measure_worker_loop, + measure_trial_coverage_args) + + max_cycle = _time_to_cycle(max_total_time) + queued_snapshots = set() + while not scheduler.all_trials_ended(experiment): + continue_inner_loop = measure_manager_inner_loop( + experiment,max_cycle, request_queue, response_queue, + queued_snapshots) + if not continue_inner_loop: + break + time.sleep(MEASURE_MANAGER_LOOP_TIMEOUT) + logger.info('All trials ended. Ending measure manager loop') + def main(): """Measure the experiment.""" diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py new file mode 100644 index 000000000..214ff73a2 --- /dev/null +++ b/experiment/measurer/measure_worker.py @@ -0,0 +1,87 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module for measurer workers logic.""" +import time +from typing import Dict +from common import logs +from database.models import Snapshot +from experiment.measurer import datatypes +from experiment.measurer import measure_manager + +MEASUREMENT_TIMEOUT = 1 +logger = logs.Logger() # pylint: disable=invalid-name + + +class BaseMeasureWorker: + """Base class for measure worker. Encapsulates core methods that will be + implemented for Local and Google Cloud measure workers.""" + + def __init__(self, config: Dict): + self.request_queue = config['request_queue'] + self.response_queue = config['response_queue'] + self.region_coverage = config['region_coverage'] + logs.initialize(default_extras={ + 'component': 'measurer', + 'subcomponent': 'worker', + }) + logger.info('Starting one measure worker loop') + + def get_task_from_request_queue(self): + """"Get task from request queue""" + raise NotImplementedError + + def put_result_in_response_queue(self, measured_snapshot, request): + """Save measurement result in response queue, for the measure manager to + retrieve""" + raise NotImplementedError + + def measure_worker_loop(self): + """Periodically retrieves request from request queue, measure it, and + put result in response queue""" + while True: + # 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', + # 'cycle'] + request = self.get_task_from_request_queue() + logger.info( + 'Measurer worker: Got request %s %s %d %d from request queue', + request.fuzzer, request.benchmark, + request.trial_id, request.cycle + ) + measured_snapshot = measure_manager.measure_snapshot_coverage( + request.fuzzer, request.benchmark, request.trial_id, + request.cycle, self.region_coverage) + self.put_result_in_response_queue(measured_snapshot, request) + time.sleep(MEASUREMENT_TIMEOUT) + + +class LocalMeasureWorker(BaseMeasureWorker): + """Class that holds implementations of core methods for running a measure + worker locally.""" + def get_task_from_request_queue(self) -> datatypes.SnapshotMeasureRequest: + """Get item from request multiprocessing queue, block if necessary until + an item is available""" + request = self.request_queue.get(block=True) + return request + + def put_result_in_response_queue(self, measured_snapshot: Snapshot, + request: datatypes.SnapshotMeasureRequest): + if measured_snapshot: + logger.info('Put measured snapshot in response_queue') + self.response_queue.put(measured_snapshot) + else: + reeschedule_request = datatypes.ReescheduleRequest(request.fuzzer, + request.benchmark, + request.trial_id, + request.cycle) + self.response_queue.put(reeschedule_request) diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index 69e6400a6..f00f4ed37 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """Tests for measure_manager.py.""" - import os import shutil from unittest import mock @@ -25,6 +24,7 @@ from database import models from database import utils as db_utils from experiment.build import build_utils +from experiment.measurer import datatypes from experiment.measurer import measure_manager from test_libs import utils as test_utils @@ -174,8 +174,8 @@ def test_measure_trial_coverage(mocked_measure_snapshot_coverage, mocked_queue, """Tests that measure_trial_coverage works as expected.""" min_cycle = 1 max_cycle = 10 - measure_request = measure_manager.SnapshotMeasureRequest( - FUZZER, BENCHMARK, TRIAL_NUM, min_cycle) + measure_request = datatypes.SnapshotMeasureRequest(FUZZER, BENCHMARK, + TRIAL_NUM, min_cycle) measure_manager.measure_trial_coverage(measure_request, max_cycle, mocked_queue(), False) expected_calls = [ @@ -409,3 +409,132 @@ def test_path_exists_in_experiment_filestore(mocked_execute, environ): mocked_execute.assert_called_with( ['gsutil', 'ls', 'gs://cloud-bucket/example-experiment'], expect_zero=False) + + +def test_consume_unmapped_type_from_response_queue(): + """Tests the scenario where an unmapped type is retrieved from the response + queue. This scenario is not expected to happen, so in this case no snapshots + are returned""" + # Use normal queue here as multiprocessing queue gives flaky tests + response_queue = queue.Queue() + response_queue.put('unexpected string') + snapshots = measure_manager.consume_snapshots_from_response_queue( + response_queue, set()) + assert not snapshots + + +def test_consume_reeschedule_type_from_response_queue(): + """Tests the secnario where a reeschedule object is retrieved from the + response queue. In this scenario, we want to remove the snapshot identifier + from the queued_snapshots set, as this allows the measurement task to be + reescheduled in the future""" + # Use normal queue here as multiprocessing queue gives flaky tests + response_queue = queue.Queue() + TRIAL_ID = 1 + CYCLE = 0 + reeschedule_request_object = datatypes.ReescheduleRequest( + 'fuzzer','benchmark',TRIAL_ID, CYCLE) + snapshot_identifier = (TRIAL_ID, CYCLE) + response_queue.put(reeschedule_request_object) + queued_snapshots_set = set([snapshot_identifier]) + snapshots = measure_manager.consume_snapshots_from_response_queue( + response_queue, queued_snapshots_set) + assert not snapshots + assert len(queued_snapshots_set) == 0 + + +def test_consume_snapshot_type_from_response_queue(): + """Tests the scenario where a measured snapshot is retrieved from the + response queue. In this scenario, we want to return the snapshot in the + function.""" + # Use normal queue here as multiprocessing queue gives flaky tests + response_queue = queue.Queue() + TRIAL_ID = 1 + CYCLE = 0 + snapshot_identifier = (TRIAL_ID, CYCLE) + queued_snapshots_set = set([snapshot_identifier]) + measured_snapshot = models.Snapshot(trial_id=TRIAL_ID) + response_queue.put(measured_snapshot) + assert response_queue.qsize() == 1 + snapshots = measure_manager.consume_snapshots_from_response_queue( + response_queue, queued_snapshots_set) + assert len(snapshots) == 1 + + +@mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') +def test_measure_manager_inner_loop_break_condition( + mocked_get_unmeasured_snapshots): + """Tests that the measure manager inner loop returns False when there's no + more snapshots left to be measured""" + # Empty list means no more snapshots left to be measured + mocked_get_unmeasured_snapshots.return_value = [] + request_queue = queue.Queue() + response_queue = queue.Queue() + continue_inner_loop = measure_manager.measure_manager_inner_loop( + 'experiment', 1, request_queue, response_queue, set()) + assert not continue_inner_loop + + +@mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') +@mock.patch( + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' +) +def test_measure_manager_inner_loop_writes_to_request_queue( + mocked_consume_snapshots_from_response_queue, + mocked_get_unmeasured_snapshots): + """Tests that the measure manager inner loop is writing measurement tasks to + request queue""" + mocked_get_unmeasured_snapshots.return_value = [ + datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + ] + mocked_consume_snapshots_from_response_queue.return_value = [] + request_queue = queue.Queue() + response_queue = queue.Queue() + measure_manager.measure_manager_inner_loop('experiment', 1, request_queue, + response_queue, set()) + assert request_queue.qsize() == 1 + + +@mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') +@mock.patch( + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' +) +@mock.patch('database.utils.add_all') +def test_measure_manager_inner_loop_dont_write_to_db( + mocked_add_all, mocked_consume_snapshots_from_response_queue, + mocked_get_unmeasured_snapshots): + """Tests that the measure manager inner loop does not call add_all to write + to the database, when there are no measured snapshots to be written""" + mocked_get_unmeasured_snapshots.return_value = [ + datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + ] + request_queue = queue.Queue() + response_queue = queue.Queue() + mocked_consume_snapshots_from_response_queue.return_value = [] + measure_manager.measure_manager_inner_loop('experiment', 1, request_queue, + response_queue, set()) + mocked_add_all.not_called() + + +@mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') +@mock.patch( + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' +) +@mock.patch('database.utils.add_all') +def test_measure_manager_inner_loop_writes_to_db( + mocked_add_all, mocked_consume_snapshots_from_response_queue, + mocked_get_unmeasured_snapshots): + """Tests that the measure manager inner loop calls add_all to write + to the database, when there are measured snapshots to be written""" + mocked_get_unmeasured_snapshots.return_value = [ + datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + ] + request_queue = queue.Queue() + response_queue = queue.Queue() + snapshot_model = models.Snapshot(trial_id=1) + mocked_consume_snapshots_from_response_queue.return_value = [ + snapshot_model + ] + measure_manager.measure_manager_inner_loop('experiment', 1, request_queue, + response_queue, set()) + mocked_add_all.assert_called_with([snapshot_model]) diff --git a/experiment/measurer/test_measure_worker.py b/experiment/measurer/test_measure_worker.py new file mode 100644 index 000000000..63a24a77c --- /dev/null +++ b/experiment/measurer/test_measure_worker.py @@ -0,0 +1,56 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for measure_worker.py.""" +import multiprocessing +import pytest + +from database.models import Snapshot +from experiment.measurer import datatypes +from experiment.measurer import measure_worker + + +@pytest.fixture +def local_measure_worker(): + """Fixture for instantiating a local measure worker object""" + request_queue = multiprocessing.Queue() + response_queue = multiprocessing.Queue() + region_coverage = False + config = { + 'request_queue': request_queue, + 'response_queue': response_queue, + 'region_coverage': region_coverage + } + return measure_worker.LocalMeasureWorker(config) + + +def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name + """Tests the scenario where measure_snapshot is not None, so snapshot is put + in response_queue""" + request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + snapshot = Snapshot(trial_id=1) + local_measure_worker.put_result_in_response_queue(snapshot, request) + response_queue = local_measure_worker.response_queue + assert response_queue.qsize() == 1 + assert isinstance(response_queue.get(), Snapshot) + + +def test_put_reeschedule_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name + """Tests the scenario where measure_snapshot is None, so task needs to be + reescheduled""" + request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + snapshot = None + local_measure_worker.put_result_in_response_queue(snapshot, request) + response_queue = local_measure_worker.response_queue + assert response_queue.qsize() == 1 + assert isinstance(response_queue.get(), datatypes.ReescheduleRequest) From 748b47d30739bebbbf619cc40925f486e64b52e5 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Thu, 20 Jun 2024 14:06:59 +0000 Subject: [PATCH 2/8] Fixing tests and format --- experiment/measurer/measure_manager.py | 83 ++++---- experiment/measurer/measure_worker.py | 18 +- experiment/measurer/test_measure_manager.py | 25 +-- experiment/measurer/test_measure_worker.py | 4 +- service/test_automatic_run_experiment.py | 220 ++++++++++++++++++++ 5 files changed, 282 insertions(+), 68 deletions(-) create mode 100644 service/test_automatic_run_experiment.py diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index 81ef8058a..e8f3b1592 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -81,10 +81,10 @@ def measure_main(experiment_config): local_experiment = experiment_utils.is_local_experiment() if local_experiment: measure_manager_loop(experiment, max_total_time, measurers_cpus, - runners_cpus, region_coverage) + runners_cpus, region_coverage) else: - measure_loop(experiment, max_total_time, measurers_cpus, - runners_cpus, region_coverage) + measure_loop(experiment, max_total_time, measurers_cpus, runners_cpus, + region_coverage) # Clean up resources. gc.collect() @@ -284,8 +284,9 @@ def _query_measured_latest_snapshots(experiment: str): return (SnapshotWithTime(*snapshot) for snapshot in snapshots_query) -def _get_unmeasured_next_snapshots(experiment: str, max_cycle: int - ) -> List[datatypes.SnapshotMeasureRequest]: +def _get_unmeasured_next_snapshots( + experiment: str, + max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]: """Returns a list of the latest unmeasured SnapshotMeasureRequests of trials in |experiment| that have been measured at least once in |experiment|. |max_total_time| is used to determine if a trial has another @@ -301,16 +302,15 @@ def _get_unmeasured_next_snapshots(experiment: str, max_cycle: int if next_cycle > max_cycle: continue - snapshot_with_cycle = datatypes.SnapshotMeasureRequest(snapshot.fuzzer, - snapshot.benchmark, - snapshot.trial_id, - next_cycle) + snapshot_with_cycle = datatypes.SnapshotMeasureRequest( + snapshot.fuzzer, snapshot.benchmark, snapshot.trial_id, next_cycle) next_snapshots.append(snapshot_with_cycle) return next_snapshots -def get_unmeasured_snapshots(experiment: str, max_cycle: int - ) -> List[datatypes.SnapshotMeasureRequest]: +def get_unmeasured_snapshots( + experiment: str, + max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]: """Returns a list of SnapshotMeasureRequests that need to be measured (assuming they have been saved already).""" # Measure the first snapshot of every started trial without any measured @@ -678,32 +678,32 @@ def initialize_logs(): 'subcomponent': 'measurer', }) -def consume_snapshots_from_response_queue(response_queue, queued_snapshots - ) -> List[models.Snapshot]: + +def consume_snapshots_from_response_queue( + response_queue, queued_snapshots) -> List[models.Snapshot]: """Consume response_queue, allows reeschedule objects to reescheduled, and return all measured snapshots in a list.""" measured_snapshots = [] while True: try: response_object = response_queue.get_nowait() - match type(response_object): - case datatypes.ReescheduleRequest: - # Need to reeschedule measurement task, remove from the set - snapshot_identifier = (response_object.trial_id, - response_object.cycle) - queued_snapshots.remove(snapshot_identifier) - logger.info( - 'Reescheduling task for trial %s and cycle %s', - response_object.trial_id, response_object.cycle) - case models.Snapshot: - measured_snapshots.append( response_object ) - case _: - logger.error('Type of response object not mapped! %s', - type(response_object)) + if isinstance(response_object, datatypes.ReescheduleRequest): + # Need to reeschedule measurement task, remove from the set + snapshot_identifier = (response_object.trial_id, + response_object.cycle) + queued_snapshots.remove(snapshot_identifier) + logger.info('Reescheduling task for trial %s and cycle %s', + response_object.trial_id, response_object.cycle) + elif isinstance(response_object, models.Snapshot): + measured_snapshots.append(response_object) + else: + logger.error('Type of response object not mapped! %s', + type(response_object)) except queue.Empty: break return measured_snapshots + def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, response_queue, queued_snapshots): """Reads from database to determine which snapshots needs measuring. Write @@ -714,7 +714,7 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, # Read database to determine which snapshots needs measuring. unmeasured_snapshots = get_unmeasured_snapshots(experiment, max_cycle) logger.info('Retrieved %d unmeasured snapshots from measure manager', - {len(unmeasured_snapshots)}) + len(unmeasured_snapshots)) # When there are no more snapshots left to be measured, should break loop if not unmeasured_snapshots: return False @@ -723,8 +723,8 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, for unmeasured_snapshot in unmeasured_snapshots: # No need to insert fuzzer and benchmark info here as it's redundant # (Can be retrieved through trial_id) - unmeasured_snapshot_identifier = ( unmeasured_snapshot.trial_id, - unmeasured_snapshot.cycle ) + unmeasured_snapshot_identifier = (unmeasured_snapshot.trial_id, + unmeasured_snapshot.cycle) # Checking if snapshot already was queued so workers will not repeat # measurement for same snapshot if unmeasured_snapshot_identifier not in queued_snapshots: @@ -734,10 +734,10 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, queued_snapshots.add(unmeasured_snapshot_identifier) # Read results from response queue - measured_snapshots = consume_snapshots_from_response_queue(response_queue, - queued_snapshots) + measured_snapshots = consume_snapshots_from_response_queue( + response_queue, queued_snapshots) logger.info('Retrieved %d measured snapshots from response queue', - {len(measured_snapshots)}) + len(measured_snapshots)) # Save measured snapshots to database if measured_snapshots: @@ -745,6 +745,7 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, return True + def get_pool_args(measurers_cpus, runners_cpus): """Return pool args based on measurer cpus and runner cpus arguments.""" pool_args = () @@ -761,17 +762,20 @@ def get_pool_args(measurers_cpus, runners_cpus): pool_args = (measurers_cpus,) return pool_args + # pylint: disable=too-many-locals -def measure_manager_loop( - experiment: str, max_total_time: int,measurers_cpus=None, - runners_cpus=None, region_coverage=False): +def measure_manager_loop(experiment: str, + max_total_time: int, + measurers_cpus=None, + runners_cpus=None, + region_coverage=False): """Measure manager loop. Creates request and response queues, request measurements tasks from workers, retrieve measurement results from response queue and writes measured snapshots in database.""" logger.info('Starting measure manager loop.') pool_args = get_pool_args(measurers_cpus, runners_cpus) with multiprocessing.Pool( - *pool_args) as pool, multiprocessing.Manager() as manager: + *pool_args) as pool, multiprocessing.Manager() as manager: logger.info('Setting up coverage binaries') set_up_coverage_binaries(pool, experiment) request_queue = manager.Queue() @@ -780,8 +784,7 @@ def measure_manager_loop( # Since each worker is gonna be in forever loop, we dont need result # return. Workers life scope will end automatically when there are no # more snapshots left to measure. - logger.info('Starting measure worker loop for %d workers', - NUM_WORKERS) + logger.info('Starting measure worker loop for %d workers', NUM_WORKERS) config = { 'request_queue': request_queue, 'response_queue': response_queue, @@ -796,7 +799,7 @@ def measure_manager_loop( queued_snapshots = set() while not scheduler.all_trials_ended(experiment): continue_inner_loop = measure_manager_inner_loop( - experiment,max_cycle, request_queue, response_queue, + experiment, max_cycle, request_queue, response_queue, queued_snapshots) if not continue_inner_loop: break diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 214ff73a2..937000c1e 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -13,7 +13,7 @@ # limitations under the License. """Module for measurer workers logic.""" import time -from typing import Dict +from typing import Dict, Optional from common import logs from database.models import Snapshot from experiment.measurer import datatypes @@ -55,9 +55,8 @@ def measure_worker_loop(self): request = self.get_task_from_request_queue() logger.info( 'Measurer worker: Got request %s %s %d %d from request queue', - request.fuzzer, request.benchmark, - request.trial_id, request.cycle - ) + request.fuzzer, request.benchmark, request.trial_id, + request.cycle) measured_snapshot = measure_manager.measure_snapshot_coverage( request.fuzzer, request.benchmark, request.trial_id, request.cycle, self.region_coverage) @@ -68,20 +67,21 @@ def measure_worker_loop(self): class LocalMeasureWorker(BaseMeasureWorker): """Class that holds implementations of core methods for running a measure worker locally.""" + def get_task_from_request_queue(self) -> datatypes.SnapshotMeasureRequest: """Get item from request multiprocessing queue, block if necessary until an item is available""" request = self.request_queue.get(block=True) return request - def put_result_in_response_queue(self, measured_snapshot: Snapshot, + def put_result_in_response_queue(self, + measured_snapshot: Optional[Snapshot], request: datatypes.SnapshotMeasureRequest): if measured_snapshot: logger.info('Put measured snapshot in response_queue') self.response_queue.put(measured_snapshot) else: - reeschedule_request = datatypes.ReescheduleRequest(request.fuzzer, - request.benchmark, - request.trial_id, - request.cycle) + reeschedule_request = datatypes.ReescheduleRequest( + request.fuzzer, request.benchmark, request.trial_id, + request.cycle) self.response_queue.put(reeschedule_request) diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index f00f4ed37..aedaeb649 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -430,11 +430,9 @@ def test_consume_reeschedule_type_from_response_queue(): reescheduled in the future""" # Use normal queue here as multiprocessing queue gives flaky tests response_queue = queue.Queue() - TRIAL_ID = 1 - CYCLE = 0 reeschedule_request_object = datatypes.ReescheduleRequest( - 'fuzzer','benchmark',TRIAL_ID, CYCLE) - snapshot_identifier = (TRIAL_ID, CYCLE) + 'fuzzer', 'benchmark', TRIAL_NUM, CYCLE) + snapshot_identifier = (TRIAL_NUM, CYCLE) response_queue.put(reeschedule_request_object) queued_snapshots_set = set([snapshot_identifier]) snapshots = measure_manager.consume_snapshots_from_response_queue( @@ -449,11 +447,9 @@ def test_consume_snapshot_type_from_response_queue(): function.""" # Use normal queue here as multiprocessing queue gives flaky tests response_queue = queue.Queue() - TRIAL_ID = 1 - CYCLE = 0 - snapshot_identifier = (TRIAL_ID, CYCLE) + snapshot_identifier = (TRIAL_NUM, CYCLE) queued_snapshots_set = set([snapshot_identifier]) - measured_snapshot = models.Snapshot(trial_id=TRIAL_ID) + measured_snapshot = models.Snapshot(trial_id=TRIAL_NUM) response_queue.put(measured_snapshot) assert response_queue.qsize() == 1 snapshots = measure_manager.consume_snapshots_from_response_queue( @@ -477,8 +473,7 @@ def test_measure_manager_inner_loop_break_condition( @mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') @mock.patch( - 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' -) + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue') def test_measure_manager_inner_loop_writes_to_request_queue( mocked_consume_snapshots_from_response_queue, mocked_get_unmeasured_snapshots): @@ -497,8 +492,7 @@ def test_measure_manager_inner_loop_writes_to_request_queue( @mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') @mock.patch( - 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' -) + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue') @mock.patch('database.utils.add_all') def test_measure_manager_inner_loop_dont_write_to_db( mocked_add_all, mocked_consume_snapshots_from_response_queue, @@ -518,8 +512,7 @@ def test_measure_manager_inner_loop_dont_write_to_db( @mock.patch('experiment.measurer.measure_manager.get_unmeasured_snapshots') @mock.patch( - 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue' -) + 'experiment.measurer.measure_manager.consume_snapshots_from_response_queue') @mock.patch('database.utils.add_all') def test_measure_manager_inner_loop_writes_to_db( mocked_add_all, mocked_consume_snapshots_from_response_queue, @@ -532,9 +525,7 @@ def test_measure_manager_inner_loop_writes_to_db( request_queue = queue.Queue() response_queue = queue.Queue() snapshot_model = models.Snapshot(trial_id=1) - mocked_consume_snapshots_from_response_queue.return_value = [ - snapshot_model - ] + mocked_consume_snapshots_from_response_queue.return_value = [snapshot_model] measure_manager.measure_manager_inner_loop('experiment', 1, request_queue, response_queue, set()) mocked_add_all.assert_called_with([snapshot_model]) diff --git a/experiment/measurer/test_measure_worker.py b/experiment/measurer/test_measure_worker.py index 63a24a77c..fa61d3c75 100644 --- a/experiment/measurer/test_measure_worker.py +++ b/experiment/measurer/test_measure_worker.py @@ -34,7 +34,7 @@ def local_measure_worker(): return measure_worker.LocalMeasureWorker(config) -def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name +def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is not None, so snapshot is put in response_queue""" request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) @@ -45,7 +45,7 @@ def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable assert isinstance(response_queue.get(), Snapshot) -def test_put_reeschedule_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name +def test_put_reeschedule_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is None, so task needs to be reescheduled""" request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) diff --git a/service/test_automatic_run_experiment.py b/service/test_automatic_run_experiment.py new file mode 100644 index 000000000..fc9accc92 --- /dev/null +++ b/service/test_automatic_run_experiment.py @@ -0,0 +1,220 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for automatic_run_experiment.py""" +import os +import datetime +from unittest import mock + +import pytest + +from common import utils +from service import automatic_run_experiment + +# pylint: disable=invalid-name,unused-argument + +# A valid experiment name. +EXPERIMENT = '2020-01-01' + +EXPERIMENT_REQUESTS = [{ + 'experiment': datetime.date(2020, 6, 8), + 'fuzzers': ['aflplusplus', 'libfuzzer'], +}, { + 'experiment': datetime.date(2020, 6, 5), + 'fuzzers': ['honggfuzz', 'afl'], + 'description': 'Test experiment', + 'oss-fuzz-corpus': True, +}] + + +@mock.patch('experiment.run_experiment.start_experiment') +@mock.patch('common.logs.warning') +@mock.patch('service.automatic_run_experiment._get_requested_experiments') +def test_run_requested_experiment_pause_service( + mocked_get_requested_experiments, mocked_warning, + mocked_start_experiment, db): + """Tests that run_requested_experiment doesn't run an experiment when a + pause is requested.""" + experiment_requests_with_pause = EXPERIMENT_REQUESTS.copy() + experiment_requests_with_pause.append( + automatic_run_experiment.PAUSE_SERVICE_KEYWORD) + mocked_get_requested_experiments.return_value = ( + experiment_requests_with_pause) + + assert (automatic_run_experiment.run_requested_experiment(dry_run=False) is + None) + mocked_warning.assert_called_with( + 'Pause service requested, not running experiment.') + assert mocked_start_experiment.call_count == 0 + + +@mock.patch('experiment.run_experiment.start_experiment') +@mock.patch('service.automatic_run_experiment._get_requested_experiments') +def test_run_requested_experiment(mocked_get_requested_experiments, + mocked_start_experiment, db): + """Tests that run_requested_experiment starts and stops the experiment + properly.""" + mocked_get_requested_experiments.return_value = EXPERIMENT_REQUESTS + expected_experiment_name = '2020-06-05' + expected_fuzzers = ['honggfuzz', 'afl'] + automatic_run_experiment.run_requested_experiment(dry_run=False) + expected_config_file = os.path.join(utils.ROOT_DIR, 'service', + 'experiment-config.yaml') + + expected_benchmarks = sorted([ + 'bloaty_fuzz_target', + 'curl_curl_fuzzer_http', + 'jsoncpp_jsoncpp_fuzzer', + 'libpcap_fuzz_both', + 'libxslt_xpath', + 'mbedtls_fuzz_dtlsclient', + 'openssl_x509', + 'sqlite3_ossfuzz', + 'systemd_fuzz-link-parser', + 'zlib_zlib_uncompress_fuzzer', + 'freetype2_ftfuzzer', + 'harfbuzz_hb-shape-fuzzer', + 'lcms_cms_transform_fuzzer', + 'libjpeg-turbo_libjpeg_turbo_fuzzer', + 'libpng_libpng_read_fuzzer', + 'libxml2_xml', + 'openh264_decoder_fuzzer', + 'openthread_ot-ip6-send-fuzzer', + 'proj4_proj_crs_to_crs_fuzzer', + 're2_fuzzer', + 'stb_stbi_read_fuzzer', + 'vorbis_decode_fuzzer', + 'woff2_convert_woff2ttf_fuzzer', + ]) + expected_call = mock.call( + expected_experiment_name, + expected_config_file, + expected_benchmarks, + expected_fuzzers, + description='Test experiment', + concurrent_builds=(automatic_run_experiment.CONCURRENT_BUILDS), + oss_fuzz_corpus=True) + start_experiment_call_args = mocked_start_experiment.call_args_list + assert len(start_experiment_call_args) == 1 + start_experiment_call_args = start_experiment_call_args[0] + assert start_experiment_call_args == expected_call + + +@pytest.mark.parametrize( + ('name', 'expected_result'), [('02000-1-1', False), ('2020-1-1', False), + ('2020-01-01', True), + ('2020-01-01-aflplusplus', True), + ('2020-01-01-1', True)]) +def test_validate_experiment_name(name, expected_result): + """Tests that validate experiment name returns True for valid names and + False for names that are not valid.""" + assert (automatic_run_experiment.validate_experiment_name(name) == + expected_result) + + +# Call the parameter exp_request instead of request because pytest reserves it. +@pytest.mark.parametrize( + ('exp_request', 'expected_result'), + [ + ({ + 'experiment': EXPERIMENT, + 'fuzzers': ['afl'] + }, True), + # Not a dict. + (1, False), + # No fuzzers. + ({ + 'experiment': EXPERIMENT, + 'fuzzers': [] + }, False), + # No fuzzers. + ({ + 'experiment': EXPERIMENT + }, False), + # No experiment. + ({ + 'fuzzers': ['afl'] + }, False), + # Invalid experiment name for request. + ({ + 'experiment': 'invalid', + 'fuzzers': ['afl'] + }, False), + # Invalid experiment name. + ({ + 'experiment': 'i' * 100, + 'fuzzers': ['afl'] + }, False), + # Nonexistent fuzzers. + ({ + 'experiment': EXPERIMENT, + 'fuzzers': ['nonexistent-fuzzer'] + }, False), + # Invalid fuzzers. + ( + { + 'experiment': EXPERIMENT, + 'fuzzers': ['1'] # Need to make this exist. + }, + False), + # Invalid description. + ({ + 'experiment': EXPERIMENT, + 'fuzzers': ['afl'], + 'description': 1, + }, False), + # Invalid oss_fuzz_corpus flag. + ({ + 'experiment': EXPERIMENT, + 'fuzzers': ['afl'], + 'oss_fuzz_corpus': 'invalid', + }, False), + ]) +def test_validate_experiment_requests(exp_request, expected_result): + """Tests that validate_experiment_requests returns True for valid fuzzres + and False for invalid ones.""" + assert (automatic_run_experiment.validate_experiment_requests([exp_request]) + is expected_result) + + +def test_validate_experiment_requests_duplicate_experiments(): + """Tests that validate_experiment_requests returns False if the experiment + names are duplicated.""" + requests = [ + { + 'experiment': EXPERIMENT, + 'fuzzers': ['afl'] + }, + { + 'experiment': EXPERIMENT, + 'fuzzers': ['libfuzzer'] + }, + ] + assert not automatic_run_experiment.validate_experiment_requests(requests) + + +def test_validate_experiment_requests_one_valid_one_invalid(): + """Tests that validate_experiment_requests returns False even if some + requests are valid.""" + requests = [ + { + 'experiment': EXPERIMENT, + 'fuzzers': ['afl'] + }, + { + 'experiment': '2020-02-02', + 'fuzzers': [] + }, + ] + assert not automatic_run_experiment.validate_experiment_requests(requests) + From 4570abb2b8a2679b9766fa310ad6f2fa1ec84764 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Thu, 20 Jun 2024 16:03:56 +0000 Subject: [PATCH 3/8] Initializing logs for every local worker --- experiment/measurer/measure_worker.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 937000c1e..6874455f5 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -31,11 +31,6 @@ def __init__(self, config: Dict): self.request_queue = config['request_queue'] self.response_queue = config['response_queue'] self.region_coverage = config['region_coverage'] - logs.initialize(default_extras={ - 'component': 'measurer', - 'subcomponent': 'worker', - }) - logger.info('Starting one measure worker loop') def get_task_from_request_queue(self): """"Get task from request queue""" @@ -49,6 +44,11 @@ def put_result_in_response_queue(self, measured_snapshot, request): def measure_worker_loop(self): """Periodically retrieves request from request queue, measure it, and put result in response queue""" + logs.initialize(default_extras={ + 'component': 'measurer', + 'subcomponent': 'worker', + }) + logger.info('Starting one measure worker loop') while True: # 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', # 'cycle'] From 77f1aa6b064c2ef83ee56ba966cde8b59b20ae97 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Thu, 20 Jun 2024 17:21:42 +0000 Subject: [PATCH 4/8] Fixes based on PR feedback --- experiment/measurer/datatypes.py | 4 +- experiment/measurer/measure_manager.py | 66 ++++++++++----------- experiment/measurer/measure_worker.py | 13 ++-- experiment/measurer/test_measure_manager.py | 23 ++++--- experiment/measurer/test_measure_worker.py | 12 ++-- 5 files changed, 55 insertions(+), 63 deletions(-) diff --git a/experiment/measurer/datatypes.py b/experiment/measurer/datatypes.py index bd564f1e5..7acd0036a 100644 --- a/experiment/measurer/datatypes.py +++ b/experiment/measurer/datatypes.py @@ -17,5 +17,5 @@ SnapshotMeasureRequest = collections.namedtuple( 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) -ReescheduleRequest = collections.namedtuple( - 'ReescheduleRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle']) +RetryRequest = collections.namedtuple( + 'RetryRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle']) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index e8f3b1592..710b714f8 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -44,7 +44,7 @@ from database import models from experiment.build import build_utils from experiment.measurer import coverage_utils -from experiment.measurer import datatypes +from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_worker from experiment.measurer import run_coverage from experiment.measurer import run_crashes @@ -78,13 +78,8 @@ def measure_main(experiment_config): measurers_cpus = experiment_config['measurers_cpus'] runners_cpus = experiment_config['runners_cpus'] region_coverage = experiment_config['region_coverage'] - local_experiment = experiment_utils.is_local_experiment() - if local_experiment: - measure_manager_loop(experiment, max_total_time, measurers_cpus, - runners_cpus, region_coverage) - else: - measure_loop(experiment, max_total_time, measurers_cpus, runners_cpus, - region_coverage) + measure_manager_loop(experiment, max_total_time, measurers_cpus, + runners_cpus, region_coverage) # Clean up resources. gc.collect() @@ -251,13 +246,12 @@ def _query_unmeasured_trials(experiment: str): def _get_unmeasured_first_snapshots( - experiment: str) -> List[datatypes.SnapshotMeasureRequest]: + experiment: str) -> List[SnapshotMeasureRequest]: """Returns a list of unmeasured SnapshotMeasureRequests that are the first snapshot for their trial. The trials are trials in |experiment|.""" trials_without_snapshots = _query_unmeasured_trials(experiment) return [ - datatypes.SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, - trial.id, 0) + SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, trial.id, 0) for trial in trials_without_snapshots ] @@ -285,8 +279,7 @@ def _query_measured_latest_snapshots(experiment: str): def _get_unmeasured_next_snapshots( - experiment: str, - max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]: + experiment: str, max_cycle: int) -> List[SnapshotMeasureRequest]: """Returns a list of the latest unmeasured SnapshotMeasureRequests of trials in |experiment| that have been measured at least once in |experiment|. |max_total_time| is used to determine if a trial has another @@ -302,15 +295,16 @@ def _get_unmeasured_next_snapshots( if next_cycle > max_cycle: continue - snapshot_with_cycle = datatypes.SnapshotMeasureRequest( - snapshot.fuzzer, snapshot.benchmark, snapshot.trial_id, next_cycle) + snapshot_with_cycle = SnapshotMeasureRequest(snapshot.fuzzer, + snapshot.benchmark, + snapshot.trial_id, + next_cycle) next_snapshots.append(snapshot_with_cycle) return next_snapshots -def get_unmeasured_snapshots( - experiment: str, - max_cycle: int) -> List[datatypes.SnapshotMeasureRequest]: +def get_unmeasured_snapshots(experiment: str, + max_cycle: int) -> List[SnapshotMeasureRequest]: """Returns a list of SnapshotMeasureRequests that need to be measured (assuming they have been saved already).""" # Measure the first snapshot of every started trial without any measured @@ -681,14 +675,15 @@ def initialize_logs(): def consume_snapshots_from_response_queue( response_queue, queued_snapshots) -> List[models.Snapshot]: - """Consume response_queue, allows reeschedule objects to reescheduled, and + """Consume response_queue, allows retry objects to retried, and return all measured snapshots in a list.""" measured_snapshots = [] while True: try: response_object = response_queue.get_nowait() - if isinstance(response_object, datatypes.ReescheduleRequest): - # Need to reeschedule measurement task, remove from the set + if isinstance(response_object, RetryRequest): + # Need to retry measurement task, will remove identifier from + # the set so task can be retried in next loop iteration. snapshot_identifier = (response_object.trial_id, response_object.cycle) queued_snapshots.remove(snapshot_identifier) @@ -715,31 +710,31 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, unmeasured_snapshots = get_unmeasured_snapshots(experiment, max_cycle) logger.info('Retrieved %d unmeasured snapshots from measure manager', len(unmeasured_snapshots)) - # When there are no more snapshots left to be measured, should break loop + # When there are no more snapshots left to be measured, should break loop. if not unmeasured_snapshots: return False # Write measurements requests to request queue for unmeasured_snapshot in unmeasured_snapshots: # No need to insert fuzzer and benchmark info here as it's redundant - # (Can be retrieved through trial_id) + # (Can be retrieved through trial_id). unmeasured_snapshot_identifier = (unmeasured_snapshot.trial_id, unmeasured_snapshot.cycle) # Checking if snapshot already was queued so workers will not repeat # measurement for same snapshot if unmeasured_snapshot_identifier not in queued_snapshots: # If corpus does not exist, don't put in measurer workers request - # queue + # queue. request_queue.put(unmeasured_snapshot) queued_snapshots.add(unmeasured_snapshot_identifier) - # Read results from response queue + # Read results from response queue. measured_snapshots = consume_snapshots_from_response_queue( response_queue, queued_snapshots) logger.info('Retrieved %d measured snapshots from response queue', len(measured_snapshots)) - # Save measured snapshots to database + # Save measured snapshots to database. if measured_snapshots: db_utils.add_all(measured_snapshots) @@ -751,24 +746,23 @@ def get_pool_args(measurers_cpus, runners_cpus): pool_args = () if measurers_cpus is not None and runners_cpus is not None: local_experiment = experiment_utils.is_local_experiment() - if local_experiment: - cores_queue = multiprocessing.Queue() - logger.info('Scheduling measurers from core %d to %d.', - runners_cpus, runners_cpus + measurers_cpus - 1) - for cpu in range(runners_cpus, runners_cpus + measurers_cpus): - cores_queue.put(cpu) - pool_args = (measurers_cpus, _process_init, (cores_queue,)) - else: - pool_args = (measurers_cpus,) + if not local_experiment: + return (measurers_cpus,) + cores_queue = multiprocessing.Queue() + logger.info('Scheduling measurers from core %d to %d.', runners_cpus, + runners_cpus + measurers_cpus - 1) + for cpu in range(runners_cpus, runners_cpus + measurers_cpus): + cores_queue.put(cpu) + pool_args = (measurers_cpus, _process_init, (cores_queue,)) return pool_args -# pylint: disable=too-many-locals def measure_manager_loop(experiment: str, max_total_time: int, measurers_cpus=None, runners_cpus=None, region_coverage=False): + # pylint: disable=too-many-locals """Measure manager loop. Creates request and response queues, request measurements tasks from workers, retrieve measurement results from response queue and writes measured snapshots in database.""" diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 6874455f5..43561e4e5 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -16,7 +16,7 @@ from typing import Dict, Optional from common import logs from database.models import Snapshot -from experiment.measurer import datatypes +from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_manager MEASUREMENT_TIMEOUT = 1 @@ -68,7 +68,7 @@ class LocalMeasureWorker(BaseMeasureWorker): """Class that holds implementations of core methods for running a measure worker locally.""" - def get_task_from_request_queue(self) -> datatypes.SnapshotMeasureRequest: + def get_task_from_request_queue(self) -> SnapshotMeasureRequest: """Get item from request multiprocessing queue, block if necessary until an item is available""" request = self.request_queue.get(block=True) @@ -76,12 +76,11 @@ def get_task_from_request_queue(self) -> datatypes.SnapshotMeasureRequest: def put_result_in_response_queue(self, measured_snapshot: Optional[Snapshot], - request: datatypes.SnapshotMeasureRequest): + request: SnapshotMeasureRequest): if measured_snapshot: logger.info('Put measured snapshot in response_queue') self.response_queue.put(measured_snapshot) else: - reeschedule_request = datatypes.ReescheduleRequest( - request.fuzzer, request.benchmark, request.trial_id, - request.cycle) - self.response_queue.put(reeschedule_request) + retry_request = RetryRequest(request.fuzzer, request.benchmark, + request.trial_id, request.cycle) + self.response_queue.put(retry_request) diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index aedaeb649..b08f5d15b 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -24,7 +24,7 @@ from database import models from database import utils as db_utils from experiment.build import build_utils -from experiment.measurer import datatypes +from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_manager from test_libs import utils as test_utils @@ -174,8 +174,8 @@ def test_measure_trial_coverage(mocked_measure_snapshot_coverage, mocked_queue, """Tests that measure_trial_coverage works as expected.""" min_cycle = 1 max_cycle = 10 - measure_request = datatypes.SnapshotMeasureRequest(FUZZER, BENCHMARK, - TRIAL_NUM, min_cycle) + measure_request = SnapshotMeasureRequest(FUZZER, BENCHMARK, TRIAL_NUM, + min_cycle) measure_manager.measure_trial_coverage(measure_request, max_cycle, mocked_queue(), False) expected_calls = [ @@ -423,17 +423,16 @@ def test_consume_unmapped_type_from_response_queue(): assert not snapshots -def test_consume_reeschedule_type_from_response_queue(): - """Tests the secnario where a reeschedule object is retrieved from the +def test_consume_retry_type_from_response_queue(): + """Tests the scenario where a retry object is retrieved from the response queue. In this scenario, we want to remove the snapshot identifier from the queued_snapshots set, as this allows the measurement task to be - reescheduled in the future""" + retried in the future""" # Use normal queue here as multiprocessing queue gives flaky tests response_queue = queue.Queue() - reeschedule_request_object = datatypes.ReescheduleRequest( - 'fuzzer', 'benchmark', TRIAL_NUM, CYCLE) + retry_request_object = RetryRequest('fuzzer', 'benchmark', TRIAL_NUM, CYCLE) snapshot_identifier = (TRIAL_NUM, CYCLE) - response_queue.put(reeschedule_request_object) + response_queue.put(retry_request_object) queued_snapshots_set = set([snapshot_identifier]) snapshots = measure_manager.consume_snapshots_from_response_queue( response_queue, queued_snapshots_set) @@ -480,7 +479,7 @@ def test_measure_manager_inner_loop_writes_to_request_queue( """Tests that the measure manager inner loop is writing measurement tasks to request queue""" mocked_get_unmeasured_snapshots.return_value = [ - datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] mocked_consume_snapshots_from_response_queue.return_value = [] request_queue = queue.Queue() @@ -500,7 +499,7 @@ def test_measure_manager_inner_loop_dont_write_to_db( """Tests that the measure manager inner loop does not call add_all to write to the database, when there are no measured snapshots to be written""" mocked_get_unmeasured_snapshots.return_value = [ - datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] request_queue = queue.Queue() response_queue = queue.Queue() @@ -520,7 +519,7 @@ def test_measure_manager_inner_loop_writes_to_db( """Tests that the measure manager inner loop calls add_all to write to the database, when there are measured snapshots to be written""" mocked_get_unmeasured_snapshots.return_value = [ - datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] request_queue = queue.Queue() response_queue = queue.Queue() diff --git a/experiment/measurer/test_measure_worker.py b/experiment/measurer/test_measure_worker.py index fa61d3c75..f4fc4b4c7 100644 --- a/experiment/measurer/test_measure_worker.py +++ b/experiment/measurer/test_measure_worker.py @@ -16,7 +16,7 @@ import pytest from database.models import Snapshot -from experiment.measurer import datatypes +from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_worker @@ -37,7 +37,7 @@ def local_measure_worker(): def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is not None, so snapshot is put in response_queue""" - request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) snapshot = Snapshot(trial_id=1) local_measure_worker.put_result_in_response_queue(snapshot, request) response_queue = local_measure_worker.response_queue @@ -45,12 +45,12 @@ def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disabl assert isinstance(response_queue.get(), Snapshot) -def test_put_reeschedule_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name +def test_put_retry_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is None, so task needs to be - reescheduled""" - request = datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + retried""" + request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) snapshot = None local_measure_worker.put_result_in_response_queue(snapshot, request) response_queue = local_measure_worker.response_queue assert response_queue.qsize() == 1 - assert isinstance(response_queue.get(), datatypes.ReescheduleRequest) + assert isinstance(response_queue.get(), RetryRequest) From bf47d793ca2a9c01412ab20bbfe37edd88fb5f9f Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Mon, 24 Jun 2024 18:37:24 +0000 Subject: [PATCH 5/8] Removing hardcoded value for num workers --- experiment/measurer/measure_manager.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index 710b714f8..7053aeab1 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -57,7 +57,6 @@ FAIL_WAIT_SECONDS = 30 SNAPSHOT_QUEUE_GET_TIMEOUT = 1 SNAPSHOTS_BATCH_SAVE_SIZE = 100 -NUM_WORKERS = 4 MEASURE_MANAGER_LOOP_TIMEOUT = 10 @@ -76,10 +75,9 @@ def measure_main(experiment_config): experiment = experiment_config['experiment'] max_total_time = experiment_config['max_total_time'] measurers_cpus = experiment_config['measurers_cpus'] - runners_cpus = experiment_config['runners_cpus'] region_coverage = experiment_config['region_coverage'] measure_manager_loop(experiment, max_total_time, measurers_cpus, - runners_cpus, region_coverage) + region_coverage) # Clean up resources. gc.collect() @@ -723,8 +721,6 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, # Checking if snapshot already was queued so workers will not repeat # measurement for same snapshot if unmeasured_snapshot_identifier not in queued_snapshots: - # If corpus does not exist, don't put in measurer workers request - # queue. request_queue.put(unmeasured_snapshot) queued_snapshots.add(unmeasured_snapshot_identifier) @@ -760,16 +756,17 @@ def get_pool_args(measurers_cpus, runners_cpus): def measure_manager_loop(experiment: str, max_total_time: int, measurers_cpus=None, - runners_cpus=None, region_coverage=False): # pylint: disable=too-many-locals """Measure manager loop. Creates request and response queues, request measurements tasks from workers, retrieve measurement results from response queue and writes measured snapshots in database.""" logger.info('Starting measure manager loop.') - pool_args = get_pool_args(measurers_cpus, runners_cpus) - with multiprocessing.Pool( - *pool_args) as pool, multiprocessing.Manager() as manager: + if not measurers_cpus: + logger.info('Number of measurer CPUs not passed as argument. using %d', + multiprocessing.cpu_count()) + measurers_cpus = multiprocessing.cpu_count() + with multiprocessing.Pool() as pool, multiprocessing.Manager() as manager: logger.info('Setting up coverage binaries') set_up_coverage_binaries(pool, experiment) request_queue = manager.Queue() @@ -778,14 +775,15 @@ def measure_manager_loop(experiment: str, # Since each worker is gonna be in forever loop, we dont need result # return. Workers life scope will end automatically when there are no # more snapshots left to measure. - logger.info('Starting measure worker loop for %d workers', NUM_WORKERS) + logger.info('Starting measure worker loop for %d workers', + measurers_cpus) config = { 'request_queue': request_queue, 'response_queue': response_queue, 'region_coverage': region_coverage, } local_measure_worker = measure_worker.LocalMeasureWorker(config) - measure_trial_coverage_args = [()] * NUM_WORKERS + measure_trial_coverage_args = [()] * measurers_cpus _result = pool.starmap_async(local_measure_worker.measure_worker_loop, measure_trial_coverage_args) From 4d7d0acf1a26e26c0c3958eba9567ec646da5a1e Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Wed, 26 Jun 2024 14:39:37 +0000 Subject: [PATCH 6/8] Remove outdated test file to match master branch --- service/test_automatic_run_experiment.py | 220 ----------------------- 1 file changed, 220 deletions(-) delete mode 100644 service/test_automatic_run_experiment.py diff --git a/service/test_automatic_run_experiment.py b/service/test_automatic_run_experiment.py deleted file mode 100644 index fc9accc92..000000000 --- a/service/test_automatic_run_experiment.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for automatic_run_experiment.py""" -import os -import datetime -from unittest import mock - -import pytest - -from common import utils -from service import automatic_run_experiment - -# pylint: disable=invalid-name,unused-argument - -# A valid experiment name. -EXPERIMENT = '2020-01-01' - -EXPERIMENT_REQUESTS = [{ - 'experiment': datetime.date(2020, 6, 8), - 'fuzzers': ['aflplusplus', 'libfuzzer'], -}, { - 'experiment': datetime.date(2020, 6, 5), - 'fuzzers': ['honggfuzz', 'afl'], - 'description': 'Test experiment', - 'oss-fuzz-corpus': True, -}] - - -@mock.patch('experiment.run_experiment.start_experiment') -@mock.patch('common.logs.warning') -@mock.patch('service.automatic_run_experiment._get_requested_experiments') -def test_run_requested_experiment_pause_service( - mocked_get_requested_experiments, mocked_warning, - mocked_start_experiment, db): - """Tests that run_requested_experiment doesn't run an experiment when a - pause is requested.""" - experiment_requests_with_pause = EXPERIMENT_REQUESTS.copy() - experiment_requests_with_pause.append( - automatic_run_experiment.PAUSE_SERVICE_KEYWORD) - mocked_get_requested_experiments.return_value = ( - experiment_requests_with_pause) - - assert (automatic_run_experiment.run_requested_experiment(dry_run=False) is - None) - mocked_warning.assert_called_with( - 'Pause service requested, not running experiment.') - assert mocked_start_experiment.call_count == 0 - - -@mock.patch('experiment.run_experiment.start_experiment') -@mock.patch('service.automatic_run_experiment._get_requested_experiments') -def test_run_requested_experiment(mocked_get_requested_experiments, - mocked_start_experiment, db): - """Tests that run_requested_experiment starts and stops the experiment - properly.""" - mocked_get_requested_experiments.return_value = EXPERIMENT_REQUESTS - expected_experiment_name = '2020-06-05' - expected_fuzzers = ['honggfuzz', 'afl'] - automatic_run_experiment.run_requested_experiment(dry_run=False) - expected_config_file = os.path.join(utils.ROOT_DIR, 'service', - 'experiment-config.yaml') - - expected_benchmarks = sorted([ - 'bloaty_fuzz_target', - 'curl_curl_fuzzer_http', - 'jsoncpp_jsoncpp_fuzzer', - 'libpcap_fuzz_both', - 'libxslt_xpath', - 'mbedtls_fuzz_dtlsclient', - 'openssl_x509', - 'sqlite3_ossfuzz', - 'systemd_fuzz-link-parser', - 'zlib_zlib_uncompress_fuzzer', - 'freetype2_ftfuzzer', - 'harfbuzz_hb-shape-fuzzer', - 'lcms_cms_transform_fuzzer', - 'libjpeg-turbo_libjpeg_turbo_fuzzer', - 'libpng_libpng_read_fuzzer', - 'libxml2_xml', - 'openh264_decoder_fuzzer', - 'openthread_ot-ip6-send-fuzzer', - 'proj4_proj_crs_to_crs_fuzzer', - 're2_fuzzer', - 'stb_stbi_read_fuzzer', - 'vorbis_decode_fuzzer', - 'woff2_convert_woff2ttf_fuzzer', - ]) - expected_call = mock.call( - expected_experiment_name, - expected_config_file, - expected_benchmarks, - expected_fuzzers, - description='Test experiment', - concurrent_builds=(automatic_run_experiment.CONCURRENT_BUILDS), - oss_fuzz_corpus=True) - start_experiment_call_args = mocked_start_experiment.call_args_list - assert len(start_experiment_call_args) == 1 - start_experiment_call_args = start_experiment_call_args[0] - assert start_experiment_call_args == expected_call - - -@pytest.mark.parametrize( - ('name', 'expected_result'), [('02000-1-1', False), ('2020-1-1', False), - ('2020-01-01', True), - ('2020-01-01-aflplusplus', True), - ('2020-01-01-1', True)]) -def test_validate_experiment_name(name, expected_result): - """Tests that validate experiment name returns True for valid names and - False for names that are not valid.""" - assert (automatic_run_experiment.validate_experiment_name(name) == - expected_result) - - -# Call the parameter exp_request instead of request because pytest reserves it. -@pytest.mark.parametrize( - ('exp_request', 'expected_result'), - [ - ({ - 'experiment': EXPERIMENT, - 'fuzzers': ['afl'] - }, True), - # Not a dict. - (1, False), - # No fuzzers. - ({ - 'experiment': EXPERIMENT, - 'fuzzers': [] - }, False), - # No fuzzers. - ({ - 'experiment': EXPERIMENT - }, False), - # No experiment. - ({ - 'fuzzers': ['afl'] - }, False), - # Invalid experiment name for request. - ({ - 'experiment': 'invalid', - 'fuzzers': ['afl'] - }, False), - # Invalid experiment name. - ({ - 'experiment': 'i' * 100, - 'fuzzers': ['afl'] - }, False), - # Nonexistent fuzzers. - ({ - 'experiment': EXPERIMENT, - 'fuzzers': ['nonexistent-fuzzer'] - }, False), - # Invalid fuzzers. - ( - { - 'experiment': EXPERIMENT, - 'fuzzers': ['1'] # Need to make this exist. - }, - False), - # Invalid description. - ({ - 'experiment': EXPERIMENT, - 'fuzzers': ['afl'], - 'description': 1, - }, False), - # Invalid oss_fuzz_corpus flag. - ({ - 'experiment': EXPERIMENT, - 'fuzzers': ['afl'], - 'oss_fuzz_corpus': 'invalid', - }, False), - ]) -def test_validate_experiment_requests(exp_request, expected_result): - """Tests that validate_experiment_requests returns True for valid fuzzres - and False for invalid ones.""" - assert (automatic_run_experiment.validate_experiment_requests([exp_request]) - is expected_result) - - -def test_validate_experiment_requests_duplicate_experiments(): - """Tests that validate_experiment_requests returns False if the experiment - names are duplicated.""" - requests = [ - { - 'experiment': EXPERIMENT, - 'fuzzers': ['afl'] - }, - { - 'experiment': EXPERIMENT, - 'fuzzers': ['libfuzzer'] - }, - ] - assert not automatic_run_experiment.validate_experiment_requests(requests) - - -def test_validate_experiment_requests_one_valid_one_invalid(): - """Tests that validate_experiment_requests returns False even if some - requests are valid.""" - requests = [ - { - 'experiment': EXPERIMENT, - 'fuzzers': ['afl'] - }, - { - 'experiment': '2020-02-02', - 'fuzzers': [] - }, - ] - assert not automatic_run_experiment.validate_experiment_requests(requests) - From db0e45cb63066296c8ce75f8a747442c87071919 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Wed, 26 Jun 2024 17:20:58 +0000 Subject: [PATCH 7/8] Addressing some PR feedback --- experiment/measurer/datatypes.py | 3 - experiment/measurer/measure_manager.py | 76 +++++++++++---------- experiment/measurer/measure_worker.py | 16 +++-- experiment/measurer/test_measure_manager.py | 35 +++++----- experiment/measurer/test_measure_worker.py | 11 +-- 5 files changed, 73 insertions(+), 68 deletions(-) diff --git a/experiment/measurer/datatypes.py b/experiment/measurer/datatypes.py index 7acd0036a..df4eea398 100644 --- a/experiment/measurer/datatypes.py +++ b/experiment/measurer/datatypes.py @@ -16,6 +16,3 @@ SnapshotMeasureRequest = collections.namedtuple( 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) - -RetryRequest = collections.namedtuple( - 'RetryRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle']) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index 7053aeab1..a6ec60e51 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -44,11 +44,11 @@ from database import models from experiment.build import build_utils from experiment.measurer import coverage_utils -from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_worker from experiment.measurer import run_coverage from experiment.measurer import run_crashes from experiment import scheduler +import experiment.measurer.datatypes as measurer_datatypes logger = logs.Logger() @@ -57,7 +57,7 @@ FAIL_WAIT_SECONDS = 30 SNAPSHOT_QUEUE_GET_TIMEOUT = 1 SNAPSHOTS_BATCH_SAVE_SIZE = 100 -MEASURE_MANAGER_LOOP_TIMEOUT = 10 +MEASUREMENT_LOOP_WAIT = 10 def exists_in_experiment_filestore(path: pathlib.Path) -> bool: @@ -244,12 +244,13 @@ def _query_unmeasured_trials(experiment: str): def _get_unmeasured_first_snapshots( - experiment: str) -> List[SnapshotMeasureRequest]: + experiment: str) -> List[measurer_datatypes.SnapshotMeasureRequest]: """Returns a list of unmeasured SnapshotMeasureRequests that are the first snapshot for their trial. The trials are trials in |experiment|.""" trials_without_snapshots = _query_unmeasured_trials(experiment) return [ - SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, trial.id, 0) + measurer_datatypes.SnapshotMeasureRequest(trial.fuzzer, trial.benchmark, + trial.id, 0) for trial in trials_without_snapshots ] @@ -277,7 +278,8 @@ def _query_measured_latest_snapshots(experiment: str): def _get_unmeasured_next_snapshots( - experiment: str, max_cycle: int) -> List[SnapshotMeasureRequest]: + experiment: str, + max_cycle: int) -> List[measurer_datatypes.SnapshotMeasureRequest]: """Returns a list of the latest unmeasured SnapshotMeasureRequests of trials in |experiment| that have been measured at least once in |experiment|. |max_total_time| is used to determine if a trial has another @@ -293,16 +295,15 @@ def _get_unmeasured_next_snapshots( if next_cycle > max_cycle: continue - snapshot_with_cycle = SnapshotMeasureRequest(snapshot.fuzzer, - snapshot.benchmark, - snapshot.trial_id, - next_cycle) + snapshot_with_cycle = measurer_datatypes.SnapshotMeasureRequest( + snapshot.fuzzer, snapshot.benchmark, snapshot.trial_id, next_cycle) next_snapshots.append(snapshot_with_cycle) return next_snapshots -def get_unmeasured_snapshots(experiment: str, - max_cycle: int) -> List[SnapshotMeasureRequest]: +def get_unmeasured_snapshots( + experiment: str, + max_cycle: int) -> List[measurer_datatypes.SnapshotMeasureRequest]: """Returns a list of SnapshotMeasureRequests that need to be measured (assuming they have been saved already).""" # Measure the first snapshot of every started trial without any measured @@ -679,7 +680,8 @@ def consume_snapshots_from_response_queue( while True: try: response_object = response_queue.get_nowait() - if isinstance(response_object, RetryRequest): + if isinstance(response_object, + measurer_datatypes.SnapshotMeasureRequest): # Need to retry measurement task, will remove identifier from # the set so task can be retried in next loop iteration. snapshot_identifier = (response_object.trial_id, @@ -739,53 +741,53 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, def get_pool_args(measurers_cpus, runners_cpus): """Return pool args based on measurer cpus and runner cpus arguments.""" - pool_args = () - if measurers_cpus is not None and runners_cpus is not None: - local_experiment = experiment_utils.is_local_experiment() - if not local_experiment: - return (measurers_cpus,) - cores_queue = multiprocessing.Queue() - logger.info('Scheduling measurers from core %d to %d.', runners_cpus, - runners_cpus + measurers_cpus - 1) - for cpu in range(runners_cpus, runners_cpus + measurers_cpus): - cores_queue.put(cpu) - pool_args = (measurers_cpus, _process_init, (cores_queue,)) - return pool_args + if measurers_cpus is None or runners_cpus is None: + return () + + local_experiment = experiment_utils.is_local_experiment() + if not local_experiment: + return (measurers_cpus,) + + cores_queue = multiprocessing.Queue() + logger.info('Scheduling measurers from core %d to %d.', runners_cpus, + runners_cpus + measurers_cpus - 1) + for cpu in range(runners_cpus, runners_cpus + measurers_cpus): + cores_queue.put(cpu) + return (measurers_cpus, _process_init, (cores_queue,)) def measure_manager_loop(experiment: str, max_total_time: int, measurers_cpus=None, - region_coverage=False): - # pylint: disable=too-many-locals + region_coverage=False): # pylint: disable=too-many-locals """Measure manager loop. Creates request and response queues, request measurements tasks from workers, retrieve measurement results from response queue and writes measured snapshots in database.""" logger.info('Starting measure manager loop.') if not measurers_cpus: - logger.info('Number of measurer CPUs not passed as argument. using %d', - multiprocessing.cpu_count()) measurers_cpus = multiprocessing.cpu_count() + logger.info('Number of measurer CPUs not passed as argument. using %d', + measurers_cpus) with multiprocessing.Pool() as pool, multiprocessing.Manager() as manager: logger.info('Setting up coverage binaries') set_up_coverage_binaries(pool, experiment) request_queue = manager.Queue() response_queue = manager.Queue() - # Since each worker is gonna be in forever loop, we dont need result - # return. Workers life scope will end automatically when there are no - # more snapshots left to measure. - logger.info('Starting measure worker loop for %d workers', - measurers_cpus) config = { 'request_queue': request_queue, 'response_queue': response_queue, 'region_coverage': region_coverage, } local_measure_worker = measure_worker.LocalMeasureWorker(config) - measure_trial_coverage_args = [()] * measurers_cpus - _result = pool.starmap_async(local_measure_worker.measure_worker_loop, - measure_trial_coverage_args) + + # Since each worker is going to be in an infinite loop, we dont need + # result return. Workers' life scope will end automatically when there + # are no more snapshots left to measure. + logger.info('Starting measure worker loop for %d workers', + measurers_cpus) + for _ in range(measurers_cpus): + _result = pool.apply_async(local_measure_worker.measure_worker_loop) max_cycle = _time_to_cycle(max_total_time) queued_snapshots = set() @@ -795,7 +797,7 @@ def measure_manager_loop(experiment: str, queued_snapshots) if not continue_inner_loop: break - time.sleep(MEASURE_MANAGER_LOOP_TIMEOUT) + time.sleep(MEASUREMENT_LOOP_WAIT) logger.info('All trials ended. Ending measure manager loop') diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 43561e4e5..1be9a662f 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -16,7 +16,7 @@ from typing import Dict, Optional from common import logs from database.models import Snapshot -from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) +import experiment.measurer.datatypes as measurer_datatypes from experiment.measurer import measure_manager MEASUREMENT_TIMEOUT = 1 @@ -68,19 +68,21 @@ class LocalMeasureWorker(BaseMeasureWorker): """Class that holds implementations of core methods for running a measure worker locally.""" - def get_task_from_request_queue(self) -> SnapshotMeasureRequest: + def get_task_from_request_queue( + self) -> measurer_datatypes.SnapshotMeasureRequest: """Get item from request multiprocessing queue, block if necessary until an item is available""" request = self.request_queue.get(block=True) return request - def put_result_in_response_queue(self, - measured_snapshot: Optional[Snapshot], - request: SnapshotMeasureRequest): + def put_result_in_response_queue( + self, measured_snapshot: Optional[Snapshot], + request: measurer_datatypes.SnapshotMeasureRequest): if measured_snapshot: logger.info('Put measured snapshot in response_queue') self.response_queue.put(measured_snapshot) else: - retry_request = RetryRequest(request.fuzzer, request.benchmark, - request.trial_id, request.cycle) + retry_request = measurer_datatypes.SnapshotMeasureRequest( + request.fuzzer, request.benchmark, request.trial_id, + request.cycle) self.response_queue.put(retry_request) diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index b08f5d15b..bb1127565 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -24,9 +24,9 @@ from database import models from database import utils as db_utils from experiment.build import build_utils -from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_manager from test_libs import utils as test_utils +import experiment.measurer.datatypes as measurer_datatypes TEST_DATA_PATH = os.path.join(os.path.dirname(__file__), 'test_data') @@ -174,8 +174,8 @@ def test_measure_trial_coverage(mocked_measure_snapshot_coverage, mocked_queue, """Tests that measure_trial_coverage works as expected.""" min_cycle = 1 max_cycle = 10 - measure_request = SnapshotMeasureRequest(FUZZER, BENCHMARK, TRIAL_NUM, - min_cycle) + measure_request = measurer_datatypes.SnapshotMeasureRequest( + FUZZER, BENCHMARK, TRIAL_NUM, min_cycle) measure_manager.measure_trial_coverage(measure_request, max_cycle, mocked_queue(), False) expected_calls = [ @@ -414,8 +414,8 @@ def test_path_exists_in_experiment_filestore(mocked_execute, environ): def test_consume_unmapped_type_from_response_queue(): """Tests the scenario where an unmapped type is retrieved from the response queue. This scenario is not expected to happen, so in this case no snapshots - are returned""" - # Use normal queue here as multiprocessing queue gives flaky tests + are returned.""" + # Use normal queue here as multiprocessing queue gives flaky tests. response_queue = queue.Queue() response_queue.put('unexpected string') snapshots = measure_manager.consume_snapshots_from_response_queue( @@ -427,10 +427,11 @@ def test_consume_retry_type_from_response_queue(): """Tests the scenario where a retry object is retrieved from the response queue. In this scenario, we want to remove the snapshot identifier from the queued_snapshots set, as this allows the measurement task to be - retried in the future""" - # Use normal queue here as multiprocessing queue gives flaky tests + retried in the future.""" + # Use normal queue here as multiprocessing queue gives flaky tests. response_queue = queue.Queue() - retry_request_object = RetryRequest('fuzzer', 'benchmark', TRIAL_NUM, CYCLE) + retry_request_object = measurer_datatypes.SnapshotMeasureRequest( + 'fuzzer', 'benchmark', TRIAL_NUM, CYCLE) snapshot_identifier = (TRIAL_NUM, CYCLE) response_queue.put(retry_request_object) queued_snapshots_set = set([snapshot_identifier]) @@ -444,7 +445,7 @@ def test_consume_snapshot_type_from_response_queue(): """Tests the scenario where a measured snapshot is retrieved from the response queue. In this scenario, we want to return the snapshot in the function.""" - # Use normal queue here as multiprocessing queue gives flaky tests + # Use normal queue here as multiprocessing queue gives flaky tests. response_queue = queue.Queue() snapshot_identifier = (TRIAL_NUM, CYCLE) queued_snapshots_set = set([snapshot_identifier]) @@ -460,8 +461,8 @@ def test_consume_snapshot_type_from_response_queue(): def test_measure_manager_inner_loop_break_condition( mocked_get_unmeasured_snapshots): """Tests that the measure manager inner loop returns False when there's no - more snapshots left to be measured""" - # Empty list means no more snapshots left to be measured + more snapshots left to be measured.""" + # Empty list means no more snapshots left to be measured. mocked_get_unmeasured_snapshots.return_value = [] request_queue = queue.Queue() response_queue = queue.Queue() @@ -477,9 +478,9 @@ def test_measure_manager_inner_loop_writes_to_request_queue( mocked_consume_snapshots_from_response_queue, mocked_get_unmeasured_snapshots): """Tests that the measure manager inner loop is writing measurement tasks to - request queue""" + request queue.""" mocked_get_unmeasured_snapshots.return_value = [ - SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] mocked_consume_snapshots_from_response_queue.return_value = [] request_queue = queue.Queue() @@ -497,9 +498,9 @@ def test_measure_manager_inner_loop_dont_write_to_db( mocked_add_all, mocked_consume_snapshots_from_response_queue, mocked_get_unmeasured_snapshots): """Tests that the measure manager inner loop does not call add_all to write - to the database, when there are no measured snapshots to be written""" + to the database, when there are no measured snapshots to be written.""" mocked_get_unmeasured_snapshots.return_value = [ - SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] request_queue = queue.Queue() response_queue = queue.Queue() @@ -517,9 +518,9 @@ def test_measure_manager_inner_loop_writes_to_db( mocked_add_all, mocked_consume_snapshots_from_response_queue, mocked_get_unmeasured_snapshots): """Tests that the measure manager inner loop calls add_all to write - to the database, when there are measured snapshots to be written""" + to the database, when there are measured snapshots to be written.""" mocked_get_unmeasured_snapshots.return_value = [ - SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) + measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', 0, 0) ] request_queue = queue.Queue() response_queue = queue.Queue() diff --git a/experiment/measurer/test_measure_worker.py b/experiment/measurer/test_measure_worker.py index f4fc4b4c7..edff0adca 100644 --- a/experiment/measurer/test_measure_worker.py +++ b/experiment/measurer/test_measure_worker.py @@ -16,8 +16,8 @@ import pytest from database.models import Snapshot -from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) from experiment.measurer import measure_worker +import experiment.measurer.datatypes as measurer_datatypes @pytest.fixture @@ -37,7 +37,8 @@ def local_measure_worker(): def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is not None, so snapshot is put in response_queue""" - request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', + 1, 0) snapshot = Snapshot(trial_id=1) local_measure_worker.put_result_in_response_queue(snapshot, request) response_queue = local_measure_worker.response_queue @@ -48,9 +49,11 @@ def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disabl def test_put_retry_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is None, so task needs to be retried""" - request = SnapshotMeasureRequest('fuzzer', 'benchmark', 1, 0) + request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', + 1, 0) snapshot = None local_measure_worker.put_result_in_response_queue(snapshot, request) response_queue = local_measure_worker.response_queue assert response_queue.qsize() == 1 - assert isinstance(response_queue.get(), RetryRequest) + assert isinstance(response_queue.get(), + measurer_datatypes.SnapshotMeasureRequest) From 503a1b123b872d5342fbb05f0ceacdecdfc1ef94 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Thu, 27 Jun 2024 15:24:45 +0000 Subject: [PATCH 8/8] Recreating retry request datatype --- experiment/measurer/datatypes.py | 3 +++ experiment/measurer/measure_manager.py | 3 +-- experiment/measurer/measure_worker.py | 2 +- experiment/measurer/test_measure_manager.py | 2 +- experiment/measurer/test_measure_worker.py | 6 ++---- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/experiment/measurer/datatypes.py b/experiment/measurer/datatypes.py index df4eea398..21415b336 100644 --- a/experiment/measurer/datatypes.py +++ b/experiment/measurer/datatypes.py @@ -16,3 +16,6 @@ SnapshotMeasureRequest = collections.namedtuple( 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) + +RetryRequest = collections.namedtuple( + 'RetryRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index a6ec60e51..288148401 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -680,8 +680,7 @@ def consume_snapshots_from_response_queue( while True: try: response_object = response_queue.get_nowait() - if isinstance(response_object, - measurer_datatypes.SnapshotMeasureRequest): + if isinstance(response_object, measurer_datatypes.RetryRequest): # Need to retry measurement task, will remove identifier from # the set so task can be retried in next loop iteration. snapshot_identifier = (response_object.trial_id, diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 1be9a662f..cfa033d06 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -82,7 +82,7 @@ def put_result_in_response_queue( logger.info('Put measured snapshot in response_queue') self.response_queue.put(measured_snapshot) else: - retry_request = measurer_datatypes.SnapshotMeasureRequest( + retry_request = measurer_datatypes.RetryRequest( request.fuzzer, request.benchmark, request.trial_id, request.cycle) self.response_queue.put(retry_request) diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index bb1127565..7b6521869 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -430,7 +430,7 @@ def test_consume_retry_type_from_response_queue(): retried in the future.""" # Use normal queue here as multiprocessing queue gives flaky tests. response_queue = queue.Queue() - retry_request_object = measurer_datatypes.SnapshotMeasureRequest( + retry_request_object = measurer_datatypes.RetryRequest( 'fuzzer', 'benchmark', TRIAL_NUM, CYCLE) snapshot_identifier = (TRIAL_NUM, CYCLE) response_queue.put(retry_request_object) diff --git a/experiment/measurer/test_measure_worker.py b/experiment/measurer/test_measure_worker.py index edff0adca..4e5bd7b05 100644 --- a/experiment/measurer/test_measure_worker.py +++ b/experiment/measurer/test_measure_worker.py @@ -49,11 +49,9 @@ def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disabl def test_put_retry_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name """Tests the scenario where measure_snapshot is None, so task needs to be retried""" - request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark', - 1, 0) + request = measurer_datatypes.RetryRequest('fuzzer', 'benchmark', 1, 0) snapshot = None local_measure_worker.put_result_in_response_queue(snapshot, request) response_queue = local_measure_worker.response_queue assert response_queue.qsize() == 1 - assert isinstance(response_queue.get(), - measurer_datatypes.SnapshotMeasureRequest) + assert isinstance(response_queue.get(), measurer_datatypes.RetryRequest)