Skip to content

Commit

Permalink
Add support for ack, nack, and requeue methods
Browse files Browse the repository at this point in the history
  • Loading branch information
msmakouz committed Jun 24, 2024
1 parent 0013e1e commit 3204a58
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 10 deletions.
33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ var_dump($task->getId() . ' has been queued');

### Consumer

The following code will allow you to read and process the task from the RoadRunner server.
The Consumer processes tasks from RoadRunner server and responds based on the processing outcome:

- `ack` - is used for positive acknowledgements.
- `nack` - is used for negative acknowledgements.
- `requeue` - is used for requeuing the task.

The behavior of the `nack` method depends on its implementation by the queue driver. It can accept an additional
parameter **redelivery**; if it is passed and set to **true**, the task will be requeued. However, not all drivers
support this functionality. If the redelivery parameter is not passed, set to **false**, or the queue driver's
implementation does not support it, the task will not be requeued.

```php
$task->nack(message: $reason, redelivery: true);
```

The `requeue` method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend
the task to **the end of the queue** and add additional headers to the task.

```php
$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);
```

The `nack` and `requeue` methods have the ability to specify a **delay** for requeuing the task. To do this, call
the `withDelay` method and pass the desired value before invoking the `nack` or `requeue` methods.

```php
$task->withDelay(10)->requeue($exception);
```

```php
<?php
Expand All @@ -115,9 +142,9 @@ while ($task = $consumer->waitTask()) {
// Process task
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->fail($e, requeue: true);
$task->requeue($e);
}
}
```
Expand Down
40 changes: 38 additions & 2 deletions src/Task/ReceivedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ public function getQueue(): string
return $this->queue;
}

/**
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void
{
$this->respond(Type::SUCCESS);
}

/**
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void
{
$data = [
Expand All @@ -82,19 +88,49 @@ public function fail(string|\Stringable|\Throwable $error, bool $requeue = false
$this->respond(Type::ERROR, $data);
}

public function ack(): void
{
$this->respond(Type::ACK);
}

public function nack(string|\Stringable|\Throwable $message, bool $redelivery = false): void
{
$this->respond(Type::NACK, [
'message' => (string) $message,
'redelivery' => $redelivery,
'delay_seconds' => $this->delay,
]);
}

public function requeue(string|\Stringable|\Throwable $message): void
{
$data = [
'message' => (string) $message,
'delay_seconds' => $this->delay,
];

if (!empty($this->headers)) {
$data['headers'] = $this->headers;
}

$this->respond(Type::REQUEUE, $data);
}

public function isCompleted(): bool
{
return $this->completed !== null;
}

public function isSuccessful(): bool
{
return $this->completed === Type::SUCCESS;
return $this->completed === Type::SUCCESS || $this->completed === Type::ACK;
}

public function isFails(): bool
{
return $this->completed === Type::ERROR;
return $this->completed === Type::ERROR ||
$this->completed === Type::NACK ||
$this->completed === Type::REQUEUE;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/Task/ReceivedTaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

/**
* @psalm-suppress MissingImmutableAnnotation The implementation of this task is mutable.
* @method void ack()
* @method void nack(string|\Stringable|\Throwable $message, bool $redelivery = false)
* @method void requeue(string|\Stringable|\Throwable $message)
*/
interface ReceivedTaskInterface extends
QueuedTaskInterface,
Expand All @@ -19,13 +22,15 @@ interface ReceivedTaskInterface extends
* Marks the current task as completed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see ack()} instead.
*/
public function complete(): void;

/**
* Marks the current task as failed.
*
* @throws JobsException
* @deprecated Since v4.5.0, use {@see nack()} or {@see requeue()} instead.
*/
public function fail(string|\Stringable|\Throwable $error, bool $requeue = false): void;

Expand Down
17 changes: 17 additions & 0 deletions src/Task/Type.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,28 @@ interface Type
{
/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::ACK} instead.
*/
public const SUCCESS = 0;

/**
* @var TypeEnum
* @deprecated Since v4.5.0, use {@see Type::NACK} or {@see Type::REQUEUE} instead.
*/
public const ERROR = 1;

/**
* @var TypeEnum
*/
public const ACK = 2;

/**
* @var TypeEnum
*/
public const NACK = 3;

/**
* @var TypeEnum
*/
public const REQUEUE = 4;
}
121 changes: 116 additions & 5 deletions tests/Unit/Task/ReceivedTaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Spiral\RoadRunner\Jobs\Tests\Unit\Task;

use Generator;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Spiral\RoadRunner\Jobs\Queue\Driver;
Expand Down Expand Up @@ -38,8 +39,6 @@ public function createTask(
);
}



public function testGetsQueue(): void
{
$task = $this->createTask(queue: 'broker-queue-name');
Expand Down Expand Up @@ -81,6 +80,30 @@ public function testComplete(): void
$this->assertFalse($task->isFails());
}

public function testAck(): void
{
$task = $this->createTask();

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) {
$this->assertEquals('{"type":2,"data":[]}', $payload->body);

return true;
}),
);

$task->ack();

$this->assertTrue($task->isCompleted());
$this->assertTrue($task->isSuccessful());
$this->assertFalse($task->isFails());
}

public static function provideFailData(): Generator
{
Expand All @@ -90,9 +113,97 @@ public static function provideFailData(): Generator
yield 'headers' => ['Some error message', false, null, ['foo' => 'bar']];
}

/**
* @dataProvider provideFailData
*/
#[DataProvider('provideFailData')]
public function testNack(string $error, bool $redelivery, int|null $delay): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $redelivery, $error) {
$result = [
'type' => Type::NACK,
'data' => [
'message' => $error,
'redelivery' => $redelivery,
'delay_seconds' => (int) $delay,
],
];

$this->assertEquals(
\json_encode($result),
$payload->body,
);

return true;
}),
);

$task->nack(message: $error, redelivery: $redelivery);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}


#[DataProvider('provideFailData')]
public function testRequeue(string $error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();

if ($delay !== null) {
$task = $task->withDelay($delay);
}

foreach ($headers as $key => $value) {
$task = $task->withHeader($key, $value);
$headers[$key] = [$value];
}

$this->assertFalse($task->isCompleted());
$this->assertFalse($task->isFails());
$this->assertFalse($task->isSuccessful());

$this->worker->expects($this->once())
->method('respond')
->with(
$this->callback(function (Payload $payload) use ($delay, $error, $headers) {
$result = [
'type' => Type::REQUEUE,
'data' => [
'message' => $error,
'delay_seconds' => (int) $delay,
],
];

if (!empty($headers)) {
$result['data']['headers'] = $headers;
}

$this->assertEquals(\json_encode($result), $payload->body,);

return true;
}),
);

$task->requeue(message: $error);

$this->assertTrue($task->isFails());
$this->assertFalse($task->isSuccessful());
$this->assertTrue($task->isCompleted());
}

#[DataProvider('provideFailData')]
public function testFail($error, bool $requeue, int|null $delay, array $headers): void
{
$task = $this->createTask();
Expand Down

0 comments on commit 3204a58

Please sign in to comment.