diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index f82bdbf..001fd00 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -37,5 +37,5 @@ return Factory::create(new CodeIgniter4(), $overrides, $options)->forLibrary( 'CodeIgniter Queue', 'CodeIgniter Foundation', - 'admin@codeigniter.com' + 'admin@codeigniter.com', ); diff --git a/docs/index.md b/docs/index.md index 7f835b6..ab3643c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,6 +26,11 @@ If you use `database` handler: - Oracle 12.1+ - SQLite3 +If you use `Redis` (you still need a relational database to store failed jobs): + +- PHPRedis +- Predis + ### Table of Contents * [Installation](installation.md) diff --git a/docs/running-queues.md b/docs/running-queues.md index 776289f..c32d74f 100644 --- a/docs/running-queues.md +++ b/docs/running-queues.md @@ -65,11 +65,13 @@ This way, worker will consume jobs with the `low` priority and then with `high`. ### Running many instances of the same queue -As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases if you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver this setting is not relevant. +As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control. + +The PHPRedis and Predis drivers are also safe to use with multiple instances of the same command. ### Handling long-running process -If we decide to run the long process e.g. with the command: +If we decide to run the long process, e.g., with the command: php spark queue:work emails -wait 10 diff --git a/mkdocs.yml b/mkdocs.yml index feb90ad..b93a433 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -53,7 +53,7 @@ extra: site_url: https://queue.codeigniter.com/ repo_url: https://github.com/codeigniter4/queue edit_uri: edit/develop/docs/ -copyright: Copyright © 2023 CodeIgniter Foundation. +copyright: Copyright © 2025 CodeIgniter Foundation. markdown_extensions: - admonition diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index 391917d..6a4d4fb 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -108,7 +108,7 @@ public function run(array $params) $memory, $priority, $tries, - $retryAfter + $retryAfter, ] = $this->readOptions($params, $config, $queue); if ($error !== null) { diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index 6e29e45..07aeb3a 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -72,11 +72,11 @@ public function retry(?int $id, ?string $queue): int $jobs = model(QueueJobFailedModel::class) ->when( $id !== null, - static fn ($query) => $query->where('id', $id) + static fn ($query) => $query->where('id', $id), ) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->findAll(); @@ -112,11 +112,11 @@ public function flush(?int $hours, ?string $queue): bool return model(QueueJobFailedModel::class) ->when( $hours !== null, - static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) + static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp), ) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->delete(); } @@ -129,7 +129,7 @@ public function listFailed(?string $queue): array return model(QueueJobFailedModel::class) ->when( $queue !== null, - static fn ($query) => $query->where('queue', $queue) + static fn ($query) => $query->where('queue', $queue), ) ->orderBy('failed_at', 'desc') ->findAll(); diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index c06c9d3..4c2114d 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -13,6 +13,7 @@ namespace CodeIgniter\Queue\Handlers; +use CodeIgniter\Autoloader\FileLocator; use CodeIgniter\Exceptions\CriticalError; use CodeIgniter\I18n\Time; use CodeIgniter\Queue\Config\Queue as QueueConfig; @@ -27,12 +28,20 @@ class PredisHandler extends BaseHandler implements QueueInterface { private readonly Client $predis; + private readonly string $luaScript; public function __construct(protected QueueConfig $config) { try { $this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]); $this->predis->time(); + + $locator = new FileLocator(service('autoloader')); + $luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua'); + if ($luaScript === false) { + throw new CriticalError('Queue: LUA script for Predis is not available.'); + } + $this->luaScript = file_get_contents($luaScript); } catch (Exception $e) { throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').'); } @@ -77,30 +86,23 @@ public function push(string $queue, string $job, array $data): bool */ public function pop(string $queue, array $priorities): ?QueueJob { - $tasks = []; - $now = Time::now()->timestamp; - - foreach ($priorities as $priority) { - $tasks = $this->predis->zrangebyscore( - "queues:{$queue}:{$priority}", - '-inf', - $now, - ['LIMIT' => [0, 1]] - ); - if ($tasks !== []) { - $removed = $this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks); - if ($removed !== 0) { - break; - } - $tasks = []; - } - } + $now = (string) Time::now()->timestamp; + + // Prepare the arguments for the Lua script + $args = [ + 'queues:' . $queue, // KEYS[1] + $now, // ARGV[2] + json_encode($priorities), // ARGV[3] + ]; + + // Execute the Lua script + $task = $this->predis->eval($this->luaScript, 1, ...$args); - if ($tasks === []) { + if ($task === null) { return null; } - $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + $queueJob = new QueueJob(json_decode((string) $task, true)); // Set the actual status as in DB. $queueJob->status = Status::RESERVED->value; @@ -121,7 +123,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $result = $this->predis->zadd( "queues:{$queueJob->queue}:{$queueJob->priority}", - [json_encode($queueJob) => $queueJob->available_at->timestamp] + [json_encode($queueJob) => $queueJob->available_at->timestamp], ); if ($result !== 0) { $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]); diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php index 6d4e537..8fcf27f 100644 --- a/src/Handlers/RedisHandler.php +++ b/src/Handlers/RedisHandler.php @@ -13,6 +13,7 @@ namespace CodeIgniter\Queue\Handlers; +use CodeIgniter\Autoloader\FileLocator; use CodeIgniter\Exceptions\CriticalError; use CodeIgniter\I18n\Time; use CodeIgniter\Queue\Config\Queue as QueueConfig; @@ -27,6 +28,7 @@ class RedisHandler extends BaseHandler implements QueueInterface { private readonly Redis $redis; + private readonly string $luaScript; public function __construct(protected QueueConfig $config) { @@ -48,6 +50,13 @@ public function __construct(protected QueueConfig $config) if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) { throw new CriticalError('Queue: Redis setting prefix failed.'); } + + $locator = new FileLocator(service('autoloader')); + $luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua'); + if ($luaScript === false) { + throw new CriticalError('Queue: LUA script for Redis is not available.'); + } + $this->luaScript = file_get_contents($luaScript); } catch (RedisException $e) { throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').'); } @@ -96,23 +105,23 @@ public function push(string $queue, string $job, array $data): bool */ public function pop(string $queue, array $priorities): ?QueueJob { - $tasks = []; - $now = Time::now()->timestamp; - - foreach ($priorities as $priority) { - if ($tasks = $this->redis->zRangeByScore("queues:{$queue}:{$priority}", '-inf', (string) $now, ['limit' => [0, 1]])) { - if ($this->redis->zRem("queues:{$queue}:{$priority}", ...$tasks)) { - break; - } - $tasks = []; - } - } + $now = Time::now()->timestamp; + + // Prepare the arguments for the Lua script + $args = [ + 'queues:' . $queue, // KEYS[1] + $now, // ARGV[2] + json_encode($priorities), // ARGV[3] + ]; + + // Execute the Lua script + $task = $this->redis->eval($this->luaScript, $args, 1); - if ($tasks === []) { + if ($task === false) { return null; } - $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + $queueJob = new QueueJob(json_decode((string) $task, true)); // Set the actual status as in DB. $queueJob->status = Status::RESERVED->value; @@ -136,7 +145,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $result = (int) $this->redis->zAdd( "queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, - json_encode($queueJob) + json_encode($queueJob), ); if ($result !== 0) { $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id); diff --git a/src/Lua/pop_task.lua b/src/Lua/pop_task.lua new file mode 100644 index 0000000..8ddeb79 --- /dev/null +++ b/src/Lua/pop_task.lua @@ -0,0 +1,17 @@ +local queue = KEYS[1] +local now = tonumber(ARGV[1]) +local priorities = cjson.decode(ARGV[2]) +local task = nil + +for _, priority in ipairs(priorities) do + local key = queue .. ':' .. priority + local tasks = redis.call('ZRANGEBYSCORE', key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #tasks > 0 then + redis.call('ZREM', key, tasks[1]) + task = tasks[1] + break + end +end + +return task diff --git a/src/Models/QueueJobModel.php b/src/Models/QueueJobModel.php index 6aaf910..4df5bd1 100644 --- a/src/Models/QueueJobModel.php +++ b/src/Models/QueueJobModel.php @@ -141,22 +141,22 @@ private function setPriority(BaseBuilder $builder, array $priority): BaseBuilder sprintf('CASE %s ', $this->db->protectIdentifiers('priority')) . implode( ' ', - array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority)) + array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority)), ) . ' END', '', - false + false, ); } else { $builder->orderBy( 'FIELD(priority, ' . implode( ',', - array_map(static fn ($value) => "'{$value}'", $priority) + array_map(static fn ($value) => "'{$value}'", $priority), ) . ')', '', - false + false, ); } }