Skip to content
This repository has been archived by the owner on Jul 28, 2022. It is now read-only.

Commit

Permalink
Add prefetch count configuration in AMQPBackend (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiroko authored and greg0ire committed Sep 12, 2017
1 parent f365c7e commit cebab90
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 36 deletions.
16 changes: 13 additions & 3 deletions Backend/AMQPBackend.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
*/
class AMQPBackend implements BackendInterface
{
/**
* @var AMQPBackendDispatcher
*/
protected $dispatcher = null;

/**
* @var string
*/
Expand Down Expand Up @@ -63,9 +68,9 @@ class AMQPBackend implements BackendInterface
protected $ttl;

/**
* @var AMQPBackendDispatcher
* @var null|int
*/
protected $dispatcher = null;
private $prefetchCount;

/**
* @param string $exchange
Expand All @@ -76,7 +81,7 @@ class AMQPBackend implements BackendInterface
* @param string $deadLetterRoutingKey
* @param null|int $ttl
*/
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null)
{
$this->exchange = $exchange;
$this->queue = $queue;
Expand All @@ -85,6 +90,7 @@ public function __construct($exchange, $queue, $recover, $key, $deadLetterExchan
$this->deadLetterExchange = $deadLetterExchange;
$this->deadLetterRoutingKey = $deadLetterRoutingKey;
$this->ttl = $ttl;
$this->prefetchCount = $prefetchCount;

if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
throw new \RuntimeException('Please install php-amqplib/php-amqplib dependency');
Expand Down Expand Up @@ -192,6 +198,10 @@ public function createAndPublish($type, array $body)
*/
public function getIterator()
{
if ($this->prefetchCount !== null) {
$this->getChannel()->basic_qos(null, $this->prefetchCount, null);
}

return new AMQPMessageIterator($this->getChannel(), $this->queue);
}

Expand Down
12 changes: 12 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ protected function getQueueNode()
Only used by RabbitMQ
Defines the per-queue message time-to-live (milliseconds)
EOF;

$prefetchCountInfo = <<<'EOF'
Only used by RabbitMQ
Defines the number of messages which will be delivered to the customer at a time.
EOF;

$typesInfo = <<<'EOF'
Expand Down Expand Up @@ -269,6 +275,12 @@ protected function getQueueNode()
->min(0)
->defaultValue(null)
->end()
->integerNode('prefetch_count')
->info($prefetchCountInfo)
->min(0)
->max(65535)
->defaultValue(null)
->end()

// Database configuration (Doctrine)
->arrayNode('types')
Expand Down
8 changes: 6 additions & 2 deletions DependencyInjection/SonataNotificationExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
'dead_letter_exchange' => null,
'dead_letter_routing_key' => null,
'ttl' => null,
'prefetch_count' => null,
));
}

Expand Down Expand Up @@ -381,7 +382,8 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
$queue['routing_key'],
$queue['dead_letter_exchange'],
$queue['dead_letter_routing_key'],
$queue['ttl']
$queue['ttl'],
$queue['prefetch_count']
);

$amqBackends[$pos] = array(
Expand Down Expand Up @@ -419,10 +421,11 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config)
* @param string $deadLetterExchange
* @param string $deadLetterRoutingKey
* @param int|null $ttl
* @param int|null $prefetchCount
*
* @return string
*/
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null)
{
$id = 'sonata.notification.backend.rabbitmq.'.$this->amqpCounter++;

Expand All @@ -436,6 +439,7 @@ protected function createAMQPBackend(ContainerBuilder $container, $exchange, $na
$deadLetterExchange,
$deadLetterRoutingKey,
$ttl,
$prefetchCount,
)
);
$definition->setPublic(false);
Expand Down
86 changes: 55 additions & 31 deletions Tests/Backend/AMQPBackendTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Sonata\NotificationBundle\Tests\Backend;

use Sonata\NotificationBundle\Backend\AMQPBackend;
use Sonata\NotificationBundle\Tests\Helpers\PHPUnit_Framework_TestCase;

class AMQPBackendTest extends PHPUnit_Framework_TestCase
Expand All @@ -21,6 +22,7 @@ class AMQPBackendTest extends PHPUnit_Framework_TestCase
const DEAD_LETTER_EXCHANGE = 'dlx';
const DEAD_LETTER_ROUTING_KEY = 'message.type.dl';
const TTL = 60000;
const PREFETCH_COUNT = 1;

protected function setUp()
{
Expand All @@ -41,7 +43,7 @@ public function testInitializeWithNoDeadLetterExchangeAndNoDeadLetterRoutingKey(
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
);
$channelMock->expects($this->once())
->method('queue_declare')
->with(
Expand All @@ -52,14 +54,14 @@ public function testInitializeWithNoDeadLetterExchangeAndNoDeadLetterRoutingKey(
$this->isType('boolean'),
$this->isType('boolean'),
$this->equalTo(array())
);
);
$channelMock->expects($this->once())
->method('queue_bind')
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);
);

$backend->initialize();
}
Expand All @@ -85,7 +87,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey()
$this->isType('boolean'),
$this->isType('boolean'),
)
);
);
$channelMock->expects($this->once())
->method('queue_declare')
->with(
Expand All @@ -98,7 +100,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey()
$this->equalTo(array(
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
))
);
);
$channelMock->expects($this->exactly(2))
->method('queue_bind')
->withConsecutive(
Expand All @@ -112,7 +114,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey()
$this->equalTo(self::DEAD_LETTER_EXCHANGE),
$this->equalTo(self::KEY),
)
);
);

$backend->initialize();
}
Expand All @@ -129,7 +131,7 @@ public function testInitializeWithDeadLetterExchangeAndDeadLetterRoutingKey()
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
);
$channelMock->expects($this->once())
->method('queue_declare')
->with(
Expand All @@ -143,14 +145,14 @@ public function testInitializeWithDeadLetterExchangeAndDeadLetterRoutingKey()
'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE),
'x-dead-letter-routing-key' => array('S', self::DEAD_LETTER_ROUTING_KEY),
))
);
);
$channelMock->expects($this->once())
->method('queue_bind')
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);
);

$backend->initialize();
}
Expand All @@ -167,7 +169,7 @@ public function testInitializeWithTTL()
$this->isType('boolean'),
$this->isType('boolean'),
$this->isType('boolean')
);
);
$channelMock->expects($this->once())
->method('queue_declare')
->with(
Expand All @@ -180,32 +182,55 @@ public function testInitializeWithTTL()
$this->equalTo(array(
'x-message-ttl' => array('I', self::TTL),
))
);
);
$channelMock->expects($this->once())
->method('queue_bind')
->with(
$this->equalTo(self::QUEUE),
$this->equalTo(self::EXCHANGE),
$this->equalTo(self::KEY)
);
);

$backend->initialize();
}

protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null)
public function testGetIteratorWithNoPrefetchCount()
{
$mock = $this->getMockBuilder('\Sonata\NotificationBundle\Backend\AMQPBackend')
->setConstructorArgs(array(
self::EXCHANGE,
self::QUEUE,
$recover,
self::KEY,
$deadLetterExchange,
$deadLetterRoutingKey,
$ttl,
))
->setMethods(array('getIterator'))
->getMock();
list($backend, $channelMock) = $this->getBackendAndChannelMock();

$channelMock->expects($this->never())
->method('basic_qos');

$backend->getIterator();
}

public function testGetIteratorWithPrefetchCount()
{
list($backend, $channelMock) = $this->getBackendAndChannelMock(false, null, null, null, self::PREFETCH_COUNT);

$channelMock->expects($this->once())
->method('basic_qos')
->with(
$this->isNull(),
$this->equalTo(self::PREFETCH_COUNT),
$this->isNull()
);

$backend->getIterator();
}

protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null)
{
$backend = new AMQPBackend(
self::EXCHANGE,
self::QUEUE,
$recover,
self::KEY,
$deadLetterExchange,
$deadLetterRoutingKey,
$ttl,
$prefetchCount
);

$settings = array(
'host' => 'foo',
Expand All @@ -221,20 +246,19 @@ protected function getBackendAndChannelMock($recover = false, $deadLetterExchang

$channelMock = $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
->disableOriginalConstructor()
->setMethods(array('queue_declare', 'exchange_declare', 'queue_bind'))
->getMock()
;
->setMethods(array('queue_declare', 'exchange_declare', 'queue_bind', 'basic_qos'))
->getMock();

$dispatcherMock = $this->getMockBuilder('\Sonata\NotificationBundle\Backend\AMQPBackendDispatcher')
->setConstructorArgs(array($settings, $queues, 'default', array(array('type' => self::KEY, 'backend' => $mock))))
->setConstructorArgs(array($settings, $queues, 'default', array(array('type' => self::KEY, 'backend' => $backend))))
->setMethods(array('getChannel'))
->getMock();

$dispatcherMock->method('getChannel')
->willReturn($channelMock);

$mock->setDispatcher($dispatcherMock);
$backend->setDispatcher($dispatcherMock);

return array($mock, $channelMock);
return array($backend, $channelMock);
}
}

0 comments on commit cebab90

Please sign in to comment.