Skip to content

Commit f406600

Browse files
authored
Improve publish delivering to subscribers, part 1 (#68)
1 parent c73dc2a commit f406600

File tree

48 files changed

+536
-384
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+536
-384
lines changed

acl-groovy-dsl/src/main/groovy/javasabr/mqtt/service/acl/AclRulesLoader.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package javasabr.mqtt.service.acl
22

3-
import groovy.transform.Field
3+
44
import javasabr.mqtt.model.acl.Operation
55
import javasabr.mqtt.model.acl.rule.Rule
66
import javasabr.mqtt.model.exception.AclConfigurationException

acl-groovy-dsl/src/main/java/javasabr/mqtt/model/acl/rule/AllowSubscribeRule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import static javasabr.mqtt.model.acl.Action.ALLOW;
44
import static javasabr.mqtt.model.acl.Operation.SUBSCRIBE;
55

6-
import javasabr.mqtt.model.MqttUser;
76
import javasabr.mqtt.model.acl.Action;
87
import javasabr.mqtt.model.acl.Operation;
98
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
109
import javasabr.mqtt.model.acl.condition.TopicCondition;
11-
import javasabr.mqtt.model.topic.AbstractTopic;
1210
import lombok.EqualsAndHashCode;
1311

1412
@EqualsAndHashCode(callSuper = true)

acl-groovy-dsl/src/main/java/javasabr/mqtt/model/acl/rule/DenyPublishRule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import static javasabr.mqtt.model.acl.Action.DENY;
44
import static javasabr.mqtt.model.acl.Operation.PUBLISH;
55

6-
import javasabr.mqtt.model.MqttUser;
76
import javasabr.mqtt.model.acl.Action;
87
import javasabr.mqtt.model.acl.Operation;
98
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
109
import javasabr.mqtt.model.acl.condition.TopicCondition;
11-
import javasabr.mqtt.model.topic.AbstractTopic;
1210
import lombok.EqualsAndHashCode;
1311

1412
@EqualsAndHashCode(callSuper = true)

acl-groovy-dsl/src/main/java/javasabr/mqtt/model/acl/rule/DenySubscribeRule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import static javasabr.mqtt.model.acl.Action.DENY;
44
import static javasabr.mqtt.model.acl.Operation.SUBSCRIBE;
55

6-
import javasabr.mqtt.model.MqttUser;
76
import javasabr.mqtt.model.acl.Action;
87
import javasabr.mqtt.model.acl.Operation;
98
import javasabr.mqtt.model.acl.condition.MqttUserCondition;
109
import javasabr.mqtt.model.acl.condition.TopicCondition;
11-
import javasabr.mqtt.model.topic.AbstractTopic;
1210
import lombok.EqualsAndHashCode;
1311

1412
@EqualsAndHashCode(callSuper = true)

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
303303
env.getProperty(
304304
"mqtt.external.connection.topic.alias.maximum",
305305
int.class,
306-
MqttProperties.TOPIC_ALIAS_MAX_DEFAULT),
306+
0),
307307
env.getProperty(
308308
"mqtt.external.connection.default.session.expiration.time",
309309
long.class,
@@ -319,15 +319,15 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
319319
env.getProperty(
320320
"mqtt.external.connection.retain.available",
321321
boolean.class,
322-
MqttProperties.RETAIN_AVAILABLE_DEFAULT),
322+
false), // set false because currently it's not implemented and we should not allow for clients to use it
323323
env.getProperty(
324324
"mqtt.external.connection.wildcard.subscription.available",
325325
boolean.class,
326326
MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT),
327327
env.getProperty(
328328
"mqtt.external.connection.subscription.id.available",
329329
boolean.class,
330-
MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT),
330+
false), // set false because currently it's not implemented and we should not allow for clients to use it
331331
env.getProperty(
332332
"mqtt.external.connection.shared.subscription.available",
333333
boolean.class,
Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
package javasabr.mqtt.service;
22

3+
import javasabr.mqtt.model.MqttUser;
34
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
45
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
56
import javasabr.mqtt.model.session.MqttSession;
67
import javasabr.mqtt.model.subscriber.SingleSubscriber;
7-
import javasabr.mqtt.model.subscriber.Subscriber;
88
import javasabr.mqtt.model.subscription.Subscription;
99
import javasabr.mqtt.model.topic.TopicFilter;
1010
import javasabr.mqtt.model.topic.TopicName;
11-
import javasabr.mqtt.network.user.NetworkMqttUser;
1211
import javasabr.rlib.collections.array.Array;
1312
import javasabr.rlib.collections.array.MutableArray;
1413

1514
/**
1615
* Subscription service
1716
*/
1817
public interface SubscriptionService {
19-
20-
NetworkMqttUser resolveClient(Subscriber subscriber);
21-
18+
2219
default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
2320
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
2421
}
@@ -32,7 +29,7 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
3229
* @param subscriptions the list of request to subscribe topics
3330
* @return array of subscribe ack reason codes
3431
*/
35-
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttSession session, Array<Subscription> subscriptions);
32+
Array<SubscribeAckReasonCode> subscribe(MqttUser user, MqttSession session, Array<Subscription> subscriptions);
3633

3734
/**
3835
* Removes MQTT client from listening to the topics.
@@ -41,9 +38,9 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
4138
* @param topicFilters topic filters
4239
* @return array of unsubscribe ack reason codes
4340
*/
44-
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttSession session, Array<TopicFilter> topicFilters);
41+
Array<UnsubscribeAckReasonCode> unsubscribe(MqttUser user, MqttSession session, Array<TopicFilter> topicFilters);
4542

46-
void cleanSubscriptions(NetworkMqttUser user, MqttSession session);
43+
void cleanSubscriptions(MqttUser user, MqttSession session);
4744

48-
void restoreSubscriptions(NetworkMqttUser user, MqttSession session);
45+
void restoreSubscriptions(MqttUser user, MqttSession session);
4946
}

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@
44
import static javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode.SUCCESS;
55

66
import javasabr.mqtt.model.MqttClientConnectionConfig;
7+
import javasabr.mqtt.model.MqttUser;
78
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
89
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
910
import javasabr.mqtt.model.session.ActiveSubscriptions;
1011
import javasabr.mqtt.model.session.MqttSession;
1112
import javasabr.mqtt.model.subscriber.SingleSubscriber;
12-
import javasabr.mqtt.model.subscriber.Subscriber;
1313
import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree;
1414
import javasabr.mqtt.model.subscription.Subscription;
1515
import javasabr.mqtt.model.topic.SharedTopicFilter;
1616
import javasabr.mqtt.model.topic.TopicFilter;
1717
import javasabr.mqtt.model.topic.TopicName;
18-
import javasabr.mqtt.network.user.NetworkMqttUser;
1918
import javasabr.mqtt.service.SubscriptionService;
2019
import javasabr.rlib.collections.array.Array;
2120
import javasabr.rlib.collections.array.ArrayFactory;
@@ -37,14 +36,6 @@ public InMemorySubscriptionService() {
3736
this.subscriberTree = new ConcurrentSubscriberTree();
3837
}
3938

40-
@Override
41-
public NetworkMqttUser resolveClient(Subscriber subscriber) {
42-
if (subscriber instanceof SingleSubscriber single) {
43-
return (NetworkMqttUser) single.user();
44-
}
45-
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
46-
}
47-
4839
@Override
4940
public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber> container, TopicName topicName) {
5041
Array<SingleSubscriber> matched = subscriberTree.matches(topicName);
@@ -54,7 +45,7 @@ public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber>
5445

5546
@Override
5647
public Array<SubscribeAckReasonCode> subscribe(
57-
NetworkMqttUser user,
48+
MqttUser user,
5849
MqttSession session,
5950
Array<Subscription> subscriptions) {
6051

@@ -69,7 +60,7 @@ public Array<SubscribeAckReasonCode> subscribe(
6960
return subscribeResults;
7061
}
7162

72-
private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession session, Subscription subscription) {
63+
private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession session, Subscription subscription) {
7364
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
7465
TopicFilter topicFilter = subscription.topicFilter();
7566
if (topicFilter.isInvalid()) {
@@ -90,7 +81,7 @@ private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession
9081

9182
@Override
9283
public Array<UnsubscribeAckReasonCode> unsubscribe(
93-
NetworkMqttUser user,
84+
MqttUser user,
9485
MqttSession session,
9586
Array<TopicFilter> topicFilters) {
9687

@@ -105,7 +96,7 @@ public Array<UnsubscribeAckReasonCode> unsubscribe(
10596
return unsubscribeResults;
10697
}
10798

108-
private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttSession session, TopicFilter topicFilter) {
99+
private UnsubscribeAckReasonCode removeSubscription(MqttUser user, MqttSession session, TopicFilter topicFilter) {
109100
if (topicFilter.isInvalid()) {
110101
return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID;
111102
} else if (subscriberTree.unsubscribe(user, topicFilter)) {
@@ -119,7 +110,7 @@ private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttSe
119110
}
120111

121112
@Override
122-
public void cleanSubscriptions(NetworkMqttUser user, MqttSession session) {
113+
public void cleanSubscriptions(MqttUser user, MqttSession session) {
123114
Array<Subscription> subscriptions = session
124115
.activeSubscriptions()
125116
.subscriptions();
@@ -129,7 +120,7 @@ public void cleanSubscriptions(NetworkMqttUser user, MqttSession session) {
129120
}
130121

131122
@Override
132-
public void restoreSubscriptions(NetworkMqttUser user, MqttSession session) {
123+
public void restoreSubscriptions(MqttUser user, MqttSession session) {
133124
Array<Subscription> subscriptions = session
134125
.activeSubscriptions()
135126
.subscriptions();

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected void malformedProtocolError(MqttConnection connection, U user, Excepti
111111
MqttOutMessage feedback = messageOutFactoryService
112112
.resolveFactory(user)
113113
.newDisconnect(user, DisconnectReasonCode.MALFORMED_PACKET, exception.getMessage());
114-
user.send(feedback)
114+
user.sendAsync(feedback)
115115
.thenAccept(_ -> connection.close());
116116
}
117117

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected void processValidMessage(
8585
}
8686

8787
private void reject(ExternalNetworkMqttUser user, ConnectAckReasonCode connectAckReasonCode) {
88-
user.sendAsync(messageOutFactoryService
88+
user.sendInBackground(messageOutFactoryService
8989
.resolveFactory(user)
9090
.newConnectAck(user, connectAckReasonCode));
9191
}
@@ -209,7 +209,7 @@ private Mono<Boolean> onConnected(
209209
subscriptionService.restoreSubscriptions(user, session);
210210

211211
return Mono.fromFuture(user
212-
.send(connectAck)
212+
.sendAsync(connectAck)
213213
.thenApply(result -> onSentConnAck(user, session, result)));
214214
}
215215

@@ -220,7 +220,7 @@ private boolean onSentConnAck(ConfigurableNetworkMqttUser user, NetworkMqttSessi
220220
return false;
221221
}
222222

223-
session.resendPendingPackets(user);
223+
session.resendNotConfirmedPublishesTo(user);
224224
return true;
225225
}
226226

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java renamed to core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ProcessingOutPublishesMqttInMessageHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package javasabr.mqtt.service.message.handler.impl;
22

33
import javasabr.mqtt.model.message.TrackableMqttMessage;
4+
import javasabr.mqtt.model.session.ProcessingPublishes;
45
import javasabr.mqtt.network.MqttConnection;
56
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
67
import javasabr.mqtt.network.message.in.MqttInMessage;
78
import javasabr.mqtt.network.session.NetworkMqttSession;
89
import javasabr.mqtt.service.MessageOutFactoryService;
910

10-
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMqttMessage>
11-
extends AbstractMqttInMessageHandler<ExternalNetworkMqttUser, P> {
11+
public abstract class ProcessingOutPublishesMqttInMessageHandler<M extends MqttInMessage & TrackableMqttMessage>
12+
extends AbstractMqttInMessageHandler<ExternalNetworkMqttUser, M> {
1213

13-
protected PendingOutResponseMqttInMessageHandler(
14-
Class<P> expectedNetworkPacket,
14+
protected ProcessingOutPublishesMqttInMessageHandler(
15+
Class<M> expectedNetworkPacket,
1516
MessageOutFactoryService messageOutFactoryService) {
1617
super(ExternalNetworkMqttUser.class, expectedNetworkPacket, messageOutFactoryService);
1718
}
@@ -21,7 +22,8 @@ protected void processValidMessage(
2122
MqttConnection connection,
2223
ExternalNetworkMqttUser user,
2324
NetworkMqttSession session,
24-
P message) {
25-
session.updateOutPendingPacket(user, message);
25+
M message) {
26+
ProcessingPublishes processingPublishes = session.outProcessingPublishes();
27+
processingPublishes.apply(user, message);
2628
}
2729
}

0 commit comments

Comments
 (0)