From 2f331d043d0d7e6c06fd214dac351d4ae5271840 Mon Sep 17 00:00:00 2001 From: michalsn Date: Tue, 7 Jan 2025 18:15:44 +0100 Subject: [PATCH 1/5] use lua script to pop the task atomically --- src/Handlers/PredisHandler.php | 42 ++++++++++++++++++---------------- src/Handlers/RedisHandler.php | 35 +++++++++++++++++----------- src/Lua/pop_task.lua | 17 ++++++++++++++ 3 files changed, 61 insertions(+), 33 deletions(-) create mode 100644 src/Lua/pop_task.lua diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index c06c9d3..e3f81ef 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -13,6 +13,7 @@ namespace CodeIgniter\Queue\Handlers; +use CodeIgniter\Autoloader\FileLocator; use CodeIgniter\Exceptions\CriticalError; use CodeIgniter\I18n\Time; use CodeIgniter\Queue\Config\Queue as QueueConfig; @@ -27,12 +28,20 @@ class PredisHandler extends BaseHandler implements QueueInterface { private readonly Client $predis; + private readonly string $luaScript; public function __construct(protected QueueConfig $config) { try { $this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]); $this->predis->time(); + + $locator = new FileLocator(service('autoloader')); + $luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua'); + if ($luaScript === false) { + throw new CriticalError('Queue: LUA script for Predis is not available.'); + } + $this->luaScript = file_get_contents($luaScript); } catch (Exception $e) { throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').'); } @@ -77,30 +86,23 @@ public function push(string $queue, string $job, array $data): bool */ public function pop(string $queue, array $priorities): ?QueueJob { - $tasks = []; - $now = Time::now()->timestamp; - - foreach ($priorities as $priority) { - $tasks = $this->predis->zrangebyscore( - "queues:{$queue}:{$priority}", - '-inf', - $now, - ['LIMIT' => [0, 1]] - ); - if ($tasks !== []) { - $removed = $this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks); - if ($removed !== 0) { - break; - } - $tasks = []; - } - } + $now = Time::now()->timestamp; + + // Prepare the arguments for the Lua script + $args = [ + 'queues:' . $queue, // KEYS[1] + $now, // ARGV[2] + json_encode($priorities), // ARGV[3] + ]; + + // Execute the Lua script + $task = $this->predis->eval($this->luaScript, 1, ...$args); - if ($tasks === []) { + if ($task === null) { return null; } - $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + $queueJob = new QueueJob(json_decode((string) $task, true)); // Set the actual status as in DB. $queueJob->status = Status::RESERVED->value; diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php index 6d4e537..06255ad 100644 --- a/src/Handlers/RedisHandler.php +++ b/src/Handlers/RedisHandler.php @@ -13,6 +13,7 @@ namespace CodeIgniter\Queue\Handlers; +use CodeIgniter\Autoloader\FileLocator; use CodeIgniter\Exceptions\CriticalError; use CodeIgniter\I18n\Time; use CodeIgniter\Queue\Config\Queue as QueueConfig; @@ -27,6 +28,7 @@ class RedisHandler extends BaseHandler implements QueueInterface { private readonly Redis $redis; + private readonly string $luaScript; public function __construct(protected QueueConfig $config) { @@ -48,6 +50,13 @@ public function __construct(protected QueueConfig $config) if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) { throw new CriticalError('Queue: Redis setting prefix failed.'); } + + $locator = new FileLocator(service('autoloader')); + $luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua'); + if ($luaScript === false) { + throw new CriticalError('Queue: LUA script for Redis is not available.'); + } + $this->luaScript = file_get_contents($luaScript); } catch (RedisException $e) { throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').'); } @@ -96,23 +105,23 @@ public function push(string $queue, string $job, array $data): bool */ public function pop(string $queue, array $priorities): ?QueueJob { - $tasks = []; - $now = Time::now()->timestamp; - - foreach ($priorities as $priority) { - if ($tasks = $this->redis->zRangeByScore("queues:{$queue}:{$priority}", '-inf', (string) $now, ['limit' => [0, 1]])) { - if ($this->redis->zRem("queues:{$queue}:{$priority}", ...$tasks)) { - break; - } - $tasks = []; - } - } + $now = Time::now()->timestamp; + + // Prepare the arguments for the Lua script + $args = [ + 'queues:' . $queue, // KEYS[1] + $now, // ARGV[2] + json_encode($priorities), // ARGV[3] + ]; + + // Execute the Lua script + $task = $this->redis->eval($this->luaScript, $args, 1); - if ($tasks === []) { + if ($task === false) { return null; } - $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + $queueJob = new QueueJob(json_decode((string) $task, true)); // Set the actual status as in DB. $queueJob->status = Status::RESERVED->value; diff --git a/src/Lua/pop_task.lua b/src/Lua/pop_task.lua new file mode 100644 index 0000000..8ddeb79 --- /dev/null +++ b/src/Lua/pop_task.lua @@ -0,0 +1,17 @@ +local queue = KEYS[1] +local now = tonumber(ARGV[1]) +local priorities = cjson.decode(ARGV[2]) +local task = nil + +for _, priority in ipairs(priorities) do + local key = queue .. ':' .. priority + local tasks = redis.call('ZRANGEBYSCORE', key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #tasks > 0 then + redis.call('ZREM', key, tasks[1]) + task = tasks[1] + break + end +end + +return task From 6f6696905a94fac99b7c019f3f117ebda06f1b45 Mon Sep 17 00:00:00 2001 From: michalsn Date: Tue, 7 Jan 2025 18:15:51 +0100 Subject: [PATCH 2/5] docs update --- docs/index.md | 5 +++++ docs/running-queues.md | 5 +++-- mkdocs.yml | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7f835b6..ab3643c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,6 +26,11 @@ If you use `database` handler: - Oracle 12.1+ - SQLite3 +If you use `Redis` (you still need a relational database to store failed jobs): + +- PHPRedis +- Predis + ### Table of Contents * [Installation](installation.md) diff --git a/docs/running-queues.md b/docs/running-queues.md index 776289f..7e1dd68 100644 --- a/docs/running-queues.md +++ b/docs/running-queues.md @@ -65,11 +65,12 @@ This way, worker will consume jobs with the `low` priority and then with `high`. ### Running many instances of the same queue -As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases if you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver this setting is not relevant. +As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases if you keep the `skipLocked` to `true` in the config file. +Only for SQLite3, PHPRedis and Predis driver, this setting is not relevant. ### Handling long-running process -If we decide to run the long process e.g. with the command: +If we decide to run the long process, e.g., with the command: php spark queue:work emails -wait 10 diff --git a/mkdocs.yml b/mkdocs.yml index feb90ad..b93a433 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -53,7 +53,7 @@ extra: site_url: https://queue.codeigniter.com/ repo_url: https://github.com/codeigniter4/queue edit_uri: edit/develop/docs/ -copyright: Copyright © 2023 CodeIgniter Foundation. +copyright: Copyright © 2025 CodeIgniter Foundation. markdown_extensions: - admonition From 498da7a1be820583659b804103419a7815d0f4ae Mon Sep 17 00:00:00 2001 From: michalsn Date: Tue, 7 Jan 2025 18:24:35 +0100 Subject: [PATCH 3/5] fix phpstan --- src/Handlers/PredisHandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index e3f81ef..39256bf 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -86,7 +86,7 @@ public function push(string $queue, string $job, array $data): bool */ public function pop(string $queue, array $priorities): ?QueueJob { - $now = Time::now()->timestamp; + $now = (string) Time::now()->timestamp; // Prepare the arguments for the Lua script $args = [ From 9cdc8e8c27b9fc48b2ca6fec1e4201715f60e770 Mon Sep 17 00:00:00 2001 From: michalsn Date: Fri, 17 Jan 2025 12:12:57 +0100 Subject: [PATCH 4/5] update docs --- docs/running-queues.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/running-queues.md b/docs/running-queues.md index 7e1dd68..c32d74f 100644 --- a/docs/running-queues.md +++ b/docs/running-queues.md @@ -65,8 +65,9 @@ This way, worker will consume jobs with the `low` priority and then with `high`. ### Running many instances of the same queue -As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases if you keep the `skipLocked` to `true` in the config file. -Only for SQLite3, PHPRedis and Predis driver, this setting is not relevant. +As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control. + +The PHPRedis and Predis drivers are also safe to use with multiple instances of the same command. ### Handling long-running process From db615f1e606013afb660dec3c69eb9e6a1fcc9ad Mon Sep 17 00:00:00 2001 From: michalsn Date: Fri, 17 Jan 2025 12:18:14 +0100 Subject: [PATCH 5/5] cs fix --- .php-cs-fixer.dist.php | 2 +- src/Commands/QueueWork.php | 2 +- src/Handlers/BaseHandler.php | 10 +++++----- src/Handlers/PredisHandler.php | 2 +- src/Handlers/RedisHandler.php | 2 +- src/Models/QueueJobModel.php | 8 ++++---- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index f82bdbf..001fd00 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -37,5 +37,5 @@ return Factory::create(new CodeIgniter4(), $overrides, $options)->forLibrary( 'CodeIgniter Queue', 'CodeIgniter Foundation', - 'admin@codeigniter.com' + 'admin@codeigniter.com', ); diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index 391917d..6a4d4fb 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -108,7 +108,7 @@ public function run(array $params) $memory, $priority, $tries, - $retryAfter + $retryAfter, ] = $this->readOptions($params, $config, $queue); if ($error !== null) { diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index 6e29e45..07aeb3a 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -72,11 +72,11 @@ public function retry(?int $id, ?string $queue): int $jobs = model(QueueJobFailedModel::class) ->when( $id !== null, - static fn ($query) => $query->where('id', $id) + static fn ($query) => $query->where('id', $id), ) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->findAll(); @@ -112,11 +112,11 @@ public function flush(?int $hours, ?string $queue): bool return model(QueueJobFailedModel::class) ->when( $hours !== null, - static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) + static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp), ) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->delete(); } @@ -129,7 +129,7 @@ public function listFailed(?string $queue): array return model(QueueJobFailedModel::class) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->orderBy('failed_at', 'desc') ->findAll(); diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index 39256bf..4c2114d 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -123,7 +123,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $result = $this->predis->zadd( "queues:{$queueJob->queue}:{$queueJob->priority}", - [json_encode($queueJob) => $queueJob->available_at->timestamp] + [json_encode($queueJob) => $queueJob->available_at->timestamp], ); if ($result !== 0) { $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]); diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php index 06255ad..8fcf27f 100644 --- a/src/Handlers/RedisHandler.php +++ b/src/Handlers/RedisHandler.php @@ -145,7 +145,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $result = (int) $this->redis->zAdd( "queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, - json_encode($queueJob) + json_encode($queueJob), ); if ($result !== 0) { $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id); diff --git a/src/Models/QueueJobModel.php b/src/Models/QueueJobModel.php index 6aaf910..4df5bd1 100644 --- a/src/Models/QueueJobModel.php +++ b/src/Models/QueueJobModel.php @@ -141,22 +141,22 @@ private function setPriority(BaseBuilder $builder, array $priority): BaseBuilder sprintf('CASE %s ', $this->db->protectIdentifiers('priority')) . implode( ' ', - array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority)) + array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority)), ) . ' END', '', - false + false, ); } else { $builder->orderBy( 'FIELD(priority, ' . implode( ',', - array_map(static fn ($value) => "'{$value}'", $priority) + array_map(static fn ($value) => "'{$value}'", $priority), ) . ')', '', - false + false, ); } }