Skip to content

Commit c6ada4d

Browse files
committed
[broker-30] Rewrite retained messages collecting
1 parent 26129f5 commit c6ada4d

File tree

7 files changed

+187
-122
lines changed

7 files changed

+187
-122
lines changed

model/src/main/java/javasabr/mqtt/model/topic/AbstractTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public int levelsCount() {
4141
return segments.length;
4242
}
4343

44-
String lastSegment() {
44+
public String lastSegment() {
4545
return segments[segments.length - 1];
4646
}
4747

model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentRetainedMessageTree.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,30 @@
22

33
import javasabr.mqtt.model.publishing.Publish;
44
import javasabr.mqtt.model.topic.TopicFilter;
5-
import javasabr.mqtt.model.topic.TopicName;
5+
import javasabr.rlib.collections.array.Array;
6+
import javasabr.rlib.collections.array.MutableArray;
67
import javasabr.rlib.common.ThreadSafe;
78
import lombok.AccessLevel;
89
import lombok.experimental.FieldDefaults;
9-
import org.jspecify.annotations.Nullable;
1010

1111
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
1212
public class ConcurrentRetainedMessageTree implements ThreadSafe {
1313

14-
TopicMessageNode rootNode;
14+
RetainedMessageNode rootNode;
1515

1616
public ConcurrentRetainedMessageTree() {
17-
this.rootNode = new TopicMessageNode();
17+
this.rootNode = new RetainedMessageNode();
1818
}
1919

2020
public void retainMessage(Publish message) {
21-
rootNode.retainMessage(0, message, message.topicName());
21+
if (message.retained()) {
22+
rootNode.retainMessage(0, message, message.topicName());
23+
}
2224
}
2325

24-
public @Nullable Publish getRetainedMessage(TopicName topicName) {
25-
return rootNode.getRetainedMessage(0, topicName);
26-
}
27-
28-
public @Nullable Publish getRetainedMessage(TopicFilter topicFilter) {
29-
return rootNode.getRetainedMessage(0, topicFilter);
26+
public Array<Publish> getRetainedMessage(TopicFilter topicFilter) {
27+
var resultArray = MutableArray.ofType(Publish.class);
28+
rootNode.collectRetainedMessages(0, topicFilter, topicFilter.levelsCount() - 1, resultArray);
29+
return resultArray;
3030
}
3131
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package javasabr.mqtt.model.topic.tree;
2+
3+
import static javasabr.mqtt.model.topic.TopicFilter.MULTI_LEVEL_WILDCARD;
4+
import static javasabr.mqtt.model.topic.TopicFilter.SINGLE_LEVEL_WILDCARD;
5+
6+
import java.util.Objects;
7+
import java.util.PriorityQueue;
8+
import java.util.Queue;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import java.util.function.Supplier;
11+
import javasabr.mqtt.base.util.DebugUtils;
12+
import javasabr.mqtt.model.publishing.Publish;
13+
import javasabr.mqtt.model.topic.TopicFilter;
14+
import javasabr.mqtt.model.topic.TopicName;
15+
import javasabr.rlib.collections.array.MutableArray;
16+
import javasabr.rlib.collections.dictionary.DictionaryFactory;
17+
import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
18+
import lombok.AccessLevel;
19+
import lombok.Getter;
20+
import lombok.experimental.Accessors;
21+
import lombok.experimental.FieldDefaults;
22+
import org.jspecify.annotations.Nullable;
23+
24+
@Getter(AccessLevel.PACKAGE)
25+
@Accessors(fluent = true, chain = false)
26+
@FieldDefaults(level = AccessLevel.PRIVATE)
27+
class RetainedMessageNode {
28+
29+
private final static Supplier<RetainedMessageNode> TOPIC_NODE_FACTORY = RetainedMessageNode::new;
30+
31+
static {
32+
DebugUtils.registerIncludedFields("childNodes", "retainedMessage");
33+
}
34+
35+
@Nullable
36+
volatile LockableRefToRefDictionary<String, RetainedMessageNode> childNodes;
37+
final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>();
38+
39+
public void retainMessage(int level, Publish message, TopicName topicName) {
40+
if (level + 1 == topicName.levelsCount()) {
41+
retainedMessage.set(message.payload().length == 0 ? null : message);
42+
return;
43+
}
44+
RetainedMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
45+
childNode.retainMessage(level + 1, message, topicName);
46+
}
47+
48+
public void collectRetainedMessages(int level, TopicFilter topicFilter, int lastLevel, MutableArray<Publish> result) {
49+
String segment = topicFilter.segment(level);
50+
Publish publish = retainedMessage.get();
51+
if (Objects.equals(segment, MULTI_LEVEL_WILDCARD)) {
52+
collectAllMessages(this, result);
53+
} else if (Objects.equals(segment, SINGLE_LEVEL_WILDCARD)) {
54+
var childNodes = childNodes();
55+
if (childNodes == null) {
56+
return;
57+
}
58+
long stamp = childNodes.readLock();
59+
try {
60+
for (RetainedMessageNode n : childNodes) {
61+
n.collectRetainedMessages(level + 1, topicFilter, lastLevel, result);
62+
}
63+
} finally {
64+
childNodes.readUnlock(stamp);
65+
}
66+
} else if (level == lastLevel && publish != null && Objects.equals(segment, publish.topicName().lastSegment())) {
67+
result.add(publish);
68+
} else {
69+
RetainedMessageNode topicFilterNode = childNode(segment);
70+
if (topicFilterNode == null) {
71+
return;
72+
}
73+
topicFilterNode.collectRetainedMessages(level + 1, topicFilter, lastLevel, result);
74+
}
75+
}
76+
77+
private void collectAllMessages(RetainedMessageNode node, MutableArray<Publish> result) {
78+
Queue<RetainedMessageNode> queue = new PriorityQueue<>();
79+
queue.add(node);
80+
while (!queue.isEmpty()) {
81+
RetainedMessageNode poll = queue.poll();
82+
Publish message = poll.retainedMessage.get();
83+
if (message != null) {
84+
result.add(message);
85+
}
86+
var childNodes = poll.childNodes();
87+
if (childNodes == null) {
88+
continue;
89+
}
90+
long stamp = childNodes.readLock();
91+
try {
92+
for (RetainedMessageNode n : childNodes) {
93+
queue.add(n);
94+
}
95+
} finally {
96+
childNodes.readUnlock(stamp);
97+
}
98+
}
99+
}
100+
101+
@Nullable
102+
private RetainedMessageNode childNode(String segment) {
103+
LockableRefToRefDictionary<String, RetainedMessageNode> childNodes = childNodes();
104+
if (childNodes == null) {
105+
return null;
106+
}
107+
long stamp = childNodes.readLock();
108+
try {
109+
return childNodes.get(segment);
110+
} finally {
111+
childNodes.readUnlock(stamp);
112+
}
113+
}
114+
115+
private RetainedMessageNode getOrCreateChildNode(String segment) {
116+
LockableRefToRefDictionary<String, RetainedMessageNode> childNodes = getOrCreateChildNodes();
117+
long stamp = childNodes.readLock();
118+
try {
119+
RetainedMessageNode topicFilterNode = childNodes.get(segment);
120+
if (topicFilterNode != null) {
121+
return topicFilterNode;
122+
}
123+
} finally {
124+
childNodes.readUnlock(stamp);
125+
}
126+
stamp = childNodes.writeLock();
127+
try {
128+
return childNodes.getOrCompute(segment, TOPIC_NODE_FACTORY);
129+
} finally {
130+
childNodes.writeUnlock(stamp);
131+
}
132+
}
133+
134+
private LockableRefToRefDictionary<String, RetainedMessageNode> getOrCreateChildNodes() {
135+
if (childNodes == null) {
136+
synchronized (this) {
137+
if (childNodes == null) {
138+
childNodes = DictionaryFactory.stampedLockBasedRefToRefDictionary();
139+
}
140+
}
141+
}
142+
//noinspection ConstantConditions
143+
return childNodes;
144+
}
145+
146+
@Override
147+
public String toString() {
148+
return DebugUtils.toJsonString(this);
149+
}
150+
}

model/src/main/java/javasabr/mqtt/model/topic/tree/TopicMessageNode.java

Lines changed: 0 additions & 94 deletions
This file was deleted.

service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
import javasabr.mqtt.model.subscriber.SingleSubscriber;
55
import javasabr.mqtt.model.topic.TopicFilter;
66
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
7+
import javasabr.rlib.collections.array.Array;
78

89
public interface PublishDeliveringService {
910

1011
PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);
1112

12-
PublishHandlingResult deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber);
13+
Array<PublishHandlingResult> deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber);
1314
}

service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import javasabr.mqtt.service.PublishDeliveringService;
1010
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
1111
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
12+
import javasabr.rlib.collections.array.Array;
13+
import javasabr.rlib.collections.array.MutableArray;
1214
import lombok.AccessLevel;
1315
import lombok.CustomLog;
1416
import lombok.experimental.FieldDefaults;
@@ -20,7 +22,7 @@ public class DefaultPublishDeliveringService implements PublishDeliveringService
2022

2123
@Nullable
2224
MqttPublishOutMessageHandler[] publishOutMessageHandlers;
23-
ConcurrentRetainedMessageTree topicTree;
25+
ConcurrentRetainedMessageTree retainedMessageTree;
2426

2527
public DefaultPublishDeliveringService(
2628
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
@@ -42,17 +44,15 @@ public DefaultPublishDeliveringService(
4244
}
4345
handlers[qos.level()] = knownPublishOutHandler;
4446
}
45-
this.topicTree = new ConcurrentRetainedMessageTree();
47+
this.retainedMessageTree = new ConcurrentRetainedMessageTree();
4648
this.publishOutMessageHandlers = handlers;
4749
log.info(publishOutMessageHandlers, DefaultPublishDeliveringService::buildServiceDescription);
4850
}
4951

5052
@Override
5153
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
5254
try {
53-
if (publish.retained()) {
54-
topicTree.retainMessage(publish);
55-
}
55+
retainedMessageTree.retainMessage(publish);
5656
//noinspection DataFlowIssue
5757
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
5858
} catch (IndexOutOfBoundsException | NullPointerException ex) {
@@ -62,9 +62,13 @@ public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber s
6262
}
6363

6464
@Override
65-
public PublishHandlingResult deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber) {
66-
Publish retainedMessage = topicTree.getRetainedMessage(topicFilter);
67-
return startDelivering(retainedMessage, subscriber);
65+
public Array<PublishHandlingResult> deliverRetainedMessages(TopicFilter topicFilter, SingleSubscriber subscriber) {
66+
Array<Publish> retainedMessage = retainedMessageTree.getRetainedMessage(topicFilter);
67+
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
68+
for (Publish message : retainedMessage) {
69+
result.add(startDelivering(message, subscriber));
70+
}
71+
return result;
6872
}
6973

7074
private static String buildServiceDescription(

service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,23 +194,27 @@ private void sendRetainedMessages(
194194
RequestedSubscription requestedSubscription = subscriptions.get(i);
195195
SubscribeAckReasonCode subscribeAckReasonCode = subscribeResults.get(i);
196196
Subscription subscription = subs.get(i);
197-
if (subscribeAckReasonCode.ordinal() < 3) {
198-
TopicFilter topicFilter = TopicFilter.valueOf(requestedSubscription.rawTopicFilter());
199-
SingleSubscriber singleSubscriber = new SingleSubscriber(client, subscription);
200-
PublishHandlingResult result = publishDeliveringService.deliverRetainedMessages(topicFilter, singleSubscriber);
197+
if (subscribeAckReasonCode.ordinal() > 2) {
198+
// TODO handle error
199+
continue;
200+
}
201+
TopicFilter topicFilter = TopicFilter.valueOf(requestedSubscription.rawTopicFilter());
202+
SingleSubscriber singleSubscriber = new SingleSubscriber(client, subscription);
203+
var results = publishDeliveringService.deliverRetainedMessages(topicFilter, singleSubscriber);
204+
for (PublishHandlingResult result : results) {
201205
if (result.error()) {
202206
errorResult = result;
203-
} else if(result == PublishHandlingResult.SUCCESS) {
207+
} else if (result == PublishHandlingResult.SUCCESS) {
204208
count++;
205209
}
206210
if (errorResult != null) {
207211
log.debug(client.clientId(), errorResult,
208212
"[%s] Found final error:[%s] during sending retained messages"::formatted);
209-
// handleError(client, publish, errorResult);
213+
// TODO handleError(client, publish, errorResult);
210214
} else {
211215
log.debug(client.clientId(), count,
212216
"[%s] Successfully started delivering retained messages to [%s] subscribers"::formatted);
213-
// handleSuccessfulResult(client, publish, count);
217+
// TODO handleSuccessfulResult(client, publish, count);
214218
}
215219
}
216220
}

0 commit comments

Comments
 (0)