Skip to content

Commit 94b32db

Browse files
committed
Merge branch 'develop' into feature-broker-30
# Conflicts: # model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentRetainedMessageTree.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/RetainedMessageNode.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/ConcurrentSubscriptionTree.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterNode.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterTreeBase.java # model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java # model/src/main/java/javasabr/mqtt/model/topic/tree/TopicNode.java # model/src/main/java/javasabr/mqtt/model/topic/tree/TopicTreeBase.java # model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy # service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
2 parents c6ada4d + 0bd0dab commit 94b32db

File tree

6 files changed

+119
-113
lines changed

6 files changed

+119
-113
lines changed

model/src/main/java/javasabr/mqtt/model/subscribtion/tree/ConcurrentSubscriptionTree.java renamed to model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.model.subscribtion.tree;
1+
package javasabr.mqtt.model.subscriber.tree;
22

33
import javasabr.mqtt.model.subscriber.SingleSubscriber;
44
import javasabr.mqtt.model.subscribtion.Subscription;
@@ -13,12 +13,13 @@
1313
import org.jspecify.annotations.Nullable;
1414

1515
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
16-
public class ConcurrentSubscriptionTree implements ThreadSafe {
1716

18-
TopicFilterNode rootNode;
17+
public class ConcurrentSubscriberTree implements ThreadSafe {
1918

20-
public ConcurrentSubscriptionTree() {
21-
this.rootNode = new TopicFilterNode();
19+
SubscriberNode rootNode;
20+
21+
public ConcurrentSubscriberTree() {
22+
this.rootNode = new SubscriberNode();
2223
}
2324

2425
@Nullable

model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterNode.java renamed to model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
package javasabr.mqtt.model.subscribtion.tree;
1+
package javasabr.mqtt.model.subscriber.tree;
22

33
import java.util.function.Supplier;
44
import javasabr.mqtt.base.util.DebugUtils;
55
import javasabr.mqtt.model.subscriber.SingleSubscriber;
66
import javasabr.mqtt.model.subscriber.Subscriber;
7+
import javasabr.mqtt.model.subscriber.tree.SubscriberTreeBase;
78
import javasabr.mqtt.model.subscribtion.Subscription;
89
import javasabr.mqtt.model.subscribtion.SubscriptionOwner;
910
import javasabr.mqtt.model.topic.TopicFilter;
@@ -22,16 +23,16 @@
2223
@Getter(AccessLevel.PACKAGE)
2324
@Accessors(fluent = true, chain = false)
2425
@FieldDefaults(level = AccessLevel.PRIVATE)
25-
class TopicFilterNode extends TopicFilterTreeBase {
26+
class SubscriberNode extends SubscriberTreeBase {
2627

27-
private final static Supplier<TopicFilterNode> TOPIC_NODE_FACTORY = TopicFilterNode::new;
28+
private final static Supplier<SubscriberNode> SUBSCRIBER_NODE_FACTORY = SubscriberNode::new;
2829

2930
static {
3031
DebugUtils.registerIncludedFields("childNodes", "subscribers");
3132
}
3233

3334
@Nullable
34-
volatile LockableRefToRefDictionary<String, TopicFilterNode> childNodes;
35+
volatile LockableRefToRefDictionary<String, SubscriberNode> childNodes;
3536
@Nullable
3637
volatile LockableArray<Subscriber> subscribers;
3738

@@ -43,15 +44,17 @@ public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscripti
4344
if (level == topicFilter.levelsCount()) {
4445
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
4546
}
46-
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
47+
48+
SubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level));
4749
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
4850
}
4951

5052
public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {
5153
if (level == topicFilter.levelsCount()) {
5254
return removeSubscriber(subscribers(), owner, topicFilter);
5355
}
54-
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
56+
57+
SubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level));
5558
return childNode.unsubscribe(level + 1, owner, topicFilter);
5659
}
5760

@@ -67,14 +70,14 @@ private void exactlyTopicMatch(
6770
int lastLevel,
6871
MutableArray<SingleSubscriber> result) {
6972
String segment = topicName.segment(level);
70-
TopicFilterNode topicFilterNode = childNode(segment);
71-
if (topicFilterNode == null) {
73+
SubscriberNode subscriberNode = childNode(segment);
74+
if (subscriberNode == null) {
7275
return;
7376
}
7477
if (level == lastLevel) {
75-
appendSubscribersTo(result, topicFilterNode);
78+
appendSubscribersTo(result, subscriberNode);
7679
} else if (level < lastLevel) {
77-
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
80+
subscriberNode.matchesTo(level + 1, topicName, lastLevel, result);
7881
}
7982
}
8083

@@ -83,47 +86,49 @@ private void singleWildcardTopicMatch(
8386
TopicName topicName,
8487
int lastLevel,
8588
MutableArray<SingleSubscriber> result) {
86-
TopicFilterNode topicFilterNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
87-
if (topicFilterNode == null) {
89+
90+
SubscriberNode subscriberNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
91+
if (subscriberNode == null) {
8892
return;
8993
}
9094
if (level == lastLevel) {
91-
appendSubscribersTo(result, topicFilterNode);
95+
appendSubscribersTo(result, subscriberNode);
9296
} else if (level < lastLevel) {
93-
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
97+
subscriberNode.matchesTo(level + 1, topicName, lastLevel, result);
9498
}
9599
}
96100

97101
private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) {
98-
TopicFilterNode topicFilterNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
99-
if (topicFilterNode != null) {
100-
appendSubscribersTo(result, topicFilterNode);
102+
103+
SubscriberNode subscriberNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
104+
if (subscriberNode != null) {
105+
appendSubscribersTo(result, subscriberNode);
101106
}
102107
}
103108

104-
private TopicFilterNode getOrCreateChildNode(String segment) {
105-
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = getOrCreateChildNodes();
109+
private SubscriberNode getOrCreateChildNode(String segment) {
110+
LockableRefToRefDictionary<String, SubscriberNode> childNodes = getOrCreateChildNodes();
106111
long stamp = childNodes.readLock();
107112
try {
108-
TopicFilterNode topicFilterNode = childNodes.get(segment);
109-
if (topicFilterNode != null) {
110-
return topicFilterNode;
113+
SubscriberNode subscriberNode = childNodes.get(segment);
114+
if (subscriberNode != null) {
115+
return subscriberNode;
111116
}
112117
} finally {
113118
childNodes.readUnlock(stamp);
114119
}
115120
stamp = childNodes.writeLock();
116121
try {
117122
//noinspection DataFlowIssue
118-
return childNodes.getOrCompute(segment, TOPIC_NODE_FACTORY);
123+
return childNodes.getOrCompute(segment, SUBSCRIBER_NODE_FACTORY);
119124
} finally {
120125
childNodes.writeUnlock(stamp);
121126
}
122127
}
123128

124129
@Nullable
125-
private TopicFilterNode childNode(String segment) {
126-
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = childNodes();
130+
private SubscriberNode childNode(String segment) {
131+
LockableRefToRefDictionary<String, SubscriberNode> childNodes = childNodes();
127132
if (childNodes == null) {
128133
return null;
129134
}
@@ -135,7 +140,7 @@ private TopicFilterNode childNode(String segment) {
135140
}
136141
}
137142

138-
private LockableRefToRefDictionary<String, TopicFilterNode> getOrCreateChildNodes() {
143+
private LockableRefToRefDictionary<String, SubscriberNode> getOrCreateChildNodes() {
139144
if (childNodes == null) {
140145
synchronized (this) {
141146
if (childNodes == null) {

model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterTreeBase.java renamed to model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package javasabr.mqtt.model.subscribtion.tree;
1+
package javasabr.mqtt.model.subscriber.tree;
22

33
import java.util.Objects;
44
import javasabr.mqtt.model.QoS;
@@ -18,7 +18,7 @@
1818

1919
@RequiredArgsConstructor
2020
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
21-
abstract class TopicFilterTreeBase {
21+
abstract class SubscriberTreeBase {
2222

2323
/**
2424
* @return previous subscriber with the same owner
@@ -66,7 +66,7 @@ private static void addSharedSubscriber(
6666
String group = sharedTopicFilter.shareName();
6767
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
6868
.iterations()
69-
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
69+
.findAny(group, SubscriberTreeBase::isSharedSubscriberWithGroup);
7070

7171
if (sharedSubscriber == null) {
7272
sharedSubscriber = new SharedSubscriber(sharedTopicFilter);
@@ -76,8 +76,8 @@ private static void addSharedSubscriber(
7676
sharedSubscriber.addSubscriber(new SingleSubscriber(owner, subscription));
7777
}
7878

79-
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicFilterNode topicFilterNode) {
80-
LockableArray<Subscriber> subscribers = topicFilterNode.subscribers();
79+
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, SubscriberNode subscriberNode) {
80+
LockableArray<Subscriber> subscribers = subscriberNode.subscribers();
8181
if (subscribers == null) {
8282
return;
8383
}
@@ -125,7 +125,7 @@ private static boolean removeSharedSubscriber(
125125
String group = sharedTopicFilter.shareName();
126126
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
127127
.iterations()
128-
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
128+
.findAny(group, SubscriberTreeBase::isSharedSubscriberWithGroup);
129129
if (sharedSubscriber != null) {
130130
boolean removed = sharedSubscriber.removeSubscriberWithOwner(owner);
131131
if (sharedSubscriber.isEmpty()) {

model/src/main/java/javasabr/mqtt/model/subscribtion/tree/package-info.java renamed to model/src/main/java/javasabr/mqtt/model/subscriber/tree/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
@NullMarked
2-
package javasabr.mqtt.model.subscribtion.tree;
2+
package javasabr.mqtt.model.subscriber.tree;
33

44
import org.jspecify.annotations.NullMarked;

0 commit comments

Comments
 (0)