Skip to content

Commit 26f45dc

Browse files
committed
[broker-30] Introduce RetainMessageService
1 parent b829090 commit 26f45dc

18 files changed

+211
-133
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import javasabr.mqtt.service.MessageOutFactoryService;
1818
import javasabr.mqtt.service.PublishDeliveringService;
1919
import javasabr.mqtt.service.PublishReceivingService;
20+
import javasabr.mqtt.service.RetainMessageService;
2021
import javasabr.mqtt.service.SubscriptionService;
2122
import javasabr.mqtt.service.TopicService;
2223
import javasabr.mqtt.service.handler.client.ExternalNetworkMqttUserReleaseHandler;
@@ -25,6 +26,7 @@
2526
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
2627
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2728
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
29+
import javasabr.mqtt.service.impl.DefaultRetainMessageService;
2830
import javasabr.mqtt.service.impl.DefaultTopicService;
2931
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
3032
import javasabr.mqtt.service.impl.FileCredentialsSource;
@@ -99,8 +101,13 @@ AuthenticationService authenticationService(
99101
}
100102

101103
@Bean
102-
SubscriptionService subscriptionService(PublishDeliveringService publishDeliveringService) {
103-
return new InMemorySubscriptionService(publishDeliveringService);
104+
SubscriptionService subscriptionService(RetainMessageService retainMessageService) {
105+
return new InMemorySubscriptionService(retainMessageService);
106+
}
107+
108+
@Bean
109+
RetainMessageService retainMessageService(PublishDeliveringService publishDeliveringService) {
110+
return new DefaultRetainMessageService(publishDeliveringService);
104111
}
105112

106113
@Bean
@@ -218,24 +225,39 @@ PublishDeliveringService publishDeliveringService(
218225
MqttPublishInMessageHandler qos0MqttPublishInMessageHandler(
219226
SubscriptionService subscriptionService,
220227
PublishDeliveringService publishDeliveringService,
221-
MessageOutFactoryService messageOutFactoryService) {
222-
return new Qos0MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
228+
MessageOutFactoryService messageOutFactoryService,
229+
RetainMessageService retainMessageService) {
230+
return new Qos0MqttPublishInMessageHandler(
231+
subscriptionService,
232+
publishDeliveringService,
233+
messageOutFactoryService,
234+
retainMessageService);
223235
}
224236

225237
@Bean
226238
MqttPublishInMessageHandler qos1MqttPublishInMessageHandler(
227239
SubscriptionService subscriptionService,
228240
PublishDeliveringService publishDeliveringService,
229-
MessageOutFactoryService messageOutFactoryService) {
230-
return new Qos1MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
241+
MessageOutFactoryService messageOutFactoryService,
242+
RetainMessageService retainMessageService) {
243+
return new Qos1MqttPublishInMessageHandler(
244+
subscriptionService,
245+
publishDeliveringService,
246+
messageOutFactoryService,
247+
retainMessageService);
231248
}
232249

233250
@Bean
234251
MqttPublishInMessageHandler qos2MqttPublishInMessageHandler(
235252
SubscriptionService subscriptionService,
236253
PublishDeliveringService publishDeliveringService,
237-
MessageOutFactoryService messageOutFactoryService) {
238-
return new Qos2MqttPublishInMessageHandler(subscriptionService, publishDeliveringService, messageOutFactoryService);
254+
MessageOutFactoryService messageOutFactoryService,
255+
RetainMessageService retainMessageService) {
256+
return new Qos2MqttPublishInMessageHandler(
257+
subscriptionService,
258+
publishDeliveringService,
259+
messageOutFactoryService,
260+
retainMessageService);
239261
}
240262

241263
@Bean
@@ -268,10 +290,7 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
268290
"mqtt.external.connection.receive.maximum",
269291
int.class,
270292
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_DEFAULT),
271-
env.getProperty(
272-
"mqtt.external.connection.topic.alias.maximum",
273-
int.class,
274-
0),
293+
env.getProperty("mqtt.external.connection.topic.alias.maximum", int.class, 0),
275294
env.getProperty(
276295
"mqtt.external.connection.default.session.expiration.time",
277296
long.class,
@@ -284,18 +303,14 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
284303
"mqtt.external.connection.sessions.enabled",
285304
boolean.class,
286305
MqttProperties.SESSIONS_ENABLED_DEFAULT),
287-
env.getProperty(
288-
"mqtt.external.connection.retain.available",
289-
boolean.class,
290-
false), // set false because currently it's not implemented and we should not allow for clients to use it
306+
env.getProperty("mqtt.external.connection.retain.available", boolean.class, false),
307+
// set false because currently it's not implemented and we should not allow for clients to use it
291308
env.getProperty(
292309
"mqtt.external.connection.wildcard.subscription.available",
293310
boolean.class,
294311
MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT),
295-
env.getProperty(
296-
"mqtt.external.connection.subscription.id.available",
297-
boolean.class,
298-
false), // set false because currently it's not implemented and we should not allow for clients to use it
312+
env.getProperty("mqtt.external.connection.subscription.id.available", boolean.class, false),
313+
// set false because currently it's not implemented and we should not allow for clients to use it
299314
env.getProperty(
300315
"mqtt.external.connection.shared.subscription.available",
301316
boolean.class,

core-service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,4 @@
88
public interface PublishDeliveringService {
99

1010
PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);
11-
12-
Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber);
1311
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package javasabr.mqtt.service;
2+
3+
import javasabr.mqtt.model.publishing.Publish;
4+
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5+
import javasabr.mqtt.model.subscription.Subscription;
6+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
7+
import javasabr.rlib.collections.array.Array;
8+
9+
public interface RetainMessageService {
10+
11+
void retainMessage(Publish publish, Subscription subscription);
12+
13+
Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber);
14+
}

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
package javasabr.mqtt.service.impl;
22

33
import java.util.Collection;
4-
import java.util.function.Function;
54
import javasabr.mqtt.model.QoS;
65
import javasabr.mqtt.model.publishing.Publish;
76
import javasabr.mqtt.model.subscriber.SingleSubscriber;
8-
import javasabr.mqtt.model.subscription.Subscription;
9-
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
107
import javasabr.mqtt.service.PublishDeliveringService;
118
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
129
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
13-
import javasabr.rlib.collections.array.Array;
14-
import javasabr.rlib.collections.array.MutableArray;
1510
import lombok.AccessLevel;
1611
import lombok.CustomLog;
1712
import lombok.experimental.FieldDefaults;
@@ -23,7 +18,6 @@ public class DefaultPublishDeliveringService implements PublishDeliveringService
2318

2419
@Nullable
2520
MqttPublishOutMessageHandler[] publishOutMessageHandlers;
26-
ConcurrentRetainedMessageTree retainedMessageTree;
2721

2822
public DefaultPublishDeliveringService(
2923
Collection<? extends MqttPublishOutMessageHandler> knownPublishOutHandlers) {
@@ -45,34 +39,12 @@ public DefaultPublishDeliveringService(
4539
}
4640
handlers[qos.level()] = knownPublishOutHandler;
4741
}
48-
this.retainedMessageTree = new ConcurrentRetainedMessageTree();
4942
this.publishOutMessageHandlers = handlers;
5043
log.info(publishOutMessageHandlers, DefaultPublishDeliveringService::buildServiceDescription);
5144
}
5245

5346
@Override
5447
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
55-
if (publish.retained()) {
56-
Subscription subscription = subscriber.subscription();
57-
boolean retainAsPublished = subscription.retainAsPublished();
58-
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
59-
retainedMessageTree.retainMessage(transformer.apply(publish));
60-
}
61-
return startDeliveringWithoutRetain(publish, subscriber);
62-
}
63-
64-
@Override
65-
public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber) {
66-
Subscription subscription = subscriber.subscription();
67-
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
68-
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
69-
for (Publish message : retainedMessages) {
70-
result.add(startDeliveringWithoutRetain(message, subscriber));
71-
}
72-
return Array.copyOf(result);
73-
}
74-
75-
private PublishHandlingResult startDeliveringWithoutRetain(Publish publish, SingleSubscriber subscriber) {
7648
try {
7749
//noinspection DataFlowIssue
7850
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package javasabr.mqtt.service.impl;
2+
3+
import java.util.function.Function;
4+
import javasabr.mqtt.model.publishing.Publish;
5+
import javasabr.mqtt.model.subscriber.SingleSubscriber;
6+
import javasabr.mqtt.model.subscription.Subscription;
7+
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
8+
import javasabr.mqtt.service.PublishDeliveringService;
9+
import javasabr.mqtt.service.RetainMessageService;
10+
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
11+
import javasabr.rlib.collections.array.Array;
12+
import javasabr.rlib.collections.array.MutableArray;
13+
import lombok.AccessLevel;
14+
import lombok.experimental.FieldDefaults;
15+
16+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
17+
public class DefaultRetainMessageService implements RetainMessageService {
18+
19+
PublishDeliveringService defaultPublishDeliveringService;
20+
ConcurrentRetainedMessageTree retainedMessageTree;
21+
22+
public DefaultRetainMessageService(PublishDeliveringService defaultPublishDeliveringService) {
23+
this.defaultPublishDeliveringService = defaultPublishDeliveringService;
24+
this.retainedMessageTree = new ConcurrentRetainedMessageTree();
25+
}
26+
27+
@Override
28+
public void retainMessage(Publish publish, Subscription subscription) {
29+
if (publish.retained()) {
30+
boolean retainAsPublished = subscription.retainAsPublished();
31+
Function<Publish, Publish> transformer = retainAsPublished ? Function.identity() : Publish::withoutRetain;
32+
retainedMessageTree.retainMessage(transformer.apply(publish));
33+
}
34+
}
35+
36+
@Override
37+
public Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber) {
38+
Subscription subscription = subscriber.subscription();
39+
Array<Publish> retainedMessages = retainedMessageTree.getRetainedMessage(subscription.topicFilter());
40+
MutableArray<PublishHandlingResult> result = MutableArray.ofType(PublishHandlingResult.class);
41+
for (Publish message : retainedMessages) {
42+
result.add(defaultPublishDeliveringService.startDelivering(message, subscriber));
43+
}
44+
return Array.copyOf(result);
45+
}
46+
}

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import javasabr.mqtt.model.topic.SharedTopicFilter;
1919
import javasabr.mqtt.model.topic.TopicFilter;
2020
import javasabr.mqtt.model.topic.TopicName;
21-
import javasabr.mqtt.service.PublishDeliveringService;
21+
import javasabr.mqtt.service.RetainMessageService;
2222
import javasabr.mqtt.service.SubscriptionService;
2323
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
2424
import javasabr.rlib.collections.array.Array;
@@ -35,12 +35,12 @@
3535
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
3636
public class InMemorySubscriptionService implements SubscriptionService {
3737

38-
PublishDeliveringService publishDeliveringService;
38+
RetainMessageService retainMessageService;
3939
ConcurrentSubscriberTree subscriberTree;
4040

41-
public InMemorySubscriptionService(PublishDeliveringService publishDeliveringService) {
41+
public InMemorySubscriptionService(RetainMessageService retainMessageService) {
4242
this.subscriberTree = new ConcurrentSubscriberTree();
43-
this.publishDeliveringService = publishDeliveringService;
43+
this.retainMessageService = retainMessageService;
4444
}
4545

4646
@Override
@@ -147,7 +147,7 @@ private void sendRetainedMessages(MqttUser user, Subscription subscription) {
147147
String clientId = user.clientId();
148148
PublishHandlingResult errorResult = null;
149149
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
150-
var results = publishDeliveringService.deliverRetainedMessages(singleSubscriber);
150+
var results = retainMessageService.deliverRetainedMessages(singleSubscriber);
151151
for (PublishHandlingResult result : results) {
152152
if (result.error()) {
153153
errorResult = result;

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import javasabr.mqtt.network.user.NetworkMqttUser;
1111
import javasabr.mqtt.service.MessageOutFactoryService;
1212
import javasabr.mqtt.service.PublishDeliveringService;
13+
import javasabr.mqtt.service.RetainMessageService;
1314
import javasabr.mqtt.service.SubscriptionService;
1415
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
1516
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
@@ -29,6 +30,7 @@ public abstract class AbstractMqttPublishInMessageHandler<U extends NetworkMqttU
2930
SubscriptionService subscriptionService;
3031
PublishDeliveringService publishDeliveringService;
3132
MessageOutFactoryService messageOutFactoryService;
33+
RetainMessageService retainMessageService;
3234

3335
@Override
3436
public void handle(NetworkMqttUser user, Publish publish) {
@@ -103,6 +105,7 @@ protected PublishHandlingResult checkSubscriber(
103105
}
104106

105107
protected void startDelivering(Publish publish, SingleSubscriber subscriber) {
108+
retainMessageService.retainMessage(publish, subscriber.subscription());
106109
publishDeliveringService.startDelivering(publish, subscriber);
107110
}
108111

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishInMessageHandler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,22 @@
1010
import javasabr.mqtt.network.session.NetworkMqttSession;
1111
import javasabr.mqtt.service.MessageOutFactoryService;
1212
import javasabr.mqtt.service.PublishDeliveringService;
13+
import javasabr.mqtt.service.RetainMessageService;
1314
import javasabr.mqtt.service.SubscriptionService;
1415

1516
public class Qos0MqttPublishInMessageHandler extends AbstractMqttPublishInMessageHandler<ExternalNetworkMqttUser> {
1617

1718
public Qos0MqttPublishInMessageHandler(
1819
SubscriptionService subscriptionService,
1920
PublishDeliveringService publishDeliveringService,
20-
MessageOutFactoryService messageOutFactoryService) {
21-
super(ExternalNetworkMqttUser.class, subscriptionService, publishDeliveringService, messageOutFactoryService);
21+
MessageOutFactoryService messageOutFactoryService,
22+
RetainMessageService retainMessageService) {
23+
super(
24+
ExternalNetworkMqttUser.class,
25+
subscriptionService,
26+
publishDeliveringService,
27+
messageOutFactoryService,
28+
retainMessageService);
2229
}
2330

2431
@Override

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import javasabr.mqtt.network.session.NetworkMqttSession;
1212
import javasabr.mqtt.service.MessageOutFactoryService;
1313
import javasabr.mqtt.service.PublishDeliveringService;
14+
import javasabr.mqtt.service.RetainMessageService;
1415
import javasabr.mqtt.service.SubscriptionService;
1516
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1617
import lombok.AccessLevel;
@@ -24,8 +25,9 @@ public class Qos1MqttPublishInMessageHandler extends TrackableMqttPublishInMessa
2425
public Qos1MqttPublishInMessageHandler(
2526
SubscriptionService subscriptionService,
2627
PublishDeliveringService publishDeliveringService,
27-
MessageOutFactoryService messageOutFactoryService) {
28-
super(ExternalNetworkMqttUser.class, subscriptionService, publishDeliveringService, messageOutFactoryService);
28+
MessageOutFactoryService messageOutFactoryService,
29+
RetainMessageService retainMessageService) {
30+
super(ExternalNetworkMqttUser.class, subscriptionService, publishDeliveringService, messageOutFactoryService, retainMessageService);
2931
}
3032

3133
@Override

0 commit comments

Comments
 (0)