Skip to content

Commit b829090

Browse files
committed
[broker-30] Add tests
1 parent c3ab5c5 commit b829090

File tree

2 files changed

+89
-26
lines changed

2 files changed

+89
-26
lines changed

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import javasabr.mqtt.model.MqttClientConnectionConfig;
99
import javasabr.mqtt.model.MqttUser;
10+
import javasabr.mqtt.model.QoS;
1011
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
1112
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
1213
import javasabr.mqtt.model.session.ActiveSubscriptions;
@@ -81,13 +82,13 @@ private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession sessio
8182
if (previous != null) {
8283
activeSubscriptions.remove(previous.subscription());
8384
}
84-
if ((subscription.retainHandling() == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previous == null)
85-
|| subscription.retainHandling() == SEND) {
85+
QoS subscriptionQoS = subscription.qos();
86+
if (subscriptionQoS.ordinal() <= 2 && (subscription.retainHandling() == SEND ||
87+
(subscription.retainHandling() == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previous == null))) {
8688
sendRetainedMessages(user, subscription);
8789
}
8890
activeSubscriptions.add(subscription);
89-
return subscription
90-
.qos()
91+
return subscriptionQoS
9192
.subscribeAckReasonCode();
9293
}
9394

@@ -142,19 +143,8 @@ public void restoreSubscriptions(MqttUser user, MqttSession session) {
142143
}
143144

144145
private void sendRetainedMessages(MqttUser user, Subscription subscription) {
145-
SubscribeAckReasonCode subscribeAckReasonCode = subscription
146-
.qos()
147-
.subscribeAckReasonCode();
148-
String clientId = user.clientId();
149-
if (subscribeAckReasonCode.ordinal() > 2) {
150-
log.debug(
151-
clientId,
152-
subscription,
153-
subscribeAckReasonCode,
154-
"[%s] Unable to send retained messages for [%s] due to wrong subscribeAckReasonCode [%s]"::formatted);
155-
return;
156-
}
157146
int count = 0;
147+
String clientId = user.clientId();
158148
PublishHandlingResult errorResult = null;
159149
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
160150
var results = publishDeliveringService.deliverRetainedMessages(singleSubscriber);
@@ -167,7 +157,7 @@ private void sendRetainedMessages(MqttUser user, Subscription subscription) {
167157
if (errorResult != null) {
168158
log.debug(clientId, errorResult, "[%s] Error occurred [%s] during sending retained messages"::formatted);
169159
} else {
170-
log.debug(clientId, count, "[%s] Delivering [%s] retained messages has been started"::formatted);
160+
log.debug(clientId, count, "[%s] Delivering of [%s] retained message has been started"::formatted);
171161
}
172162
}
173163
}

core-service/src/test/groovy/javasabr/mqtt/service/impl/InMemorySubscriptionServiceTest.groovy

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode
88
import javasabr.mqtt.model.subscriber.SingleSubscriber
99
import javasabr.mqtt.model.subscription.Subscription
1010
import javasabr.mqtt.model.subscription.TestPublishFactory
11+
import javasabr.mqtt.model.topic.TopicFilter
12+
import javasabr.mqtt.model.topic.TopicName
13+
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler
14+
import javasabr.mqtt.network.impl.InternalNetworkMqttUser
1115
import javasabr.mqtt.network.message.out.PublishMqtt5OutMessage
1216
import javasabr.mqtt.service.IntegrationServiceSpecification
1317
import javasabr.mqtt.service.TestExternalNetworkMqttUser
@@ -514,21 +518,90 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
514518
secondSentMessage.retain()
515519
and:
516520
mqttUser.isEmpty()
521+
}
517522

523+
def "should not send retained messages in case of invalid QoS"() {
524+
given:
525+
def serverConfig = defaultExternalServerConnectionConfig
526+
def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion.MQTT_5)
527+
def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser
528+
def subscription = new Subscription(
529+
defaultTopicService.createTopicFilter(mqttUser, "topic/filter/1"),
530+
30,
531+
QoS.INVALID,
532+
SubscribeRetainHandling.SEND,
533+
true,
534+
true)
535+
def subscriptions = Array.of(subscription)
536+
and:
537+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
538+
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
518539
when:
519-
def publishWithRetain2 = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload2")
520-
defaultPublishDeliveringService.startDelivering(publishWithRetain2, new SingleSubscriber(mqttUser, subscription))
521540
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
522541
then:
523-
def thirdSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
524-
thirdSentMessage.payload() == publishWithRetain2.payload()
525-
thirdSentMessage.retain()
542+
mqttUser.isEmpty()
543+
}
544+
545+
def "should clean and restore subscriptions"() {
546+
given:
547+
def serverConfig = defaultExternalServerConnectionConfig
548+
def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion.MQTT_5)
549+
def expectedUser = mqttConnection.user() as TestExternalNetworkMqttUser
550+
def expectedSubscription = new Subscription(
551+
TopicFilter.valueOf("topic"),
552+
30,
553+
QoS.AT_MOST_ONCE,
554+
SubscribeRetainHandling.SEND,
555+
true,
556+
true)
557+
when:
558+
defaultSubscriptionService.subscribe(expectedUser, expectedUser.session(), Array.of(expectedSubscription))
559+
def subscribers = defaultSubscriptionService.findSubscribers(TopicName.valueOf("topic"))
560+
then:
561+
!subscribers.isEmpty()
562+
with(subscribers[0]) {
563+
user() == expectedUser
564+
subscription() == expectedSubscription
565+
}
566+
when:
567+
defaultSubscriptionService.cleanSubscriptions(expectedUser, expectedUser.session())
568+
subscribers = defaultSubscriptionService.findSubscribers(TopicName.valueOf("topic"))
569+
then:
570+
subscribers.isEmpty()
571+
572+
when:
573+
defaultSubscriptionService.restoreSubscriptions(expectedUser, expectedUser.session())
574+
subscribers = defaultSubscriptionService.findSubscribers(TopicName.valueOf("topic"))
575+
then:
576+
!subscribers.isEmpty()
577+
with(subscribers[0]) {
578+
user() == expectedUser
579+
subscription() == expectedSubscription
580+
}
581+
}
582+
def "should suppress retained message delivering failure"() {
583+
given:
584+
def serverConfig = defaultExternalServerConnectionConfig
585+
def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion.MQTT_5)
586+
def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser
587+
def anotherUser = new InternalNetworkMqttUser(mqttConnection, Mock(NetworkMqttUserReleaseHandler))
588+
def subscription = new Subscription(
589+
defaultTopicService.createTopicFilter(mqttUser, "topic/filter/1"),
590+
30,
591+
QoS.AT_MOST_ONCE,
592+
SubscribeRetainHandling.SEND,
593+
true,
594+
true)
595+
def subscriptions = Array.of(subscription)
526596
and:
527-
def fourthSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
528-
fourthSentMessage.payload() == publishWithRetain2.payload()
529-
fourthSentMessage.retain()
597+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
598+
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
599+
when:
600+
defaultSubscriptionService.subscribe(anotherUser, mqttUser.session(), subscriptions)
601+
then:
602+
def firstSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
603+
firstSentMessage.payload() == publishWithRetain.payload()
530604
and:
531605
mqttUser.isEmpty()
532-
533606
}
534607
}

0 commit comments

Comments
 (0)