Skip to content

Commit fc54885

Browse files
committed
Merge pull request #176 from papoola/patch
timeout migrated from getReplies() to addRequest() as expiration #2
2 parents b39c6f9 + 86d3102 commit fc54885

File tree

2 files changed

+29
-10
lines changed

2 files changed

+29
-10
lines changed

README.md

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,7 @@ public function indexAction($name)
393393
{
394394
$client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
395395
$client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
396-
$timeout = 5; // seconds
397-
try {
398-
$replies = $client->getReplies($timeout);
399-
// process $replies['request_id'];
400-
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
401-
// handle timeout
402-
}
396+
$replies = $client->getReplies();
403397
}
404398
```
405399
@@ -413,6 +407,23 @@ The arguments we are sending are the __min__ and __max__ values for the `rand()`
413407
414408
The final piece is to get the reply. Our PHP script will block till the server returns a value. The __$replies__ variable will be an associative array where each reply from the server will contained in the respective __request\_id__ key.
415409
410+
You can also set a expiration for request in seconds, after which message will no longer be handled by server and client request will simply time out. Setting expiration for messages works only for RabbitMQ 3.x and above. Visit http://www.rabbitmq.com/ttl.html#per-message-ttl for more information.
411+
412+
```php
413+
public function indexAction($name)
414+
{
415+
$expiration = 5; // seconds
416+
$client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
417+
$client->addRequest($body, $server, $requestId, $expiration);
418+
try {
419+
$replies = $client->getReplies();
420+
// process $replies['request_id'];
421+
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
422+
// handle timeout
423+
}
424+
}
425+
```
426+
416427
As you can guess, we can also make __parallel RPC calls__.
417428
418429
### Parallel RPC ###

RabbitMq/RpcClient.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,38 +10,46 @@ class RpcClient extends BaseAmqp
1010
protected $requests = 0;
1111
protected $replies = array();
1212
protected $queueName;
13+
protected $timeout = 0;
1314

1415
public function initClient()
1516
{
1617
list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, true);
1718
}
1819

19-
public function addRequest($msgBody, $server, $requestId = null, $routingKey = '')
20+
public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0)
2021
{
2122
if (empty($requestId)) {
2223
throw new \InvalidArgumentException('You must provide a $requestId');
2324
}
2425

2526
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
2627
'reply_to' => $this->queueName,
28+
'delivery_mode' => 1, // non durable
29+
'expiration' => $expiration*1000,
2730
'correlation_id' => $requestId));
2831

2932
$this->getChannel()->basic_publish($msg, $server, $routingKey);
3033

3134
$this->requests++;
35+
36+
if ($expiration > $this->timeout) {
37+
$this->timeout = $expiration;
38+
}
3239
}
3340

34-
public function getReplies($timeout = 0)
41+
public function getReplies()
3542
{
3643
$this->replies = array();
3744
$this->getChannel()->basic_consume($this->queueName, '', false, true, false, false, array($this, 'processMessage'));
3845

3946
while (count($this->replies) < $this->requests) {
40-
$this->getChannel()->wait(null, false, $timeout);
47+
$this->getChannel()->wait(null, false, $this->timeout);
4148
}
4249

4350
$this->getChannel()->basic_cancel($this->queueName);
4451
$this->requests = 0;
52+
$this->timeout = 0;
4553

4654
return $this->replies;
4755
}

0 commit comments

Comments
 (0)