Skip to content

Commit

Permalink
Merge pull request #2 from asiries335/mvp-2
Browse files Browse the repository at this point in the history
add use react/event-loop
  • Loading branch information
asiries335 authored Aug 29, 2020
2 parents a635c7b + 38e11fe commit cccea08
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 32 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This package is for working with Redis streams for php

#### **What is implemented in the current version**

At the moment, the package can work with methods: **xadd**, **xread**, **xrevrange**.
At the moment, the package can work with methods: **xadd**, **xread**, **xrevrange**, **xdel**.

The package has functions for adding messages to a stream,
get messages from a stream,
Expand Down Expand Up @@ -54,7 +54,13 @@ $client->stream('test')->add(
);
```

see more https://redis.io/commands/xadd
_Delete a message_

```php
$client->stream('test')->delete('key');
```

see more https://redis.io/commands/xdel

_Get a collection of messages from the stream_

Expand Down Expand Up @@ -98,4 +104,4 @@ $client->stream('test')->listen(
);
```

The method works as event-loop
functional works on a package basis https://github.com/reactphp/event-loop
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"predis/predis": "^1.1",
"phpunit/phpunit": "^8.5",
"mockery/mockery": "^1.3",
"phpunit/php-code-coverage": "^7.0"
"phpunit/php-code-coverage": "^7.0",
"react/event-loop": "^1.1.1"
},
"autoload": {
"psr-4": {
Expand Down
43 changes: 43 additions & 0 deletions src/Data/Constants.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php


namespace Asiries335\redisSteamPhp\Data;


final class Constants
{
/**
* COMMAND XADD
*
* @var string
*/
public const COMMAND_XADD = 'xadd';

/**
* COMMAND XREAD
*
* @var string
*/
public const COMMAND_XREAD = 'xread';

/**
* COMMAND XRANGE
*
* @var string
*/
public const COMMAND_XRANGE = 'xrange';

/**
* TIME TICK INTERVAL
*
* @var float
*/
public const TIME_TICK_INTERVAL = 1;

/**
* COMMAND XDEL
*
* @var string
*/
public const COMMAND_XDEL = 'xdel';
}
76 changes: 50 additions & 26 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
namespace Asiries335\redisSteamPhp;

use Asiries335\redisSteamPhp\Data\Collection;
use Asiries335\redisSteamPhp\Data\Constants;
use Asiries335\redisSteamPhp\Data\Message;
use Asiries335\redisSteamPhp\Hydrator\CollectionHydrator;
use Asiries335\redisSteamPhp\Hydrator\MessageHydrator;
Expand Down Expand Up @@ -60,7 +61,7 @@ public function add(string $key, array $values) : string
{
try {
return (string) $this->_client->rawCommand(
'xadd',
Constants::COMMAND_XADD,
$this->_streamName,
'*',
$key,
Expand All @@ -71,6 +72,30 @@ public function add(string $key, array $values) : string
}
}

/**
* Removes the messages entries from a stream
*
* @param string $key Key Message
*
* @return int
*
* @throws \Exception
*
* @see https://redis.io/commands/xdel
*/
public function delete(string $key) : int
{
try {
return (int) $this->_client->rawCommand(
Constants::COMMAND_XDEL,
$this->_streamName,
$key
);
} catch (\Exception $exception) {
throw $exception;
}
}

/**
* Get data from stream
*
Expand All @@ -84,7 +109,7 @@ public function get() : Collection
{
try {
$items = $this->_client->rawCommand(
'xread',
Constants::COMMAND_XREAD,
'STREAMS',
$this->_streamName,
'0'
Expand Down Expand Up @@ -116,32 +141,31 @@ public function listen(\Closure $closure) : void
{
$messageHydrate = new MessageHydrator();

$lastMessageId = null;

while (true) {
$data = $this->_client->rawCommand(
'xrevrange',
$this->_streamName,
'+',
'-',
'COUNT',
1
);

if (empty($data) === true) {
usleep(1);
continue;
}

$message = $messageHydrate->hydrate($data[0], Message::class);

if ($message->getId() !== $lastMessageId) {
$lastMessageId = $message->getId();
$closure->call($this, $message);
$loop = \React\EventLoop\Factory::create();

$loop->addPeriodicTimer(
Constants::TIME_TICK_INTERVAL,
function () use ($closure, $messageHydrate, $loop) {
$rows = $this->_client->rawCommand(
Constants::COMMAND_XRANGE,
$this->_streamName,
'-',
'+'
);

if (empty($rows) === true) {
return;
}

foreach ($rows as $row) {
$message = $messageHydrate->hydrate($row, Message::class);
$closure->call($this, $message);
$this->delete($message->getId());
}
}
);

usleep(1);
}
$loop->run();
}

}
32 changes: 30 additions & 2 deletions tests/Unit/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public function testAddDataToStream() : void

$this->client->shouldReceive('rawCommand')->andReturn($key);

$this->stream = new Stream($this->client, self::TEST_NAME_STREAM);
$stream = new Stream($this->client, self::TEST_NAME_STREAM);

$result = $this->stream->add($key, $values);
$result = $stream->add($key, $values);

$this->assertEquals($key, $result);
}
Expand Down Expand Up @@ -98,4 +98,32 @@ public function testReadStream() : void
$this->assertEquals(Collection::create($data), $collectionStream);

}

/**
* Delete message
*
* @throws \Exception
*
* @return void
*/
public function testDeleteMessage() : void
{
$key = 'name';

$values = [
'id' => 123,
'name' => 'Barney',
'age' => 25,
];

$this->client->shouldReceive('rawCommand')->andReturn($key);

$stream = new Stream($this->client, self::TEST_NAME_STREAM);

$stream->add($key, $values);

$result = $stream->delete($key);

$this->assertIsInt($result);
}
}

0 comments on commit cccea08

Please sign in to comment.