Skip to content

Propagate errors to main asyncio loop #788

Answered by agronholm
recycletechno asked this question in Q&A
Discussion options

You must be logged in to vote

What you can do is add an event listener to the scheduler to react to job errors, and have that set the exception on a future that your application awaits on:

import asyncio

from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger

def raise_error():
    raise RuntimeError

async def main():
    future = asyncio.get_running_loop().create_future()
    scheduler = AsyncIOScheduler()
    scheduler.add_listener(lambda event: future.set_exception(event.exception), EVENT_JOB_ERROR)
    scheduler.add_job(raise_error, DateTrigger())
    scheduler.start()
    await future

asyncio.run(main())

Replies: 1 comment 1 reply

Comment options

You must be logged in to vote
1 reply
@recycletechno
Comment options

Answer selected by recycletechno
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants