Skip to content
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
26129f5
[broker-30] Implement delivering retained messages
crazyrokr Nov 19, 2025
c6ada4d
[broker-30] Rewrite retained messages collecting
crazyrokr Nov 19, 2025
94b32db
Merge branch 'develop' into feature-broker-30
crazyrokr Nov 19, 2025
01bac5b
[broker-30] Cleanup code after merge
crazyrokr Nov 19, 2025
c903510
[broker-30] Fix corner cases in retained messages
crazyrokr Nov 20, 2025
e1f5c2a
[broker-30] Handle SubscribeRetainHandling
crazyrokr Nov 20, 2025
a8829ac
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
a1b87dd
[broker-30] Fix build
crazyrokr Dec 1, 2025
fd848eb
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
be8070b
[broker-30] SubscriberNode refactoring
crazyrokr Dec 2, 2025
0c219dd
[broker-30] Revert wrong access level modifier
crazyrokr Dec 2, 2025
62900ca
[broker-30] Avoid tree branch locking
crazyrokr Dec 3, 2025
dc1d443
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 6, 2025
7314ed1
[broker-30] Introduce AbstractTrieNode
crazyrokr Dec 7, 2025
901e865
[broker-30] Remove redundant code from SubscriberNode
crazyrokr Dec 7, 2025
cc91db2
[broker-30] Revert redundant changes
crazyrokr Dec 7, 2025
42af014
[broker-30] Improve AbstractTrieNode
crazyrokr Dec 7, 2025
b1aa456
[broker-30] Implement retainAsPublished support
crazyrokr Dec 7, 2025
f2f2f67
[broker-30] Update debug message
crazyrokr Dec 7, 2025
bf837eb
[broker-30] Small refactoring
crazyrokr Dec 7, 2025
cc08339
[broker-30] Avoid double message retaining
crazyrokr Dec 7, 2025
bacefb7
[broker-30] Add more tests
crazyrokr Dec 8, 2025
c2ac2ad
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 8, 2025
1d37242
[broker-30] Fix tests
crazyrokr Dec 8, 2025
c3ab5c5
[broker-30] Fix retainAsPublished logic
crazyrokr Dec 8, 2025
b829090
[broker-30] Add tests
crazyrokr Dec 8, 2025
26f45dc
[broker-30] Introduce RetainMessageService
crazyrokr Dec 8, 2025
6c16962
[broker-30] Rework retainAsPublished handling
crazyrokr Dec 9, 2025
12f8069
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 9, 2025
31ff664
[broker-30] Fix formatting
crazyrokr Dec 9, 2025
8191bc8
[broker-30] Improve RetainedMessageTreeTest
crazyrokr Dec 9, 2025
c7c72d3
[broker-30] Improve code readability
crazyrokr Dec 9, 2025
8a23e97
[broker-30] Extract creation of SingleSubscriber to SubscriptionService
crazyrokr Dec 9, 2025
57eeb77
[broker-30] Revert unnecessary changes
crazyrokr Dec 9, 2025
28e73a5
[broker-30] Fix QoS comparison
crazyrokr Dec 9, 2025
2fb16a5
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 10, 2025
f4a6727
[broker-30] Update tests
crazyrokr Dec 10, 2025
751624f
[broker-30] Introduce setRetainedMessage and clearRetainedMessage
crazyrokr Dec 10, 2025
649f57e
[broker-30] Move publish.retained() check to caller
crazyrokr Dec 10, 2025
b4e1232
[broker-30] Extract isRetainHandlingSatisfied() method
crazyrokr Dec 10, 2025
1f3d50a
[broker-30] Revert formatting
crazyrokr Dec 10, 2025
588aae4
[broker-30] Revert SubscriberNode
crazyrokr Dec 10, 2025
07ce54d
[broker-30] Reduce memory allocation of RetainedMessageNode
crazyrokr Dec 11, 2025
3163a8a
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 12, 2025
17359d3
[broker-30] Refactoring
crazyrokr Dec 12, 2025
ffb1c83
[broker-30] Update tests
crazyrokr Dec 12, 2025
a568d4f
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 14, 2025
94b7b17
[broker-30] Move logic from subscription service to subscription handler
crazyrokr Dec 14, 2025
e7c01a9
[broker-30] Separate addRetainedMessage and removeRetainedMessage API
crazyrokr Dec 15, 2025
d6f66eb
[broker-30] Use ArrayBuilder in getRetainedMessages API
crazyrokr Dec 15, 2025
bc83b99
Merge remote-tracking branch 'origin/feature-broker-30' into feature-…
crazyrokr Dec 15, 2025
595508e
[broker-30] Reduce memory allocation in runtime
crazyrokr Dec 15, 2025
c8b63fd
[broker-30] Apply formatting
crazyrokr Dec 15, 2025
b08fedb
[broker-30] Remove redundant method
crazyrokr Dec 15, 2025
1d2f71f
[broker-30] Refactoring
crazyrokr Dec 15, 2025
5bbbb84
[broker-30] Remove redundant method
crazyrokr Dec 16, 2025
9dad98a
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 17, 2025
24abf4d
[broker-30] Exclude SingleSubscriber from publish delivering process
crazyrokr Dec 18, 2025
a0b4c30
[broker-30] Rename `registerRetainMessage` to `retain`
crazyrokr Dec 18, 2025
561d970
[broker-30] Return previousSubscription instead of isSubscriptionAlre…
crazyrokr Dec 19, 2025
b2f4470
[broker-30] Rename lookupRetainedMessages to findRetainedMessages
crazyrokr Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acl-groovy-dsl/src/main/resources/acl.gdsl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ contributor(context(scope: scriptScope())) {
&& !enclosingCall("allOf") && !enclosingCall("anyOf")) {
method name: 'topicFilter',
type: 'javasabr.mqtt.service.acl.builder.SubscribeRuleBuilder',
params: [string: 'javasabr.mqtt.model.acl.matcher.ValueMatcher<String>...'],
params: [string: 'javasabr.mqtt.model.acl.matcher.TopicFilterMatcher...'],
doc: 'Set of topic filters matching by rule'
method name: 'match',
type: 'javasabr.mqtt.model.acl.matcher.TopicFilterMatcher',
Expand Down
2 changes: 1 addition & 1 deletion application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ tasks.withType(GroovyCompile).configureEach {
configurations.each {
it.exclude group: "org.slf4j", module: "slf4j-log4j12"
it.exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ CredentialSource credentialSource(
@Bean
AuthenticationService authenticationService(
CredentialSource credentialSource,
@Value("${authentication.allow.anonymous:false}") boolean allowAnonymousAuth) {
@Value("${authentication.allow.anonymous:false}")
boolean allowAnonymousAuth) {
return new SimpleAuthenticationService(credentialSource, allowAnonymousAuth);
}

@Bean
SubscriptionService subscriptionService() {
return new InMemorySubscriptionService();
SubscriptionService subscriptionService(PublishDeliveringService publishDeliveringService) {
return new InMemorySubscriptionService(publishDeliveringService);
}

@Bean
Expand Down Expand Up @@ -153,10 +154,7 @@ MqttInMessageHandler publishMqttInMessageHandler(
PublishReceivingService publishReceivingService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new PublishMqttInMessageHandler(
publishReceivingService,
messageOutFactoryService,
topicService);
return new PublishMqttInMessageHandler(publishReceivingService, messageOutFactoryService, topicService);
}

@Bean
Expand Down Expand Up @@ -187,10 +185,7 @@ MqttInMessageHandler unsubscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new UnsubscribeMqttInMessageHandler(
subscriptionService,
messageOutFactoryService,
topicService);
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
}

@Bean
Expand All @@ -199,24 +194,18 @@ ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessa
}

@Bean
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
Expand All @@ -230,32 +219,23 @@ MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos0MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos1MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
SubscriptionService subscriptionService,
PublishDeliveringService publishDeliveringService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishInMessageHandler(
subscriptionService,
publishDeliveringService,
messageOutFactoryService);
return new Qos2MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
}

@Bean
Expand All @@ -280,22 +260,10 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
"mqtt.external.connection.max.message.size",
int.class,
MqttProperties.MAXIMUM_MESSAGE_SIZE_DEFAULT),
env.getProperty(
"mqtt.external.connection.max.string.length",
int.class,
MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty(
"mqtt.external.connection.max.binary.size",
int.class,
MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty(
"mqtt.external.connection.max.topic.levels",
int.class,
MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty(
"mqtt.external.connection.min.keep.alive",
int.class,
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
env.getProperty("mqtt.external.connection.max.string.length", int.class, MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty("mqtt.external.connection.max.binary.size", int.class, MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty("mqtt.external.connection.max.topic.levels", int.class, MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty("mqtt.external.connection.min.keep.alive", int.class, MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
env.getProperty(
"mqtt.external.connection.receive.maximum",
int.class,
Expand Down Expand Up @@ -360,7 +328,8 @@ NetworkMqttUserFactory externalClientFactory(NetworkMqttUserReleaseHandler exter
MqttConnectionFactory externalConnectionFactory(
MqttServerConnectionConfig externalServerConnectionConfig,
NetworkMqttUserFactory mqttUserFactory,
@Value("${mqtt.external.connection.max.packets.by.read:100}") int maxPacketsByRead) {
@Value("${mqtt.external.connection.max.packets.by.read:100}")
int maxPacketsByRead) {
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, mqttUserFactory, maxPacketsByRead);
}

Expand Down
3 changes: 2 additions & 1 deletion core-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ dependencies {

testImplementation projects.testSupport
testImplementation testFixtures(projects.network)
}
testImplementation testFixtures(projects.model)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;

public interface PublishDeliveringService {

PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);

Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Subscription service
*/
public interface SubscriptionService {

default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package javasabr.mqtt.service.impl;

import java.util.Collection;
import java.util.function.Function;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;
Expand All @@ -18,6 +23,7 @@ public class DefaultPublishDeliveringService implements PublishDeliveringService

@Nullable
MqttPublishOutMessageHandler[] publishOutMessageHandlers;
ConcurrentRetainedMessageTree retainedMessageTree;

public DefaultPublishDeliveringService(
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
Expand All @@ -39,13 +45,34 @@ public DefaultPublishDeliveringService(
}
handlers[qos.level()] = knownPublishOutHandler;
}

this.retainedMessageTree = new ConcurrentRetainedMessageTree();
this.publishOutMessageHandlers = handlers;
log.info(publishOutMessageHandlers, DefaultPublishDeliveringService::buildServiceDescription);
}

@Override
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
if (publish.retained()) {
Subscription subscription = subscriber.subscription();
boolean retainAsPublished = subscription.retainAsPublished();
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
retainedMessageTree.retainMessage(transformer.apply(publish));
}
return startDeliveringWithoutRetain(publish, subscriber);
}

@Override
public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber) {
Subscription subscription = subscriber.subscription();
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
for (Publish message : retainedMessages) {
result.add(startDeliveringWithoutRetain(message, subscriber));
}
return Array.copyOf(result);
}

private PublishHandlingResult startDeliveringWithoutRetain(Publish publish, SingleSubscriber subscriber) {
try {
//noinspection DataFlowIssue
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package javasabr.mqtt.service.impl;

import static javasabr.mqtt.model.SubscribeRetainHandling.SEND;
import static javasabr.mqtt.model.SubscribeRetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.NO_SUBSCRIPTION_EXISTED;
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.SUCCESS;

import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
import javasabr.mqtt.model.session.ActiveSubscriptions;
Expand All @@ -15,7 +18,9 @@
import javasabr.mqtt.model.topic.SharedTopicFilter;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.ArrayFactory;
import javasabr.rlib.collections.array.MutableArray;
Expand All @@ -30,10 +35,12 @@
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class InMemorySubscriptionService implements SubscriptionService {

PublishDeliveringService publishDeliveringService;
ConcurrentSubscriberTree subscriberTree;

public InMemorySubscriptionService() {
public InMemorySubscriptionService(PublishDeliveringService publishDeliveringService) {
this.subscriberTree = new ConcurrentSubscriberTree();
this.publishDeliveringService = publishDeliveringService;
}

@Override
Expand Down Expand Up @@ -75,8 +82,14 @@ private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession sessio
if (previous != null) {
activeSubscriptions.remove(previous.subscription());
}
QoS subscriptionQoS = subscription.qos();
if (subscriptionQoS.ordinal() <= 2 && (subscription.retainHandling() == SEND ||
(subscription.retainHandling() == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previous == null))) {
sendRetainedMessages(user, subscription);
}
activeSubscriptions.add(subscription);
return subscription.qos().subscribeAckReasonCode();
return subscriptionQoS
.subscribeAckReasonCode();
}

@Override
Expand Down Expand Up @@ -128,4 +141,24 @@ public void restoreSubscriptions(MqttUser user, MqttSession session) {
subscriberTree.subscribe(user, subscription);
}
}

private void sendRetainedMessages(MqttUser user, Subscription subscription) {
int count = 0;
String clientId = user.clientId();
PublishHandlingResult errorResult = null;
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
var results = publishDeliveringService.deliverRetainedMessages(singleSubscriber);
for (PublishHandlingResult result : results) {
if (result.error()) {
errorResult = result;
} else if (result == PublishHandlingResult.SUCCESS) {
count++;
}
if (errorResult != null) {
log.debug(clientId, errorResult, "[%s] Error occurred [%s] during sending retained messages"::formatted);
} else {
log.debug(clientId, count, "[%s] Delivering of [%s] retained message has been started"::formatted);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import lombok.AccessLevel;
Expand All @@ -23,7 +22,6 @@ public abstract class AbstractMqttPublishOutMessageHandler<U extends NetworkMqtt
implements MqttPublishOutMessageHandler {

Class<U> expectedUserType;
SubscriptionService subscriptionService;
MessageOutFactoryService messageOutFactoryService;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import org.jspecify.annotations.Nullable;

public class Qos0MqttPublishOutMessageHandler
extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {
public class Qos0MqttPublishOutMessageHandler extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {

public Qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService);
public Qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, messageOutFactoryService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.message.in.PublishAckMqttInMessage;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import lombok.CustomLog;
import org.jspecify.annotations.Nullable;

@CustomLog
public class Qos1MqttPublishOutMessageHandler extends TrackableMqttPublishOutMessageHandler {

public Qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(subscriptionService, messageOutFactoryService);
public Qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(messageOutFactoryService);
}

@Override
Expand Down
Loading
Loading