From cebab90b2d4d66960d7f670a7f9515972079908a Mon Sep 17 00:00:00 2001 From: shiroko Date: Tue, 12 Sep 2017 18:26:54 +0900 Subject: [PATCH] Add prefetch count configuration in AMQPBackend (#257) --- Backend/AMQPBackend.php | 16 +++- DependencyInjection/Configuration.php | 12 +++ .../SonataNotificationExtension.php | 8 +- Tests/Backend/AMQPBackendTest.php | 86 ++++++++++++------- 4 files changed, 86 insertions(+), 36 deletions(-) diff --git a/Backend/AMQPBackend.php b/Backend/AMQPBackend.php index 81afc3ed..8118b7fd 100644 --- a/Backend/AMQPBackend.php +++ b/Backend/AMQPBackend.php @@ -27,6 +27,11 @@ */ class AMQPBackend implements BackendInterface { + /** + * @var AMQPBackendDispatcher + */ + protected $dispatcher = null; + /** * @var string */ @@ -63,9 +68,9 @@ class AMQPBackend implements BackendInterface protected $ttl; /** - * @var AMQPBackendDispatcher + * @var null|int */ - protected $dispatcher = null; + private $prefetchCount; /** * @param string $exchange @@ -76,7 +81,7 @@ class AMQPBackend implements BackendInterface * @param string $deadLetterRoutingKey * @param null|int $ttl */ - public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null) + public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null) { $this->exchange = $exchange; $this->queue = $queue; @@ -85,6 +90,7 @@ public function __construct($exchange, $queue, $recover, $key, $deadLetterExchan $this->deadLetterExchange = $deadLetterExchange; $this->deadLetterRoutingKey = $deadLetterRoutingKey; $this->ttl = $ttl; + $this->prefetchCount = $prefetchCount; if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) { throw new \RuntimeException('Please install php-amqplib/php-amqplib dependency'); @@ -192,6 +198,10 @@ public function createAndPublish($type, array $body) */ public function getIterator() { + if ($this->prefetchCount !== null) { + $this->getChannel()->basic_qos(null, $this->prefetchCount, null); + } + return new AMQPMessageIterator($this->getChannel(), $this->queue); } diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 43634eb8..8f91926b 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -221,6 +221,12 @@ protected function getQueueNode() Only used by RabbitMQ Defines the per-queue message time-to-live (milliseconds) +EOF; + + $prefetchCountInfo = <<<'EOF' +Only used by RabbitMQ + +Defines the number of messages which will be delivered to the customer at a time. EOF; $typesInfo = <<<'EOF' @@ -269,6 +275,12 @@ protected function getQueueNode() ->min(0) ->defaultValue(null) ->end() + ->integerNode('prefetch_count') + ->info($prefetchCountInfo) + ->min(0) + ->max(65535) + ->defaultValue(null) + ->end() // Database configuration (Doctrine) ->arrayNode('types') diff --git a/DependencyInjection/SonataNotificationExtension.php b/DependencyInjection/SonataNotificationExtension.php index b4652bb5..3dfc1b04 100644 --- a/DependencyInjection/SonataNotificationExtension.php +++ b/DependencyInjection/SonataNotificationExtension.php @@ -335,6 +335,7 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config) 'dead_letter_exchange' => null, 'dead_letter_routing_key' => null, 'ttl' => null, + 'prefetch_count' => null, )); } @@ -381,7 +382,8 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config) $queue['routing_key'], $queue['dead_letter_exchange'], $queue['dead_letter_routing_key'], - $queue['ttl'] + $queue['ttl'], + $queue['prefetch_count'] ); $amqBackends[$pos] = array( @@ -419,10 +421,11 @@ protected function configureRabbitmq(ContainerBuilder $container, array $config) * @param string $deadLetterExchange * @param string $deadLetterRoutingKey * @param int|null $ttl + * @param int|null $prefetchCount * * @return string */ - protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null) + protected function createAMQPBackend(ContainerBuilder $container, $exchange, $name, $recover, $key = '', $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null) { $id = 'sonata.notification.backend.rabbitmq.'.$this->amqpCounter++; @@ -436,6 +439,7 @@ protected function createAMQPBackend(ContainerBuilder $container, $exchange, $na $deadLetterExchange, $deadLetterRoutingKey, $ttl, + $prefetchCount, ) ); $definition->setPublic(false); diff --git a/Tests/Backend/AMQPBackendTest.php b/Tests/Backend/AMQPBackendTest.php index 8b3a7fcf..1180ad34 100644 --- a/Tests/Backend/AMQPBackendTest.php +++ b/Tests/Backend/AMQPBackendTest.php @@ -11,6 +11,7 @@ namespace Sonata\NotificationBundle\Tests\Backend; +use Sonata\NotificationBundle\Backend\AMQPBackend; use Sonata\NotificationBundle\Tests\Helpers\PHPUnit_Framework_TestCase; class AMQPBackendTest extends PHPUnit_Framework_TestCase @@ -21,6 +22,7 @@ class AMQPBackendTest extends PHPUnit_Framework_TestCase const DEAD_LETTER_EXCHANGE = 'dlx'; const DEAD_LETTER_ROUTING_KEY = 'message.type.dl'; const TTL = 60000; + const PREFETCH_COUNT = 1; protected function setUp() { @@ -41,7 +43,7 @@ public function testInitializeWithNoDeadLetterExchangeAndNoDeadLetterRoutingKey( $this->isType('boolean'), $this->isType('boolean'), $this->isType('boolean') - ); + ); $channelMock->expects($this->once()) ->method('queue_declare') ->with( @@ -52,14 +54,14 @@ public function testInitializeWithNoDeadLetterExchangeAndNoDeadLetterRoutingKey( $this->isType('boolean'), $this->isType('boolean'), $this->equalTo(array()) - ); + ); $channelMock->expects($this->once()) ->method('queue_bind') ->with( $this->equalTo(self::QUEUE), $this->equalTo(self::EXCHANGE), $this->equalTo(self::KEY) - ); + ); $backend->initialize(); } @@ -85,7 +87,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey() $this->isType('boolean'), $this->isType('boolean'), ) - ); + ); $channelMock->expects($this->once()) ->method('queue_declare') ->with( @@ -98,7 +100,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey() $this->equalTo(array( 'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE), )) - ); + ); $channelMock->expects($this->exactly(2)) ->method('queue_bind') ->withConsecutive( @@ -112,7 +114,7 @@ public function testInitializeWithDeadLetterExchangeAndNoDeadLetterRoutingKey() $this->equalTo(self::DEAD_LETTER_EXCHANGE), $this->equalTo(self::KEY), ) - ); + ); $backend->initialize(); } @@ -129,7 +131,7 @@ public function testInitializeWithDeadLetterExchangeAndDeadLetterRoutingKey() $this->isType('boolean'), $this->isType('boolean'), $this->isType('boolean') - ); + ); $channelMock->expects($this->once()) ->method('queue_declare') ->with( @@ -143,14 +145,14 @@ public function testInitializeWithDeadLetterExchangeAndDeadLetterRoutingKey() 'x-dead-letter-exchange' => array('S', self::DEAD_LETTER_EXCHANGE), 'x-dead-letter-routing-key' => array('S', self::DEAD_LETTER_ROUTING_KEY), )) - ); + ); $channelMock->expects($this->once()) ->method('queue_bind') ->with( $this->equalTo(self::QUEUE), $this->equalTo(self::EXCHANGE), $this->equalTo(self::KEY) - ); + ); $backend->initialize(); } @@ -167,7 +169,7 @@ public function testInitializeWithTTL() $this->isType('boolean'), $this->isType('boolean'), $this->isType('boolean') - ); + ); $channelMock->expects($this->once()) ->method('queue_declare') ->with( @@ -180,32 +182,55 @@ public function testInitializeWithTTL() $this->equalTo(array( 'x-message-ttl' => array('I', self::TTL), )) - ); + ); $channelMock->expects($this->once()) ->method('queue_bind') ->with( $this->equalTo(self::QUEUE), $this->equalTo(self::EXCHANGE), $this->equalTo(self::KEY) - ); + ); $backend->initialize(); } - protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null) + public function testGetIteratorWithNoPrefetchCount() { - $mock = $this->getMockBuilder('\Sonata\NotificationBundle\Backend\AMQPBackend') - ->setConstructorArgs(array( - self::EXCHANGE, - self::QUEUE, - $recover, - self::KEY, - $deadLetterExchange, - $deadLetterRoutingKey, - $ttl, - )) - ->setMethods(array('getIterator')) - ->getMock(); + list($backend, $channelMock) = $this->getBackendAndChannelMock(); + + $channelMock->expects($this->never()) + ->method('basic_qos'); + + $backend->getIterator(); + } + + public function testGetIteratorWithPrefetchCount() + { + list($backend, $channelMock) = $this->getBackendAndChannelMock(false, null, null, null, self::PREFETCH_COUNT); + + $channelMock->expects($this->once()) + ->method('basic_qos') + ->with( + $this->isNull(), + $this->equalTo(self::PREFETCH_COUNT), + $this->isNull() + ); + + $backend->getIterator(); + } + + protected function getBackendAndChannelMock($recover = false, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null) + { + $backend = new AMQPBackend( + self::EXCHANGE, + self::QUEUE, + $recover, + self::KEY, + $deadLetterExchange, + $deadLetterRoutingKey, + $ttl, + $prefetchCount + ); $settings = array( 'host' => 'foo', @@ -221,20 +246,19 @@ protected function getBackendAndChannelMock($recover = false, $deadLetterExchang $channelMock = $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel') ->disableOriginalConstructor() - ->setMethods(array('queue_declare', 'exchange_declare', 'queue_bind')) - ->getMock() - ; + ->setMethods(array('queue_declare', 'exchange_declare', 'queue_bind', 'basic_qos')) + ->getMock(); $dispatcherMock = $this->getMockBuilder('\Sonata\NotificationBundle\Backend\AMQPBackendDispatcher') - ->setConstructorArgs(array($settings, $queues, 'default', array(array('type' => self::KEY, 'backend' => $mock)))) + ->setConstructorArgs(array($settings, $queues, 'default', array(array('type' => self::KEY, 'backend' => $backend)))) ->setMethods(array('getChannel')) ->getMock(); $dispatcherMock->method('getChannel') ->willReturn($channelMock); - $mock->setDispatcher($dispatcherMock); + $backend->setDispatcher($dispatcherMock); - return array($mock, $channelMock); + return array($backend, $channelMock); } }