diff --git a/docs/aws-provider.rst b/docs/aws-provider.rst index 8546a0b..508a4f6 100644 --- a/docs/aws-provider.rst +++ b/docs/aws-provider.rst @@ -74,9 +74,11 @@ fetch those messages. +--------------------------+-------------------------------------------------------------------------------------------+---------------+ | Option | Description | Default Value | +==========================+===========================================================================================+===============+ -| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` | +| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` | +--------------------------+-------------------------------------------------------------------------------------------+---------------+ -| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` | +| ``push_notifications_only`` | Use only SNS without SQS. Beware of max retention period of 1 hour for messages | ``false`` | ++--------------------------+-------------------------------------------------------------------------------------------+---------------+ +| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` | +--------------------------+-------------------------------------------------------------------------------------------+---------------+ .. code-block:: php diff --git a/docs/configuration.rst b/docs/configuration.rst index c05d51d..0e00e1e 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -66,6 +66,8 @@ The options and their descriptions are listed below. +---------------------------------+--------------------------------------------------------------------------------------------+---------------+ | ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` | +---------------------------------+--------------------------------------------------------------------------------------------+---------------+ +| ``push_notifications_only`` | AWS Only for now. Disable the Queue Polling if you do not require message for 1 hour+ | ``false`` | ++---------------------------------+--------------------------------------------------------------------------------------------+---------------+ | ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` | +---------------------------------+--------------------------------------------------------------------------------------------+---------------+ | ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` | @@ -137,6 +139,7 @@ A working configuration would look like the following options: queue_name: my_actual_queue_name push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 @@ -154,6 +157,7 @@ A working configuration would look like the following queue_name: my_actual_queue_name.fifo push_notifications: true notification_retries: 3 + push_notifications_only: false message_delay: 0 message_timeout: 30 message_expiration: 604800 diff --git a/integration_tests/Provider/IronMqProviderTest.php b/integration_tests/Provider/IronMqProviderTest.php index 33e128e..2efb232 100755 --- a/integration_tests/Provider/IronMqProviderTest.php +++ b/integration_tests/Provider/IronMqProviderTest.php @@ -57,6 +57,7 @@ private function getIronMqProvider(array $options = []) [ 'logging_enabled' => false, 'push_notifications' => true, + 'push_notifications_only' => false, 'push_type' => 'multicast', 'notification_retries' => 3, 'notification_retries_delay' => 60, diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 11b03ed..1f214ab 100755 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -143,6 +143,10 @@ private function getQueuesNode() ->defaultFalse() ->info('Whether notifications are sent to the subscribers') ->end() + ->booleanNode('push_notifications_only') + ->defaultFalse() + ->info('Where to disable queue polling') + ->end() ->scalarNode('push_type') ->defaultValue('multicast') ->info('Whether the push queue is multicast or unicast') diff --git a/src/Event/NotificationEvent.php b/src/Event/NotificationEvent.php index 0db73db..00bf801 100755 --- a/src/Event/NotificationEvent.php +++ b/src/Event/NotificationEvent.php @@ -109,4 +109,5 @@ public function getNotification() { return $this->notification; } + } diff --git a/src/Message/Notification.php b/src/Message/Notification.php index bec06bd..1642031 100755 --- a/src/Message/Notification.php +++ b/src/Message/Notification.php @@ -49,6 +49,7 @@ class Notification * @var ArrayCollection */ protected $metadata; + /** * Constructor. @@ -101,4 +102,5 @@ public function getMetadata() { return $this->metadata; } + } diff --git a/src/Provider/AwsProvider.php b/src/Provider/AwsProvider.php index c158cf1..f6a984e 100755 --- a/src/Provider/AwsProvider.php +++ b/src/Provider/AwsProvider.php @@ -115,12 +115,14 @@ public function create() // Create the SNS Topic $this->createTopic(); - // Add the SQS Queue as a Subscriber to the SNS Topic - $this->subscribeToTopic( - $this->topicArn, - 'sqs', - $this->sqs->getQueueArn($this->queueUrl) - ); + if(!$this->options['push_notifications_only']) { + // Add the SQS Queue as a Subscriber to the SNS Topic + $this->subscribeToTopic( + $this->topicArn, + 'sqs', + $this->sqs->getQueueArn($this->queueUrl) + ); + } // Add configured Subscribers to the SNS Topic foreach ($this->options['subscribers'] as $subscriber) { @@ -151,11 +153,12 @@ public function destroy() $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]); } - + + $topicExists = $this->topicExists(); $key = $this->getNameWithPrefix() . '_arn'; $this->cache->delete($key); - if ($this->topicExists() || !empty($this->queueUrl)) { + if ($this->options['push_notifications_only'] || ($topicExists || !empty($this->queueUrl))) { // Delete the SNS Topic $topicArn = !empty($this->topicArn) ? $this->topicArn @@ -197,8 +200,10 @@ public function publish(array $message, array $options = []) $publishStart = microtime(true); // ensures that the SQS Queue and SNS Topic exist - if (!$this->queueExists()) { - $this->create(); + if(!$this->options['push_notifications_only']) { + if (!$this->queueExists()) { + $this->create(); + } } if ($options['push_notifications']) { @@ -206,13 +211,22 @@ public function publish(array $message, array $options = []) if (!$this->topicExists()) { $this->create(); } - - $message = [ - 'default' => $this->getNameWithPrefix(), - 'sqs' => json_encode($message), - 'http' => $this->getNameWithPrefix(), - 'https' => $this->getNameWithPrefix(), - ]; + + if($this->options['push_notifications_only']) { + $jsonMessage = json_encode($message); + $message = [ + 'default' => $jsonMessage, + 'http' => $jsonMessage, + 'https' => $jsonMessage, + ]; + } else { + $message = [ + 'default' => $this->getNameWithPrefix(), + 'sqs' => json_encode($message), + 'http' => $this->getNameWithPrefix(), + 'https' => $this->getNameWithPrefix(), + ]; + } $result = $this->sns->publish([ 'TopicArn' => $this->topicArn, @@ -325,22 +339,26 @@ public function receive(array $options = []) */ public function delete($id) { - if (!$this->queueExists()) { - return false; - } + if($this->options['push_notifications_only']) { + return true; + } else { + if (!$this->queueExists()) { + return false; + } - $this->sqs->deleteMessage([ - 'QueueUrl' => $this->queueUrl, - 'ReceiptHandle' => $id - ]); + $this->sqs->deleteMessage([ + 'QueueUrl' => $this->queueUrl, + 'ReceiptHandle' => $id + ]); - $context = [ - 'QueueUrl' => $this->queueUrl, - 'ReceiptHandle' => $id - ]; - $this->log(200,"Message deleted from SQS Queue", $context); + $context = [ + 'QueueUrl' => $this->queueUrl, + 'ReceiptHandle' => $id + ]; + $this->log(200,"Message deleted from SQS Queue", $context); - return true; + return true; + } } /** @@ -389,40 +407,42 @@ public function queueExists() */ public function createQueue() { - $attributes = [ - 'VisibilityTimeout' => $this->options['message_timeout'], - 'MessageRetentionPeriod' => $this->options['message_expiration'], - 'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time'] - ]; + if(!$this->options['push_notifications_only']) { + $attributes = [ + 'VisibilityTimeout' => $this->options['message_timeout'], + 'MessageRetentionPeriod' => $this->options['message_expiration'], + 'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time'] + ]; - if ($this->isQueueFIFO()) { - $attributes['FifoQueue'] = 'true'; - $attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true - ? 'true' - : 'false'; - } + if ($this->isQueueFIFO()) { + $attributes['FifoQueue'] = 'true'; + $attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true + ? 'true' + : 'false'; + } - $result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]); + $result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]); - $this->queueUrl = $result->get('QueueUrl'); + $this->queueUrl = $result->get('QueueUrl'); - $key = $this->getNameWithPrefix() . '_url'; - $this->cache->save($key, $this->queueUrl); + $key = $this->getNameWithPrefix() . '_url'; + $this->cache->save($key, $this->queueUrl); - $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]); + $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]); - if ($this->options['push_notifications']) { + if ($this->options['push_notifications']) { - $policy = $this->createSqsPolicy(); + $policy = $this->createSqsPolicy(); - $this->sqs->setQueueAttributes([ - 'QueueUrl' => $this->queueUrl, - 'Attributes' => [ - 'Policy' => $policy, - ] - ]); + $this->sqs->setQueueAttributes([ + 'QueueUrl' => $this->queueUrl, + 'Attributes' => [ + 'Policy' => $policy, + ] + ]); - $this->log(200, "Created Updated SQS Policy"); + $this->log(200, "Created Updated SQS Policy"); + } } } @@ -643,13 +663,20 @@ public function onNotification(NotificationEvent $event, $eventName, EventDispat return; } - - $messages = $this->receive(); - foreach ($messages as $message) { - + + if($this->options['push_notifications_only']) { + $notification = $event->getNotification(); + $message = new Message($notification->getId(), $notification->getBody(), (array)$notification->getMetadata()); $messageEvent = new MessageEvent($this->name, $message); $dispatcher->dispatch(Events::Message($this->name), $messageEvent); + } else { + $messages = $this->receive(); + foreach ($messages as $message) { + $messageEvent = new MessageEvent($this->name, $message); + $dispatcher->dispatch(Events::Message($this->name), $messageEvent); + } } + } /** diff --git a/src/Resources/config/config.yml b/src/Resources/config/config.yml index cb923f6..433cb63 100755 --- a/src/Resources/config/config.yml +++ b/src/Resources/config/config.yml @@ -16,6 +16,7 @@ uecode_qpush: provider: aws #or ironmq options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 diff --git a/tests/Fixtures/config_test.yml b/tests/Fixtures/config_test.yml index 6719679..b604e84 100644 --- a/tests/Fixtures/config_test.yml +++ b/tests/Fixtures/config_test.yml @@ -36,6 +36,7 @@ uecode_qpush: provider: aws options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 @@ -48,6 +49,7 @@ uecode_qpush: provider: secondary_aws options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 @@ -60,6 +62,7 @@ uecode_qpush: provider: aws options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 @@ -74,6 +77,7 @@ uecode_qpush: provider: ironmq options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 @@ -86,6 +90,7 @@ uecode_qpush: provider: secondary_ironmq options: push_notifications: true + push_notifications_only: false notification_retries: 3 message_delay: 0 message_timeout: 30 diff --git a/tests/Provider/AbstractProviderTest.php b/tests/Provider/AbstractProviderTest.php index 1180141..1cbda02 100644 --- a/tests/Provider/AbstractProviderTest.php +++ b/tests/Provider/AbstractProviderTest.php @@ -56,15 +56,16 @@ private function getTestProvider(array $options = []) $options = array_merge( [ - 'logging_enabled' => false, - 'push_notifications' => true, - 'notification_retries' => 3, - 'message_delay' => 0, - 'message_timeout' => 30, - 'message_expiration' => 604800, - 'messages_to_receive' => 1, - 'receive_wait_time' => 3, - 'subscribers' => [ + 'logging_enabled' => false, + 'push_notifications' => true, + 'push_notifications_only' => false, + 'notification_retries' => 3, + 'message_delay' => 0, + 'message_timeout' => 30, + 'message_expiration' => 604800, + 'messages_to_receive' => 1, + 'receive_wait_time' => 3, + 'subscribers' => [ [ 'protocol' => 'http', 'endpoint' => 'http://fake.com' ] ] ], diff --git a/tests/Provider/AwsProviderTest.php b/tests/Provider/AwsProviderTest.php index 0ab8786..1ab184b 100755 --- a/tests/Provider/AwsProviderTest.php +++ b/tests/Provider/AwsProviderTest.php @@ -65,6 +65,7 @@ private function getAwsProvider(array $options = []) [ 'logging_enabled' => false, 'push_notifications' => true, + 'push_notifications_only' => false, 'notification_retries' => 3, 'message_delay' => 0, 'message_timeout' => 30, diff --git a/tests/Provider/CustomProviderTest.php b/tests/Provider/CustomProviderTest.php index 9a5b2dd..e5a6cbb 100644 --- a/tests/Provider/CustomProviderTest.php +++ b/tests/Provider/CustomProviderTest.php @@ -71,15 +71,16 @@ public function testReceive() protected function getCustomProvider() { $options = [ - 'logging_enabled' => false, - 'push_notifications' => true, - 'notification_retries' => 3, - 'message_delay' => 0, - 'message_timeout' => 30, - 'message_expiration' => 604800, - 'messages_to_receive' => 1, - 'receive_wait_time' => 3, - 'subscribers' => [] + 'logging_enabled' => false, + 'push_notifications' => true, + 'push_notifications_only' => false, + 'notification_retries' => 3, + 'message_delay' => 0, + 'message_timeout' => 30, + 'message_expiration' => 604800, + 'messages_to_receive' => 1, + 'receive_wait_time' => 3, + 'subscribers' => [] ]; $cache = $this->getMock( diff --git a/tests/Provider/IronMqProviderTest.php b/tests/Provider/IronMqProviderTest.php index e843b5a..cd009b5 100755 --- a/tests/Provider/IronMqProviderTest.php +++ b/tests/Provider/IronMqProviderTest.php @@ -58,18 +58,19 @@ private function getIronMqProvider(array $options = []) { $options = array_merge( [ - 'logging_enabled' => false, - 'push_notifications' => true, - 'push_type' => 'multicast', - 'notification_retries' => 3, - 'notification_retries_delay' => 60, - 'message_delay' => 0, - 'message_timeout' => 30, - 'message_expiration' => 604800, - 'messages_to_receive' => 1, - 'rate_limit' => -1, - 'receive_wait_time' => 3, - 'subscribers' => [ + 'logging_enabled' => false, + 'push_notifications' => true, + 'push_notifications_only' => false, + 'push_type' => 'multicast', + 'notification_retries' => 3, + 'notification_retries_delay' => 60, + 'message_delay' => 0, + 'message_timeout' => 30, + 'message_expiration' => 604800, + 'messages_to_receive' => 1, + 'rate_limit' => -1, + 'receive_wait_time' => 3, + 'subscribers' => [ [ 'protocol' => 'http', 'endpoint' => 'http://fake.com' ] ] ], diff --git a/tests/Provider/SyncProviderTest.php b/tests/Provider/SyncProviderTest.php index 14e6c7f..8547fc9 100644 --- a/tests/Provider/SyncProviderTest.php +++ b/tests/Provider/SyncProviderTest.php @@ -84,15 +84,16 @@ public function testReceive() protected function getSyncProvider() { $options = [ - 'logging_enabled' => false, - 'push_notifications' => true, - 'notification_retries' => 3, - 'message_delay' => 0, - 'message_timeout' => 30, - 'message_expiration' => 604800, - 'messages_to_receive' => 1, - 'receive_wait_time' => 3, - 'subscribers' => [ + 'logging_enabled' => false, + 'push_notifications' => true, + 'push_notifications_only' => false, + 'notification_retries' => 3, + 'message_delay' => 0, + 'message_timeout' => 30, + 'message_expiration' => 604800, + 'messages_to_receive' => 1, + 'receive_wait_time' => 3, + 'subscribers' => [ [ 'protocol' => 'http', 'endpoint' => 'http://fake.com' ] ] ];