Skip to content

Commit

Permalink
adding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fcollman committed May 7, 2024
1 parent 889c727 commit 53cc6f3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Chris Roat <1053153+chrisroat@users.noreply.github.com>
Forrest Collman <forrest.collman@gmail.com>
Jingpeng Wu <jingpeng.wu@gmail.com>
Nico Kemnitz <nkemnitz@princeton.edu>
William Silversmith <william.silversmith@gmail.com>
Expand Down
24 changes: 16 additions & 8 deletions taskqueue/goog_pubsub_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

from google.cloud import pubsub_v1
from google.api_core.exceptions import ClientError
from google.pubsub_v1.types import PullRequest
from google.pubsub_v1.types import AcknowledgeRequest


from .lib import toiter, sip, jsonify

Expand Down Expand Up @@ -54,9 +57,7 @@ def __init__(self, qurl, **kwargs):

self.subscriber = pubsub_v1.SubscriberClient()
self.publisher = pubsub_v1.PublisherClient()
self._topic_path = self.publisher.topic_path(
self.project_id, self.subscription_id
)
self._topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
self._subscription_path = self.subscriber.subscription_path(
self.project_id, self.subscription_id
)
Expand Down Expand Up @@ -138,7 +139,10 @@ def release_all(self):

def lease(self, seconds, num_tasks=1, wait_sec=20):
# Pull messages from the subscription
response = self.subscriber.pull(self.subscription_path, max_messages=num_tasks)
request = PullRequest(
subscription=self._subscription_path, max_messages=num_tasks
)
response = self.subscriber.pull(request)

tasks = []
for received_message in response.received_messages:
Expand All @@ -158,9 +162,10 @@ def delete(self, task):
ack_id = task._id
except AttributeError:
ack_id = task["id"]

self.subscriber.acknowledge(self._subscription_path, [ack_id])

request = AcknowledgeRequest(
subscription=self._subscription_path, ack_ids=[ack_id]
)
self.subscriber.acknowledge(request=request)
return 1

def tally(self):
Expand All @@ -179,7 +184,10 @@ def purge(self, native=False):

# Acknowledge all received messages
ack_ids = [msg.ack_id for msg in response.received_messages]
self.subscriber.acknowledge(self._subscription_path, ack_ids)
request = AcknowledgeRequest(
subscription=self._subscription_path, ack_ids=ack_ids
)
self.subscriber.acknowledge(request=request)

def __iter__(self):
return iter(self.lease(num_tasks=10, seconds=0))

0 comments on commit 53cc6f3

Please sign in to comment.