-
Notifications
You must be signed in to change notification settings - Fork 277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding local implementation for queue based measuring #1998
Changes from 6 commits
0afcea2
748b47d
4570abb
77f1aa6
bf47d79
4d7d0ac
db0e45c
503a1b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Module for common data types shared under the measurer module.""" | ||
import collections | ||
|
||
SnapshotMeasureRequest = collections.namedtuple( | ||
'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) | ||
|
||
RetryRequest = collections.namedtuple( | ||
'RetryRequest', ['fuzzer', 'benchmarck', 'trial_id', 'cycle']) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,20 +44,20 @@ | |
from database import models | ||
from experiment.build import build_utils | ||
from experiment.measurer import coverage_utils | ||
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No please don't import classes or functions. We import modules. See the styleguide. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, sorry. I thought this is what you meant before in another comment. Just changed it to the import you suggested now |
||
from experiment.measurer import measure_worker | ||
from experiment.measurer import run_coverage | ||
from experiment.measurer import run_crashes | ||
from experiment import scheduler | ||
|
||
logger = logs.Logger() | ||
|
||
SnapshotMeasureRequest = collections.namedtuple( | ||
'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) | ||
|
||
NUM_RETRIES = 3 | ||
RETRY_DELAY = 3 | ||
FAIL_WAIT_SECONDS = 30 | ||
SNAPSHOT_QUEUE_GET_TIMEOUT = 1 | ||
SNAPSHOTS_BATCH_SAVE_SIZE = 100 | ||
MEASURE_MANAGER_LOOP_TIMEOUT = 10 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not well named. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have any suggestions? I changed it to "MEASUREMENT_LOOP_WAIT" now, but not sure if thats any better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes WAIT is better. |
||
|
||
|
||
def exists_in_experiment_filestore(path: pathlib.Path) -> bool: | ||
|
@@ -75,10 +75,9 @@ def measure_main(experiment_config): | |
experiment = experiment_config['experiment'] | ||
max_total_time = experiment_config['max_total_time'] | ||
measurers_cpus = experiment_config['measurers_cpus'] | ||
runners_cpus = experiment_config['runners_cpus'] | ||
region_coverage = experiment_config['region_coverage'] | ||
measure_loop(experiment, max_total_time, measurers_cpus, runners_cpus, | ||
region_coverage) | ||
measure_manager_loop(experiment, max_total_time, measurers_cpus, | ||
region_coverage) | ||
|
||
# Clean up resources. | ||
gc.collect() | ||
|
@@ -104,18 +103,7 @@ def measure_loop(experiment: str, | |
"""Continuously measure trials for |experiment|.""" | ||
logger.info('Start measure_loop.') | ||
|
||
pool_args = () | ||
if measurers_cpus is not None and runners_cpus is not None: | ||
local_experiment = experiment_utils.is_local_experiment() | ||
if local_experiment: | ||
cores_queue = multiprocessing.Queue() | ||
logger.info('Scheduling measurers from core %d to %d.', | ||
runners_cpus, runners_cpus + measurers_cpus - 1) | ||
for cpu in range(runners_cpus, runners_cpus + measurers_cpus): | ||
cores_queue.put(cpu) | ||
pool_args = (measurers_cpus, _process_init, (cores_queue,)) | ||
else: | ||
pool_args = (measurers_cpus,) | ||
pool_args = get_pool_args(measurers_cpus, runners_cpus) | ||
|
||
with multiprocessing.Pool( | ||
*pool_args) as pool, multiprocessing.Manager() as manager: | ||
|
@@ -683,6 +671,134 @@ def initialize_logs(): | |
}) | ||
|
||
|
||
def consume_snapshots_from_response_queue( | ||
response_queue, queued_snapshots) -> List[models.Snapshot]: | ||
"""Consume response_queue, allows retry objects to retried, and | ||
return all measured snapshots in a list.""" | ||
measured_snapshots = [] | ||
while True: | ||
jonathanmetzman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
response_object = response_queue.get_nowait() | ||
if isinstance(response_object, RetryRequest): | ||
# Need to retry measurement task, will remove identifier from | ||
# the set so task can be retried in next loop iteration. | ||
snapshot_identifier = (response_object.trial_id, | ||
response_object.cycle) | ||
queued_snapshots.remove(snapshot_identifier) | ||
logger.info('Reescheduling task for trial %s and cycle %s', | ||
response_object.trial_id, response_object.cycle) | ||
elif isinstance(response_object, models.Snapshot): | ||
measured_snapshots.append(response_object) | ||
else: | ||
logger.error('Type of response object not mapped! %s', | ||
type(response_object)) | ||
except queue.Empty: | ||
break | ||
return measured_snapshots | ||
|
||
|
||
def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, | ||
response_queue, queued_snapshots): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: This is not your mistake but rather a common confusion caused by legacy FuzzBench code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the queues especifically, I will probably try to come up with a more generic type in the future, as we'll use it for local experiments and also for cloud experiments, with a cloud type of queue |
||
"""Reads from database to determine which snapshots needs measuring. Write | ||
measurements tasks to request queue, get results from response queue, and | ||
write measured snapshots to database. Returns False if there's no more | ||
snapshots left to be measured""" | ||
initialize_logs() | ||
# Read database to determine which snapshots needs measuring. | ||
unmeasured_snapshots = get_unmeasured_snapshots(experiment, max_cycle) | ||
logger.info('Retrieved %d unmeasured snapshots from measure manager', | ||
len(unmeasured_snapshots)) | ||
# When there are no more snapshots left to be measured, should break loop. | ||
if not unmeasured_snapshots: | ||
return False | ||
|
||
# Write measurements requests to request queue | ||
for unmeasured_snapshot in unmeasured_snapshots: | ||
# No need to insert fuzzer and benchmark info here as it's redundant | ||
# (Can be retrieved through trial_id). | ||
unmeasured_snapshot_identifier = (unmeasured_snapshot.trial_id, | ||
unmeasured_snapshot.cycle) | ||
# Checking if snapshot already was queued so workers will not repeat | ||
# measurement for same snapshot | ||
if unmeasured_snapshot_identifier not in queued_snapshots: | ||
request_queue.put(unmeasured_snapshot) | ||
queued_snapshots.add(unmeasured_snapshot_identifier) | ||
|
||
# Read results from response queue. | ||
measured_snapshots = consume_snapshots_from_response_queue( | ||
response_queue, queued_snapshots) | ||
logger.info('Retrieved %d measured snapshots from response queue', | ||
len(measured_snapshots)) | ||
|
||
# Save measured snapshots to database. | ||
if measured_snapshots: | ||
db_utils.add_all(measured_snapshots) | ||
|
||
return True | ||
|
||
|
||
def get_pool_args(measurers_cpus, runners_cpus): | ||
"""Return pool args based on measurer cpus and runner cpus arguments.""" | ||
pool_args = () | ||
if measurers_cpus is not None and runners_cpus is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change this so we return early if they are None, and then we can have less nesting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think we no longer need that function, its currently only being used in the old measure_loop method. Should I remove them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can fix the nesting nonetheless |
||
local_experiment = experiment_utils.is_local_experiment() | ||
if not local_experiment: | ||
return (measurers_cpus,) | ||
cores_queue = multiprocessing.Queue() | ||
logger.info('Scheduling measurers from core %d to %d.', runners_cpus, | ||
runners_cpus + measurers_cpus - 1) | ||
for cpu in range(runners_cpus, runners_cpus + measurers_cpus): | ||
cores_queue.put(cpu) | ||
pool_args = (measurers_cpus, _process_init, (cores_queue,)) | ||
return pool_args | ||
|
||
|
||
def measure_manager_loop(experiment: str, | ||
max_total_time: int, | ||
measurers_cpus=None, | ||
region_coverage=False): | ||
# pylint: disable=too-many-locals | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it has to go on the same line as the colon. |
||
"""Measure manager loop. Creates request and response queues, request | ||
measurements tasks from workers, retrieve measurement results from response | ||
queue and writes measured snapshots in database.""" | ||
logger.info('Starting measure manager loop.') | ||
if not measurers_cpus: | ||
logger.info('Number of measurer CPUs not passed as argument. using %d', | ||
multiprocessing.cpu_count()) | ||
measurers_cpus = multiprocessing.cpu_count() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take this variable and use it in the log instead of calling the function again. |
||
with multiprocessing.Pool() as pool, multiprocessing.Manager() as manager: | ||
logger.info('Setting up coverage binaries') | ||
set_up_coverage_binaries(pool, experiment) | ||
request_queue = manager.Queue() | ||
response_queue = manager.Queue() | ||
|
||
# Since each worker is gonna be in forever loop, we dont need result | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you write "going to be in an infinite loop" |
||
# return. Workers life scope will end automatically when there are no | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
# more snapshots left to measure. | ||
logger.info('Starting measure worker loop for %d workers', | ||
measurers_cpus) | ||
config = { | ||
'request_queue': request_queue, | ||
'response_queue': response_queue, | ||
'region_coverage': region_coverage, | ||
} | ||
local_measure_worker = measure_worker.LocalMeasureWorker(config) | ||
measure_trial_coverage_args = [()] * measurers_cpus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this? |
||
_result = pool.starmap_async(local_measure_worker.measure_worker_loop, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need map if you are not passing any arguments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to pool.apply_async calls! |
||
measure_trial_coverage_args) | ||
|
||
max_cycle = _time_to_cycle(max_total_time) | ||
queued_snapshots = set() | ||
while not scheduler.all_trials_ended(experiment): | ||
continue_inner_loop = measure_manager_inner_loop( | ||
experiment, max_cycle, request_queue, response_queue, | ||
queued_snapshots) | ||
if not continue_inner_loop: | ||
break | ||
time.sleep(MEASURE_MANAGER_LOOP_TIMEOUT) | ||
logger.info('All trials ended. Ending measure manager loop') | ||
|
||
|
||
def main(): | ||
"""Measure the experiment.""" | ||
initialize_logs() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Module for measurer workers logic.""" | ||
import time | ||
from typing import Dict, Optional | ||
from common import logs | ||
from database.models import Snapshot | ||
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) | ||
from experiment.measurer import measure_manager | ||
|
||
MEASUREMENT_TIMEOUT = 1 | ||
logger = logs.Logger() # pylint: disable=invalid-name | ||
|
||
|
||
class BaseMeasureWorker: | ||
"""Base class for measure worker. Encapsulates core methods that will be | ||
implemented for Local and Google Cloud measure workers.""" | ||
|
||
def __init__(self, config: Dict): | ||
self.request_queue = config['request_queue'] | ||
self.response_queue = config['response_queue'] | ||
self.region_coverage = config['region_coverage'] | ||
|
||
def get_task_from_request_queue(self): | ||
""""Get task from request queue""" | ||
raise NotImplementedError | ||
|
||
def put_result_in_response_queue(self, measured_snapshot, request): | ||
"""Save measurement result in response queue, for the measure manager to | ||
retrieve""" | ||
raise NotImplementedError | ||
|
||
def measure_worker_loop(self): | ||
"""Periodically retrieves request from request queue, measure it, and | ||
put result in response queue""" | ||
logs.initialize(default_extras={ | ||
'component': 'measurer', | ||
'subcomponent': 'worker', | ||
}) | ||
logger.info('Starting one measure worker loop') | ||
while True: | ||
# 'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', | ||
# 'cycle'] | ||
request = self.get_task_from_request_queue() | ||
logger.info( | ||
'Measurer worker: Got request %s %s %d %d from request queue', | ||
request.fuzzer, request.benchmark, request.trial_id, | ||
request.cycle) | ||
measured_snapshot = measure_manager.measure_snapshot_coverage( | ||
request.fuzzer, request.benchmark, request.trial_id, | ||
request.cycle, self.region_coverage) | ||
self.put_result_in_response_queue(measured_snapshot, request) | ||
time.sleep(MEASUREMENT_TIMEOUT) | ||
|
||
|
||
class LocalMeasureWorker(BaseMeasureWorker): | ||
"""Class that holds implementations of core methods for running a measure | ||
worker locally.""" | ||
|
||
def get_task_from_request_queue(self) -> SnapshotMeasureRequest: | ||
"""Get item from request multiprocessing queue, block if necessary until | ||
an item is available""" | ||
request = self.request_queue.get(block=True) | ||
return request | ||
|
||
def put_result_in_response_queue(self, | ||
measured_snapshot: Optional[Snapshot], | ||
request: SnapshotMeasureRequest): | ||
if measured_snapshot: | ||
gustavogaldinoo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.info('Put measured snapshot in response_queue') | ||
self.response_queue.put(measured_snapshot) | ||
else: | ||
retry_request = RetryRequest(request.fuzzer, request.benchmark, | ||
request.trial_id, request.cycle) | ||
self.response_queue.put(retry_request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't we make a single datatype?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I thought it would be more explicit to create a datatype specifically for the retry, but since both of them are measurement requests, and hold the same fields, I guess its not necessary. Just removed the RetryRequest datatype
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh shoot. I messed up here.
I think there should be a response object that includes a snapshot or retry bool.
BUt response and request should be different. Sorry.
After you undo this, you can land.