From 05d36685fcd450de8b78ef21287d9f1816fa10ac Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Thu, 22 Aug 2024 15:44:26 +0000 Subject: [PATCH] Refactoring gcloud workers to initialize a subscriber and publisher client for each worker --- experiment/measurer/measure_manager.py | 32 +++++++++++++++------ experiment/measurer/measure_worker.py | 11 +++---- experiment/measurer/test_measure_manager.py | 5 +++- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index c9b08041f..467222872 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -915,7 +915,7 @@ def initialize_queues(self, manager) -> Tuple[Optional[str], Optional[str]]: def _create_response_queue_subscription(self): """Creates a new Pub/Sub subscription for the response queue.""" try: - subscription = self.subscriber_client.create_subscription( + subscription = pubsub_v1.SubscriberClient().create_subscription( request={ 'name': self.subscription_path, 'topic': self.response_queue_topic_path @@ -929,14 +929,6 @@ def _create_response_queue_subscription(self): def start_workers(self, request_queue, response_queue, pool): self._create_response_queue_subscription() - config = { - 'request_queue_topic_id': self.request_queue_topic_id, - 'response_queue_topic_id': self.response_queue_topic_id, - 'region_coverage': self.region_coverage, - 'project_id': self.project_id, - 'experiment': self.experiment, - } - google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config) # Since each worker is going to be in an infinite loop, we dont need # result return. Workers' life scope will end automatically when @@ -946,7 +938,29 @@ def start_workers(self, request_queue, response_queue, pool): f'{self.measurers_cpus}' ' workers in google cloud measure manager') logger.info(log_message) + + config = { + 'request_queue_topic_id': self.request_queue_topic_id, + 'response_queue_topic_id': self.response_queue_topic_id, + 'region_coverage': self.region_coverage, + 'project_id': self.project_id, + 'experiment': self.experiment, + } + + # Create the worker request queue subscription once, before starting all + # workers + worker_request_queue_subscription = ('request-queue-subscription-' + f'{self.experiment}') + worker_subscription_path = self.subscriber_client.subscription_path( + self.project_id, worker_request_queue_subscription) + worker_request_queue_topic_path = self.subscriber_client.topic_path( + self.project_id, self.request_queue_topic_id) + measure_worker.GoogleCloudMeasureWorker.create_request_queue_subscription( # pylint: disable=line-too-long + worker_subscription_path, worker_request_queue_topic_path) + for _ in range(self.measurers_cpus): + google_cloud_worker = measure_worker.GoogleCloudMeasureWorker( + config) pool.apply_async(google_cloud_worker.measure_worker_loop) def get_result_from_response_queue(self, response_queue: str): diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index d44037ada..3bdc96a68 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -145,15 +145,16 @@ def __init__(self, config: Dict): f'{self.experiment}') self.subscription_path = self.subscriber_client.subscription_path( self.project_id, self.request_queue_subscription) - self._create_request_queue_subscription() - def _create_request_queue_subscription(self): + @staticmethod + def create_request_queue_subscription(subscription_path, + request_queue_topic_path): """Creates a new Pub/Sub subscription for the request queue.""" try: - subscription = self.subscriber_client.create_subscription( + subscription = pubsub_v1.SubscriberClient().create_subscription( request={ - 'name': self.subscription_path, - 'topic': self.request_queue_topic_path, + 'name': subscription_path, + 'topic': request_queue_topic_path, 'enable_message_ordering': True, }) logger.info('Subscription %s created successfully.', diff --git a/experiment/measurer/test_measure_manager.py b/experiment/measurer/test_measure_manager.py index 995acda35..7826210d9 100644 --- a/experiment/measurer/test_measure_manager.py +++ b/experiment/measurer/test_measure_manager.py @@ -659,7 +659,10 @@ def test_gcloud_measure_manager_get_snapshot_from_response_queue( @mock.patch('experiment.measurer.measure_worker.GoogleCloudMeasureWorker') -def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker, +@mock.patch('google.cloud.pubsub_v1.PublisherClient') +@mock.patch('google.cloud.pubsub_v1.SubscriberClient') +def test_gcloud_measure_manager_start_workers(_mock_subscriber, _mock_publisher, + mock_gcloud_measure_worker, gcloud_measure_manager): """Tests that the start workers method is calling the measure worker loop method, a number of times equal to the number of measurers CPUs."""