Skip to content

Commit cc08339

Browse files
committed
[broker-30] Avoid double message retaining
1 parent bf837eb commit cc08339

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

core-service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import javasabr.mqtt.model.publishing.Publish;
44
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5-
import javasabr.mqtt.model.topic.TopicFilter;
65
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
76
import javasabr.rlib.collections.array.Array;
87

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,10 @@ public DefaultPublishDeliveringService(
5252

5353
@Override
5454
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
55-
try {
56-
if (publish.retained()) {
57-
retainedMessageTree.retainMessage(publish);
58-
}
59-
//noinspection DataFlowIssue
60-
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
61-
} catch (IndexOutOfBoundsException | NullPointerException ex) {
62-
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
63-
return PublishHandlingResult.UNSPECIFIED_ERROR;
55+
if (publish.retained()) {
56+
retainedMessageTree.retainMessage(publish);
6457
}
58+
return startDeliveringWithoutRetain(publish, subscriber);
6559
}
6660

6761
@Override
@@ -72,11 +66,21 @@ public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber sub
7266
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter(), transformer);
7367
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
7468
for (Publish message : retainedMessages) {
75-
result.add(startDelivering(message, subscriber));
69+
result.add(startDeliveringWithoutRetain(message, subscriber));
7670
}
7771
return Array.copyOf(result);
7872
}
7973

74+
private PublishHandlingResult startDeliveringWithoutRetain(Publish publish, SingleSubscriber subscriber) {
75+
try {
76+
//noinspection DataFlowIssue
77+
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
78+
} catch (IndexOutOfBoundsException | NullPointerException ex) {
79+
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
80+
return PublishHandlingResult.UNSPECIFIED_ERROR;
81+
}
82+
}
83+
8084
private static String buildServiceDescription(
8185
@Nullable MqttPublishOutMessageHandler[] publishOutMessageHandlers) {
8286
var builder = new StringBuilder();

0 commit comments

Comments
 (0)