Skip to content


Repository files navigation

minique /miniːk/

A minimal Redis 4.0+ job queue for Python 3.7 and above.


  • Python 3.7+
  • Redis 4.0+


  • Have a Redis 4.0+ server running.


from redis import StrictRedis
from minique.api import enqueue, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')

job = enqueue(
    callable='my_jobs.calcumacalate',  # Dotted path to your callable.
    kwargs={'a': 5, 'b': 5},  # Only kwargs supported.
    # You can also set a `job_id` yourself (but it must be unique)

job_id =  # Save the job ID somewhere, maybe?

while not job.has_finished:
    pass  # Twiddle thumbs...

print(job.result)  # Okay!

# Get the same job later (though not later than 7 days (by default)):
job = get_job(redis, job_id)


  • Ensure your workers are able to import the functions you wish to run.
  • Set the callables the worker will allow with --allow-callable.
    • Alternately, you may wish to subclass to specify an entirely different lookup mechanism.
$ minique -u redis://localhost:6379/4 -q work -q anotherqueue -q thirdqueue --allow-callable 'my_jobs.*'

Priority Queues

Minique supports priority queueing as an optional feature using the enqueue_priority API.

Priority queues are compatible with standard workers. However, priority is implemented using a helper data structure, requiring the client needs to call job.cleanup() after each job and/or PriorityQueue(...).periodic_clean() to prune this structure of jobs that have already been processed.

Priority queue requires Lua scripting permissions from the Redis queue service.

from redis import StrictRedis
from minique.api import enqueue_priority, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')

job = enqueue_priority(
    callable='my_jobs.calcumacalate',  # Dotted path to your callable.
    kwargs={'a': 5, 'b': 5},  # Only kwargs supported.
    priority=1,  # Integer
    # You can also set a `job_id` yourself (but it must be unique)

job_id =  # Save the job ID somewhere, maybe?

while not job.has_finished:
    pass  # Twiddle thumbs...

print(job.result)  # Okay!

# Job priorities are stored in a helper hash table which should be cleaned using this method
# after the job has left the queue.

# Get the same job later (though not later than 7 days (by default)):
job = get_job(redis, job_id)

Sentry Support

Minique automatically integrates with the Sentry exception tracking service.

You can use the [sentry] installation extra to install sentry-sdk along with Minique, or you can do it manually.

Simply set the SENTRY_DSN environment variable; if all goes well, you should see a "Sentry configured with a valid DSN" message at CLI boot.

The other environment-configurable options also work as you would expect.

Exceptions occurring during job execution will be sent to Sentry and annotated with minique context describing the job ID and queue name.


# install `minique` in editable mode with development dependencies
pip install -e .[sentry,test] pre-commit mypy==1.0.0 types-redis && pre-commit install

# run lints
pre-commit run --all-files

# run type checks
mypy --strict --install-types --show-error-codes minique

# run tests against the specified Redis database
REDIS_URL=redis://localhost:6379/0 pytest .


# decide on a new version number and set it
vim minique/
__version__ = "0.9.0"

npx auto-changelog --commit-limit=0 -v 0.9.0

# undo changes changelog generation did to the older entries

git add -u
git commit -m "Become 0.9.0"
git tag -m "v0.9.0" -a v0.9.0

git push --follow-tags


A minimal Redis job queue for Python 3.







No packages published
