diff --git a/phpstan.neon.dist b/phpstan.neon.dist index d8d5352..decefa1 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -8,37 +8,13 @@ parameters: - vendor/codeigniter4/framework/system/Test/bootstrap.php excludePaths: ignoreErrors: - - - message: '#Cannot use \+\+ on array\|bool\|float\|int\|object\|string\|null.#' - paths: - - src/Commands/QueueWork.php - - - message: '#Variable \$config on left side of \?\?\= always exists and is not nullable.#' - paths: - - src/Config/Services.php - - - message: '#Call to an undefined method CodeIgniter\\Queue\\Handlers\\BaseHandler::push\(\).#' - paths: - - src/Handlers/BaseHandler.php - message: '#Call to deprecated function random_string\(\):#' paths: - src/Handlers/RedisHandler.php - src/Handlers/PredisHandler.php - - - message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#' - paths: - - tests/_support/Database/Seeds/TestRedisQueueSeeder.php - message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#' - paths: - - src/Handlers/BaseHandler.php - - src/Handlers/DatabaseHandler.php - - src/Handlers/RedisHandler.php - - src/Handlers/PredisHandler.php - - src/Models/QueueJobModel.php - - tests/RedisHandlerTest.php - - tests/PredisHandlerTest.php - message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::affectedRows\(\).#' paths: @@ -47,22 +23,6 @@ parameters: message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::truncate\(\).#' paths: - src/Handlers/BaseHandler.php - - - message: '#Parameter \#3 \$tries of method CodeIgniter\\Queue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#' - paths: - - src/Commands/QueueWork.php - - - message: '#Parameter \#4 \$retryAfter of method CodeIgniter\\Queue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#' - paths: - - src/Commands/QueueWork.php - - - message: '#Expression on left side of \?\? is not nullable.#' - paths: - - src/Commands/QueueWork.php - - - message: '#Variable \$job might not be defined.#' - paths: - - src/Commands/QueueWork.php universalObjectCratesClasses: - CodeIgniter\Entity - CodeIgniter\Entity\Entity diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index c4a228d..50de84e 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -79,7 +79,7 @@ public function run(array $params) $stopWhenEmpty = false; $waiting = false; - // Read params + // Read queue name from params if (! $queue = array_shift($params)) { CLI::error('The queueName is not specified.'); @@ -87,15 +87,25 @@ public function run(array $params) } // Read options - $sleep = $params['sleep'] ?? CLI::getOption('sleep') ?? 10; - $rest = $params['rest'] ?? CLI::getOption('rest') ?? 0; - $maxJobs = $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0; - $maxTime = $params['max-time'] ?? CLI::getOption('max-time') ?? 0; - $memory = $params['memory'] ?? CLI::getOption('memory') ?? 128; - $priority = $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default'; - $tries = $params['tries'] ?? CLI::getOption('tries'); - $retryAfter = $params['retry-after'] ?? CLI::getOption('retry-after'); - $countJobs = 0; + [ + $error, + $sleep, + $rest, + $maxJobs, + $maxTime, + $memory, + $priority, + $tries, + $retryAfter + ] = $this->readOptions($params, $config, $queue); + + if ($error !== null) { + CLI::write($error, 'red'); + + return EXIT_ERROR; + } + + $countJobs = 0; if (array_key_exists('stop-when-empty', $params) || CLI::getOption('stop-when-empty')) { $stopWhenEmpty = true; @@ -111,7 +121,7 @@ public function run(array $params) CLI::write(PHP_EOL); - $priority = array_map('trim', explode(',', $priority)); + $priority = array_map('trim', explode(',', (string) $priority)); while (true) { $work = service('queue')->pop($queue, $priority); @@ -175,6 +185,42 @@ public function run(array $params) } } + private function readOptions(array $params, QueueConfig $config, string $queue): array + { + $options = [ + 'error' => null, + 'sleep' => $params['sleep'] ?? CLI::getOption('sleep') ?? 10, + 'rest' => $params['rest'] ?? CLI::getOption('rest') ?? 0, + 'maxJobs' => $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0, + 'maxTime' => $params['max-time'] ?? CLI::getOption('max-time') ?? 0, + 'memory' => $params['memory'] ?? CLI::getOption('memory') ?? 128, + 'priority' => $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default', + 'tries' => $params['tries'] ?? CLI::getOption('tries'), + 'retryAfter' => $params['retry-after'] ?? CLI::getOption('retry-after'), + ]; + + // Options that, being defined, cannot be `true` + $keys = ['sleep', 'rest', 'maxJobs', 'maxTime', 'memory', 'priority', 'tries', 'retyAfter']; + + foreach ($keys as $key) { + if ($options[$key] === true) { + $options['error'] = sprintf('Option: "-%s" must have a defined value.', $key); + + return array_values($options); + } + } + // Options that, being defined, have to be `int` + $keys = array_diff($keys, ['priority']); + + foreach ($keys as $key) { + if ($options[$key] !== null && ! is_int($options[$key])) { + $options[$key] = (int) $options[$key]; + } + } + + return array_values($options); + } + private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?int $retryAfter): void { timer()->start('work'); @@ -190,7 +236,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i CLI::write('The processing of this job was successful', 'green'); } catch (Throwable $err) { - if (isset($job) && ++$work->attempts < $tries ?? $job->getTries()) { + if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) { // Schedule for later service('queue')->later($work, $retryAfter ?? $job->getRetryAfter()); } else { diff --git a/src/Config/Queue.php b/src/Config/Queue.php index 7241578..db407e0 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -7,6 +7,8 @@ use CodeIgniter\Queue\Handlers\DatabaseHandler; use CodeIgniter\Queue\Handlers\PredisHandler; use CodeIgniter\Queue\Handlers\RedisHandler; +use CodeIgniter\Queue\Interfaces\JobInterface; +use CodeIgniter\Queue\Interfaces\QueueInterface; class Queue extends BaseConfig { @@ -17,6 +19,8 @@ class Queue extends BaseConfig /** * Available handlers. + * + * @var array> */ public array $handlers = [ 'database' => DatabaseHandler::class, @@ -81,6 +85,8 @@ class Queue extends BaseConfig /** * Your jobs handlers. + * + * @var array> */ public array $jobHandlers = []; @@ -95,6 +101,8 @@ public function __construct() /** * Resolve job class name. + * + * @return class-string */ public function resolveJobClass(string $name): string { diff --git a/src/Config/Services.php b/src/Config/Services.php index 3624e46..353e8ff 100644 --- a/src/Config/Services.php +++ b/src/Config/Services.php @@ -15,7 +15,7 @@ public static function queue(?QueueConfig $config = null, $getShared = true): Qu return static::getSharedInstance('queue', $config); } - /** @var QueueConfig $config */ + /** @var QueueConfig|null $config */ $config ??= config('Queue'); return (new Queue($config))->init(); diff --git a/src/Entities/QueueJob.php b/src/Entities/QueueJob.php index 4d79d97..28fac48 100644 --- a/src/Entities/QueueJob.php +++ b/src/Entities/QueueJob.php @@ -3,7 +3,18 @@ namespace CodeIgniter\Queue\Entities; use CodeIgniter\Entity\Entity; +use CodeIgniter\I18n\Time; +/** + * @property int $attempts + * @property Time $available_at + * @property Time $created_at + * @property int $id + * @property array $payload + * @property string $priority + * @property string $queue + * @property int $status + */ class QueueJob extends Entity { protected $dates = ['available_at', 'created_at']; diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index b6c3a35..16da9bc 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -19,6 +19,18 @@ abstract class BaseHandler protected QueueConfig $config; protected ?string $priority = null; + abstract public function push(string $queue, string $job, array $data): bool; + + abstract public function pop(string $queue, array $priorities): ?QueueJob; + + abstract public function later(QueueJob $queueJob, int $seconds): bool; + + abstract public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool; + + abstract public function done(QueueJob $queueJob, bool $keepJob): bool; + + abstract public function clear(?string $queue = null): bool; + /** * Set priority for job queue. */ diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index bdfeadd..32f52f8 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -80,7 +80,7 @@ public function pop(string $queue, array $priorities): ?QueueJob $queueJob->status = Status::RESERVED->value; $queueJob->syncOriginal(); - $this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob)); + $this->predis->hset("queues:{$queue}::reserved", (string) $queueJob->id, json_encode($queueJob)); return $queueJob; } @@ -94,7 +94,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp; if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) { - $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]); } return $result > 0; @@ -109,7 +109,7 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool $this->logFailed($queueJob, $err); } - return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]); } /** @@ -122,7 +122,7 @@ public function done(QueueJob $queueJob, bool $keepJob): bool $this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]); } - return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]); } /** diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php index b3bd0f1..3036893 100644 --- a/src/Handlers/RedisHandler.php +++ b/src/Handlers/RedisHandler.php @@ -99,7 +99,7 @@ public function pop(string $queue, array $priorities): ?QueueJob $queueJob->status = Status::RESERVED->value; $queueJob->syncOriginal(); - $this->redis->hSet("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob)); + $this->redis->hSet("queues:{$queue}::reserved", (string) $queueJob->id, json_encode($queueJob)); return $queueJob; } @@ -115,7 +115,7 @@ public function later(QueueJob $queueJob, int $seconds): bool $queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp; if ($result = (int) $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) { - $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id); } return $result > 0; @@ -130,7 +130,7 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool $this->logFailed($queueJob, $err); } - return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id); } /** @@ -145,7 +145,7 @@ public function done(QueueJob $queueJob, bool $keepJob): bool $this->redis->lPush("queues:{$queueJob->queue}::done", json_encode($queueJob)); } - return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id); } /** diff --git a/src/Interfaces/JobInterface.php b/src/Interfaces/JobInterface.php index 0ae7aeb..eb466a6 100644 --- a/src/Interfaces/JobInterface.php +++ b/src/Interfaces/JobInterface.php @@ -7,4 +7,8 @@ interface JobInterface public function __construct(array $data); public function process(); + + public function getRetryAfter(): int; + + public function getTries(): int; } diff --git a/tests/_support/Database/Seeds/TestRedisQueueSeeder.php b/tests/_support/Database/Seeds/TestRedisQueueSeeder.php index 76164b1..abaebed 100644 --- a/tests/_support/Database/Seeds/TestRedisQueueSeeder.php +++ b/tests/_support/Database/Seeds/TestRedisQueueSeeder.php @@ -40,7 +40,7 @@ public function run(): void 'attempts' => 0, 'available_at' => 1_697_269_864, ]); - $redis->hSet("queues:{$jobQueue->queue}::reserved", $jobQueue->id, json_encode($jobQueue)); + $redis->hSet("queues:{$jobQueue->queue}::reserved", (string) $jobQueue->id, json_encode($jobQueue)); $jobQueue = new QueueJob([ 'id' => '1234567890654321', diff --git a/tests/_support/Jobs/Success.php b/tests/_support/Jobs/Success.php index 894cff5..dd2125d 100644 --- a/tests/_support/Jobs/Success.php +++ b/tests/_support/Jobs/Success.php @@ -8,7 +8,7 @@ class Success extends BaseJob implements JobInterface { protected int $retryAfter = 6; - protected int $retries = 3; + protected int $tries = 3; public function process(): bool {