diff --git a/docs/commands.md b/docs/commands.md index 8d5a5b7..b539f6d 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -60,6 +60,7 @@ Allows you to consume jobs from a specific queue. * `-max-jobs` - The maximum number of jobs to handle before worker should exit. Disabled by default. * `-max-time` - The maximum number of seconds worker should run. Disabled by default. * `-memory` - The maximum memory in MB that worker can take. Default value: `128`. +* `-priority` - The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via `$queuePriorities` for the given queue. Disabled by default. * `-tries` - The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default. * `-retry-after` - The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default. * `--stop-when-empty` - Stop when the queue is empty. @@ -70,6 +71,10 @@ Allows you to consume jobs from a specific queue. It will listen for 5 jobs from the `emails` queue and then stop. + php spark queue:work emails -max-jobs 5 -priority low,high + +It will work the same as the previous command but will first consume jobs from the `emails` queue that were added with the `low` priority. + ### queue:stop Allows you to stop a specific queue in a safe way. It does this as soon as the job that is running in the queue is completed. diff --git a/docs/configuration.md b/docs/configuration.md index d5c5279..c872654 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -17,6 +17,8 @@ Available options: - [$database](#database) - [$keepDoneJobs](#keepDoneJobs) - [$keepFailedJobs](#keepFailedJobs) +- [$queueDefaultPriority](#queueDefaultPriority) +- [$queuePriorities](#queuePriorities) - [$jobHandlers](#jobHandlers) ### $defaultHandler @@ -44,6 +46,38 @@ If the job failed, should we move it to the failed jobs table? Default value: `t This is very useful when you want to be able to see which tasks are failing and why. +### $queueDefaultPriority + +The default priority for the `queue` if non default `queuePriorities` are set. Not set by default. + +This is needed only if you have defined non default priorities for the queue and the default priority should be different from the `default` value. + +Example: + +```php +public array $queueDefaultPriority = [ + 'emails' => 'low', +]; +``` + +This means that all the jobs added to the `emails` queue will have the default priority set to `low`. + +### $queuePriorities + +The valid priorities for the `queue` in the order they will be consumed first. Not set by default. + +By default, the priority is set to `['default']`. If you want to have multiple priorities in the queue, you can define them here. + +Example: + +```php +public array $queuePriorities = [ + 'emails' => ['high', 'low'], +]; +``` + +This means that the jobs added to the `emails` queue can have either `high` or `low` priority. + ### $jobHandlers An array of available jobs as key-value. Every job that you want to use with the queue has to be defined here. diff --git a/docs/running_queues.md b/docs/running_queues.md index e8b3266..10d4139 100644 --- a/docs/running_queues.md +++ b/docs/running_queues.md @@ -24,6 +24,45 @@ So choosing the right command is not so obvious. We have to estimate how many jo You might use CodeIgniter [Tasks](https://github.com/codeigniter4/tasks) library to schedule queue worker instead of working directly with CRON. +### Working with priorities + +By default, every job in the queue has the same priority. However, we can send the jobs to the queue with different priorities. This way some jobs may be handled earlier. + +As an example, we will define priorities for the `emails` queue: + +```php +// app/Config/Queue.php + +public array $queueDefaultPriority = [ + 'emails' => 'low', +]; + +public array $queuePriorities = [ + 'emails' => ['high', 'low'], +]; +``` + +With this configuration, we can now add new jobs to the queue like this: + +```php +// This job will have low priority: +service('queue')->push('emails', 'email', ['message' => 'Email message with low priority']); +// But this one will have high priority +service('queue')->setPriority('high')->push('emails', 'email', ['message' => 'Email message with high priority']); +``` + +Now, if we run the worker: + + php spark queue:work emails + +It will consume the jobs from the queue based on priority set in the config: `$queuePriorities`. So, first `high` priority and then `low` priority. + +But we can also run the worker like this: + + php spark queue:work emails -priority low,high + +This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override. + ### Running many instances of the same queue As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases except `SQLite3` since it doesn't guarantee that the job will be selected only by one process. diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 7237eff..d1d87e5 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -16,19 +16,24 @@ parameters: message: '#Variable \$config on left side of \?\?\= always exists and is not nullable.#' paths: - src/Config/Services.php + - + message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#' + paths: + - src/Handlers/BaseHandler.php - message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#' paths: + - src/Handlers/BaseHandler.php - src/Handlers/DatabaseHandler.php - src/Models/QueueJobModel.php - message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#' paths: - - src/Handlers/DatabaseHandler.php + - src/Handlers/BaseHandler.php - message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::truncate\(\).#' paths: - - src/Handlers/DatabaseHandler.php + - src/Handlers/BaseHandler.php - message: '#Parameter \#3 \$tries of method Michalsn\\CodeIgniterQueue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#' paths: diff --git a/src/Commands/QueuePublish.php b/src/Commands/QueuePublish.php index 6f76e22..606cb60 100644 --- a/src/Commands/QueuePublish.php +++ b/src/Commands/QueuePublish.php @@ -59,6 +59,18 @@ public function resolveJobClass(string $name): string return $this->jobHandlers[$name]; } + + /** + * Stringify queue priorities. + */ + public function getQueuePriorities(string $name): ?string + { + if (! isset($this->queuePriorities[$name])) { + return null; + } + + return implode(',', $this->queuePriorities[$name]); + } EOT; $contents = str_replace($method, '', $contents); file_put_contents($file, $contents); diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index 74c2b4e..5053993 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -59,6 +59,7 @@ class QueueWork extends BaseCommand '-max-jobs' => 'The maximum number of jobs to handle before worker should exit. Disabled by default.', '-max-time' => 'The maximum number of seconds worker should run. Disabled by default.', '-memory' => 'The maximum memory in MB that worker can take. Default value: 128', + '-priority' => 'The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via $queuePriorities for the given queue. Disabled by default.', '-tries' => 'The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default.', '-retry-after' => 'The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default.', '--stop-when-empty' => 'Stop when the queue is empty.', @@ -71,6 +72,8 @@ class QueueWork extends BaseCommand */ public function run(array $params) { + set_time_limit(0); + /** @var QueueConfig $config */ $config = config('Queue'); $stopWhenEmpty = false; @@ -89,6 +92,7 @@ public function run(array $params) $maxJobs = $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0; $maxTime = $params['max-time'] ?? CLI::getOption('max-time') ?? 0; $memory = $params['memory'] ?? CLI::getOption('memory') ?? 128; + $priority = $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default'; $tries = $params['tries'] ?? CLI::getOption('tries'); $retryAfter = $params['retry-after'] ?? CLI::getOption('retry-after'); $countJobs = 0; @@ -99,10 +103,18 @@ public function run(array $params) $startTime = microtime(true); - CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan') . PHP_EOL, 'cyan'); + CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan'), 'cyan'); + + if ($priority !== 'default') { + CLI::write('Jobs will be consumed according to priority: ' . CLI::color($priority, 'light_cyan'), 'cyan'); + } + + CLI::write(PHP_EOL); + + $priority = array_map('trim', explode(',', $priority)); while (true) { - $work = service('queue')->pop($queue); + $work = service('queue')->pop($queue, $priority); if ($work === null) { if ($stopWhenEmpty) { @@ -216,7 +228,7 @@ private function maxTimeCheck(int $maxTime, float $startTime): bool private function checkMemory(int $memory): bool { - if (memory_get_peak_usage() > $memory * 1024 * 1024) { + if (memory_get_usage(true) > $memory * 1024 * 1024) { CLI::write(sprintf('The memory limit of %s MB was reached. Stopping.', $memory), 'yellow'); return true; @@ -234,7 +246,7 @@ private function checkStop(string $queue, float $startTime): bool } if ($startTime < (float) $time) { - CLI::write('This worker has been scheduled to end. Stopping.', 'yellow'); + CLI::write('The termination of this worker has been planned. Stopping.', 'yellow'); return true; } diff --git a/src/Config/Queue.php b/src/Config/Queue.php index edb5f78..fc8a37b 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -38,6 +38,18 @@ class Queue extends BaseConfig */ public bool $keepFailedJobs = true; + /** + * Default priorities for the queue + * if different from the "default". + */ + public array $queueDefaultPriority = []; + + /** + * Valid priorities in the order for the queue, + * if different from the "default". + */ + public array $queuePriorities = []; + /** * Your jobs handlers. */ @@ -63,4 +75,16 @@ public function resolveJobClass(string $name): string return $this->jobHandlers[$name]; } + + /** + * Stringify queue priorities. + */ + public function getQueuePriorities(string $name): ?string + { + if (! isset($this->queuePriorities[$name])) { + return null; + } + + return implode(',', $this->queuePriorities[$name]); + } } diff --git a/src/Config/Services.php b/src/Config/Services.php index 99502ca..fe5e158 100644 --- a/src/Config/Services.php +++ b/src/Config/Services.php @@ -4,11 +4,12 @@ use CodeIgniter\Config\BaseService; use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig; +use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface; use Michalsn\CodeIgniterQueue\Queue; class Services extends BaseService { - public static function queue(?QueueConfig $config = null, $getShared = true) + public static function queue(?QueueConfig $config = null, $getShared = true): QueueInterface { if ($getShared) { return static::getSharedInstance('queue', $config); diff --git a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php new file mode 100644 index 0000000..1f9fe82 --- /dev/null +++ b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php @@ -0,0 +1,61 @@ + [ + 'type' => 'varchar', + 'constraint' => 64, + 'null' => false, + 'default' => 'default', + 'after' => 'payload', + ], + ]; + + $this->forge->addColumn('queue_jobs', $fields); + $this->forge->addColumn('queue_jobs_failed', $fields); + + // Ugly fix for dropping the correct index + // since it had no name given + $keys = $this->db->getIndexData('queue_jobs'); + + foreach ($keys as $key) { + if ($key->fields === ['queue', 'status', 'available_at']) { + $this->forge->dropKey('queue_jobs', $key->name, false); + break; + } + } + + $this->forge->addKey(['queue', 'priority', 'status', 'available_at'], false, false, 'queue_priority_status_available_at'); + $this->forge->processIndexes('queue_jobs'); + } + + public function down() + { + // Ugly fix for dropping the correct index + $keys = $this->db->getIndexData('queue_jobs'); + + foreach ($keys as $key) { + if ($key->fields === ['queue', 'priority', 'status', 'available_at']) { + $this->forge->dropKey('queue_jobs', $key->name, false); + break; + } + } + + $this->forge->addKey(['queue', 'status', 'available_at']); + $this->forge->processIndexes('queue_jobs'); + + $this->forge->dropColumn('queue_jobs', 'priority'); + $this->forge->dropColumn('queue_jobs_failed', 'priority'); + } +} diff --git a/src/Entities/QueueJob.php b/src/Entities/QueueJob.php index ae49c53..59beb3a 100644 --- a/src/Entities/QueueJob.php +++ b/src/Entities/QueueJob.php @@ -11,6 +11,7 @@ class QueueJob extends Entity 'id' => 'integer', 'queue' => 'string', 'payload' => 'json-array', + 'priority' => 'string', 'status' => 'integer', 'attempts' => 'integer', ]; diff --git a/src/Entities/QueueJobFailed.php b/src/Entities/QueueJobFailed.php index b8dc0ed..06c1a97 100644 --- a/src/Entities/QueueJobFailed.php +++ b/src/Entities/QueueJobFailed.php @@ -12,6 +12,7 @@ class QueueJobFailed extends Entity 'connection' => 'string', 'queue' => 'string', 'payload' => 'json-array', + 'priority' => 'string', 'exceptions' => 'string', ]; } diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php index 0da0cab..93b240c 100644 --- a/src/Exceptions/QueueException.php +++ b/src/Exceptions/QueueException.php @@ -15,4 +15,19 @@ public static function forIncorrectJobHandler(): static { return new self(lang('Queue.incorrectJobHandler')); } + + public static function forIncorrectPriorityFormat(): static + { + return new self(lang('Queue.incorrectPriorityFormat')); + } + + public static function forTooLongPriorityName(): static + { + return new self(lang('Queue.tooLongPriorityName')); + } + + public static function forIncorrectQueuePriority(string $priority, string $queue): static + { + return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue])); + } } diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php new file mode 100644 index 0000000..aad22b4 --- /dev/null +++ b/src/Handlers/BaseHandler.php @@ -0,0 +1,153 @@ + 64) { + throw QueueException::forTooLongPriorityName(); + } + + $this->priority = $priority; + + return $this; + } + + /** + * Retry failed job. + * + * @throws ReflectionException + */ + public function retry(?int $id, ?string $queue): int + { + $jobs = model(QueueJobFailedModel::class) + ->when( + $id !== null, + static fn ($query) => $query->where('id', $id) + ) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->findAll(); + + foreach ($jobs as $job) { + $this->setPriority($job->priority)->push($job->queue, $job->payload['job'], $job->payload['data']); + $this->forget($job->id); + } + + return count($jobs); + } + + /** + * Delete failed job by ID. + */ + public function forget(int $id): bool + { + if (model(QueueJobFailedModel::class)->delete($id)) { + return model(QueueJobFailedModel::class)->affectedRows() > 0; + } + + return false; + } + + /** + * Delete many failed jobs at once. + */ + public function flush(?int $hours, ?string $queue): bool + { + if ($hours === null && $queue === null) { + return model(QueueJobFailedModel::class)->truncate(); + } + + return model(QueueJobFailedModel::class) + ->when( + $hours !== null, + static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) + ) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->delete(); + } + + /** + * List failed queue jobs. + */ + public function listFailed(?string $queue): array + { + return model(QueueJobFailedModel::class) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->orderBy('failed_at', 'desc') + ->findAll(); + } + + /** + * Log failed job. + * + * @throws ReflectionException + */ + protected function logFailed(QueueJob $queueJob, Throwable $err): bool + { + $exception = "Exception: {$err->getCode()} - {$err->getMessage()}" . PHP_EOL . + "file: {$err->getFile()}:{$err->getLine()}"; + + $queueJobFailed = new QueueJobFailed([ + 'connection' => 'database', + 'queue' => $queueJob->queue, + 'payload' => $queueJob->payload, + 'priority' => $queueJob->priority, + 'exception' => $exception, + ]); + + return model(QueueJobFailedModel::class)->insert($queueJobFailed, false); + } + + /** + * Validate job and priority. + */ + protected function validateJobAndPriority(string $queue, string $job): void + { + // Validate jobHandler. + if (! in_array($job, array_keys($this->config->jobHandlers), true)) { + throw QueueException::forIncorrectJobHandler(); + } + + if ($this->priority === null) { + $this->setPriority($this->config->queueDefaultPriority[$queue] ?? 'default'); + } + + // Validate non-standard priority. + if (! in_array($this->priority, $this->config->queuePriorities[$queue] ?? ['default'], true)) { + throw QueueException::forIncorrectQueuePriority($this->priority, $queue); + } + } +} diff --git a/src/Handlers/DatabaseHandler.php b/src/Handlers/DatabaseHandler.php index 94d357b..bf74dc5 100644 --- a/src/Handlers/DatabaseHandler.php +++ b/src/Handlers/DatabaseHandler.php @@ -5,17 +5,14 @@ use CodeIgniter\I18n\Time; use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig; use Michalsn\CodeIgniterQueue\Entities\QueueJob; -use Michalsn\CodeIgniterQueue\Entities\QueueJobFailed; use Michalsn\CodeIgniterQueue\Enums\Status; -use Michalsn\CodeIgniterQueue\Exceptions\QueueException; use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface; -use Michalsn\CodeIgniterQueue\Models\QueueJobFailedModel; use Michalsn\CodeIgniterQueue\Models\QueueJobModel; use Michalsn\CodeIgniterQueue\Payload; use ReflectionException; use Throwable; -class DatabaseHandler implements QueueInterface +class DatabaseHandler extends BaseHandler implements QueueInterface { private readonly QueueJobModel $jobModel; @@ -32,18 +29,19 @@ public function __construct(protected QueueConfig $config) */ public function push(string $queue, string $job, array $data): bool { - if (! in_array($job, array_keys($this->config->jobHandlers), true)) { - throw QueueException::forIncorrectJobHandler(); - } + $this->validateJobAndPriority($queue, $job); $queueJob = new QueueJob([ 'queue' => $queue, 'payload' => new Payload($job, $data), + 'priority' => $this->priority, 'status' => Status::PENDING->value, 'attempts' => 0, 'available_at' => Time::now()->timestamp, ]); + $this->priority = null; + return $this->jobModel->insert($queueJob, false); } @@ -52,9 +50,9 @@ public function push(string $queue, string $job, array $data): bool * * @throws ReflectionException */ - public function pop(string $queue): ?QueueJob + public function pop(string $queue, array $priorities): ?QueueJob { - $queueJob = $this->jobModel->getFromQueue($queue); + $queueJob = $this->jobModel->getFromQueue($queue, $priorities); if ($queueJob === null) { return null; @@ -88,16 +86,7 @@ public function later(QueueJob $queueJob, int $seconds): bool public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool { if ($keepJob) { - $exception = "Exception: {$err->getCode()} - {$err->getMessage()}" . PHP_EOL . - "file: {$err->getFile()}:{$err->getLine()}"; - - $queueJobFailed = new QueueJobFailed([ - 'connection' => 'database', - 'queue' => $queueJob->queue, - 'payload' => $queueJob->payload, - 'exception' => $exception, - ]); - model(QueueJobFailedModel::class)->insert($queueJobFailed, false); + $this->logFailed($queueJob, $err); } return $this->jobModel->delete($queueJob->id); @@ -128,78 +117,4 @@ public function clear(?string $queue = null): bool return $this->jobModel->delete(); } - - /** - * Retry failed job. - * ∂ - * - * @throws ReflectionException - */ - public function retry(?int $id, ?string $queue): int - { - $jobs = model(QueueJobFailedModel::class) - ->when( - $id !== null, - static fn ($query) => $query->where('id', $id) - ) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->findAll(); - - foreach ($jobs as $job) { - $this->push($job->queue, $job->payload['job'], $job->payload['data']); - $this->forget($job->id); - } - - return count($jobs); - } - - /** - * Delete failed job by ID. - */ - public function forget(int $id): bool - { - if (model(QueueJobFailedModel::class)->delete($id)) { - return model(QueueJobFailedModel::class)->affectedRows() > 0; - } - - return false; - } - - /** - * Delete many failed jobs at once. - */ - public function flush(?int $hours, ?string $queue): bool - { - if ($hours === null && $queue === null) { - return model(QueueJobFailedModel::class)->truncate(); - } - - return model(QueueJobFailedModel::class) - ->when( - $hours !== null, - static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) - ) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->delete(); - } - - /** - * List failed queue jobs. - */ - public function listFailed(?string $queue) - { - return model(QueueJobFailedModel::class) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->orderBy('failed_at', 'desc') - ->findAll(); - } } diff --git a/src/Interfaces/QueueInterface.php b/src/Interfaces/QueueInterface.php index aef9125..2f6be86 100644 --- a/src/Interfaces/QueueInterface.php +++ b/src/Interfaces/QueueInterface.php @@ -9,7 +9,7 @@ interface QueueInterface { public function push(string $queue, string $job, array $data); - public function pop(string $queue); + public function pop(string $queue, array $priorities); public function later(QueueJob $queueJob, int $seconds); diff --git a/src/Language/en/Queue.php b/src/Language/en/Queue.php index ab54ff9..d7c1a09 100644 --- a/src/Language/en/Queue.php +++ b/src/Language/en/Queue.php @@ -6,6 +6,9 @@ 'job' => 'Job class name', ], ], - 'incorrectHandler' => 'This queue handler is incorrect.', - 'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.', + 'incorrectHandler' => 'This queue handler is incorrect.', + 'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.', + 'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.', + 'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.', + 'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".', ]; diff --git a/src/Models/QueueJobFailedModel.php b/src/Models/QueueJobFailedModel.php index 9a51c5f..25932d2 100644 --- a/src/Models/QueueJobFailedModel.php +++ b/src/Models/QueueJobFailedModel.php @@ -13,7 +13,7 @@ class QueueJobFailedModel extends Model protected $returnType = QueueJobFailed::class; protected $useSoftDeletes = false; protected $protectFields = true; - protected $allowedFields = ['connection', 'queue', 'payload', 'exception']; + protected $allowedFields = ['connection', 'queue', 'payload', 'priority', 'exception']; // Dates protected $useTimestamps = true; diff --git a/src/Models/QueueJobModel.php b/src/Models/QueueJobModel.php index 0bf5a0d..58d2fb0 100644 --- a/src/Models/QueueJobModel.php +++ b/src/Models/QueueJobModel.php @@ -2,6 +2,8 @@ namespace Michalsn\CodeIgniterQueue\Models; +use CodeIgniter\Database\BaseBuilder; +use CodeIgniter\Database\RawSql; use CodeIgniter\I18n\Time; use CodeIgniter\Model; use Michalsn\CodeIgniterQueue\Entities\QueueJob; @@ -16,7 +18,7 @@ class QueueJobModel extends Model protected $returnType = QueueJob::class; protected $useSoftDeletes = false; protected $protectFields = true; - protected $allowedFields = ['queue', 'payload', 'status', 'attempts', 'available_at']; + protected $allowedFields = ['queue', 'payload', 'priority', 'status', 'attempts', 'available_at']; // Dates protected $useTimestamps = true; @@ -35,7 +37,7 @@ class QueueJobModel extends Model * * @throws ReflectionException */ - public function getFromQueue(string $name): ?QueueJob + public function getFromQueue(string $name, array $priority): ?QueueJob { // For SQLite3 memory database this will cause problems // so check if we're not in the testing environment first. @@ -47,14 +49,14 @@ public function getFromQueue(string $name): ?QueueJob $this->db->transStart(); // Prepare SQL - $sql = $this->builder() + $builder = $this->builder() ->where('queue', $name) ->where('status', Status::PENDING->value) ->where('available_at <=', Time::now()->timestamp) - ->orderBy('available_at', 'asc') - ->orderBy('id', 'asc') - ->limit(1) - ->getCompiledSelect(); + ->limit(1); + + $builder = $this->setPriority($builder, $priority); + $sql = $builder->getCompiledSelect(); $query = $this->db->query($this->skipLocked($sql)); if ($query === false) { @@ -90,4 +92,26 @@ private function skipLocked(string $sql): string return $sql .= ' FOR UPDATE SKIP LOCKED'; } + + /** + * Handle priority of the queue. + */ + private function setPriority(BaseBuilder $builder, array $priority): BaseBuilder + { + $builder->whereIn('priority', $priority); + + if ($priority !== ['default']) { + if ($this->db->DBDriver === 'SQLite3') { + $builder->orderBy(new RawSql('CASE priority ' . implode(' ', array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority))) . ' END')); + } else { + $builder->orderBy(new RawSql('FIELD(priority, ' . implode(',', array_map(static fn ($value) => "'{$value}'", $priority)) . ')')); + } + } + + $builder + ->orderBy('available_at', 'asc') + ->orderBy('id', 'asc'); + + return $builder; + } } diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php index 1932ca6..26eb20a 100644 --- a/tests/DatabaseHandlerTest.php +++ b/tests/DatabaseHandlerTest.php @@ -2,6 +2,7 @@ namespace Tests; +use CodeIgniter\Test\ReflectionHelper; use Exception; use Michalsn\CodeIgniterQueue\Entities\QueueJob; use Michalsn\CodeIgniterQueue\Enums\Status; @@ -18,6 +19,8 @@ */ final class DatabaseHandlerTest extends TestCase { + use ReflectionHelper; + protected $seed = TestQueueSeeder::class; private QueueConfig $config; @@ -34,6 +37,32 @@ public function testDatabaseHandler() $this->assertInstanceOf(DatabaseHandler::class, $handler); } + public function testPriority() + { + $handler = new DatabaseHandler($this->config); + $handler->setPriority('high'); + + $this->assertSame('high', self::getPrivateProperty($handler, 'priority')); + } + + public function testPriorityNameException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority('high_:'); + } + + public function testPriorityNameLengthException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name is too long. It should be no longer than 64 letters.'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority(str_repeat('a', 65)); + } + /** * @throws ReflectionException */ @@ -49,6 +78,22 @@ public function testPush() ]); } + /** + * @throws ReflectionException + */ + public function testPushWithPriority() + { + $handler = new DatabaseHandler($this->config); + $result = $handler->setPriority('high')->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value']]), + 'priority' => 'high', + ]); + } + /** * @throws ReflectionException */ @@ -61,13 +106,25 @@ public function testPushException() $handler->push('queue', 'not-exists', ['key' => 'value']); } + /** + * @throws ReflectionException + */ + public function testPushWithPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This queue has incorrectly defined priority: "invalid" for the queue: "queue".'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority('invalid')->push('queue', 'success', ['key' => 'value']); + } + /** * @throws ReflectionException */ public function testPop() { $handler = new DatabaseHandler($this->config); - $result = $handler->pop('queue1'); + $result = $handler->pop('queue1', ['default']); $this->assertInstanceOf(QueueJob::class, $result); $this->seeInDatabase('queue_jobs', [ @@ -82,7 +139,7 @@ public function testPop() public function testPopEmpty() { $handler = new DatabaseHandler($this->config); - $result = $handler->pop('queue123'); + $result = $handler->pop('queue123', ['default']); $this->assertNull($result); } @@ -93,7 +150,7 @@ public function testPopEmpty() public function testLater() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $this->seeInDatabase('queue_jobs', [ 'id' => 2, @@ -115,7 +172,7 @@ public function testLater() public function testFailedAndKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception'); $result = $handler->failed($queueJob, $err, true); @@ -134,7 +191,7 @@ public function testFailedAndKeepJob() public function testFailedAndDontKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception'); $result = $handler->failed($queueJob, $err, false); @@ -156,7 +213,7 @@ public function testFailedAndDontKeepJob() public function testDoneAndKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $result = $handler->done($queueJob, true); @@ -173,7 +230,7 @@ public function testDoneAndKeepJob() public function testDoneAndDontKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $result = $handler->done($queueJob, false); @@ -233,7 +290,7 @@ public function testForget() public function testFlush() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception here'); $result = $handler->failed($queueJob, $err, true); diff --git a/tests/_support/Config/Queue.php b/tests/_support/Config/Queue.php index 508a8e0..57f7256 100644 --- a/tests/_support/Config/Queue.php +++ b/tests/_support/Config/Queue.php @@ -39,6 +39,22 @@ class Queue extends BaseQueue */ public bool $keepFailedJobs = true; + /** + * Default priorities for the queue + * if different from the "default". + */ + public array $queueDefaultPriority = [ + 'queue' => 'low', + ]; + + /** + * Valid priorities for the queue, + * if different from the "default". + */ + public array $queuePriorities = [ + 'queue' => ['high', 'low'], + ]; + /** * Your jobs handlers. */ diff --git a/tests/_support/Database/Seeds/TestQueueSeeder.php b/tests/_support/Database/Seeds/TestQueueSeeder.php index 9dff812..5d3b315 100644 --- a/tests/_support/Database/Seeds/TestQueueSeeder.php +++ b/tests/_support/Database/Seeds/TestQueueSeeder.php @@ -16,6 +16,7 @@ public function run(): void model(QueueJobModel::class)->insert(new QueueJob([ 'queue' => 'queue1', 'payload' => ['job' => 'success', 'data' => []], + 'priority' => 'default', 'status' => Status::RESERVED->value, 'attempts' => 0, 'available_at' => 1_697_269_864, @@ -24,6 +25,7 @@ public function run(): void model(QueueJobModel::class)->insert(new QueueJob([ 'queue' => 'queue1', 'payload' => ['job' => 'failure', 'data' => []], + 'priority' => 'default', 'status' => Status::PENDING->value, 'attempts' => 0, 'available_at' => 1_697_269_860, @@ -33,6 +35,7 @@ public function run(): void 'connection' => 'database', 'queue' => 'queue1', 'payload' => ['job' => 'failure', 'data' => []], + 'priority' => 'default', 'exception' => 'Exception info', ])); }