Skip to content

Commit

Permalink
Merge pull request #18 from ustudio/add-support-for-newer-redispy-ver…
Browse files Browse the repository at this point in the history
…sions

Support Latest Redis-Py Version
  • Loading branch information
spiralman authored Dec 1, 2023
2 parents 74715f0 + c016a72 commit e52ec79
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 153 deletions.
80 changes: 47 additions & 33 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
version: 2
version: 2.1
jobs:
test-python37:
test:
parameters:
python_version:
description: "The Python version to use for running the tests"
type: string
docker:
- image: cimg/python:3.7

working_directory: ~/repo

- image: cimg/python:<< parameters.python_version >>
- image: redis:5.0
environment:
REDIS_URI: redis://localhost:6379/0
steps:
- checkout

Expand All @@ -16,71 +20,81 @@ jobs:
- restore_cache:
keys:
- v2-python-{{ checksum "pythonversion" }}-dependencies-{{ checksum "setup.py" }}-{{ checksum "requirements.txt" }}
- v1-python-{{ checksum "pythonversion" }}-dependencies-{{ checksum "poetry.lock" }}

- run:
name: Install Dependencies
name: install dependencies
command: |
python -m virtualenv ~/venv
. ~/venv/bin/activate
pip install -e .
pip install -r requirements.txt
poetry self update --no-ansi -- 1.6.1
poetry install --no-ansi
mkdir -p test-reports
- save_cache:
paths:
- ~/venv
key: v2-python-{{ checksum "pythonversion" }}-dependencies-{{ checksum "setup.py" }}-{{ checksum "requirements.txt" }}
- ~/.cache/pypoetry/virtualenvs
key: v1-python-{{ checksum "pythonversion" }}-dependencies-{{ checksum "poetry.lock" }}

- run:
name: Run Tests
name: run tests
command: |
. ~/venv/bin/activate
pytest --verbose --junit-xml=test-reports/pytest-python37.xml
poetry run pytest --verbose --junit-xml=test-reports/pytest.xml
- run:
name: Run Linter
name: run lint
command: |
. ~/venv/bin/activate
flake8
poetry run flake8 | tee test-reports/flake8-errors
- store_artifacts:
path: test-reports
prefix: python-<< parameters.python_version >>

- store_test_results:
path: test-reports
prefix: python-<< parameters.python_version >>

publish:
docker:
- image: cimg/python:3.7
- image: cimg/python:3.11
working_directory: ~/repo
steps:
- checkout

- run:
name: Install Dependencies
command: |
python -m virtualenv ~/venv
. ~/venv/bin/activate
pip install twine
- run:
name: Publish to PyPI
command: |
. ~/venv/bin/activate
./publish_to_pypi.sh
export POETRY_HTTP_BASIC_PYPI_USERNAME=$PYPI_USERNAME
export POETRY_HTTP_BASIC_PYPI_PASSWORD=$PYPI_PASSWORD
poetry publish --build
workflows:
version: 2
test-and-publish:
test-and-build:
jobs:
- test-python37:
- test:
name: test-3.9
python_version: "3.9"
filters:
tags:
only: /.*/
- test:
name: test-3.10
python_version: "3.10"
filters:
tags:
only: /.*/
- test:
name: test-3.11
python_version: "3.11"
filters:
tags:
only: /.*/
- publish:
requires:
- test-python37
- test-3.9
- test-3.10
- test-3.11
filters:
tags:
only: /^v[0-9]+(\.[0-9]+)*.*/
Expand Down
1 change: 0 additions & 1 deletion MANIFEST.in

This file was deleted.

4 changes: 2 additions & 2 deletions blueque/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@


class Client(object):
def __init__(self, *args, **kwargs):
def __init__(self, url, **kwargs):
super(Client, self).__init__()

self._redis = redis.StrictRedis.from_url(*args, decode_responses=True, **kwargs)
self._redis = redis.StrictRedis.from_url(url, decode_responses=True, **kwargs)

def get_queue(self, name):
redis_queue = RedisQueue(name, self._redis)
Expand Down
29 changes: 15 additions & 14 deletions blueque/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def add_listener(self, node_id):
self._log("adding listener %s" % (node_id))
with self._redis.pipeline() as pipeline:
pipeline.sadd(self._listeners_key, node_id)
pipeline.zincrby(self._queues_key, self._name, amount=1)
pipeline.zincrby(self._queues_key, 1, self._name)
pipeline.execute()

def remove_listener(self, node_id):
Expand All @@ -54,7 +54,7 @@ def remove_listener(self, node_id):

if removed > 0:
self._log("removed listener")
self._redis.zincrby(self._queues_key, self._name, amount=-1)
self._redis.zincrby(self._queues_key, -1, self._name)

return removed

Expand All @@ -78,9 +78,9 @@ def _generate_task(self, pipeline, status, parameters, **kwargs):

task_data.update(kwargs)

pipeline.hmset(RedisTask.task_key(task_id), task_data)
pipeline.hset(RedisTask.task_key(task_id), mapping=task_data)

pipeline.zincrby(self._key("queues"), self._name, amount=0)
pipeline.zincrby(self._key("queues"), 0, self._name)

return task_id

Expand All @@ -91,7 +91,7 @@ def schedule(self, parameters, eta):
with self._redis.pipeline() as pipeline:
task_id = self._generate_task(pipeline, "scheduled", parameters, eta=eta)

pipeline.zadd(self._scheduled_key, eta, task_id)
pipeline.zadd(self._scheduled_key, {task_id: eta})

pipeline.execute()

Expand Down Expand Up @@ -124,7 +124,8 @@ def enqueue_transaction(pipeline):

for task in due_tasks:
pipeline.lpush(self._pending_name, task)
pipeline.hmset(RedisTask.task_key(task), {"status": "pending", "updated": now})
pipeline.hset(
RedisTask.task_key(task), mapping={"status": "pending", "updated": now})

self._redis.transaction(enqueue_transaction, self._scheduled_key)

Expand All @@ -139,9 +140,9 @@ def dequeue(self, node_id):

self._log("got task %s" % (task_id))

self._redis.hmset(
self._redis.hset(
RedisTask.task_key(task_id),
{
mapping={
"status": "reserved",
"node": node_id,
"updated": time.time()
Expand All @@ -153,9 +154,9 @@ def start(self, task_id, node_id, pid):
self._log("starting task %s on %s, pid %i" % (task_id, node_id, pid))
with self._redis.pipeline() as pipeline:
pipeline.sadd(self._started_key, self._running_job(node_id, pid, task_id))
pipeline.hmset(
pipeline.hset(
RedisTask.task_key(task_id),
{"status": "started", "pid": pid, "updated": time.time()})
mapping={"status": "started", "pid": pid, "updated": time.time()})

pipeline.execute()

Expand All @@ -175,9 +176,9 @@ def complete(self, task_id, node_id, pid, result):
pipeline.lrem(self._reserved_key(node_id), 1, task_id)
pipeline.srem(self._started_key, self._running_job(node_id, pid, task_id))

pipeline.hmset(
pipeline.hset(
RedisTask.task_key(task_id),
{
mapping={
"status": "complete",
"result": result,
"updated": time.time()
Expand All @@ -194,9 +195,9 @@ def fail(self, task_id, node_id, pid, error):
pipeline.lrem(self._reserved_key(node_id), 1, task_id)
pipeline.srem(self._started_key, self._running_job(node_id, pid, task_id))

pipeline.hmset(
pipeline.hset(
RedisTask.task_key(task_id),
{
mapping={
"status": "failed",
"error": error,
"updated": time.time()
Expand Down
Loading

0 comments on commit e52ec79

Please sign in to comment.