Skip to content

Commit c3ab5c5

Browse files
committed
[broker-30] Fix retainAsPublished logic
1 parent 1d37242 commit c3ab5c5

File tree

6 files changed

+101
-23
lines changed

6 files changed

+101
-23
lines changed

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,18 @@ public DefaultPublishDeliveringService(
5353
@Override
5454
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
5555
if (publish.retained()) {
56-
retainedMessageTree.retainMessage(publish);
56+
Subscription subscription = subscriber.subscription();
57+
boolean retainAsPublished = subscription.retainAsPublished();
58+
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
59+
retainedMessageTree.retainMessage(transformer.apply(publish));
5760
}
5861
return startDeliveringWithoutRetain(publish, subscriber);
5962
}
6063

6164
@Override
6265
public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber) {
6366
Subscription subscription = subscriber.subscription();
64-
boolean retainAsPublished = subscription.retainAsPublished();
65-
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
66-
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter(), transformer);
67+
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
6768
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
6869
for (Publish message : retainedMessages) {
6970
result.add(startDeliveringWithoutRetain(message, subscriber));

core-service/src/test/groovy/javasabr/mqtt/service/impl/InMemorySubscriptionServiceTest.groovy

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -357,12 +357,13 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
357357
true))
358358
and:
359359
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
360-
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
361360
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
361+
and:
362+
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
362363
defaultPublishDeliveringService.startDelivering(publishWithoutRetain, new SingleSubscriber(mqttUser, subscription))
363-
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
364364
when:
365365
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
366+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
366367
then:
367368
def firstPublishMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
368369
firstPublishMessage.payload() == publishWithRetain.payload()
@@ -392,9 +393,9 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
392393
and:
393394
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
394395
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
395-
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
396396
when:
397397
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
398+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
398399
then:
399400
def firstSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
400401
firstSentMessage.payload() == publishWithRetain.payload()
@@ -438,12 +439,13 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
438439
true))
439440
and:
440441
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
441-
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
442442
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
443+
and:
444+
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
443445
defaultPublishDeliveringService.startDelivering(publishWithoutRetain, new SingleSubscriber(mqttUser, subscription))
444-
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
445446
when:
446447
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
448+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
447449
then:
448450
def firstPublishMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
449451
firstPublishMessage.payload() == publishWithRetain.payload()
@@ -453,4 +455,80 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
453455
and:
454456
mqttUser.isEmpty()
455457
}
458+
459+
def "should reset retain flag if 'retain as published' is false"() {
460+
given:
461+
def serverConfig = defaultExternalServerConnectionConfig
462+
def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion.MQTT_5)
463+
def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser
464+
def subscription = new Subscription(
465+
defaultTopicService.createTopicFilter(mqttUser, "topic/filter/1"),
466+
30,
467+
QoS.AT_MOST_ONCE,
468+
SubscribeRetainHandling.SEND,
469+
true,
470+
false)
471+
def subscriptions = Array.of(subscription)
472+
and:
473+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
474+
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
475+
when:
476+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
477+
then:
478+
def firstSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
479+
firstSentMessage.payload() == publishWithRetain.payload()
480+
firstSentMessage.retain()
481+
and:
482+
def thirdSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
483+
thirdSentMessage.payload() == publishWithRetain.payload()
484+
!thirdSentMessage.retain()
485+
and:
486+
mqttUser.isEmpty()
487+
}
488+
489+
def "should keep retain flag if 'retain as published' is true"() {
490+
given:
491+
def serverConfig = defaultExternalServerConnectionConfig
492+
def mqttConnection = mockedExternalConnection(serverConfig, MqttVersion.MQTT_5)
493+
def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser
494+
def subscription = new Subscription(
495+
defaultTopicService.createTopicFilter(mqttUser, "topic/filter/1"),
496+
30,
497+
QoS.AT_MOST_ONCE,
498+
SubscribeRetainHandling.SEND,
499+
true,
500+
true)
501+
def subscriptions = Array.of(subscription)
502+
503+
when:
504+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
505+
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
506+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
507+
then:
508+
def firstSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
509+
firstSentMessage.payload() == publishWithRetain.payload()
510+
firstSentMessage.retain()
511+
and:
512+
def secondSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
513+
secondSentMessage.payload() == publishWithRetain.payload()
514+
secondSentMessage.retain()
515+
and:
516+
mqttUser.isEmpty()
517+
518+
when:
519+
def publishWithRetain2 = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload2")
520+
defaultPublishDeliveringService.startDelivering(publishWithRetain2, new SingleSubscriber(mqttUser, subscription))
521+
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
522+
then:
523+
def thirdSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
524+
thirdSentMessage.payload() == publishWithRetain2.payload()
525+
thirdSentMessage.retain()
526+
and:
527+
def fourthSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
528+
fourthSentMessage.payload() == publishWithRetain2.payload()
529+
fourthSentMessage.retain()
530+
and:
531+
mqttUser.isEmpty()
532+
533+
}
456534
}

model/src/main/java/javasabr/mqtt/model/subscription/Subscription.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ public record Subscription(
3333
boolean noLocal,
3434
/*
3535
If true, Application Messages forwarded using this subscription keep the RETAIN flag they were published with. If
36-
false, Application Messages forwarded using this subscription have the RETAIN flag set to 0. Retained messages sent
37-
when the subscription is established have the RETAIN flag set to 1.
36+
false, Application Messages forwarded using this subscription have the RETAIN flag set to 0.
37+
38+
Bit 3 of the Subscription Options represents the Retain As Published option.
39+
If 1, Application Messages forwarded using this subscription keep the RETAIN flag they were published with.
40+
If 0, Application Messages forwarded using this subscription have the RETAIN flag set to 0.
41+
Retained messages sent when the subscription is established have the RETAIN flag set to 1.
3842
*/
3943
boolean retainAsPublished) {
4044

model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentRetainedMessageTree.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public void retainMessage(Publish message) {
2222
rootNode.retainMessage(0, message, message.topicName());
2323
}
2424

25-
public Array<Publish> getRetainedMessage(TopicFilter topicFilter, Function<Publish, Publish> publishTransformer) {
25+
public Array<Publish> getRetainedMessage(TopicFilter topicFilter) {
2626
var resultArray = MutableArray.ofType(Publish.class);
27-
rootNode.collectRetainedMessages(0, topicFilter, resultArray, publishTransformer);
27+
rootNode.collectRetainedMessages(0, topicFilter, resultArray);
2828
return Array.copyOf(resultArray);
2929
}
3030
}

model/src/main/java/javasabr/mqtt/model/topic/tree/RetainedMessageNode.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.Queue;
44
import java.util.concurrent.atomic.AtomicReference;
5-
import java.util.function.Function;
65
import java.util.function.Supplier;
76
import javasabr.mqtt.base.util.DebugUtils;
87
import javasabr.mqtt.model.AbstractTrieNode;
@@ -53,12 +52,11 @@ public void retainMessage(int level, Publish message, TopicName topicName) {
5352
public void collectRetainedMessages(
5453
int level,
5554
TopicFilter topicFilter,
56-
MutableArray<Publish> result,
57-
Function<Publish, Publish> publishTransformer) {
55+
MutableArray<Publish> result) {
5856
if (level == topicFilter.levelsCount()) {
5957
Publish publish = retainedMessage.get();
6058
if (publish != null) {
61-
result.add(publishTransformer.apply(publish));
59+
result.add(publish);
6260
}
6361
return;
6462
}
@@ -72,13 +70,13 @@ public void collectRetainedMessages(
7270
var localChildNodes = getChildNodes(RetainedMessageNode::childNodesFactory);
7371
if (localChildNodes != null) {
7472
for (RetainedMessageNode childNode : localChildNodes) {
75-
childNode.collectRetainedMessages(level + 1, topicFilter, result, publishTransformer);
73+
childNode.collectRetainedMessages(level + 1, topicFilter, result);
7674
}
7775
}
7876
} else {
7977
RetainedMessageNode retainedMessageNode = getChildNode(segment);
8078
if (retainedMessageNode != null) {
81-
retainedMessageNode.collectRetainedMessages(level + 1, topicFilter, result, publishTransformer);
79+
retainedMessageNode.collectRetainedMessages(level + 1, topicFilter, result);
8280
}
8381
}
8482
}

model/src/test/groovy/javasabr/mqtt/model/topic/tree/RetainedMessageTreeTest.groovy

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package javasabr.mqtt.model.topic.tree
22

3-
43
import javasabr.mqtt.model.publishing.Publish
54
import javasabr.mqtt.model.topic.TopicFilter
65
import javasabr.mqtt.test.support.UnitSpecification
76

8-
import java.util.function.Function
9-
107
import static javasabr.mqtt.model.subscription.TestPublishFactory.makePublish
118

129
class RetainedMessageTreeTest extends UnitSpecification {
@@ -21,7 +18,7 @@ class RetainedMessageTreeTest extends UnitSpecification {
2118
retainedMessageTree.retainMessage(message)
2219
}
2320
when:
24-
def retainedMessages = retainedMessageTree.getRetainedMessage(TopicFilter.valueOf(topicFilter), Function.identity())
21+
def retainedMessages = retainedMessageTree.getRetainedMessage(TopicFilter.valueOf(topicFilter))
2522
.collect { it }
2623
then:
2724
retainedMessages.size() == expectedMessages.size()

0 commit comments

Comments
 (0)