From 3829686d54d1838c6299795ef672905b5f0d500a Mon Sep 17 00:00:00 2001 From: aegorov Date: Sun, 7 Apr 2024 01:57:32 +0300 Subject: [PATCH 1/5] Add delay for queue message handle --- src/Message/Message.php | 43 +++++++++++++++++++++++++ src/QueueProvider.php | 9 +++++- tests/Integration/QueueProviderTest.php | 18 ++++++++++- tests/Message/MessageTest.php | 42 ++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 src/Message/Message.php create mode 100644 tests/Message/MessageTest.php diff --git a/src/Message/Message.php b/src/Message/Message.php new file mode 100644 index 0000000..612d620 --- /dev/null +++ b/src/Message/Message.php @@ -0,0 +1,43 @@ +delay > 0) { + $this->metadata['delay'] = $delay; + } + } + + public function withDelay(int $delay): self + { + $message = clone $this; + $message->metadata['delay'] = $delay; + return $message; + } + + public function getHandlerName(): string + { + return $this->handlerName; + } + + public function getData(): mixed + { + return $this->data; + } + + public function getMetadata(): array + { + return $this->metadata; + } +} diff --git a/src/QueueProvider.php b/src/QueueProvider.php index d065f4c..5ed38ce 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -28,7 +28,13 @@ public function pushMessage(string $message, array $metadata = []): int $this->checkConnection(); $id = $this->getId(); $this->redis->hset("$this->channelName.messages", (string) $id, $message); - $this->redis->lpush("$this->channelName.waiting", $id); + + $delay = isset($metadata['delay']) && is_int($metadata['delay']) ? $metadata['delay'] : 0; + if ($delay > 0) { + $this->redis->zadd("$this->channelName.delayed", time() + $delay, $id); + } else { + $this->redis->lpush("$this->channelName.waiting", $id); + } return $id; } @@ -93,6 +99,7 @@ public function delete(string $id): void { $this->checkConnection(); $this->redis->zrem("$this->channelName.reserved", $id); + $this->redis->zrem("$this->channelName.delayed", $id); $this->redis->hdel("$this->channelName.messages", $id); $this->redis->hdel("$this->channelName.attempts", $id); } diff --git a/tests/Integration/QueueProviderTest.php b/tests/Integration/QueueProviderTest.php index 0d0a0cd..09d2281 100644 --- a/tests/Integration/QueueProviderTest.php +++ b/tests/Integration/QueueProviderTest.php @@ -5,6 +5,7 @@ namespace Yiisoft\Queue\Redis\Tests\Integration; use PHPUnit\Framework\TestCase; +use Yiisoft\Queue\Redis\Message\Message; use Yiisoft\Queue\Redis\QueueProvider; use Yiisoft\Queue\Redis\QueueProviderInterface; @@ -19,7 +20,7 @@ public function testGetId(QueueProvider $provider) $this->assertGreaterThan(0, $id); } - public function test__construct() + public function test__construct(): QueueProvider { $redis = new \Redis(); $connected = $redis->connect('redis'); @@ -28,4 +29,19 @@ public function test__construct() $this->assertInstanceOf(QueueProviderInterface::class, $provider); return $provider; } + + /** + * @depends test__construct + */ + public function testDelay(QueueProvider $provider): void + { + $message = new Message('test', ['key' => 'value'], [], 2); + $id = $provider->pushMessage(json_encode($message->getData(), JSON_THROW_ON_ERROR), $message->getMetadata()); + $this->assertGreaterThan(0, $id); + $reserv = $provider->reserve($id); + $this->assertNull($reserv); + sleep(3); + $reserv = $provider->reserve($id); + $this->assertNotNull($reserv); + } } diff --git a/tests/Message/MessageTest.php b/tests/Message/MessageTest.php new file mode 100644 index 0000000..32376a5 --- /dev/null +++ b/tests/Message/MessageTest.php @@ -0,0 +1,42 @@ +assertEquals('handler', $message->getHandlerName()); + } + + public function testGetData(): void + { + $message = new Message('handler', 'data', []); + $this->assertEquals('data', $message->getData()); + } + + public function testGetMetadata(): void + { + $metadata = ['key' => 'value']; + $message = new Message('handler', 'data', $metadata); + $this->assertEquals($metadata, $message->getMetadata()); + + $message = new Message('handler', 'data', $metadata, 2); + $metadata['delay'] = 2; + $this->assertEquals($metadata, $message->getMetadata()); + } + + public function testWithDelay(): void + { + $message = new Message('handler', 'data', []); + $delayedMessage = $message->withDelay(5); + + $this->assertNotSame($message, $delayedMessage); + $this->assertEquals(5, $delayedMessage->getMetadata()['delay']); + } +} From 70f2c8f3195dce61bf98b002e62b02b39951a0e1 Mon Sep 17 00:00:00 2001 From: aegorov Date: Sun, 7 Apr 2024 02:21:16 +0300 Subject: [PATCH 2/5] move MessageTest.php to Unit --- tests/{ => Unit}/Message/MessageTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/{ => Unit}/Message/MessageTest.php (96%) diff --git a/tests/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php similarity index 96% rename from tests/Message/MessageTest.php rename to tests/Unit/Message/MessageTest.php index 32376a5..08958e4 100644 --- a/tests/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -1,7 +1,7 @@ Date: Tue, 10 Dec 2024 00:17:57 +0300 Subject: [PATCH 3/5] move MessageTest.php to Unit --- src/Adapter.php | 23 ++++++++++++++++------- src/QueueProvider.php | 8 +++++++- src/QueueProviderInterface.php | 2 ++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index c503559..2d05b9e 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -14,11 +14,12 @@ final class Adapter implements AdapterInterface { public function __construct( - private QueueProviderInterface $provider, + private QueueProviderInterface $provider, private MessageSerializerInterface $serializer, - private LoopInterface $loop, - private int $timeout = 3 - ) { + private LoopInterface $loop, + private int $timeout = 3 + ) + { } public function runExisting(callable $handlerCallback): void @@ -66,15 +67,18 @@ public function push(MessageInterface $message): MessageInterface public function subscribe(callable $handlerCallback): void { - while ($this->loop->canContinue()) { + $continue = true; + while ($continue) { $message = $this->reserve(); if (null === $message) { + $continue = $this->loop->canContinue(); continue; } $result = $handlerCallback($message); - if ($result) { - $this->provider->delete((string) $message->getId()); + $this->provider->delete((string) $message->getId()); + if (!$result) { + $continue = false; } } } @@ -99,4 +103,9 @@ private function reserve(): ?IdEnvelope return $envelope; } + + public function getChannelName(): string + { + return $this->provider->getChannelName(); + } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index d065f4c..b684e2a 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -17,7 +17,8 @@ class QueueProvider implements QueueProviderInterface public function __construct( private \Redis $redis, //redis connection, private string $channelName = self::DEFAULT_CHANNEL_NAME - ) { + ) + { } /** @@ -145,4 +146,9 @@ private function checkConnection(): void throw new NotConnectedRedisException('Redis is not connected.'); } } + + public function getChannelName(): string + { + return $this->channelName; + } } diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 1bce6fc..f59b523 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool; public function existInReserved(int $id): bool; public function withChannelName(string $channelName): self; + + public function getChannelName(): string; } From cfaec4bab612d03312eea11eed5dea9324839396 Mon Sep 17 00:00:00 2001 From: aegorov Date: Tue, 10 Dec 2024 00:52:18 +0300 Subject: [PATCH 4/5] add implementation method Message::fromData --- src/Message/Message.php | 14 ++++++++++---- tests/Unit/Message/MessageTest.php | 11 +++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/Message/Message.php b/src/Message/Message.php index 2c5cc9d..52c72fd 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -10,10 +10,11 @@ final class Message implements MessageInterface { public function __construct( private string $handlerName, - private mixed $data, - private array $metadata, - private int $delay = 0 //delay in seconds - ) { + private mixed $data, + private array $metadata, + private int $delay = 0 //delay in seconds + ) + { if ($this->delay > 0) { $this->metadata['delay'] = $delay; } @@ -40,4 +41,9 @@ public function getMetadata(): array { return $this->metadata; } + + public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface + { + return new self($handlerName, $data, $metadata, $metadata['delay'] ? (int)$metadata['delay'] : 0); + } } diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index d982c14..e47c87c 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -40,4 +40,15 @@ public function testWithDelay(): void $this->assertNotSame($message, $delayedMessage); $this->assertEquals(5, $delayedMessage->getMetadata()['delay']); } + + public function testFromData(): void + { + $message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => 2]); + self::assertEquals('test-handler', $message->getHandlerName()); + self::assertEquals(['data' => 'test-data'], $message->getData()); + self::assertEquals(['delay' => 2], $message->getMetadata()); + + $message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => '3']); + self::assertEquals(['delay' => 3], $message->getMetadata()); + } } From 19b17dcec2ba6c6c8f7edc8eca21de28e131109c Mon Sep 17 00:00:00 2001 From: aegorov Date: Wed, 11 Dec 2024 20:01:00 +0300 Subject: [PATCH 5/5] additional tests for 100% coverage --- tests/Integration/QueueProviderTest.php | 12 ++++++++++++ tests/Integration/QueueTest.php | 13 ++++++++++++- tests/Unit/QueueProviderTest.php | 8 ++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/tests/Integration/QueueProviderTest.php b/tests/Integration/QueueProviderTest.php index 09d2281..9b42327 100644 --- a/tests/Integration/QueueProviderTest.php +++ b/tests/Integration/QueueProviderTest.php @@ -44,4 +44,16 @@ public function testDelay(QueueProvider $provider): void $reserv = $provider->reserve($id); $this->assertNotNull($reserv); } + + /** + * @depends test__construct + */ + public function testWithChannelName(QueueProvider $provider): void + { + self::assertEquals('test', $provider->getChannelName()); + $providerOther = $provider->withChannelName('test'); + self::assertEquals($providerOther->getChannelName(), $provider->getChannelName()); + $providerOther = $provider->withChannelName('test1'); + self::assertEquals('test1', $providerOther->getChannelName()); + } } diff --git a/tests/Integration/QueueTest.php b/tests/Integration/QueueTest.php index 2882e40..9276fbc 100644 --- a/tests/Integration/QueueTest.php +++ b/tests/Integration/QueueTest.php @@ -15,6 +15,7 @@ use Yiisoft\Queue\Redis\Adapter; use Yiisoft\Queue\Redis\QueueProvider; use Yiisoft\Queue\Redis\QueueProviderInterface; +use Yiisoft\Queue\Redis\Reserve; use Yiisoft\Queue\Redis\Tests\Support\FileHelper; use Yiisoft\Queue\Redis\Tests\Support\IntegrationTestCase; @@ -85,6 +86,7 @@ public function testListen(): void $mockLoop, ); $queue = $this->getDefaultQueue($adapter); + self::assertEquals('yii-queue', $adapter->getChannelName()); $queue->push( new Message('ext-simple', ['file_name' => 'test-listen' . $time, 'payload' => ['time' => $time]]) @@ -124,7 +126,10 @@ public function testAdapterStatusException() public function testAdapterNullMessage() { $provider = $this->createMock(QueueProviderInterface::class); - $provider->method('reserve')->willReturn(null); + $provider->method('reserve')->willReturnOnConsecutiveCalls( + null, null, null, new Reserve(1, '{"name":"handler"}') + ); + $provider->method('delete'); $mockLoop = $this->createMock(LoopInterface::class); $mockLoop->expects($this->exactly(2))->method('canContinue')->willReturn(true, false); @@ -145,5 +150,11 @@ public function testAdapterNullMessage() $notUseHandler = false; }); $this->assertTrue($notUseHandler); + + $adapter->subscribe(function (MessageInterface $message) use (&$notUseHandler): mixed { + $notUseHandler = false; + return null; + }); + $this->assertFalse($notUseHandler); } } diff --git a/tests/Unit/QueueProviderTest.php b/tests/Unit/QueueProviderTest.php index 8458639..9d44183 100644 --- a/tests/Unit/QueueProviderTest.php +++ b/tests/Unit/QueueProviderTest.php @@ -20,6 +20,14 @@ public function test__construct() return $provider; } + /** + * @depends test__construct + */ + public function testGetChannelName(QueueProvider $provider) + { + self::assertEquals('test', $provider->getChannelName()); + } + /** * @depends test__construct * @throws \PHPUnit\Framework\MockObject\Exception