diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a9e0ec29e6..89b7d61fa4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -265,4 +265,4 @@ jobs: - name: Test with pytest run: | cd python - ./run_tests.sh + ./run_tests_dragonfly.sh diff --git a/python/bullmq/flow_producer.py b/python/bullmq/flow_producer.py index 5c0d80af8f..c5c5e4eee7 100644 --- a/python/bullmq/flow_producer.py +++ b/python/bullmq/flow_producer.py @@ -29,6 +29,7 @@ class FlowProducer: Instantiate a FlowProducer object """ + #pass only queueOpts, no need 2 parameters in next breaking change def __init__(self, redisOpts: dict | str = {}, opts: QueueBaseOptions = {}): """ Initialize a connection diff --git a/python/run_tests_dragonfly.sh b/python/run_tests_dragonfly.sh new file mode 100755 index 0000000000..60de87b06b --- /dev/null +++ b/python/run_tests_dragonfly.sh @@ -0,0 +1,8 @@ +#!/bin/bash +BULLMQ_TEST_PREFIX="{b}" +python3 -m unittest -v tests.bulk_tests +python3 -m unittest -v tests.delay_tests +python3 -m unittest -v tests.flow_tests +python3 -m unittest -v tests.job_tests +python3 -m unittest -v tests.queue_tests +python3 -m unittest -v tests.worker_tests \ No newline at end of file diff --git a/python/tests/bulk_tests.py b/python/tests/bulk_tests.py index cccb267b6b..fe565b22b4 100644 --- a/python/tests/bulk_tests.py +++ b/python/tests/bulk_tests.py @@ -5,26 +5,28 @@ """ import unittest +import os from asyncio import Future from bullmq import Queue, Job, Worker from uuid import uuid4 queueName = f"__test_queue__{uuid4().hex}" +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestJob(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() async def test_process_jobs(self): name = "test" - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) async def process(job: Job, token: str): if job.data.get("idx") == 0: @@ -34,7 +36,7 @@ async def process(job: Job, token: str): self.assertEqual(job.data.get("foo"), "baz") return "done" - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) completed_events = Future() diff --git a/python/tests/delay_tests.py b/python/tests/delay_tests.py index 89c48580e5..f5d54b53c5 100644 --- a/python/tests/delay_tests.py +++ b/python/tests/delay_tests.py @@ -6,19 +6,21 @@ import unittest import time +import os from asyncio import Future from bullmq import Queue, Job, Worker from uuid import uuid4 queueName = f"__test_queue__{uuid4().hex}" +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestJob(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() @@ -27,12 +29,12 @@ async def test_progress_delayed_job_only_after_delayed_time(self): delay = 1000 margin = 1.2 timestamp = round(time.time() * 1000) - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) async def process(job: Job, token: str): return "done" - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) completed_events = Future() diff --git a/python/tests/flow_tests.py b/python/tests/flow_tests.py index 7f0a9a6428..91162f44a4 100644 --- a/python/tests/flow_tests.py +++ b/python/tests/flow_tests.py @@ -5,6 +5,7 @@ """ from asyncio import Future +import os from bullmq import Queue, Job, FlowProducer, Worker from uuid import uuid4 @@ -12,13 +13,14 @@ import unittest queue_name = f"__test_queue__{uuid4().hex}" +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestJob(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queue_name) + queue = Queue(queue_name, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() @@ -48,10 +50,10 @@ async def process2(job: Job, token: str): processing_parent.set_result(None) return 1 - parent_worker = Worker(parent_queue_name, process2) - children_worker = Worker(queue_name, process1) + parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix}) + children_worker = Worker(queue_name, process1, {"prefix": prefix}) - flow = FlowProducer() + flow = FlowProducer({}, {"prefix": prefix}) await flow.add( { "name": 'parent-job', @@ -72,7 +74,7 @@ async def process2(job: Job, token: str): await children_worker.close() await flow.close() - parent_queue = Queue(parent_queue_name) + parent_queue = Queue(parent_queue_name, {"prefix": prefix}) await parent_queue.pause() await parent_queue.obliterate() await parent_queue.close() @@ -105,10 +107,10 @@ async def process2(job: Job, token: str): processing_parents.set_result(None) return 1 - parent_worker = Worker(parent_queue_name, process2) - children_worker = Worker(queue_name, process1) + parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix}) + children_worker = Worker(queue_name, process1, {"prefix": prefix}) - flow = FlowProducer() + flow = FlowProducer({},{"prefix": prefix}) await flow.addBulk([ { "name": 'parent-job-1', @@ -135,7 +137,7 @@ async def process2(job: Job, token: str): await children_worker.close() await flow.close() - parent_queue = Queue(parent_queue_name) + parent_queue = Queue(parent_queue_name, {"prefix": prefix}) await parent_queue.pause() await parent_queue.obliterate() await parent_queue.close() @@ -166,10 +168,10 @@ async def process2(job: Job, token: str): processing_parent.set_result(children_values) return 1 - parent_worker = Worker(parent_queue_name, process2) - children_worker = Worker(queue_name, process1) + parent_worker = Worker(parent_queue_name, process2, {"prefix": prefix}) + children_worker = Worker(queue_name, process1, {"prefix": prefix}) - flow = FlowProducer() + flow = FlowProducer({},{"prefix": prefix}) await flow.add( { "name": 'parent-job', @@ -197,13 +199,13 @@ def on_parent_processed(future): await children_worker.close() await flow.close() - parent_queue = Queue(parent_queue_name) + parent_queue = Queue(parent_queue_name, {"prefix": prefix}) await parent_queue.pause() await parent_queue.obliterate() await parent_queue.close() async def test_get_children_values_on_simple_jobs(self): - queue = Queue(queue_name) + queue = Queue(queue_name, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {"delay": 1500}) children_values = await job.getChildrenValues() self.assertEqual(children_values, {}) diff --git a/python/tests/job_tests.py b/python/tests/job_tests.py index 576b7d4815..51476416fd 100644 --- a/python/tests/job_tests.py +++ b/python/tests/job_tests.py @@ -5,24 +5,26 @@ """ import unittest +import os from bullmq import Queue, Job from uuid import uuid4 queueName = f"__test_queue__{uuid4().hex}" +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestJob(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() async def test_set_and_get_progress_as_number(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) await job.updateProgress(42) stored_job = await Job.fromId(queue, job.id) @@ -31,7 +33,7 @@ async def test_set_and_get_progress_as_number(self): await queue.close() async def test_set_and_get_progress_as_object(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) await job.updateProgress({"total": 120, "completed": 40}) stored_job = await Job.fromId(queue, job.id) @@ -40,7 +42,7 @@ async def test_set_and_get_progress_as_object(self): await queue.close() async def test_get_job_state(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) state = await job.getState() @@ -49,7 +51,7 @@ async def test_get_job_state(self): await queue.close() async def test_job_log(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) firstLog = 'some log text 1' secondLog = 'some log text 2' job = await queue.add("test-job", {"foo": "bar"}, {}) @@ -63,7 +65,7 @@ async def test_job_log(self): await queue.close() async def test_update_job_data(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {}) await job.updateData({"baz": "qux"}) stored_job = await Job.fromId(queue, job.id) @@ -73,7 +75,7 @@ async def test_update_job_data(self): await queue.close() async def test_job_data_json_compliant(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {}) with self.assertRaises(ValueError): await job.updateData({"baz": float('nan')}) @@ -81,7 +83,7 @@ async def test_job_data_json_compliant(self): await queue.close() async def test_update_job_data_when_is_removed(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {}) await job.remove() with self.assertRaises(TypeError): @@ -90,7 +92,7 @@ async def test_update_job_data_when_is_removed(self): await queue.close() async def test_promote_delayed_job(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {"delay": 1500}) isDelayed = await job.isDelayed() self.assertEqual(isDelayed, True) diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index 5ffeaf5f38..3ed284bd1e 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -11,30 +11,31 @@ import asyncio import unittest +import os import time queueName = f"__test_queue__{uuid4().hex}" - +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestQueue(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() async def test_add_job(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) self.assertEqual(job.id, "1") await queue.close() async def test_get_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job1 = await queue.add("test-job", {"foo": "bar"}, {}) job2 = await queue.add("test-job", {"foo": "bar"}, {}) jobs = await queue.getJobs(["wait"]) @@ -44,7 +45,7 @@ async def test_get_jobs(self): await queue.close() async def test_get_job_state(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) state = await queue.getJobState(job.id) @@ -52,7 +53,7 @@ async def test_get_job_state(self): await queue.close() async def test_add_job_with_options(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} attempts = 3 delay = 1000 @@ -66,7 +67,7 @@ async def test_add_job_with_options(self): await queue.close() async def test_is_paused(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() isPaused = await queue.isPaused() @@ -81,7 +82,8 @@ async def test_is_paused(self): await queue.close() async def test_is_paused_with_custom_prefix(self): - queue = Queue(queueName, {"prefix": "test"}) + custom_prefix = "{" + prefix + "}" + queue = Queue(queueName, {"prefix": custom_prefix}) await queue.pause() isPaused = await queue.isPaused() @@ -97,7 +99,7 @@ async def test_is_paused_with_custom_prefix(self): await queue.close() async def test_trim_events_manually(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) @@ -115,7 +117,8 @@ async def test_trim_events_manually(self): await queue.close() async def test_trim_events_manually_with_custom_prefix(self): - queue = Queue(queueName, {"prefix": "test"}) + custom_prefix = "{" + prefix + "}" + queue = Queue(queueName, {"prefix": custom_prefix}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) await queue.add("test", data={}, opts={}) @@ -134,7 +137,7 @@ async def test_trim_events_manually_with_custom_prefix(self): await queue.close() async def test_get_delayed_count(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} delay = 1000 await queue.add("test-job", data=data, opts={"delay": delay}) @@ -146,7 +149,7 @@ async def test_get_delayed_count(self): await queue.close() async def test_retry_failed_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job_count = 8 fail = True @@ -158,7 +161,7 @@ async def process(job: Job, token: str): return order = 0 - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) failed_events = Future() @@ -209,7 +212,7 @@ def completing(job: Job, result): await worker.close() async def test_retry_completed_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job_count = 8 async def process(job: Job, token: str): @@ -217,7 +220,7 @@ async def process(job: Job, token: str): return order = 0 - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) completed_events1 = Future() @@ -265,7 +268,7 @@ def completing2(job: Job, result): await worker.close() async def test_retry_failed_jobs_before_timestamp(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job_count = 8 fail = True @@ -277,7 +280,7 @@ async def process(job: Job, token: str): return order = 0 - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) failed_events = Future() timestamp = 0 @@ -332,7 +335,7 @@ def completing(job: Job, result): await worker.close() async def test_retry_jobs_when_queue_is_paused(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job_count = 8 fail = True @@ -344,7 +347,7 @@ async def process(job: Job, token: str): return order = 0 - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) failed_events = Future() @@ -382,7 +385,7 @@ def failing(job: Job, result): await worker.close() async def test_promote_all_delayed_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job_count = 8 for index in range(job_count): @@ -402,7 +405,7 @@ async def process(job: Job, token: str): return order = 0 - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) completed_events = Future() @@ -426,7 +429,7 @@ def completing(job: Job, result): await worker.close() async def test_remove_job(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) job = await queue.add("test", {"foo": "bar"}, {}) await queue.remove(job.id) job = await Job.fromId(queue, job.id) @@ -435,7 +438,7 @@ async def test_remove_job(self): await queue.close() async def test_get_counts_per_priority(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) jobs = [{ "name": "test", "data": {}, @@ -456,7 +459,7 @@ async def test_get_counts_per_priority(self): async def test_reusable_redis(self): conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0) - queue = Queue(queueName, {"connection": conn}) + queue = Queue(queueName, {"connection": conn, "prefix": prefix}) job = await queue.add("test-job", {"foo": "bar"}, {}) self.assertEqual(job.id, "1") diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index 11ae8c99fd..71ee7da9e7 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -13,21 +13,23 @@ import asyncio import unittest import time +import os queueName = f"__test_queue__{uuid4().hex}" +prefix = os.environ.get('BULLMQ_TEST_PREFIX') or "bull" class TestWorker(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): print("Setting up test queue") # Delete test queue - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) await queue.pause() await queue.obliterate() await queue.close() async def test_process_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -35,7 +37,7 @@ async def process(job: Job, token: str): print("Processing job", job) return "done" - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) processing = Future() worker.on("completed", lambda job, result: processing.set_result(None)) @@ -54,7 +56,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_job_with_array_as_return_value(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -62,7 +64,7 @@ async def process(job: Job, token: str): print("Processing job", job) return ['foo'] - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) processing = Future() worker.on("completed", lambda job, result: processing.set_result(None)) @@ -81,7 +83,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_job_with_boolean_as_return_value(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -89,7 +91,7 @@ async def process(job: Job, token: str): print("Processing job", job) return True - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) processing = Future() worker.on("completed", lambda job, result: processing.set_result(None)) @@ -108,7 +110,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_job_fail_with_nan_as_return_value(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -118,7 +120,7 @@ async def process(job: Job, token: str): print("Processing job", job) return float('nan') - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) processing = Future() worker.on("failed", lambda job, result: processing.set_result(None)) @@ -138,7 +140,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_jobs_fail(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -148,7 +150,7 @@ async def process(job: Job, token: str): print("Processing job", job) raise Exception(failedReason) - worker = Worker(queueName, process) + worker = Worker(queueName, process, {"prefix": prefix}) processing = Future() worker.on("failed", lambda job, result: processing.set_result(None)) @@ -169,7 +171,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_renews_lock(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -177,7 +179,7 @@ async def process(job: Job, token: str): await asyncio.sleep(3) return "done" - worker = Worker(queueName, process, {"lockDuration": 1000}) + worker = Worker(queueName, process, {"lockDuration": 1000 "prefix": prefix}) processing = Future() worker.on("completed", lambda job, result: processing.set_result(None)) @@ -196,7 +198,7 @@ async def process(job: Job, token: str): await queue.close() async def test_process_stalled_jobs(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -208,7 +210,7 @@ async def process1(job: Job, token: str): await asyncio.sleep(2) return "done1" - worker = Worker(queueName, process1, {"lockDuration": 1000}) + worker = Worker(queueName, process1, {"lockDuration": 1000, "prefix": prefix}) await startProcessing await worker.close(force=True) @@ -217,7 +219,7 @@ async def process2(job: Job, token: str): return "done2" worker2 = Worker(queueName, process2, { - "lockDuration": 1000, "stalledInterval": 1000}) + "lockDuration": 1000, "stalledInterval": 1000, "prefix": prefix}) processing = Future() worker2.on("completed", lambda job, @@ -241,14 +243,14 @@ async def process2(job: Job, token: str): await queue.close() async def test_retry_job_after_delay_with_fixed_backoff(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) async def process1(job: Job, token: str): if job.attemptsMade < 2: raise Exception("Not yet!") return None - worker = Worker(queueName, process1) + worker = Worker(queueName, process1, {"prefix": prefix}) start = round(time.time() * 1000) await queue.add("test", { "foo": "bar" }, @@ -269,7 +271,7 @@ def completing(job: Job, result): await worker.close() async def test_retry_job_after_delay_with_custom_backoff(self): - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) async def process1(job: Job, token: str): if job.attemptsMade < 2: @@ -281,7 +283,7 @@ def backoff_strategy(attempts_made, type, err, job): worker = Worker(queueName, process1, {"settings": { "backoffStrategy": backoff_strategy - }}) + }, "prefix": prefix}) start = round(time.time() * 1000) await queue.add("test", { "foo": "bar" }, @@ -303,8 +305,8 @@ def completing(job: Job, result): async def test_create_children_at_runtime(self): parent_queue_name = f"__parent_queue__{uuid4().hex}" - parent_queue = Queue(parent_queue_name) - queue = Queue(queueName) + parent_queue = Queue(parent_queue_name, {"prefix": prefix}) + queue = Queue(queueName, {"prefix": prefix}) class Step(int, Enum): Initial = 1 @@ -358,8 +360,8 @@ async def children_process(job: Job, token: str): await asyncio.sleep(0.2) return None - worker = Worker(parent_queue_name, parent_process, {}) - children_worker = Worker(queueName, children_process, {}) + worker = Worker(parent_queue_name, parent_process, {"prefix": prefix}) + children_worker = Worker(queueName, children_process, {"prefix": prefix}) await parent_queue.add( "test", {"step": Step.Initial}, { @@ -390,7 +392,7 @@ async def test_process_job_respecting_the_concurrency_set(self): pending_message_to_process = 8 wait = 0.01 job_count = 0 - queue = Queue(queueName) + queue = Queue(queueName, {"prefix": prefix}) async def process(job: Job, token: str): nonlocal num_jobs_processing @@ -409,7 +411,7 @@ async def process(job: Job, token: str): for _ in range(8): await queue.add("test", data={}) - worker = Worker(queueName, process, {"concurrency": 4 }) + worker = Worker(queueName, process, {"concurrency": 4, "prefix": prefix}) completed_events = Future() @@ -428,7 +430,7 @@ def completing(job: Job, result): async def test_reusable_redis(self): conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0) - queue = Queue(queueName, {"connection": conn}) + queue = Queue(queueName, {"connection": conn, "prefix": prefix}) data = {"foo": "bar"} job = await queue.add("test-job", data, {"removeOnComplete": False}) @@ -436,7 +438,7 @@ async def process(job: Job, token: str): print("Processing job", job) return "done" - worker = Worker(queueName, process, {"connection": conn}) + worker = Worker(queueName, process, {"connection": conn, "prefix": prefix}) processing = Future() worker.on("completed", lambda job, result: processing.set_result(None))