Skip to content

Commit dc6e65b

Browse files
committed
Merge pull request #179 from joelwurtz/multiple-consumers
Add multiple queues consumers
2 parents 05059f8 + 9aca5c8 commit dc6e65b

File tree

13 files changed

+461
-21
lines changed

13 files changed

+461
-21
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Command;
4+
5+
class MultipleConsumerCommand extends BaseConsumerCommand
6+
{
7+
8+
protected function configure()
9+
{
10+
parent::configure();
11+
12+
$this->setName('rabbitmq:multiple-consumer');
13+
}
14+
15+
protected function getConsumerService()
16+
{
17+
return 'old_sound_rabbit_mq.%s_multiple';
18+
}
19+
}

DependencyInjection/Compiler/RegisterPartsPass.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public function process(ContainerBuilder $container)
2020
'old_sound_rabbit_mq.base_amqp',
2121
'old_sound_rabbit_mq.producer',
2222
'old_sound_rabbit_mq.consumer',
23+
'old_sound_rabbit_mq.multi_consumer',
2324
'old_sound_rabbit_mq.anon_consumer',
2425
'old_sound_rabbit_mq.rpc_client',
2526
'old_sound_rabbit_mq.rpc_server',

DependencyInjection/Configuration.php

Lines changed: 128 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,31 @@ public function getConfigTreeBuilder()
1717
{
1818
$tree = new TreeBuilder();
1919

20-
$tree->root('old_sound_rabbit_mq')
20+
$rootNode = $tree->root('old_sound_rabbit_mq');
21+
22+
$rootNode
2123
->children()
2224
->booleanNode('debug')->defaultValue('%kernel.debug%')->end()
2325
->booleanNode('enable_collector')->defaultValue(false)->end()
2426
->booleanNode('sandbox')->defaultValue(false)->end()
27+
->end()
28+
;
29+
30+
$this->addConnections($rootNode);
31+
$this->addProducers($rootNode);
32+
$this->addConsumers($rootNode);
33+
$this->addMultipleConsumers($rootNode);
34+
$this->addAnonConsumers($rootNode);
35+
$this->addRpcClients($rootNode);
36+
$this->addRpcServers($rootNode);
37+
38+
return $tree;
39+
}
40+
41+
protected function addConnections(ArrayNodeDefinition $node)
42+
{
43+
$node
44+
->children()
2545
->arrayNode('connections')
2646
->useAttributeAsKey('key')
2747
->canBeUnset()
@@ -36,7 +56,14 @@ public function getConfigTreeBuilder()
3656
->end()
3757
->end()
3858
->end()
39-
// producers
59+
->end()
60+
;
61+
}
62+
63+
protected function addProducers(ArrayNodeDefinition $node)
64+
{
65+
$node
66+
->children()
4067
->arrayNode('producers')
4168
->canBeUnset()
4269
->useAttributeAsKey('key')
@@ -50,7 +77,14 @@ public function getConfigTreeBuilder()
5077
->end()
5178
->end()
5279
->end()
53-
// consumers
80+
->end()
81+
;
82+
}
83+
84+
protected function addConsumers(ArrayNodeDefinition $node)
85+
{
86+
$node
87+
->children()
5488
->arrayNode('consumers')
5589
->canBeUnset()
5690
->useAttributeAsKey('key')
@@ -73,6 +107,61 @@ public function getConfigTreeBuilder()
73107
->end()
74108
->end()
75109
->end()
110+
->end()
111+
;
112+
}
113+
114+
protected function addMultipleConsumers(ArrayNodeDefinition $node)
115+
{
116+
$node
117+
->children()
118+
->arrayNode('multiple_consumers')
119+
->canBeUnset()
120+
->useAttributeAsKey('key')
121+
->prototype('array')
122+
->append($this->getExchangeConfiguration())
123+
->children()
124+
->scalarNode('connection')->defaultValue('default')->end()
125+
->scalarNode('idle_timeout')->end()
126+
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
127+
->append($this->getMultipleQueuesConfiguration())
128+
->arrayNode('qos_options')
129+
->canBeUnset()
130+
->children()
131+
->scalarNode('prefetch_size')->defaultValue(0)->end()
132+
->scalarNode('prefetch_count')->defaultValue(0)->end()
133+
->booleanNode('global')->defaultFalse()->end()
134+
->end()
135+
->end()
136+
->end()
137+
->end()
138+
->end()
139+
;
140+
}
141+
142+
protected function addAnonConsumers(ArrayNodeDefinition $node)
143+
{
144+
$node
145+
->children()
146+
->arrayNode('anon_consumers')
147+
->canBeUnset()
148+
->useAttributeAsKey('key')
149+
->prototype('array')
150+
->append($this->getExchangeConfiguration())
151+
->children()
152+
->scalarNode('connection')->defaultValue('default')->end()
153+
->scalarNode('callback')->isRequired()->end()
154+
->end()
155+
->end()
156+
->end()
157+
->end()
158+
;
159+
}
160+
161+
protected function addRpcClients(ArrayNodeDefinition $node)
162+
{
163+
$node
164+
->children()
76165
->arrayNode('rpc_clients')
77166
->canBeUnset()
78167
->useAttributeAsKey('key')
@@ -83,6 +172,14 @@ public function getConfigTreeBuilder()
83172
->end()
84173
->end()
85174
->end()
175+
->end()
176+
;
177+
}
178+
179+
protected function addRpcServers(ArrayNodeDefinition $node)
180+
{
181+
$node
182+
->children()
86183
->arrayNode('rpc_servers')
87184
->canBeUnset()
88185
->useAttributeAsKey('key')
@@ -101,31 +198,19 @@ public function getConfigTreeBuilder()
101198
->end()
102199
->end()
103200
->end()
104-
->arrayNode('anon_consumers')
105-
->canBeUnset()
106-
->useAttributeAsKey('key')
107-
->prototype('array')
108-
->append($this->getExchangeConfiguration())
109-
->children()
110-
->scalarNode('connection')->defaultValue('default')->end()
111-
->scalarNode('callback')->isRequired()->end()
112-
->end()
113-
->end()
114-
->end()
115201
->end()
116202
;
117-
118-
return $tree;
119203
}
120204

121205
protected function getExchangeConfiguration()
122206
{
123207
$node = new ArrayNodeDefinition('exchange_options');
124208

125209
return $node
210+
->isRequired()
126211
->children()
127-
->scalarNode('name')->end()
128-
->scalarNode('type')->end()
212+
->scalarNode('name')->isRequired()->end()
213+
->scalarNode('type')->isRequired()->end()
129214
->booleanNode('passive')->defaultValue(false)->end()
130215
->booleanNode('durable')->defaultValue(true)->end()
131216
->booleanNode('auto_delete')->defaultValue(false)->end()
@@ -141,9 +226,32 @@ protected function getQueueConfiguration()
141226
{
142227
$node = new ArrayNodeDefinition('queue_options');
143228

144-
return $node
229+
$this->addQueueNodeConfiguration($node);
230+
231+
return $node;
232+
}
233+
234+
protected function getMultipleQueuesConfiguration()
235+
{
236+
$node = new ArrayNodeDefinition('queues');
237+
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');
238+
239+
$this->addQueueNodeConfiguration($prototypeNode);
240+
241+
$prototypeNode->children()
242+
->scalarNode('callback')->isRequired()->end()
243+
->end();
244+
245+
$prototypeNode->end();
246+
247+
return $node;
248+
}
249+
250+
protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
251+
{
252+
$node
145253
->children()
146-
->scalarNode('name')->end()
254+
->scalarNode('name')->isRequired()->end()
147255
->booleanNode('passive')->defaultFalse()->end()
148256
->booleanNode('durable')->defaultTrue()->end()
149257
->booleanNode('exclusive')->defaultFalse()->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace OldSound\RabbitMqBundle\DependencyInjection;
44

5+
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6+
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
57
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
68
use Symfony\Component\DependencyInjection\ContainerBuilder;
79
use Symfony\Component\DependencyInjection\Definition;
@@ -46,6 +48,7 @@ public function load(array $configs, ContainerBuilder $container)
4648
$this->loadConnections();
4749
$this->loadProducers();
4850
$this->loadConsumers();
51+
$this->loadMultipleConsumers();
4952
$this->loadAnonConsumers();
5053
$this->loadRpcClients();
5154
$this->loadRpcServers();
@@ -149,6 +152,47 @@ protected function loadConsumers()
149152
}
150153
}
151154

155+
protected function loadMultipleConsumers()
156+
{
157+
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
158+
$queues = array();
159+
160+
foreach ($consumer['queues'] as $queueName => $queueOptions) {
161+
$queues[$queueOptions['name']] = $queueOptions;
162+
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
163+
}
164+
165+
$definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
166+
$definition
167+
->addTag('old_sound_rabbit_mq.base_amqp')
168+
->addTag('old_sound_rabbit_mq.multi_consumer')
169+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
170+
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
171+
172+
if (array_key_exists('qos_options', $consumer)) {
173+
$definition->addMethodCall('setQosOptions', array(
174+
$consumer['qos_options']['prefetch_size'],
175+
$consumer['qos_options']['prefetch_count'],
176+
$consumer['qos_options']['global']
177+
));
178+
}
179+
180+
if(isset($consumer['idle_timeout'])) {
181+
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
182+
}
183+
if (!$consumer['auto_setup_fabric']) {
184+
$definition->addMethodCall('disableAutoSetupFabric');
185+
}
186+
187+
$this->injectConnection($definition, $consumer['connection']);
188+
if ($this->collectorEnabled) {
189+
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
190+
}
191+
192+
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition);
193+
}
194+
}
195+
152196
protected function loadAnonConsumers()
153197
{
154198
foreach ($this->config['anon_consumers'] as $key => $anon) {

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,42 @@ public function indexAction($name)
466466
467467
Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array.
468468
469+
### Multiple Consumers ###
470+
471+
It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing
472+
with many evolutions (forget to add a line in your supervisord configuration?). This is also useful for small queues as you may not want to have as many workers as queues, and want to regroup
473+
some tasks together without losing flexibility and separation principle.
474+
475+
Multiple consumers allow you to handle this use case by listening to multiple queues on the same consumer.
476+
477+
Here is how you can set a consumer with multiple queues:
478+
479+
```yaml
480+
multiple_consumers:
481+
upload:
482+
connection: default
483+
exchange_options: {name: 'upload', type: direct}
484+
queues:
485+
upload-picture:
486+
name: upload_picture
487+
callback: upload_picture_service
488+
routing_keys:
489+
- picture
490+
upload-video:
491+
name: upload_video
492+
callback: upload_video_service
493+
routing_keys:
494+
- video
495+
upload-stats:
496+
name: upload_stats
497+
callback: upload_stats
498+
```
499+
500+
The callback is now specified under each queues and must implement the `ConsumerInterface` like a simple consumer.
501+
All the options of `queues-options` in the consumer are available for each queue.
502+
503+
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
504+
469505
### Anonymous Consumers ###
470506
471507
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.

RabbitMq/Consumer.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ public function purge()
5959

6060
public function processMessage(AMQPMessage $msg)
6161
{
62-
6362
$processFlag = call_user_func($this->callback, $msg);
6463

64+
$this->handleProcessMessage($msg, $processFlag);
65+
}
66+
67+
protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
68+
{
6569
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
6670
// Reject and requeue message to RabbitMQ
6771
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\RabbitMq\Exception;
4+
5+
class QueueNotFoundException extends \RuntimeException
6+
{
7+
}

0 commit comments

Comments
 (0)