Skip to content

Commit 6c16962

Browse files
committed
[broker-30] Rework retainAsPublished handling
1 parent 26f45dc commit 6c16962

File tree

6 files changed

+67
-66
lines changed

6 files changed

+67
-66
lines changed

core-service/src/main/java/javasabr/mqtt/service/RetainMessageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
public interface RetainMessageService {
1010

11-
void retainMessage(Publish publish, Subscription subscription);
11+
void retainMessage(Publish publish);
1212

1313
Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber);
1414
}

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultRetainMessageService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package javasabr.mqtt.service.impl;
22

3-
import java.util.function.Function;
43
import javasabr.mqtt.model.publishing.Publish;
54
import javasabr.mqtt.model.subscriber.SingleSubscriber;
65
import javasabr.mqtt.model.subscription.Subscription;
@@ -25,20 +24,22 @@ public DefaultRetainMessageService(PublishDeliveringService defaultPublishDelive
2524
}
2625

2726
@Override
28-
public void retainMessage(Publish publish, Subscription subscription) {
27+
public void retainMessage(Publish publish) {
2928
if (publish.retained()) {
30-
boolean retainAsPublished = subscription.retainAsPublished();
31-
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
32-
retainedMessageTree.retainMessage(transformer.apply(publish));
29+
retainedMessageTree.retainMessage(publish);
3330
}
3431
}
3532

3633
@Override
3734
public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber) {
3835
Subscription subscription = subscriber.subscription();
36+
boolean retainAsPublished = subscription.retainAsPublished();
3937
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
4038
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
4139
for (Publish message : retainedMessages) {
40+
if (!retainAsPublished) {
41+
message = message.withoutRetain();
42+
}
4243
result.add(defaultPublishDeliveringService.startDelivering(message, subscriber));
4344
}
4445
return Array.copyOf(result);

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ protected boolean validateImpl(U user, NetworkMqttSession session, Publish publi
5454
}
5555

5656
protected void handleImpl(U user, NetworkMqttSession session, Publish publish) {
57+
retainMessageService.retainMessage(publish);
58+
5759
TopicName topicName = publish.topicName();
5860
Array<SingleSubscriber> subscribers = subscriptionService.findSubscribers(topicName);
5961
if (subscribers.isEmpty()) {
@@ -105,7 +107,6 @@ protected PublishHandlingResult checkSubscriber(
105107
}
106108

107109
protected void startDelivering(Publish publish, SingleSubscriber subscriber) {
108-
retainMessageService.retainMessage(publish, subscriber.subscription());
109110
publishDeliveringService.startDelivering(publish, subscriber);
110111
}
111112

core-service/src/test/groovy/javasabr/mqtt/service/impl/InMemorySubscriptionServiceTest.groovy

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,11 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
360360
true,
361361
true))
362362
and:
363-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
364-
defaultRetainMessageService.retainMessage(publishWithRetain, subscription)
363+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
364+
defaultRetainMessageService.retainMessage(publishWithRetain)
365365
and:
366-
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
367-
defaultRetainMessageService.retainMessage(publishWithoutRetain, subscription)
366+
def publishWithoutRetain = TestPublishFactory.createPublishWithoutRetain("topic/filter/1", "payload2")
367+
defaultRetainMessageService.retainMessage(publishWithoutRetain)
368368
when:
369369
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
370370
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -389,7 +389,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
389389
true)
390390
def subscriptions = Array.of(subscription)
391391
and:
392-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
392+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
393393
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
394394
when:
395395
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -436,10 +436,10 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
436436
true,
437437
true))
438438
and:
439-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
439+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
440440
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
441441
and:
442-
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
442+
def publishWithoutRetain = TestPublishFactory.createPublishWithoutRetain("topic/filter/1", "payload2")
443443
defaultPublishDeliveringService.startDelivering(publishWithoutRetain, new SingleSubscriber(mqttUser, subscription))
444444
when:
445445
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -468,8 +468,8 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
468468
false)
469469
def subscriptions = Array.of(subscription)
470470
and:
471-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
472-
defaultRetainMessageService.retainMessage(publishWithRetain, subscription)
471+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
472+
defaultRetainMessageService.retainMessage(publishWithRetain)
473473
when:
474474
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
475475
then:
@@ -495,8 +495,8 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
495495
def subscriptions = Array.of(subscription)
496496

497497
when:
498-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
499-
defaultRetainMessageService.retainMessage(publishWithRetain, subscription)
498+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
499+
defaultRetainMessageService.retainMessage(publishWithRetain)
500500
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
501501
then:
502502
def secondSentMessage = mqttUser.nextSentMessage(PublishMqtt5OutMessage)
@@ -520,7 +520,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
520520
true)
521521
def subscriptions = Array.of(subscription)
522522
and:
523-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
523+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
524524
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
525525
when:
526526
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -580,7 +580,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
580580
true)
581581
def subscriptions = Array.of(subscription)
582582
and:
583-
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
583+
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
584584
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
585585
when:
586586
defaultSubscriptionService.subscribe(anotherUser, mqttUser.session(), subscriptions)

model/src/test/groovy/javasabr/mqtt/model/topic/tree/RetainedMessageTreeTest.groovy

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ import javasabr.mqtt.model.publishing.Publish
44
import javasabr.mqtt.model.topic.TopicFilter
55
import javasabr.mqtt.test.support.UnitSpecification
66

7-
import static javasabr.mqtt.model.subscription.TestPublishFactory.makePublish
7+
import static javasabr.mqtt.model.subscription.TestPublishFactory.createPublish
88

99
class RetainedMessageTreeTest extends UnitSpecification {
1010

1111
def "should fetch retained messages by topic filter"(
12-
List<Publish> messages,
12+
List<String> messages,
1313
String topicFilter,
14-
List<Publish> expectedMessages) {
14+
List<String> expectedMessages) {
1515
given:
1616
ConcurrentRetainedMessageTree retainedMessageTree = new ConcurrentRetainedMessageTree();
17-
messages.eachWithIndex { Publish message, int i ->
17+
messages.collect { createPublish(it) }.eachWithIndex { Publish message, int i ->
1818
retainedMessageTree.retainMessage(message)
1919
}
2020
when:
@@ -23,7 +23,7 @@ class RetainedMessageTreeTest extends UnitSpecification {
2323
then:
2424
retainedMessages.size() == expectedMessages.size()
2525
for (int i = 0; i < retainedMessages.size(); i++) {
26-
assert retainedMessages.get(i).topicName() == expectedMessages.get(i).topicName()
26+
assert retainedMessages[i].topicName().rawTopic() == expectedMessages[i]
2727
}
2828
where:
2929
topicFilter << [
@@ -35,63 +35,63 @@ class RetainedMessageTreeTest extends UnitSpecification {
3535
]
3636
messages << [
3737
[
38-
makePublish("/topic/segment1"),
39-
makePublish("/topic/segment2"),
40-
makePublish("/topic/segment1/segment2"),
41-
makePublish("/topic/"),
42-
makePublish("/topic")
38+
"/topic/segment1",
39+
"/topic/segment2",
40+
"/topic/segment1/segment2",
41+
"/topic/",
42+
"/topic"
4343
],
4444
[
45-
makePublish("/topic/segment1"),
46-
makePublish("/topic/segment2"),
47-
makePublish("/topic/segment1/segment2"),
48-
makePublish("/topic/"),
49-
makePublish("/topic/segment2"),
50-
makePublish("/"),
51-
makePublish("/topic/segment2/segment1")
45+
"/topic/segment1",
46+
"/topic/segment2",
47+
"/topic/segment1/segment2",
48+
"/topic/",
49+
"/topic/segment2",
50+
"/",
51+
"/topic/segment2/segment1"
5252
],
5353
[
54-
makePublish("/topic/segment1"),
55-
makePublish("/topic/segment2"),
56-
makePublish("/topic/segment3"),
57-
makePublish("/topic/segment3"),
58-
makePublish("/topic/segment3"),
59-
makePublish("/topic/segment3")
54+
"/topic/segment1",
55+
"/topic/segment2",
56+
"/topic/segment3",
57+
"/topic/segment3",
58+
"/topic/segment3",
59+
"/topic/segment3"
6060
],
6161
[
62-
makePublish("/topic/segment1"),
63-
makePublish("/topic/segment2"),
64-
makePublish("/topic/segment1/segment2"),
65-
makePublish("/topic/segment500/segment2"),
66-
makePublish("/topic/"),
67-
makePublish("/topic")
62+
"/topic/segment1",
63+
"/topic/segment2",
64+
"/topic/segment1/segment2",
65+
"/topic/segment500/segment2",
66+
"/topic/",
67+
"/topic"
6868
],
6969
[
70-
makePublish("/topic1/segment1"),
71-
makePublish("/topic/segment2"),
72-
makePublish("/topic2/segment1/segment2"),
73-
makePublish("/topic/segment3"),
74-
makePublish("/topic/segment1/segment2")
70+
"/topic1/segment1",
71+
"/topic/segment2",
72+
"/topic2/segment1/segment2",
73+
"/topic/segment3",
74+
"/topic/segment1/segment2"
7575
]
7676
]
7777
expectedMessages << [
7878
[
79-
makePublish("/topic/segment1")
79+
"/topic/segment1"
8080
],
8181
[
82-
makePublish("/topic/segment2")
82+
"/topic/segment2"
8383
],
8484
[
85-
makePublish("/topic/segment3")
85+
"/topic/segment3"
8686
],
8787
[
88-
makePublish("/topic/segment1/segment2"),
89-
makePublish("/topic/segment500/segment2")
88+
"/topic/segment1/segment2",
89+
"/topic/segment500/segment2"
9090
],
9191
[
92-
makePublish("/topic/segment2"),
93-
makePublish("/topic/segment3"),
94-
makePublish("/topic/segment1/segment2")
92+
"/topic/segment2",
93+
"/topic/segment3",
94+
"/topic/segment1/segment2"
9595
]
9696
]
9797
}

model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestPublishFactory.groovy

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import static java.nio.charset.StandardCharsets.UTF_8
1111

1212
class TestPublishFactory {
1313

14-
static def makePublish(String topicName) {
14+
static def createPublish(String topicName) {
1515
return new Publish(
1616
1,
1717
QoS.AT_MOST_ONCE,
@@ -29,7 +29,7 @@ class TestPublishFactory {
2929
Array.of());
3030
}
3131

32-
static def makePublishWithRetain(String topicName, String payload) {
32+
static def createPublishWithRetain(String topicName, String payload) {
3333
return new Publish(
3434
1,
3535
QoS.AT_MOST_ONCE,
@@ -47,7 +47,7 @@ class TestPublishFactory {
4747
Array.of());
4848
}
4949

50-
static def makePublishWithoutRetain(String topicName, String payload) {
50+
static def createPublishWithoutRetain(String topicName, String payload) {
5151
return new Publish(
5252
1,
5353
QoS.AT_MOST_ONCE,
@@ -64,5 +64,4 @@ class TestPublishFactory {
6464
PayloadFormat.UTF8_STRING,
6565
Array.of());
6666
}
67-
6867
}

0 commit comments

Comments
 (0)