Skip to content

Commit f52b46a

Browse files
committed
Merge pull request #151 from paulkamer/master
Added optional AMQP-Messages Properties to publish method of the Producer (resolves merge conflicts of #55)
2 parents dd06773 + 17870ba commit f52b46a

File tree

5 files changed

+87
-16
lines changed

5 files changed

+87
-16
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2013-01-18
2+
* adds an an optional parameter for the AMQP Message Properties for the publish method of the Producer, so they can be set as well. For example, seeting the application_headers is now possible.
3+
14
- 2012-06-04
25
* Revert PR #46. It is still possible to override parameter classes but in a proper way.
36
* Some default options for exchanges declared in the "producers" config section

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public function load(array $configs, ContainerBuilder $container)
3535
{
3636
$this->container = $container;
3737

38-
$loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__.'/../Resources/config')));
38+
$loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
3939
$loader->load('rabbitmq.xml');
4040

4141
$configuration = new Configuration();
@@ -90,7 +90,7 @@ protected function loadProducers()
9090
$definition = new Definition($producer['class']);
9191
$definition->addTag('old_sound_rabbit_mq.base_amqp');
9292
$definition->addTag('old_sound_rabbit_mq.producer');
93-
$definition->addMethodCall('setExchangeOptions', array($producer['exchange_options']));
93+
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
9494
//this producer doesn't define a queue
9595
if (!isset($producer['queue_options'])) {
9696
$producer['queue_options']['name'] = null;
@@ -121,8 +121,8 @@ protected function loadConsumers()
121121
$definition
122122
->addTag('old_sound_rabbit_mq.base_amqp')
123123
->addTag('old_sound_rabbit_mq.consumer')
124-
->addMethodCall('setExchangeOptions', array($consumer['exchange_options']))
125-
->addMethodCall('setQueueOptions', array($consumer['queue_options']))
124+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
125+
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
126126
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
127127

128128
if (array_key_exists('qos_options', $consumer)) {
@@ -156,9 +156,8 @@ protected function loadAnonConsumers()
156156
$definition
157157
->addTag('old_sound_rabbit_mq.base_amqp')
158158
->addTag('old_sound_rabbit_mq.anon_consumer')
159-
->addMethodCall('setExchangeOptions', array($anon['exchange_options']))
160-
->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')))
161-
;
159+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
160+
->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
162161
$this->injectConnection($definition, $anon['connection']);
163162
if ($this->collectorEnabled) {
164163
$this->injectLoggedChannel($definition, $key, $anon['connection']);
@@ -168,6 +167,59 @@ protected function loadAnonConsumers()
168167
}
169168
}
170169

170+
/**
171+
* Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
172+
* parameter. So we revert the change for right configurations.
173+
*
174+
* @param array $config
175+
*
176+
* @return array
177+
*/
178+
private function normalizeArgumentKeys(array $config)
179+
{
180+
if (isset($config['arguments'])) {
181+
$arguments = $config['arguments'];
182+
// support for old configuration
183+
if (is_string($arguments)) {
184+
$arguments = $this->argumentsStringAsArray($arguments);
185+
}
186+
187+
$newArguments = array();
188+
foreach ($arguments as $key => $value) {
189+
if (strstr($key, '_')) {
190+
$key = str_replace('_', '-', $key);
191+
}
192+
$newArguments[$key] = $value;
193+
}
194+
$config['arguments'] = $newArguments;
195+
}
196+
return $config;
197+
}
198+
199+
/**
200+
* Support for arguments provided as string. Support for old configuration files.
201+
*
202+
* @deprecated
203+
* @param string $arguments
204+
* @return array
205+
*/
206+
private function argumentsStringAsArray($arguments)
207+
{
208+
$argumentsArray = array();
209+
210+
$argumentPairs = explode(',', $arguments);
211+
foreach ($argumentPairs as $argument) {
212+
$argumentPair = explode(':', $argument);
213+
$type = 'S';
214+
if (isset($argumentPair[2])) {
215+
$type = $argumentPair[2];
216+
}
217+
$argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
218+
}
219+
220+
return $argumentsArray;
221+
}
222+
171223
protected function loadRpcClients()
172224
{
173225
foreach ($this->config['rpc_clients'] as $key => $client) {
@@ -193,8 +245,7 @@ protected function loadRpcServers()
193245
->addTag('old_sound_rabbit_mq.base_amqp')
194246
->addTag('old_sound_rabbit_mq.rpc_server')
195247
->addMethodCall('initServer', array($key))
196-
->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')))
197-
;
248+
->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
198249
$this->injectConnection($definition, $server['connection']);
199250
if ($this->collectorEnabled) {
200251
$this->injectLoggedChannel($definition, $key, $server['connection']);
@@ -206,12 +257,11 @@ protected function loadRpcServers()
206257

207258
protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
208259
{
209-
$id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
260+
$id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
210261
$channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
211262
$channel
212263
->setPublic(false)
213-
->addTag('old_sound_rabbit_mq.logged_channel')
214-
;
264+
->addTag('old_sound_rabbit_mq.logged_channel');
215265
$this->injectConnection($channel, $connectionName);
216266

217267
$this->container->setDefinition($id, $channel);

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ public function indexAction($name)
198198
199199
As you can see, if in your configuration you have a producer called __upload\_picture__, then in the service container you will have a service called __old_sound_rabbit_mq.upload\_picture\_producer__.
200200
201+
Besides the message itself, the `OldSound\RabbitMqBundle\RabbitMq\Producer#publish()` method also accepts an optional routing key parameter and an optional array of additional properties. The array of additional properties allows you to alter the properties with which an `PhpAmqpLib\Message\AMQPMessage` object gets constructed by default. This way, for example, you can change the application headers.
202+
201203
You can use __setContentType__ and __setDeliveryMode__ methods in order to set the message content type and the message
202204
delivery mode respectively. Default values are __text/plain__ for content type and __2__ for delivery mode.
203205

RabbitMq/BaseAmqp.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ abstract class BaseAmqp
1414
protected $queueDeclared = false;
1515
protected $routingKey = '';
1616
protected $autoSetupFabric = true;
17-
17+
protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
18+
1819
protected $exchangeOptions = array(
1920
'passive' => false,
2021
'durable' => true,

RabbitMq/Producer.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
66
use PhpAmqpLib\Message\AMQPMessage;
77

8+
/**
9+
* Prodcuer, that publishes AMQP Messages
10+
*/
811
class Producer extends BaseAmqp
912
{
1013
protected $contentType = 'text/plain';
@@ -24,13 +27,25 @@ public function setDeliveryMode($deliveryMode)
2427
return $this;
2528
}
2629

27-
public function publish($msgBody, $routingKey = '')
30+
protected function getBasicProperties()
31+
{
32+
return array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode);
33+
}
34+
35+
/**
36+
* Publishes the message and merges additional properties with basic properties
37+
*
38+
* @param string $msgBody
39+
* @param string $routingKey
40+
* @param array $additionalProperties
41+
*/
42+
public function publish($msgBody, $routingKey = '', $additionalProperties = array())
2843
{
2944
if ($this->autoSetupFabric) {
3045
$this->setupFabric();
3146
}
3247

33-
$msg = new AMQPMessage($msgBody, array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode));
34-
$this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], $routingKey);
48+
$msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
49+
$this->ch->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey);
3550
}
3651
}

0 commit comments

Comments
 (0)