-
Notifications
You must be signed in to change notification settings - Fork 277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding local implementation for queue based measuring #1998
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @gustavogaldinoo!
Could you please have look at the presubmit
failure or run make format
?
return measured_snapshots | ||
|
||
def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue, | ||
response_queue, queued_snapshots): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is not your mistake but rather a common confusion caused by legacy FuzzBench code.
We have typing hinting for some functions but not for others. Sometimes this also happens on the parameter level.
It would be great to type-hint the new code, if it is not too much trouble.
It's also low priority so feel free to leave it for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the queues especifically, I will probably try to come up with a more generic type in the future, as we'll use it for local experiments and also for cloud experiments, with a cloud type of queue
@@ -44,20 +44,21 @@ | |||
from database import models | |||
from experiment.build import build_utils | |||
from experiment.measurer import coverage_utils | |||
from experiment.measurer import datatypes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's import this as experiment.measurer.datatypes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the thought process behind this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datatypes can mean anything, by importing it as expeirment.measurer.datatypes, it's much clearer what it contains.
NUM_RETRIES = 3 | ||
RETRY_DELAY = 3 | ||
FAIL_WAIT_SECONDS = 30 | ||
SNAPSHOT_QUEUE_GET_TIMEOUT = 1 | ||
SNAPSHOTS_BATCH_SAVE_SIZE = 100 | ||
NUM_WORKERS = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WHy hardcode this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I started like this because it was easier, and forgot to go back and change it.
Should it be an argument passed when starting the experiment? Any tips on possible default values?
region_coverage) | ||
local_experiment = experiment_utils.is_local_experiment() | ||
if local_experiment: | ||
measure_manager_loop(experiment, max_total_time, measurers_cpus, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the goal to start with this only in local experiments? Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we would have to wait until we had implemented pub sub queues before using this to non local experiments, but I guess we can already allow it to both local and non local, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should be able to run in prod. But I guess the disadvantage of doing so, is that once we had pub/sub queues we would never need to run this in prod again.
Your choice, we will get better testing if we run this in prod, but it may reveal more problems than we care to fix.
else: | ||
pool_args = (measurers_cpus,) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this condition first, and then return to avoid so much nesting.
return pool_args | ||
|
||
|
||
# pylint: disable=too-many-locals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be on line 771 otherwise it is applied to the rest of the file.
case _: | ||
logger.error('Type of response object not mapped! %s', | ||
type(response_object)) | ||
if isinstance(response_object, datatypes.ReescheduleRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to have one data type, with a field to request reschedule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a little tricky to do, because before having reeschedules, we used to return a models.Snapshot type in some of our functions. We might not want to add a reeschedule field to it, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but we can return the response object instead can't we?
logger.error('Type of response object not mapped! %s', | ||
type(response_object)) | ||
if isinstance(response_object, datatypes.ReescheduleRequest): | ||
# Need to reeschedule measurement task, remove from the set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid using the word "schedule" since it means something else in the scheduler. Maybe "retry".
Also please end comments with period.
And why are we removing it from the set? That's more important for a comment to explain, I can see the line removing it, the "why" is the important part.
a3c43d2
to
bf47d79
Compare
SnapshotMeasureRequest = collections.namedtuple( | ||
'SnapshotMeasureRequest', ['fuzzer', 'benchmark', 'trial_id', 'cycle']) | ||
|
||
RetryRequest = collections.namedtuple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't we make a single datatype?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I thought it would be more explicit to create a datatype specifically for the retry, but since both of them are measurement requests, and hold the same fields, I guess its not necessary. Just removed the RetryRequest datatype
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh shoot. I messed up here.
I think there should be a response object that includes a snapshot or retry bool.
BUt response and request should be different. Sorry.
After you undo this, you can land.
@@ -44,20 +44,20 @@ | |||
from database import models | |||
from experiment.build import build_utils | |||
from experiment.measurer import coverage_utils | |||
from experiment.measurer.datatypes import (RetryRequest, SnapshotMeasureRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No please don't import classes or functions. We import modules. See the styleguide.
Also can you import like this:
import experiment.measurer.datatypes as measurer_datatypes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, sorry. I thought this is what you meant before in another comment. Just changed it to the import you suggested now
NUM_RETRIES = 3 | ||
RETRY_DELAY = 3 | ||
FAIL_WAIT_SECONDS = 30 | ||
SNAPSHOT_QUEUE_GET_TIMEOUT = 1 | ||
SNAPSHOTS_BATCH_SAVE_SIZE = 100 | ||
MEASURE_MANAGER_LOOP_TIMEOUT = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not well named.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any suggestions? I changed it to "MEASUREMENT_LOOP_WAIT" now, but not sure if thats any better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes WAIT is better.
request_queue = manager.Queue() | ||
response_queue = manager.Queue() | ||
|
||
# Since each worker is gonna be in forever loop, we dont need result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you write "going to be in an infinite loop"
response_queue = manager.Queue() | ||
|
||
# Since each worker is gonna be in forever loop, we dont need result | ||
# return. Workers life scope will end automatically when there are no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workers'
} | ||
local_measure_worker = measure_worker.LocalMeasureWorker(config) | ||
measure_trial_coverage_args = [()] * measurers_cpus | ||
_result = pool.starmap_async(local_measure_worker.measure_worker_loop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need map if you are not passing any arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to pool.apply_async calls!
if not measurers_cpus: | ||
logger.info('Number of measurer CPUs not passed as argument. using %d', | ||
multiprocessing.cpu_count()) | ||
measurers_cpus = multiprocessing.cpu_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take this variable and use it in the log instead of calling the function again.
def get_pool_args(measurers_cpus, runners_cpus): | ||
"""Return pool args based on measurer cpus and runner cpus arguments.""" | ||
pool_args = () | ||
if measurers_cpus is not None and runners_cpus is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this so we return early if they are None, and then we can have less nesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think we no longer need that function, its currently only being used in the old measure_loop method. Should I remove them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can fix the nesting nonetheless
response queue. In this scenario, we want to remove the snapshot identifier | ||
from the queued_snapshots set, as this allows the measurement task to be | ||
retried in the future""" | ||
# Use normal queue here as multiprocessing queue gives flaky tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end sentences with a period.
@@ -12,7 +12,6 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
"""Tests for measure_manager.py.""" | |||
|
|||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look like good tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx!
Adding local implementation for queue based measuring and tests