Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timezone to cron scheduler #204

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions saq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
import threading
import typing as t
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone, tzinfo

from croniter import croniter

from saq.job import Status
from saq.queue import Queue
from saq.utils import cancel_tasks, millis, now, now_seconds, uuid1
from saq.utils import cancel_tasks, millis, now, uuid1

if t.TYPE_CHECKING:
from asyncio import Task
Expand Down Expand Up @@ -56,6 +57,7 @@ class Worker:
functions: list of async functions
concurrency: number of jobs to process concurrently
cron_jobs: List of CronJob instances.
cron_tz: timezone for cron scheduler
startup: async function to call on startup
shutdown: async function to call on shutdown
before_process: async function to call before a job processes
Expand All @@ -81,6 +83,7 @@ def __init__(
id: t.Optional[str] = None,
concurrency: int = 10,
cron_jobs: Collection[CronJob] | None = None,
cron_tz: tzinfo = timezone.utc,
startup: ReceivesContext | Collection[ReceivesContext] | None = None,
shutdown: ReceivesContext | Collection[ReceivesContext] | None = None,
before_process: ReceivesContext | Collection[ReceivesContext] | None = None,
Expand Down Expand Up @@ -114,6 +117,7 @@ def __init__(
functions = set(functions)
self.functions: dict[str, Function] = {}
self.cron_jobs: Collection[CronJob] = cron_jobs or []
self.cron_tz: tzinfo = cron_tz
self.context: Context = {"worker": self}
self.tasks: set[Task[t.Any]] = set()
self.job_task_contexts: dict[Job, JobTaskContext] = {}
Expand Down Expand Up @@ -210,7 +214,8 @@ async def schedule(self, lock: int = 1) -> None:
kwargs = cron_job.__dict__.copy()
function = kwargs.pop("function").__qualname__
kwargs["key"] = f"cron:{function}" if kwargs.pop("unique") else None
scheduled = croniter(kwargs.pop("cron"), now_seconds()).get_next()
start_time = datetime.now(self.cron_tz)
scheduled = croniter(kwargs.pop("cron"), start_time).get_next()

await self.queue.enqueue(
function,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"pre-commit",
"redis>=4.2,<6.0",
"ruff",
"time-machine",
"types-croniter",
"types-redis",
"types-setuptools",
Expand Down
53 changes: 30 additions & 23 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import time
import typing as t
import unittest
from datetime import datetime, timedelta, timezone
from unittest import mock

from aiohttp import web
from aiohttp.test_utils import AioHTTPTestCase
from time_machine import travel

from saq.job import CronJob, Job, Status, ACTIVE_STATUSES
from saq.queue import Queue
Expand Down Expand Up @@ -368,35 +370,40 @@ async def test_schedule(self, mock_time: MagicMock) -> None:
self.assertEqual(job.result, 1)

@mock.patch("saq.worker.logger")
@mock.patch("saq.utils.time")
async def test_cron(self, mock_time: MagicMock, mock_logger: MagicMock) -> None:
async def test_cron(self, mock_logger: MagicMock) -> None:
with self.assertRaises(ValueError):
Worker(
self.queue,
functions=functions,
cron_jobs=[CronJob(cron, cron="x")],
cron_jobs=[CronJob(sleeper, cron="x")],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this function changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See line 404, one of the checks is if the job is running which requires it to take some time.

)

mock_time.time.return_value = 1
worker = Worker(
self.queue,
functions=functions,
cron_jobs=[CronJob(cron, cron="* * * * *", kwargs={"param": 1})],
)
self.assertEqual(await self.queue.count("queued"), 0)
self.assertEqual(await self.queue.count("incomplete"), 0)
await worker.schedule()
self.assertEqual(await self.queue.count("queued"), 0)
self.assertEqual(await self.queue.count("incomplete"), 1)

mock_time.time.return_value = 60
await asyncio.sleep(1)
await worker.schedule()
self.assertEqual(await self.queue.count("queued"), 1)
self.assertEqual(await self.queue.count("incomplete"), 1)
# Remove if statement when schedule is implemented for Postgres queue
if isinstance(self.queue, RedisQueue):
mock_logger.info.assert_any_call("Scheduled %s", ["saq:job:default:cron:cron"])
with travel(datetime(2025, 1, 1, 0, 0, 1, tzinfo=timezone.utc), tick=True) as traveller:
worker = Worker(
self.queue,
functions=functions,
cron_jobs=[CronJob(sleeper, cron="* 12 * * *", kwargs={"sleep": 3})],
cron_tz=timezone(offset=timedelta(hours=-3)), # 3 hours behind (UTC-3)
)
await worker.queue.connect()
self.assertEqual(await self.queue.count("incomplete"), 0)
tobymao marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(await self.queue.count("queued"), 0)
asyncio.create_task(worker.start())
await asyncio.sleep(1)
self.assertEqual(await self.queue.count("incomplete"), 1)
self.assertEqual(await self.queue.count("queued"), 0)

traveller.move_to(datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc)) # noon UTC
await asyncio.sleep(2)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any way to make these tests faster? can you sleep for less time? perhaps reduce the cron timer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll play around a bit add see

self.assertEqual(await self.queue.count("active"), 0)

traveller.move_to(datetime(2025, 1, 1, 15, 0, 0, tzinfo=timezone.utc)) # noon in tz
await asyncio.sleep(2)
self.assertEqual(await self.queue.count("active"), 1)

# Remove if statement when schedule is implemented for Postgres queue
if isinstance(self.queue, RedisQueue):
mock_logger.info.assert_any_call("Scheduled %s", ["saq:job:default:cron:sleeper"])

@mock.patch("saq.worker.logger")
async def test_abort(self, mock_logger: MagicMock) -> None:
Expand Down