diff --git a/README.md b/README.md index 86406a6..2efd47c 100644 --- a/README.md +++ b/README.md @@ -127,13 +127,13 @@ If a timeout value is supplied the call will wait `timeout` seconds until a `REA The method returns either a [Task](#tasks) object or `null`: ```php -$task = $queue->take(); +$taskOrNull = $queue->take(); // wait 2 seconds -$task = $queue->take(2); +$taskOrNull = $queue->take(2); // wait 100 milliseconds -$task = $queue->take(.1); +$taskOrNull = $queue->take(.1); ``` After successful execution, a task can be marked as acknowledged (that will also delete the task from a queue): @@ -151,7 +151,7 @@ Or put back into the queue in case it cannot be executed: ```php $task = $queue->release($task->getId()); -// for ttl-like queues you can specify a delay +// for *ttl queues you can specify a delay $task = $queue->release($task->getId(), ['delay' => 30]); ``` @@ -173,6 +173,12 @@ To reset buried task(s) back to `READY` state: $count = $queue->kick(3); // kick 3 buried tasks ``` +To increase TTR and/or TTL of a running task (only for *ttl queues): + +```php +$taskOrNull = $queue->touch($takenTask->getId(), 5); // increase ttr/ttl to 5 seconds +``` + A task (in any state) can be deleted permanently with `delete()`: ```php diff --git a/src/Queue.php b/src/Queue.php index 87f0fd3..c566cc8 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -74,6 +74,19 @@ public function take($timeout = null) return empty($result[0]) ? null : Task::createFromTuple($result[0]); } + /** + * @param int $taskId + * @param int|float $increment + * + * @return Task|null + */ + public function touch($taskId, $increment) + { + $result = $this->client->call("queue.tube.$this->name:touch", [$taskId, $increment]); + + return empty($result[0]) ? null : Task::createFromTuple($result[0]); + } + /** * @param int $taskId * diff --git a/tests/Integration/Ttl.php b/tests/Integration/Ttl.php index bdea4ec..ed7dd14 100644 --- a/tests/Integration/Ttl.php +++ b/tests/Integration/Ttl.php @@ -40,6 +40,51 @@ public function testTimeToLive() $this->assertNull($task); } + /** + * @eval queue.tube['%tube_name%']:put('touch_ttr_1', {ttr = 1}) + */ + public function testTouchTimeToRun() + { + $task1 = $this->queue->take(.1); + $task2 = $this->queue->touch($task1->getId(), 1); + sleep(1); + $task3 = $this->queue->take(.1); + + $this->assertTaskInstance($task2); + $this->assertSame('touch_ttr_1', $task2->getData()); + $this->assertEquals($task1, $task2); + $this->assertNull($task3); + } + + /** + * @eval queue.tube['%tube_name%']:put('touch_ttl_1', {ttl = 1}) + */ + public function testTouchTimeToLive() + { + $task1 = $this->queue->take(.1); + $task2 = $this->queue->touch($task1->getId(), 1); + $this->queue->release($task1->getId()); + sleep(1); + $task3 = $this->queue->take(.1); + + $this->assertTaskInstance($task2); + $this->assertSame('touch_ttl_1', $task2->getData()); + $this->assertEquals($task1, $task2); + $this->assertEquals($task2, $task3); + } + + /** + * @eval queue.tube['%tube_name%']:put('touch_invalid_interval') + */ + public function testTouchInvalidInterval() + { + $task = $this->queue->take(.1); + + foreach ([0, -1] as $interval) { + $this->assertNull($this->queue->touch($task->getId(), $interval)); + } + } + /** * @eval queue.tube['%tube_name%']:put('pri_low', {pri = 2}) * @eval queue.tube['%tube_name%']:put('pri_high', {pri = 1})