Skip to content

Conversation

@crazyrokr
Copy link
Collaborator

@crazyrokr crazyrokr commented Nov 19, 2025

Initial implementation of retain message feature

3 MQTT Control Packets
3.3 PUBLISH – Publish message
    └ 3.3.1 PUBLISH Fixed Header
        └ 3.3.1.3 RETAIN

  1. If a client publishes a message with RETAIN flag, then the message is retained within the corresponding topic name
  2. If a client subscribes to a topic filter, then all retained messages within topic names matching the topic filter are automatically released to the client
  3. If during the releasing of retained messages a target subscription has retainAsPublished=0 then RETAIN flag is set to 0 in the forwarder publish message

# Conflicts:
#	model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentRetainedMessageTree.java
#	model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java
#	model/src/main/java/javasabr/mqtt/model/subscriber/tree/RetainedMessageNode.java
#	model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java
#	model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java
#	model/src/main/java/javasabr/mqtt/model/subscribtion/tree/ConcurrentSubscriptionTree.java
#	model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterNode.java
#	model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterTreeBase.java
#	model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java
#	model/src/main/java/javasabr/mqtt/model/topic/tree/TopicNode.java
#	model/src/main/java/javasabr/mqtt/model/topic/tree/TopicTreeBase.java
#	model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy
#	service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
crazyrokr and others added 5 commits November 20, 2025 08:15
# Conflicts:
#	application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java
#	core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
#	core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandlerTest.groovy
@github-actions
Copy link

github-actions bot commented Dec 1, 2025

@crazyrokr crazyrokr linked an issue Dec 5, 2025 that may be closed by this pull request
# Conflicts:
#	core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java
#	core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
#	core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishOutMessageHandler.java
#	core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java
#	core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java
#	core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java
#	core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java
@crazyrokr crazyrokr marked this pull request as ready for review December 7, 2025 14:18
@crazyrokr crazyrokr requested a review from JavaSaBr December 7, 2025 16:04
@crazyrokr crazyrokr added enhancement New feature or request core labels Dec 7, 2025
publishReceivingService,
messageOutFactoryService,
topicService);
return new PublishMqttInMessageHandler(publishReceivingService, messageOutFactoryService, topicService);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong formating

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's after code reformatting with the latest code style from master https://github.com/JavaSaBr/settings/blob/master/intellij-idea/javasabr-intellij-codestyle.xml
Have you configured something else in code style?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

subscriptionService,
messageOutFactoryService,
topicService);
return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong formatting

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what exactly is wrong?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

env.getProperty("mqtt.external.connection.max.string.length", int.class, MqttProperties.MAXIMUM_STRING_LENGTH),
env.getProperty("mqtt.external.connection.max.binary.size", int.class, MqttProperties.MAXIMUM_BINARY_SIZE),
env.getProperty("mqtt.external.connection.max.topic.levels", int.class, MqttProperties.MAXIMUM_TOPIC_LEVELS),
env.getProperty("mqtt.external.connection.min.keep.alive", int.class, MqttProperties.SERVER_KEEP_ALIVE_DEFAULT),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad formatting

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what exactly is wrong?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);

Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PublishDeliveringService is responsible only for delivering a publish to a user. That is it.
I think you need to have a separated service like "RetainMessagesService" which will have a topic tree + with retain messages (instead of subscribers) and will use 'PublishDeliveringService' to deliver retain messages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
import org.jspecify.annotations.Nullable;

public abstract class AbstractTrieNode<T> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like typo -> AbstractTreeNode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually the name of a prefix tree: https://drstearns.github.io/tutorials/trie/

return;
}
if (level == lastLevel) {
if (level == lastLevel || TopicFilter.MULTI_LEVEL_WILDCARD.equals(segment)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to have it here?

Copy link
Collaborator Author

@crazyrokr crazyrokr Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition allows us to merge the exactlyTopicMatch method with the following one

private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) {
SubscriberNode subscriberNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
if (subscriberNode != null) {
appendSubscribersTo(result, subscriberNode);
}
}

I think in this case one method is better than the three, don't you think so?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr
I isolated 3 cases separately because:
a. Avoid one extra check for each segment (you did it 3 times per node, for deep trees it produces 100+ extra checks)
b. To have the clear separation of looking for subscribers because the exactly match and the single wildcard can be tuned in different ways

The TopcTree is generaly one of the high loaded data structures in our project.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, I believe your approach reduces readability and increases cognitive load. However, the performance-improvement argument may be valid, and if you compare the performance of the two approaches, I'd be happy to see the results.

} else if (level < lastLevel) {
subscriberNode.matchesTo(level + 1, topicName, lastLevel, result);
}
collectMatchingSubscribers(topicName.segment(level), level, topicName, lastLevel, container);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you returned the old way which I specially removed :D

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a reduction of duplication, and it seems more readable to me

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr I had clearly separated 3 matching methods, it cannot be less readable... more code? yes, but not less readable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A developer has to keep in mind 3 different cases with your approach, despite the fact that these three cases differ only slightly. I regard this as duplication of trivial code.


if (existeQos.ordinal() < candidateQos.ordinal()) {
QoS existedQos = result
.get(found)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong formattimg

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it seems like the following option has to be set to 1
<option name="METHOD_CALL_CHAIN_WRAP" value="2" />
https://github.com/JavaSaBr/settings/blob/28bafa79d458c1cbeb76d6c8e24980ce81a0c491/intellij-idea/javasabr-intellij-codestyle.xml#L80-L80

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ArrayFactory.mutableArray(RetainedMessageNode.class);
}

final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have only 1 message per topic? I think here should be a list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just a message per topic name, if I haven't messed anything up
please check yourself https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104

Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
Repository owner deleted a comment from github-actions bot Dec 9, 2025
@crazyrokr crazyrokr requested a review from JavaSaBr December 9, 2025 14:36
# Conflicts:
#	application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java
#	core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class DefaultRetainMessageService implements RetainMessageService {

PublishDeliveringService defaultPublishDeliveringService;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultPublishDeliveringService -> publishDeliveringService

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

@Override
public void retainMessage(Publish publish) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to check it inside this method, the caller shuold check it before calling

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

if (previousSubscriber != null) {
activeSubscriptions.remove(previousSubscriber.subscription());
}
QoS subscriptionQoS = subscription.qos();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least I think it should be extracted to a method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@Nullable
public SingleSubscriber subscribe(MqttUser user, Subscription subscription) {
return rootNode.subscribe(0, user, subscription, subscription.topicFilter());
public SingleSubscriber subscribe(SingleSubscriber subscriber) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you break the abstraction

Copy link
Collaborator Author

@crazyrokr crazyrokr Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe SingleSubscriber has to be created by SubscriptionService, not by SubscriberTree

var child = getOrCreateChildNode(topicName.segment(level));
boolean isLastLevel = (level + 1 == topicName.levelsCount());
if (isLastLevel) {
child.retainedMessage.set(message.payload().length == 0 ? null : message);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the message should be done by a separated method, without direct external access

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Retained Messages

3 participants