Skip to content

Commit 908fe41

Browse files
author
steveYeah
committed
Merge remote-tracking branch 'upstream/master'
Conflicts: README.md RabbitMq/RpcClient.php
2 parents d7fc18a + fc54885 commit 908fe41

File tree

2 files changed

+30
-11
lines changed

2 files changed

+30
-11
lines changed

README.md

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,7 @@ public function indexAction($name)
393393
{
394394
$client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
395395
$client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
396-
$timeout = 5; // seconds
397-
try {
398-
$replies = $client->getReplies($timeout);
399-
// process $replies['request_id'];
400-
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
401-
// handle timeout
402-
}
396+
$replies = $client->getReplies();
403397
}
404398
```
405399
@@ -421,7 +415,24 @@ rpc_clients:
421415
connection: default
422416
expect_serialized_response: false
423417
```
424-
418+
419+
You can also set a expiration for request in seconds, after which message will no longer be handled by server and client request will simply time out. Setting expiration for messages works only for RabbitMQ 3.x and above. Visit http://www.rabbitmq.com/ttl.html#per-message-ttl for more information.
420+
421+
```php
422+
public function indexAction($name)
423+
{
424+
$expiration = 5; // seconds
425+
$client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
426+
$client->addRequest($body, $server, $requestId, $expiration);
427+
try {
428+
$replies = $client->getReplies();
429+
// process $replies['request_id'];
430+
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
431+
// handle timeout
432+
}
433+
}
434+
```
435+
425436
As you can guess, we can also make __parallel RPC calls__.
426437
427438
### Parallel RPC ###

RabbitMq/RpcClient.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,47 @@ class RpcClient extends BaseAmqp
1111
protected $replies = array();
1212
protected $queueName;
1313
protected $expectSerializedResponse;
14+
protected $timeout = 0;
1415

1516
public function initClient($expectSerializedResponse = true)
1617
{
1718
list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, true);
1819
$this->expectSerializedResponse = $expectSerializedResponse;
1920
}
2021

21-
public function addRequest($msgBody, $server, $requestId = null, $routingKey = '')
22+
public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0)
2223
{
2324
if (empty($requestId)) {
2425
throw new \InvalidArgumentException('You must provide a $requestId');
2526
}
2627

2728
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
2829
'reply_to' => $this->queueName,
30+
'delivery_mode' => 1, // non durable
31+
'expiration' => $expiration*1000,
2932
'correlation_id' => $requestId));
3033

3134
$this->getChannel()->basic_publish($msg, $server, $routingKey);
3235

3336
$this->requests++;
37+
38+
if ($expiration > $this->timeout) {
39+
$this->timeout = $expiration;
40+
}
3441
}
3542

36-
public function getReplies($timeout = 0)
43+
public function getReplies()
3744
{
3845
$this->replies = array();
3946
$this->getChannel()->basic_consume($this->queueName, '', false, true, false, false, array($this, 'processMessage'));
4047

4148
while (count($this->replies) < $this->requests) {
42-
$this->getChannel()->wait(null, false, $timeout);
49+
$this->getChannel()->wait(null, false, $this->timeout);
4350
}
4451

4552
$this->getChannel()->basic_cancel($this->queueName);
4653
$this->requests = 0;
54+
$this->timeout = 0;
4755

4856
return $this->replies;
4957
}

0 commit comments

Comments
 (0)