diff --git a/taskqueue/gcloud/aio/taskqueue/taskmanager.py b/taskqueue/gcloud/aio/taskqueue/taskmanager.py index a84d43395..8e2700d0e 100644 --- a/taskqueue/gcloud/aio/taskqueue/taskmanager.py +++ b/taskqueue/gcloud/aio/taskqueue/taskmanager.py @@ -34,7 +34,7 @@ def __init__(self, event, executor, headers, task, lease_seconds): self.lease_seconds = lease_seconds def start(self, loop=None): - loop = asyncio.get_event_loop() + loop = loop or asyncio.get_event_loop() self.future = loop.run_in_executor( self.executor, self.autorenew, self.event, self.headers, self.task, self.lease_seconds) @@ -151,9 +151,13 @@ async def process(self, task): name = task['name'] payload = decode(task['pullMessage']['payload']) - autorenew = LeaseManager(self.manager.Event(), self.executor, - await self.tq.headers(), task, - self.lease_seconds).start() + try: + autorenew = LeaseManager(self.manager.Event(), self.executor, + await self.tq.headers(), task, + self.lease_seconds).start() + except concurrent.futures.process.BrokenProcessPool: + log.error('process pool broke, quitting TaskManager') + self.running = False try: async with self.semaphore: @@ -189,6 +193,8 @@ async def process(self, task): log.info('successfully processed task: %s', name) task = await autorenew.stop() await self.tq.ack(task) + except Exception as e: # pylint: disable=broad-except + log.exception(e) finally: await autorenew.stop() diff --git a/taskqueue/setup.py b/taskqueue/setup.py index f13434c52..983c63ab0 100644 --- a/taskqueue/setup.py +++ b/taskqueue/setup.py @@ -13,7 +13,7 @@ setuptools.setup( name='gcloud-aio-taskqueue', - version='1.2.1', + version='1.2.2', description='Asyncio Python Client for Google Cloud Task Queue', long_description=README, namespace_packages=[