Skip to content

Commit

Permalink
ci: pass custom prefix for dragonfly
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Feb 28, 2025
1 parent c284207 commit 23d303c
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,4 @@ jobs:
- name: Test with pytest
run: |
cd python
./run_tests.sh
./run_tests_dragonfly.sh
1 change: 1 addition & 0 deletions python/bullmq/flow_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FlowProducer:
Instantiate a FlowProducer object
"""

#pass only queueOpts, no need 2 parameters in next breaking change
def __init__(self, redisOpts: dict | str = {}, opts: QueueBaseOptions = {}):
"""
Initialize a connection
Expand Down
8 changes: 8 additions & 0 deletions python/run_tests_dragonfly.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
BULLMQ_TEST_PREFIX="{b}"
python3 -m unittest -v tests.bulk_tests
python3 -m unittest -v tests.delay_tests
python3 -m unittest -v tests.flow_tests
python3 -m unittest -v tests.job_tests
python3 -m unittest -v tests.queue_tests
python3 -m unittest -v tests.worker_tests
8 changes: 5 additions & 3 deletions python/tests/bulk_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@
"""

import unittest
import os

from asyncio import Future
from bullmq import Queue, Job, Worker
from uuid import uuid4

queueName = f"__test_queue__{uuid4().hex}"
prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
await queue.pause()
await queue.obliterate()
await queue.close()

async def test_process_jobs(self):
name = "test"
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})

async def process(job: Job, token: str):
if job.data.get("idx") == 0:
Expand All @@ -34,7 +36,7 @@ async def process(job: Job, token: str):
self.assertEqual(job.data.get("foo"), "baz")
return "done"

worker = Worker(queueName, process)
worker = Worker(queueName, process, {"prefix": prefix})

completed_events = Future()

Expand Down
8 changes: 5 additions & 3 deletions python/tests/delay_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@

import unittest
import time
import os

from asyncio import Future
from bullmq import Queue, Job, Worker
from uuid import uuid4

queueName = f"__test_queue__{uuid4().hex}"
prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
await queue.pause()
await queue.obliterate()
await queue.close()
Expand All @@ -27,12 +29,12 @@ async def test_progress_delayed_job_only_after_delayed_time(self):
delay = 1000
margin = 1.2
timestamp = round(time.time() * 1000)
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})

async def process(job: Job, token: str):
return "done"

worker = Worker(queueName, process)
worker = Worker(queueName, process, {"prefix": prefix})

completed_events = Future()

Expand Down
30 changes: 16 additions & 14 deletions python/tests/flow_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
"""

from asyncio import Future
import os

from bullmq import Queue, Job, FlowProducer, Worker
from uuid import uuid4

import unittest

queue_name = f"__test_queue__{uuid4().hex}"
prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queue_name)
queue = Queue(queue_name, {"prefix": prefix})
await queue.pause()
await queue.obliterate()
await queue.close()
Expand Down Expand Up @@ -48,10 +50,10 @@ async def process2(job: Job, token: str):
processing_parent.set_result(None)
return 1

parent_worker = Worker(parent_queue_name, process2)
children_worker = Worker(queue_name, process1)
parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix})
children_worker = Worker(queue_name, process1, {"prefix": prefix})

flow = FlowProducer()
flow = FlowProducer({}, {"prefix": prefix})
await flow.add(
{
"name": 'parent-job',
Expand All @@ -72,7 +74,7 @@ async def process2(job: Job, token: str):
await children_worker.close()
await flow.close()

parent_queue = Queue(parent_queue_name)
parent_queue = Queue(parent_queue_name, {"prefix": prefix})
await parent_queue.pause()
await parent_queue.obliterate()
await parent_queue.close()
Expand Down Expand Up @@ -105,10 +107,10 @@ async def process2(job: Job, token: str):
processing_parents.set_result(None)
return 1

parent_worker = Worker(parent_queue_name, process2)
children_worker = Worker(queue_name, process1)
parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix})
children_worker = Worker(queue_name, process1, {"prefix": prefix})

flow = FlowProducer()
flow = FlowProducer({},{"prefix": prefix})
await flow.addBulk([
{
"name": 'parent-job-1',
Expand All @@ -135,7 +137,7 @@ async def process2(job: Job, token: str):
await children_worker.close()
await flow.close()

parent_queue = Queue(parent_queue_name)
parent_queue = Queue(parent_queue_name, {"prefix": prefix})
await parent_queue.pause()
await parent_queue.obliterate()
await parent_queue.close()
Expand Down Expand Up @@ -166,10 +168,10 @@ async def process2(job: Job, token: str):
processing_parent.set_result(children_values)
return 1

parent_worker = Worker(parent_queue_name, process2)
children_worker = Worker(queue_name, process1)
parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix})
children_worker = Worker(queue_name, process1, {"prefix": prefix})

flow = FlowProducer()
flow = FlowProducer({},{"prefix": prefix})
await flow.add(
{
"name": 'parent-job',
Expand Down Expand Up @@ -197,13 +199,13 @@ def on_parent_processed(future):
await children_worker.close()
await flow.close()

parent_queue = Queue(parent_queue_name)
parent_queue = Queue(parent_queue_name, {"prefix": prefix})
await parent_queue.pause()
await parent_queue.obliterate()
await parent_queue.close()

async def test_get_children_values_on_simple_jobs(self):
queue = Queue(queue_name)
queue = Queue(queue_name, {"prefix": prefix})
job = await queue.add("test", {"foo": "bar"}, {"delay": 1500})
children_values = await job.getChildrenValues()
self.assertEqual(children_values, {})
Expand Down
20 changes: 11 additions & 9 deletions python/tests/job_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
"""

import unittest
import os

from bullmq import Queue, Job
from uuid import uuid4

queueName = f"__test_queue__{uuid4().hex}"
prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull"

class TestJob(unittest.IsolatedAsyncioTestCase):

async def asyncSetUp(self):
print("Setting up test queue")
# Delete test queue
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
await queue.pause()
await queue.obliterate()
await queue.close()

async def test_set_and_get_progress_as_number(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test-job", {"foo": "bar"}, {})
await job.updateProgress(42)
stored_job = await Job.fromId(queue, job.id)
Expand All @@ -31,7 +33,7 @@ async def test_set_and_get_progress_as_number(self):
await queue.close()

async def test_set_and_get_progress_as_object(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test-job", {"foo": "bar"}, {})
await job.updateProgress({"total": 120, "completed": 40})
stored_job = await Job.fromId(queue, job.id)
Expand All @@ -40,7 +42,7 @@ async def test_set_and_get_progress_as_object(self):
await queue.close()

async def test_get_job_state(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test-job", {"foo": "bar"}, {})
state = await job.getState()

Expand All @@ -49,7 +51,7 @@ async def test_get_job_state(self):
await queue.close()

async def test_job_log(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
firstLog = 'some log text 1'
secondLog = 'some log text 2'
job = await queue.add("test-job", {"foo": "bar"}, {})
Expand All @@ -63,7 +65,7 @@ async def test_job_log(self):
await queue.close()

async def test_update_job_data(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test", {"foo": "bar"}, {})
await job.updateData({"baz": "qux"})
stored_job = await Job.fromId(queue, job.id)
Expand All @@ -73,15 +75,15 @@ async def test_update_job_data(self):
await queue.close()

async def test_job_data_json_compliant(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test", {"foo": "bar"}, {})
with self.assertRaises(ValueError):
await job.updateData({"baz": float('nan')})

await queue.close()

async def test_update_job_data_when_is_removed(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test", {"foo": "bar"}, {})
await job.remove()
with self.assertRaises(TypeError):
Expand All @@ -90,7 +92,7 @@ async def test_update_job_data_when_is_removed(self):
await queue.close()

async def test_promote_delayed_job(self):
queue = Queue(queueName)
queue = Queue(queueName, {"prefix": prefix})
job = await queue.add("test", {"foo": "bar"}, {"delay": 1500})
isDelayed = await job.isDelayed()
self.assertEqual(isDelayed, True)
Expand Down
Loading

0 comments on commit 23d303c

Please sign in to comment.