Skip to content

Commit 26129f5

Browse files
committed
[broker-30] Implement delivering retained messages
1 parent 5db7fd7 commit 26129f5

File tree

14 files changed

+321
-64
lines changed

14 files changed

+321
-64
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import javasabr.mqtt.model.MqttProperties;
66
import javasabr.mqtt.model.MqttServerConnectionConfig;
77
import javasabr.mqtt.model.QoS;
8+
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
89
import javasabr.mqtt.network.MqttClientFactory;
910
import javasabr.mqtt.network.MqttConnection;
1011
import javasabr.mqtt.network.MqttConnectionFactory;
@@ -178,8 +179,9 @@ MqttInMessageHandler disconnectMqttInMessageHandler(MessageOutFactoryService mes
178179
MqttInMessageHandler subscribeMqttInMessageHandler(
179180
SubscriptionService subscriptionService,
180181
MessageOutFactoryService messageOutFactoryService,
181-
TopicService topicService) {
182-
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
182+
TopicService topicService,
183+
PublishDeliveringService publishDeliveringService) {
184+
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService, publishDeliveringService);
183185
}
184186

185187
@Bean

model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java renamed to model/src/main/java/javasabr/mqtt/model/subscribtion/tree/ConcurrentSubscriptionTree.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.model.topic.tree;
1+
package javasabr.mqtt.model.subscribtion.tree;
22

33
import javasabr.mqtt.model.subscriber.SingleSubscriber;
44
import javasabr.mqtt.model.subscribtion.Subscription;
@@ -13,12 +13,12 @@
1313
import org.jspecify.annotations.Nullable;
1414

1515
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
16-
public class ConcurrentTopicTree implements ThreadSafe {
16+
public class ConcurrentSubscriptionTree implements ThreadSafe {
1717

18-
TopicNode rootNode;
18+
TopicFilterNode rootNode;
1919

20-
public ConcurrentTopicTree() {
21-
this.rootNode = new TopicNode();
20+
public ConcurrentSubscriptionTree() {
21+
this.rootNode = new TopicFilterNode();
2222
}
2323

2424
@Nullable

model/src/main/java/javasabr/mqtt/model/topic/tree/TopicNode.java renamed to model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterNode.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.model.topic.tree;
1+
package javasabr.mqtt.model.subscribtion.tree;
22

33
import java.util.function.Supplier;
44
import javasabr.mqtt.base.util.DebugUtils;
@@ -22,16 +22,16 @@
2222
@Getter(AccessLevel.PACKAGE)
2323
@Accessors(fluent = true, chain = false)
2424
@FieldDefaults(level = AccessLevel.PRIVATE)
25-
class TopicNode extends TopicTreeBase {
25+
class TopicFilterNode extends TopicFilterTreeBase {
2626

27-
private final static Supplier<TopicNode> TOPIC_NODE_FACTORY = TopicNode::new;
27+
private final static Supplier<TopicFilterNode> TOPIC_NODE_FACTORY = TopicFilterNode::new;
2828

2929
static {
3030
DebugUtils.registerIncludedFields("childNodes", "subscribers");
3131
}
3232

3333
@Nullable
34-
volatile LockableRefToRefDictionary<String, TopicNode> childNodes;
34+
volatile LockableRefToRefDictionary<String, TopicFilterNode> childNodes;
3535
@Nullable
3636
volatile LockableArray<Subscriber> subscribers;
3737

@@ -43,15 +43,15 @@ public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscripti
4343
if (level == topicFilter.levelsCount()) {
4444
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
4545
}
46-
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
46+
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
4747
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
4848
}
4949

5050
public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {
5151
if (level == topicFilter.levelsCount()) {
5252
return removeSubscriber(subscribers(), owner, topicFilter);
5353
}
54-
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
54+
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
5555
return childNode.unsubscribe(level + 1, owner, topicFilter);
5656
}
5757

@@ -67,14 +67,14 @@ private void exactlyTopicMatch(
6767
int lastLevel,
6868
MutableArray<SingleSubscriber> result) {
6969
String segment = topicName.segment(level);
70-
TopicNode topicNode = childNode(segment);
71-
if (topicNode == null) {
70+
TopicFilterNode topicFilterNode = childNode(segment);
71+
if (topicFilterNode == null) {
7272
return;
7373
}
7474
if (level == lastLevel) {
75-
appendSubscribersTo(result, topicNode);
75+
appendSubscribersTo(result, topicFilterNode);
7676
} else if (level < lastLevel) {
77-
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
77+
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
7878
}
7979
}
8080

@@ -83,31 +83,31 @@ private void singleWildcardTopicMatch(
8383
TopicName topicName,
8484
int lastLevel,
8585
MutableArray<SingleSubscriber> result) {
86-
TopicNode topicNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
87-
if (topicNode == null) {
86+
TopicFilterNode topicFilterNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
87+
if (topicFilterNode == null) {
8888
return;
8989
}
9090
if (level == lastLevel) {
91-
appendSubscribersTo(result, topicNode);
91+
appendSubscribersTo(result, topicFilterNode);
9292
} else if (level < lastLevel) {
93-
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
93+
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
9494
}
9595
}
9696

9797
private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) {
98-
TopicNode topicNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
99-
if (topicNode != null) {
100-
appendSubscribersTo(result, topicNode);
98+
TopicFilterNode topicFilterNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
99+
if (topicFilterNode != null) {
100+
appendSubscribersTo(result, topicFilterNode);
101101
}
102102
}
103103

104-
private TopicNode getOrCreateChildNode(String segment) {
105-
LockableRefToRefDictionary<String, TopicNode> childNodes = getOrCreateChildNodes();
104+
private TopicFilterNode getOrCreateChildNode(String segment) {
105+
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = getOrCreateChildNodes();
106106
long stamp = childNodes.readLock();
107107
try {
108-
TopicNode topicNode = childNodes.get(segment);
109-
if (topicNode != null) {
110-
return topicNode;
108+
TopicFilterNode topicFilterNode = childNodes.get(segment);
109+
if (topicFilterNode != null) {
110+
return topicFilterNode;
111111
}
112112
} finally {
113113
childNodes.readUnlock(stamp);
@@ -122,8 +122,8 @@ private TopicNode getOrCreateChildNode(String segment) {
122122
}
123123

124124
@Nullable
125-
private TopicNode childNode(String segment) {
126-
LockableRefToRefDictionary<String, TopicNode> childNodes = childNodes();
125+
private TopicFilterNode childNode(String segment) {
126+
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = childNodes();
127127
if (childNodes == null) {
128128
return null;
129129
}
@@ -135,7 +135,7 @@ private TopicNode childNode(String segment) {
135135
}
136136
}
137137

138-
private LockableRefToRefDictionary<String, TopicNode> getOrCreateChildNodes() {
138+
private LockableRefToRefDictionary<String, TopicFilterNode> getOrCreateChildNodes() {
139139
if (childNodes == null) {
140140
synchronized (this) {
141141
if (childNodes == null) {

model/src/main/java/javasabr/mqtt/model/topic/tree/TopicTreeBase.java renamed to model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterTreeBase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.model.topic.tree;
1+
package javasabr.mqtt.model.subscribtion.tree;
22

33
import java.util.Objects;
44
import javasabr.mqtt.model.QoS;
@@ -18,7 +18,7 @@
1818

1919
@RequiredArgsConstructor
2020
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
21-
abstract class TopicTreeBase {
21+
abstract class TopicFilterTreeBase {
2222

2323
/**
2424
* @return previous subscriber with the same owner
@@ -66,7 +66,7 @@ private static void addSharedSubscriber(
6666
String group = sharedTopicFilter.shareName();
6767
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
6868
.iterations()
69-
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
69+
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
7070

7171
if (sharedSubscriber == null) {
7272
sharedSubscriber = new SharedSubscriber(sharedTopicFilter);
@@ -76,8 +76,8 @@ private static void addSharedSubscriber(
7676
sharedSubscriber.addSubscriber(new SingleSubscriber(owner, subscription));
7777
}
7878

79-
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicNode topicNode) {
80-
LockableArray<Subscriber> subscribers = topicNode.subscribers();
79+
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicFilterNode topicFilterNode) {
80+
LockableArray<Subscriber> subscribers = topicFilterNode.subscribers();
8181
if (subscribers == null) {
8282
return;
8383
}
@@ -125,7 +125,7 @@ private static boolean removeSharedSubscriber(
125125
String group = sharedTopicFilter.shareName();
126126
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
127127
.iterations()
128-
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
128+
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
129129
if (sharedSubscriber != null) {
130130
boolean removed = sharedSubscriber.removeSubscriberWithOwner(owner);
131131
if (sharedSubscriber.isEmpty()) {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@NullMarked
2+
package javasabr.mqtt.model.subscribtion.tree;
3+
4+
import org.jspecify.annotations.NullMarked;

model/src/main/java/javasabr/mqtt/model/topic/TopicFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
public class TopicFilter extends AbstractTopic {
1212

1313
public static final String MULTI_LEVEL_WILDCARD = "#";
14-
public static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
14+
public static final char MULTI_LEVEL_WILDCARD_CHAR = MULTI_LEVEL_WILDCARD.charAt(0);
1515
public static final String SINGLE_LEVEL_WILDCARD = "+";
16-
public static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
16+
public static final char SINGLE_LEVEL_WILDCARD_CHAR = SINGLE_LEVEL_WILDCARD.charAt(0);
1717
public static final String SPECIAL = "$";
1818

1919
public static final TopicFilter INVALID_TOPIC_FILTER = new TopicFilter("$invalid$") {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package javasabr.mqtt.model.topic.tree;
2+
3+
import javasabr.mqtt.model.publishing.Publish;
4+
import javasabr.mqtt.model.topic.TopicFilter;
5+
import javasabr.mqtt.model.topic.TopicName;
6+
import javasabr.rlib.common.ThreadSafe;
7+
import lombok.AccessLevel;
8+
import lombok.experimental.FieldDefaults;
9+
import org.jspecify.annotations.Nullable;
10+
11+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
12+
public class ConcurrentRetainedMessageTree implements ThreadSafe {
13+
14+
TopicMessageNode rootNode;
15+
16+
public ConcurrentRetainedMessageTree() {
17+
this.rootNode = new TopicMessageNode();
18+
}
19+
20+
public void retainMessage(Publish message) {
21+
rootNode.retainMessage(0, message, message.topicName());
22+
}
23+
24+
public @Nullable Publish getRetainedMessage(TopicName topicName) {
25+
return rootNode.getRetainedMessage(0, topicName);
26+
}
27+
28+
public @Nullable Publish getRetainedMessage(TopicFilter topicFilter) {
29+
return rootNode.getRetainedMessage(0, topicFilter);
30+
}
31+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package javasabr.mqtt.model.topic.tree;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
import java.util.function.Supplier;
5+
import javasabr.mqtt.base.util.DebugUtils;
6+
import javasabr.mqtt.model.publishing.Publish;
7+
import javasabr.mqtt.model.topic.TopicFilter;
8+
import javasabr.mqtt.model.topic.TopicName;
9+
import javasabr.rlib.collections.dictionary.DictionaryFactory;
10+
import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
11+
import lombok.AccessLevel;
12+
import lombok.Getter;
13+
import lombok.experimental.Accessors;
14+
import lombok.experimental.FieldDefaults;
15+
import org.jspecify.annotations.Nullable;
16+
17+
@Getter(AccessLevel.PACKAGE)
18+
@Accessors(fluent = true, chain = false)
19+
@FieldDefaults(level = AccessLevel.PRIVATE)
20+
class TopicMessageNode {
21+
22+
private final static Supplier<TopicMessageNode> TOPIC_NODE_FACTORY = TopicMessageNode::new;
23+
24+
static {
25+
DebugUtils.registerIncludedFields("childNodes", "retainedMessage");
26+
}
27+
28+
@Nullable
29+
volatile LockableRefToRefDictionary<String, TopicMessageNode> childNodes;
30+
final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>();
31+
32+
public void retainMessage(int level, Publish message, TopicName topicFilter) {
33+
if (level + 1 == topicFilter.levelsCount()) {
34+
retainedMessage.set(message);
35+
return;
36+
}
37+
TopicMessageNode childNode = getOrCreateChildNode(topicFilter.segment(level));
38+
childNode.retainMessage(level + 1, message, topicFilter);
39+
}
40+
41+
@Nullable
42+
public Publish getRetainedMessage(int level, TopicName topicName) {
43+
if (level + 1 == topicName.levelsCount()) {
44+
return retainedMessage.get();
45+
}
46+
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
47+
return childNode.getRetainedMessage(level + 1, topicName);
48+
}
49+
50+
@Nullable
51+
public Publish getRetainedMessage(int level, TopicFilter topicName) {
52+
if (level + 1 == topicName.levelsCount()) {
53+
return retainedMessage.get();
54+
}
55+
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
56+
return childNode.getRetainedMessage(level + 1, topicName);
57+
}
58+
59+
private TopicMessageNode getOrCreateChildNode(String segment) {
60+
LockableRefToRefDictionary<String, TopicMessageNode> childNodes = getOrCreateChildNodes();
61+
long stamp = childNodes.readLock();
62+
try {
63+
TopicMessageNode topicFilterNode = childNodes.get(segment);
64+
if (topicFilterNode != null) {
65+
return topicFilterNode;
66+
}
67+
} finally {
68+
childNodes.readUnlock(stamp);
69+
}
70+
stamp = childNodes.writeLock();
71+
try {
72+
return childNodes.getOrCompute(segment, TOPIC_NODE_FACTORY);
73+
} finally {
74+
childNodes.writeUnlock(stamp);
75+
}
76+
}
77+
78+
private LockableRefToRefDictionary<String, TopicMessageNode> getOrCreateChildNodes() {
79+
if (childNodes == null) {
80+
synchronized (this) {
81+
if (childNodes == null) {
82+
childNodes = DictionaryFactory.stampedLockBasedRefToRefDictionary();
83+
}
84+
}
85+
}
86+
//noinspection ConstantConditions
87+
return childNodes;
88+
}
89+
90+
@Override
91+
public String toString() {
92+
return DebugUtils.toJsonString(this);
93+
}
94+
}

0 commit comments

Comments
 (0)