@@ -5,9 +5,12 @@ import javasabr.mqtt.model.QoS
55import javasabr.mqtt.model.SubscribeRetainHandling
66import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode
77import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode
8+ import javasabr.mqtt.model.subscriber.SingleSubscriber
89import javasabr.mqtt.model.subscription.Subscription
10+ import javasabr.mqtt.model.subscription.TestPublishFactory
11+ import javasabr.mqtt.network.message.out.PublishMqtt5OutMessage
912import javasabr.mqtt.service.IntegrationServiceSpecification
10- import javasabr.mqtt.service.SubscriptionService
13+ import javasabr.mqtt.service.TestExternalNetworkMqttUser
1114import javasabr.rlib.collections.array.Array
1215
1316class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
@@ -323,4 +326,131 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
323326 storedSubscriptions. size() == 3
324327 storedSubscriptions ==~ resultSubscriptions
325328 }
329+
330+ def " should only deliver 'send-if-subscription-does-not-exist' Subscribe Retain Handling once" () {
331+ given :
332+ def serverConfig = defaultExternalServerConnectionConfig
333+ def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion . MQTT_5 )
334+ def mqttUser = mqttConnection. user() as TestExternalNetworkMqttUser
335+ def subscription = new Subscription (
336+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/1" ),
337+ 30 ,
338+ QoS . AT_MOST_ONCE ,
339+ SubscribeRetainHandling . SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST ,
340+ true ,
341+ true )
342+ def subscriptions = Array . of(
343+ subscription,
344+ new Subscription (
345+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/2" ),
346+ 30 ,
347+ QoS . AT_LEAST_ONCE ,
348+ SubscribeRetainHandling . SEND ,
349+ true ,
350+ true ),
351+ new Subscription (
352+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/3" ),
353+ 30 ,
354+ QoS . EXACTLY_ONCE ,
355+ SubscribeRetainHandling . DO_NOT_SEND ,
356+ true ,
357+ true ))
358+ and :
359+ def publishWithRetain = TestPublishFactory . makePublishWithRetain(" topic/filter/1" , " payload1" )
360+ def publishWithoutRetain = TestPublishFactory . makePublishWithoutRetain(" topic/filter/1" , " payload2" )
361+ defaultPublishDeliveringService. startDelivering(publishWithRetain, new SingleSubscriber (mqttUser, subscription))
362+ defaultPublishDeliveringService. startDelivering(publishWithoutRetain, new SingleSubscriber (mqttUser, subscription))
363+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
364+ when :
365+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
366+ then :
367+ def firstPublishMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
368+ firstPublishMessage. payload() == publishWithRetain. payload()
369+ and :
370+ def secondPublishMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
371+ secondPublishMessage. payload() == publishWithoutRetain. payload()
372+ and :
373+ def thirdPublishMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
374+ thirdPublishMessage. payload() == publishWithRetain. payload()
375+ and :
376+ mqttUser. isEmpty()
377+ }
378+
379+ def " should always deliver 'send' Subscribe Retain Handling" () {
380+ given :
381+ def serverConfig = defaultExternalServerConnectionConfig
382+ def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion . MQTT_5 )
383+ def mqttUser = mqttConnection. user() as TestExternalNetworkMqttUser
384+ def subscription = new Subscription (
385+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/1" ),
386+ 30 ,
387+ QoS . AT_MOST_ONCE ,
388+ SubscribeRetainHandling . SEND ,
389+ true ,
390+ true )
391+ def subscriptions = Array . of(subscription)
392+ and :
393+ def publishWithRetain = TestPublishFactory . makePublishWithRetain(" topic/filter/1" , " payload1" )
394+ defaultPublishDeliveringService. startDelivering(publishWithRetain, new SingleSubscriber (mqttUser, subscription))
395+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
396+ when :
397+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
398+ then :
399+ def firstSentMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
400+ firstSentMessage. payload() == publishWithRetain. payload()
401+ and :
402+ def thirdSentMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
403+ thirdSentMessage. payload() == publishWithRetain. payload()
404+ and :
405+ def fourthSentMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
406+ fourthSentMessage. payload() == publishWithRetain. payload()
407+ and :
408+ mqttUser. isEmpty()
409+ }
410+
411+ def " should not deliver 'do-not-send' Subscribe Retain Handling" () {
412+ given :
413+ def serverConfig = defaultExternalServerConnectionConfig
414+ def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion . MQTT_5 )
415+ def mqttUser = mqttConnection. user() as TestExternalNetworkMqttUser
416+ def subscription = new Subscription (
417+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/1" ),
418+ 30 ,
419+ QoS . AT_MOST_ONCE ,
420+ SubscribeRetainHandling . DO_NOT_SEND ,
421+ true ,
422+ true )
423+ def subscriptions = Array . of(
424+ subscription,
425+ new Subscription (
426+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/2" ),
427+ 30 ,
428+ QoS . AT_LEAST_ONCE ,
429+ SubscribeRetainHandling . SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST ,
430+ true ,
431+ true ),
432+ new Subscription (
433+ defaultTopicService. createTopicFilter(mqttUser, " topic/filter/3" ),
434+ 30 ,
435+ QoS . EXACTLY_ONCE ,
436+ SubscribeRetainHandling . SEND ,
437+ true ,
438+ true ))
439+ and :
440+ def publishWithRetain = TestPublishFactory . makePublishWithRetain(" topic/filter/1" , " payload1" )
441+ def publishWithoutRetain = TestPublishFactory . makePublishWithoutRetain(" topic/filter/1" , " payload2" )
442+ defaultPublishDeliveringService. startDelivering(publishWithRetain, new SingleSubscriber (mqttUser, subscription))
443+ defaultPublishDeliveringService. startDelivering(publishWithoutRetain, new SingleSubscriber (mqttUser, subscription))
444+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
445+ when :
446+ defaultSubscriptionService. subscribe(mqttUser, mqttUser. session(), subscriptions)
447+ then :
448+ def firstPublishMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
449+ firstPublishMessage. payload() == publishWithRetain. payload()
450+ and :
451+ def secondPublishMessage = mqttUser. nextSentMessage(PublishMqtt5OutMessage )
452+ secondPublishMessage. payload() == publishWithoutRetain. payload()
453+ and :
454+ mqttUser. isEmpty()
455+ }
326456}
0 commit comments