Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
26129f5
[broker-30] Implement delivering retained messages
crazyrokr Nov 19, 2025
c6ada4d
[broker-30] Rewrite retained messages collecting
crazyrokr Nov 19, 2025
94b32db
Merge branch 'develop' into feature-broker-30
crazyrokr Nov 19, 2025
01bac5b
[broker-30] Cleanup code after merge
crazyrokr Nov 19, 2025
c903510
[broker-30] Fix corner cases in retained messages
crazyrokr Nov 20, 2025
e1f5c2a
[broker-30] Handle SubscribeRetainHandling
crazyrokr Nov 20, 2025
a8829ac
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
a1b87dd
[broker-30] Fix build
crazyrokr Dec 1, 2025
fd848eb
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
be8070b
[broker-30] SubscriberNode refactoring
crazyrokr Dec 2, 2025
0c219dd
[broker-30] Revert wrong access level modifier
crazyrokr Dec 2, 2025
62900ca
[broker-30] Avoid tree branch locking
crazyrokr Dec 3, 2025
dc1d443
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 6, 2025
7314ed1
[broker-30] Introduce AbstractTrieNode
crazyrokr Dec 7, 2025
901e865
[broker-30] Remove redundant code from SubscriberNode
crazyrokr Dec 7, 2025
cc91db2
[broker-30] Revert redundant changes
crazyrokr Dec 7, 2025
42af014
[broker-30] Improve AbstractTrieNode
crazyrokr Dec 7, 2025
b1aa456
[broker-30] Implement retainAsPublished support
crazyrokr Dec 7, 2025
f2f2f67
[broker-30] Update debug message
crazyrokr Dec 7, 2025
bf837eb
[broker-30] Small refactoring
crazyrokr Dec 7, 2025
cc08339
[broker-30] Avoid double message retaining
crazyrokr Dec 7, 2025
bacefb7
[broker-30] Add more tests
crazyrokr Dec 8, 2025
c2ac2ad
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 8, 2025
1d37242
[broker-30] Fix tests
crazyrokr Dec 8, 2025
c3ab5c5
[broker-30] Fix retainAsPublished logic
crazyrokr Dec 8, 2025
b829090
[broker-30] Add tests
crazyrokr Dec 8, 2025
26f45dc
[broker-30] Introduce RetainMessageService
crazyrokr Dec 8, 2025
6c16962
[broker-30] Rework retainAsPublished handling
crazyrokr Dec 9, 2025
12f8069
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 9, 2025
31ff664
[broker-30] Fix formatting
crazyrokr Dec 9, 2025
8191bc8
[broker-30] Improve RetainedMessageTreeTest
crazyrokr Dec 9, 2025
c7c72d3
[broker-30] Improve code readability
crazyrokr Dec 9, 2025
8a23e97
[broker-30] Extract creation of SingleSubscriber to SubscriptionService
crazyrokr Dec 9, 2025
57eeb77
[broker-30] Revert unnecessary changes
crazyrokr Dec 9, 2025
28e73a5
[broker-30] Fix QoS comparison
crazyrokr Dec 9, 2025
2fb16a5
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 10, 2025
f4a6727
[broker-30] Update tests
crazyrokr Dec 10, 2025
751624f
[broker-30] Introduce setRetainedMessage and clearRetainedMessage
crazyrokr Dec 10, 2025
649f57e
[broker-30] Move publish.retained() check to caller
crazyrokr Dec 10, 2025
b4e1232
[broker-30] Extract isRetainHandlingSatisfied() method
crazyrokr Dec 10, 2025
1f3d50a
[broker-30] Revert formatting
crazyrokr Dec 10, 2025
588aae4
[broker-30] Revert SubscriberNode
crazyrokr Dec 10, 2025
07ce54d
[broker-30] Reduce memory allocation of RetainedMessageNode
crazyrokr Dec 11, 2025
3163a8a
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 12, 2025
17359d3
[broker-30] Refactoring
crazyrokr Dec 12, 2025
ffb1c83
[broker-30] Update tests
crazyrokr Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acl-groovy-dsl/src/main/resources/acl.gdsl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ contributor(context(scope: scriptScope())) {
&& !enclosingCall("allOf") && !enclosingCall("anyOf")) {
method name: 'topicFilter',
type: 'javasabr.mqtt.service.acl.builder.SubscribeRuleBuilder',
params: [string: 'javasabr.mqtt.model.acl.matcher.ValueMatcher<String>...'],
params: [string: 'javasabr.mqtt.model.acl.matcher.TopicFilterMatcher...'],
doc: 'Set of topic filters matching by rule'
method name: 'match',
type: 'javasabr.mqtt.model.acl.matcher.TopicFilterMatcher',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.PublishReceivingService;
import javasabr.mqtt.service.RetainMessageService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.TopicService;
import javasabr.mqtt.service.handler.client.ExternalNetworkMqttUserReleaseHandler;
Expand All @@ -28,6 +29,7 @@
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
import javasabr.mqtt.service.impl.DefaultRetainMessageService;
import javasabr.mqtt.service.impl.DefaultTopicService;
import javasabr.mqtt.service.impl.DisabledAuthorizationService;
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
Expand Down Expand Up @@ -114,8 +116,13 @@ AuthorizationService authorizationService() {
}

@Bean
SubscriptionService subscriptionService() {
return new InMemorySubscriptionService();
SubscriptionService subscriptionService(RetainMessageService retainMessageService) {
return new InMemorySubscriptionService(retainMessageService);
}

@Bean
RetainMessageService retainMessageService(PublishDeliveringService publishDeliveringService) {
return new DefaultRetainMessageService(publishDeliveringService);
}

@Bean
Expand Down Expand Up @@ -210,7 +217,8 @@ MqttInMessageHandler publishMqttInMessageHandler(
return new PublishMqttInMessageHandler(
publishReceivingService,
messageOutFactoryService,
topicService, authorizationService,
topicService,
authorizationService,
fieldValidators);
}

Expand Down Expand Up @@ -254,24 +262,18 @@ ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessa
}

@Bean
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
Expand All @@ -284,33 +286,39 @@ PublishDeliveringService publishDeliveringService(
MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
MessageOutFactoryService messageOutFactoryService,
RetainMessageService retainMessageService) {
return new Qos0MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
messageOutFactoryService,
retainMessageService);
}

@Bean
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
MessageOutFactoryService messageOutFactoryService,
RetainMessageService retainMessageService) {
return new Qos1MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
messageOutFactoryService,
retainMessageService);
}

@Bean
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
MessageOutFactoryService messageOutFactoryService,
RetainMessageService retainMessageService) {
return new Qos2MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
messageOutFactoryService,
retainMessageService);
}

@Bean
Expand Down
3 changes: 2 additions & 1 deletion core-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dependencies {

testImplementation projects.testSupport
testImplementation testFixtures(projects.network)
}
testImplementation testFixtures(projects.model)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package javasabr.mqtt.service;

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;

public interface RetainMessageService {

void retainMessage(Publish publish);

void deliverRetainedMessages(Subscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Subscription service
*/
public interface SubscriptionService {

default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package javasabr.mqtt.service.impl;

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.RetainMessageService;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class DefaultRetainMessageService implements RetainMessageService {

PublishDeliveringService publishDeliveringService;
ConcurrentRetainedMessageTree retainedMessageTree;

public DefaultRetainMessageService(PublishDeliveringService publishDeliveringService) {
this.publishDeliveringService = publishDeliveringService;
this.retainedMessageTree = new ConcurrentRetainedMessageTree();
}

@Override
public void retainMessage(Publish publish) {
retainedMessageTree.retainMessage(publish);

}

@Override
public void deliverRetainedMessages(Subscriber subscriber) {
SingleSubscriber singleSubscriber = subscriber.resolveSingle();
Subscription subscription = singleSubscriber.subscription();
boolean retainAsPublished = subscription.retainAsPublished();
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
for (Publish message : retainedMessages) {
if (!retainAsPublished) {
message = message.withoutRetain();
}
publishDeliveringService.startDelivering(message, singleSubscriber);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
package javasabr.mqtt.service.impl;

import static javasabr.mqtt.model.SubscribeRetainHandling.SEND;
import static javasabr.mqtt.model.SubscribeRetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.SUCCESS;

import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.SubscribeRetainHandling;
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.ActiveSubscriptions;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscriber.Subscriber;
import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.SharedTopicFilter;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.service.RetainMessageService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.ArrayFactory;
import javasabr.rlib.collections.array.MutableArray;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

/**
* In memory subscription service based on {@link ConcurrentSubscriberTree}
Expand All @@ -30,10 +36,12 @@
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class InMemorySubscriptionService implements SubscriptionService {

RetainMessageService retainMessageService;
ConcurrentSubscriberTree subscriberTree;

public InMemorySubscriptionService() {
public InMemorySubscriptionService(RetainMessageService retainMessageService) {
this.subscriberTree = new ConcurrentSubscriberTree();
this.retainMessageService = retainMessageService;
}

@Override
Expand Down Expand Up @@ -71,9 +79,13 @@ private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession sessio
return SubscribeAckReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED;
}
ActiveSubscriptions activeSubscriptions = session.activeSubscriptions();
SingleSubscriber previous = subscriberTree.subscribe(user, subscription);
if (previous != null) {
activeSubscriptions.remove(previous.subscription());
SingleSubscriber newSubscriber = new SingleSubscriber(user, subscription);
SingleSubscriber previousSubscriber = subscriberTree.subscribe(newSubscriber);
if (previousSubscriber != null) {
activeSubscriptions.remove(previousSubscriber.subscription());
}
if (isRetainHandlingRequired(subscription, previousSubscriber)) {
retainMessageService.deliverRetainedMessages(newSubscriber);
}
activeSubscriptions.add(subscription);
return subscription.qos().subscribeAckReasonCode();
Expand Down Expand Up @@ -125,7 +137,18 @@ public void restoreSubscriptions(MqttUser user, MqttSession session) {
.activeSubscriptions()
.subscriptions();
for (Subscription subscription : subscriptions) {
subscriberTree.subscribe(user, subscription);
subscriberTree.subscribe(new SingleSubscriber(user, subscription));
}
}

private static boolean isRetainHandlingRequired(
Subscription newSubscription,
@Nullable Subscriber previousSubscriber) {
if (newSubscription.topicFilter().isShared() || !newSubscription.qos().isValid()) {
return false;
}
SubscribeRetainHandling retainHandling = newSubscription.retainHandling();
return retainHandling == SEND || (retainHandling == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST
&& previousSubscriber == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.RetainMessageService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
Expand All @@ -29,6 +30,7 @@ public abstract class AbstractMqttPublishInMessageHandler<U extends NetworkMqttU
SubscriptionService subscriptionService;
PublishDeliveringService publishDeliveringService;
MessageOutFactoryService messageOutFactoryService;
RetainMessageService retainMessageService;

@Override
public void handle(NetworkMqttUser user, Publish publish) {
Expand All @@ -52,6 +54,9 @@ protected boolean validateImpl(U user, NetworkMqttSession session, Publish publi
}

protected void handleImpl(U user, NetworkMqttSession session, Publish publish) {
if (publish.retained()) {
retainMessageService.retainMessage(publish);
}
TopicName topicName = publish.topicName();
Array<SingleSubscriber> subscribers = subscriptionService.findSubscribers(topicName);
if (subscribers.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import lombok.AccessLevel;
Expand All @@ -23,7 +22,6 @@ public abstract class AbstractMqttPublishOutMessageHandler<U extends NetworkMqtt
implements MqttPublishOutMessageHandler {

Class<U> expectedUserType;
SubscriptionService subscriptionService;
MessageOutFactoryService messageOutFactoryService;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@
import javasabr.mqtt.network.session.NetworkMqttSession;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.RetainMessageService;
import javasabr.mqtt.service.SubscriptionService;

public class Qos0MqttPublishInMessageHandler extends AbstractMqttPublishInMessageHandler<ExternalNetworkMqttUser> {

public Qos0MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, subscriptionService, publishDeliveringService, messageOutFactoryService);
MessageOutFactoryService messageOutFactoryService,
RetainMessageService retainMessageService) {
super(
ExternalNetworkMqttUser.class,
subscriptionService,
publishDeliveringService,
messageOutFactoryService,
retainMessageService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import org.jspecify.annotations.Nullable;

public class Qos0MqttPublishOutMessageHandler
extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {
public class Qos0MqttPublishOutMessageHandler extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {

public Qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService);
public Qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, messageOutFactoryService);
}

@Override
Expand Down
Loading
Loading