Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fix type inconsistency and more #12

Merged
merged 9 commits into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,13 @@ parameters:
- vendor/codeigniter4/framework/system/Test/bootstrap.php
excludePaths:
ignoreErrors:
-
message: '#Cannot use \+\+ on array\|bool\|float\|int\|object\|string\|null.#'
paths:
- src/Commands/QueueWork.php
-
message: '#Variable \$config on left side of \?\?\= always exists and is not nullable.#'
paths:
- src/Config/Services.php
-
message: '#Call to an undefined method CodeIgniter\\Queue\\Handlers\\BaseHandler::push\(\).#'
paths:
- src/Handlers/BaseHandler.php
-
message: '#Call to deprecated function random_string\(\):#'
paths:
- src/Handlers/RedisHandler.php
- src/Handlers/PredisHandler.php
-
message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#'
paths:
- tests/_support/Database/Seeds/TestRedisQueueSeeder.php
-
message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#'
paths:
- src/Handlers/BaseHandler.php
- src/Handlers/DatabaseHandler.php
- src/Handlers/RedisHandler.php
- src/Handlers/PredisHandler.php
- src/Models/QueueJobModel.php
- tests/RedisHandlerTest.php
- tests/PredisHandlerTest.php
-
message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::affectedRows\(\).#'
paths:
Expand All @@ -47,22 +23,6 @@ parameters:
message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::truncate\(\).#'
paths:
- src/Handlers/BaseHandler.php
-
message: '#Parameter \#3 \$tries of method CodeIgniter\\Queue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#'
paths:
- src/Commands/QueueWork.php
-
message: '#Parameter \#4 \$retryAfter of method CodeIgniter\\Queue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#'
paths:
- src/Commands/QueueWork.php
-
message: '#Expression on left side of \?\? is not nullable.#'
paths:
- src/Commands/QueueWork.php
-
message: '#Variable \$job might not be defined.#'
paths:
- src/Commands/QueueWork.php
universalObjectCratesClasses:
- CodeIgniter\Entity
- CodeIgniter\Entity\Entity
Expand Down
70 changes: 58 additions & 12 deletions src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,33 @@ public function run(array $params)
$stopWhenEmpty = false;
$waiting = false;

// Read params
// Read queue name from params
if (! $queue = array_shift($params)) {
CLI::error('The queueName is not specified.');

return EXIT_ERROR;
}

// Read options
$sleep = $params['sleep'] ?? CLI::getOption('sleep') ?? 10;
$rest = $params['rest'] ?? CLI::getOption('rest') ?? 0;
$maxJobs = $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0;
$maxTime = $params['max-time'] ?? CLI::getOption('max-time') ?? 0;
$memory = $params['memory'] ?? CLI::getOption('memory') ?? 128;
$priority = $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default';
$tries = $params['tries'] ?? CLI::getOption('tries');
$retryAfter = $params['retry-after'] ?? CLI::getOption('retry-after');
$countJobs = 0;
[
$error,
$sleep,
$rest,
$maxJobs,
$maxTime,
$memory,
$priority,
$tries,
$retryAfter
] = $this->readOptions($params, $config, $queue);

if ($error !== null) {
CLI::write($error, 'red');

return EXIT_ERROR;
}

$countJobs = 0;

if (array_key_exists('stop-when-empty', $params) || CLI::getOption('stop-when-empty')) {
$stopWhenEmpty = true;
Expand All @@ -111,7 +121,7 @@ public function run(array $params)

CLI::write(PHP_EOL);

$priority = array_map('trim', explode(',', $priority));
$priority = array_map('trim', explode(',', (string) $priority));

while (true) {
$work = service('queue')->pop($queue, $priority);
Expand Down Expand Up @@ -175,6 +185,42 @@ public function run(array $params)
}
}

private function readOptions(array $params, QueueConfig $config, string $queue): array
{
$options = [
'error' => null,
'sleep' => $params['sleep'] ?? CLI::getOption('sleep') ?? 10,
'rest' => $params['rest'] ?? CLI::getOption('rest') ?? 0,
'maxJobs' => $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0,
'maxTime' => $params['max-time'] ?? CLI::getOption('max-time') ?? 0,
'memory' => $params['memory'] ?? CLI::getOption('memory') ?? 128,
'priority' => $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default',
'tries' => $params['tries'] ?? CLI::getOption('tries'),
'retryAfter' => $params['retry-after'] ?? CLI::getOption('retry-after'),
];

// Options that, being defined, cannot be `true`
$keys = ['sleep', 'rest', 'maxJobs', 'maxTime', 'memory', 'priority', 'tries', 'retyAfter'];

foreach ($keys as $key) {
if ($options[$key] === true) {
$options['error'] = sprintf('Option: "-%s" must have a defined value.', $key);

return array_values($options);
}
}
// Options that, being defined, have to be `int`
$keys = array_diff($keys, ['priority']);

foreach ($keys as $key) {
if ($options[$key] !== null && ! is_int($options[$key])) {
$options[$key] = (int) $options[$key];
}
}

return array_values($options);
}

private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?int $retryAfter): void
{
timer()->start('work');
Expand All @@ -190,7 +236,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i

CLI::write('The processing of this job was successful', 'green');
} catch (Throwable $err) {
if (isset($job) && ++$work->attempts < $tries ?? $job->getTries()) {
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
// Schedule for later
service('queue')->later($work, $retryAfter ?? $job->getRetryAfter());
} else {
Expand Down
8 changes: 8 additions & 0 deletions src/Config/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use CodeIgniter\Queue\Handlers\DatabaseHandler;
use CodeIgniter\Queue\Handlers\PredisHandler;
use CodeIgniter\Queue\Handlers\RedisHandler;
use CodeIgniter\Queue\Interfaces\JobInterface;
use CodeIgniter\Queue\Interfaces\QueueInterface;

class Queue extends BaseConfig
{
Expand All @@ -17,6 +19,8 @@ class Queue extends BaseConfig

/**
* Available handlers.
*
* @var array<string, class-string<QueueInterface>>
*/
public array $handlers = [
'database' => DatabaseHandler::class,
Expand Down Expand Up @@ -81,6 +85,8 @@ class Queue extends BaseConfig

/**
* Your jobs handlers.
*
* @var array<string, class-string<JobInterface>>
*/
public array $jobHandlers = [];

Expand All @@ -95,6 +101,8 @@ public function __construct()

/**
* Resolve job class name.
*
* @return class-string<JobInterface>
*/
public function resolveJobClass(string $name): string
{
Expand Down
2 changes: 1 addition & 1 deletion src/Config/Services.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static function queue(?QueueConfig $config = null, $getShared = true): Qu
return static::getSharedInstance('queue', $config);
}

/** @var QueueConfig $config */
/** @var QueueConfig|null $config */
$config ??= config('Queue');

return (new Queue($config))->init();
Expand Down
11 changes: 11 additions & 0 deletions src/Entities/QueueJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@
namespace CodeIgniter\Queue\Entities;

use CodeIgniter\Entity\Entity;
use CodeIgniter\I18n\Time;

/**
* @property int $attempts
* @property Time $available_at
* @property Time $created_at
* @property int $id
* @property array $payload
* @property string $priority
* @property string $queue
* @property int $status
*/
class QueueJob extends Entity
{
protected $dates = ['available_at', 'created_at'];
Expand Down
12 changes: 12 additions & 0 deletions src/Handlers/BaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ abstract class BaseHandler
protected QueueConfig $config;
protected ?string $priority = null;

abstract public function push(string $queue, string $job, array $data): bool;

abstract public function pop(string $queue, array $priorities): ?QueueJob;

abstract public function later(QueueJob $queueJob, int $seconds): bool;

abstract public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool;

abstract public function done(QueueJob $queueJob, bool $keepJob): bool;

abstract public function clear(?string $queue = null): bool;

/**
* Set priority for job queue.
*/
Expand Down
8 changes: 4 additions & 4 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public function pop(string $queue, array $priorities): ?QueueJob
$queueJob->status = Status::RESERVED->value;
$queueJob->syncOriginal();

$this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob));
$this->predis->hset("queues:{$queue}::reserved", (string) $queueJob->id, json_encode($queueJob));

return $queueJob;
}
Expand All @@ -94,7 +94,7 @@ public function later(QueueJob $queueJob, int $seconds): bool
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;

if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) {
$this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
$this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);
}

return $result > 0;
Expand All @@ -109,7 +109,7 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
$this->logFailed($queueJob, $err);
}

return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);
}

/**
Expand All @@ -122,7 +122,7 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
$this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]);
}

return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/Handlers/RedisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public function pop(string $queue, array $priorities): ?QueueJob
$queueJob->status = Status::RESERVED->value;
$queueJob->syncOriginal();

$this->redis->hSet("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob));
$this->redis->hSet("queues:{$queue}::reserved", (string) $queueJob->id, json_encode($queueJob));

return $queueJob;
}
Expand All @@ -115,7 +115,7 @@ public function later(QueueJob $queueJob, int $seconds): bool
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;

if ($result = (int) $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) {
$this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id);
$this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);
}

return $result > 0;
Expand All @@ -130,7 +130,7 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
$this->logFailed($queueJob, $err);
}

return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id);
return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);
}

/**
Expand All @@ -145,7 +145,7 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
$this->redis->lPush("queues:{$queueJob->queue}::done", json_encode($queueJob));
}

return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id);
return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/Interfaces/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ interface JobInterface
public function __construct(array $data);

public function process();

public function getRetryAfter(): int;

public function getTries(): int;
}
2 changes: 1 addition & 1 deletion tests/_support/Database/Seeds/TestRedisQueueSeeder.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function run(): void
'attempts' => 0,
'available_at' => 1_697_269_864,
]);
$redis->hSet("queues:{$jobQueue->queue}::reserved", $jobQueue->id, json_encode($jobQueue));
$redis->hSet("queues:{$jobQueue->queue}::reserved", (string) $jobQueue->id, json_encode($jobQueue));

$jobQueue = new QueueJob([
'id' => '1234567890654321',
Expand Down
2 changes: 1 addition & 1 deletion tests/_support/Jobs/Success.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class Success extends BaseJob implements JobInterface
{
protected int $retryAfter = 6;
protected int $retries = 3;
protected int $tries = 3;

public function process(): bool
{
Expand Down