Skip to content

Commit 653f1a1

Browse files
authored
Finalize extracting some API from network module to the model (#65)
1 parent 2770f2a commit 653f1a1

File tree

41 files changed

+240
-201
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+240
-201
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttNetworkSessionServiceTest.groovy renamed to application/src/test/groovy/javasabr/mqtt/broker/application/service/NetworkMqttSessionServiceTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import javasabr.mqtt.service.ClientIdRegistry
66
import javasabr.mqtt.service.session.MqttSessionService
77
import org.springframework.beans.factory.annotation.Autowired
88

9-
class MqttNetworkSessionServiceTest extends IntegrationSpecification {
9+
class NetworkMqttSessionServiceTest extends IntegrationSpecification {
1010

1111
@Autowired
1212
ClientIdRegistry clientIdRegistry

core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
44
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
5+
import javasabr.mqtt.model.session.MqttSession;
56
import javasabr.mqtt.model.subscriber.SingleSubscriber;
67
import javasabr.mqtt.model.subscriber.Subscriber;
78
import javasabr.mqtt.model.subscription.Subscription;
89
import javasabr.mqtt.model.topic.TopicFilter;
910
import javasabr.mqtt.model.topic.TopicName;
10-
import javasabr.mqtt.network.MqttNetworkSession;
1111
import javasabr.mqtt.network.user.NetworkMqttUser;
1212
import javasabr.rlib.collections.array.Array;
1313
import javasabr.rlib.collections.array.MutableArray;
@@ -32,7 +32,7 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
3232
* @param subscriptions the list of request to subscribe topics
3333
* @return array of subscribe ack reason codes
3434
*/
35-
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttNetworkSession session, Array<Subscription> subscriptions);
35+
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttSession session, Array<Subscription> subscriptions);
3636

3737
/**
3838
* Removes MQTT client from listening to the topics.
@@ -41,9 +41,9 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
4141
* @param topicFilters topic filters
4242
* @return array of unsubscribe ack reason codes
4343
*/
44-
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttNetworkSession session, Array<TopicFilter> topicFilters);
44+
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttSession session, Array<TopicFilter> topicFilters);
4545

46-
void cleanSubscriptions(NetworkMqttUser user, MqttNetworkSession session);
46+
void cleanSubscriptions(NetworkMqttUser user, MqttSession session);
4747

48-
void restoreSubscriptions(NetworkMqttUser user, MqttNetworkSession session);
48+
void restoreSubscriptions(NetworkMqttUser user, MqttSession session);
4949
}

core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package javasabr.mqtt.service.handler.client;
22

33
import javasabr.mqtt.model.MqttClientConnectionConfig;
4-
import javasabr.mqtt.network.MqttNetworkSession;
54
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
65
import javasabr.mqtt.network.impl.AbstractNetworkMqttUser;
6+
import javasabr.mqtt.network.session.NetworkMqttSession;
77
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
88
import javasabr.mqtt.service.ClientIdRegistry;
99
import javasabr.mqtt.service.SubscriptionService;
@@ -43,7 +43,7 @@ protected Mono<?> releaseImpl(T user) {
4343
return Mono.empty();
4444
}
4545

46-
MqttNetworkSession session = user.session();
46+
NetworkMqttSession session = user.session();
4747
Mono<?> asyncActions = null;
4848

4949
if (session != null) {

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ public DefaultConnectionService(Class<? extends NetworkMqttUser> expectedUserTyp
3535
var inMessageHandlers = new MqttInMessageHandler[highestPacketType + 1];
3636

3737
for (MqttInMessageHandler knownInMessageHandler : knownInMessageHandlers) {
38-
Class<? extends NetworkMqttUser> clientType = knownInMessageHandler.expectedUserType();
39-
if (!expectedUserType.isAssignableFrom(clientType)) {
38+
Class<? extends NetworkMqttUser> userType = knownInMessageHandler.expectedUserType();
39+
if (!expectedUserType.isAssignableFrom(userType)) {
4040
continue;
4141
}
4242
MqttMessageType messageType = knownInMessageHandler.messageType();
@@ -106,7 +106,7 @@ protected void processReceivedInvalidMessage(
106106
}
107107

108108
private static String buildServiceDescription(
109-
Class<? extends NetworkMqttUser> expectedClientType,
109+
Class<? extends NetworkMqttUser> expectedUserType,
110110
@Nullable MqttInMessageHandler[] inMessageHandlers) {
111111
var builder = new StringBuilder();
112112
builder.append("{\n");
@@ -131,6 +131,6 @@ private static String buildServiceDescription(
131131
.append("\n}");
132132

133133
return "Registered [%d] for [%s] MqttInMessageHandlers: %s"
134-
.formatted(count, expectedClientType.getSimpleName(), builder);
134+
.formatted(count, expectedUserType.getSimpleName(), builder);
135135
}
136136
}

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultMessageOutFactoryService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import javasabr.mqtt.model.MqttClientConnectionConfig;
55
import javasabr.mqtt.model.MqttVersion;
66
import javasabr.mqtt.network.MqttConnection;
7-
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
87
import javasabr.mqtt.network.user.NetworkMqttUser;
98
import javasabr.mqtt.service.MessageOutFactoryService;
109
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
@@ -45,10 +44,7 @@ public DefaultMessageOutFactoryService(Collection<? extends MqttMessageOutFactor
4544

4645
@Override
4746
public MqttMessageOutFactory resolveFactory(NetworkMqttUser user) {
48-
if (user instanceof ConfigurableNetworkMqttUser configurableUser) {
49-
return resolveFactory(configurableUser.connection());
50-
}
51-
throw new IllegalArgumentException("Unsupported user: " + user);
47+
return resolveFactory(user.connection());
5248
}
5349

5450
@Override

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import javasabr.mqtt.model.reason.code.SubscribeAckReasonCode;
88
import javasabr.mqtt.model.reason.code.UnsubscribeAckReasonCode;
99
import javasabr.mqtt.model.session.ActiveSubscriptions;
10+
import javasabr.mqtt.model.session.MqttSession;
1011
import javasabr.mqtt.model.subscriber.SingleSubscriber;
1112
import javasabr.mqtt.model.subscriber.Subscriber;
1213
import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree;
1314
import javasabr.mqtt.model.subscription.Subscription;
1415
import javasabr.mqtt.model.topic.SharedTopicFilter;
1516
import javasabr.mqtt.model.topic.TopicFilter;
1617
import javasabr.mqtt.model.topic.TopicName;
17-
import javasabr.mqtt.network.MqttNetworkSession;
1818
import javasabr.mqtt.network.user.NetworkMqttUser;
1919
import javasabr.mqtt.service.SubscriptionService;
2020
import javasabr.rlib.collections.array.Array;
@@ -55,7 +55,7 @@ public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber>
5555
@Override
5656
public Array<SubscribeAckReasonCode> subscribe(
5757
NetworkMqttUser user,
58-
MqttNetworkSession session,
58+
MqttSession session,
5959
Array<Subscription> subscriptions) {
6060

6161
MutableArray<SubscribeAckReasonCode> subscribeResults = ArrayFactory.mutableArray(
@@ -69,7 +69,7 @@ public Array<SubscribeAckReasonCode> subscribe(
6969
return subscribeResults;
7070
}
7171

72-
private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttNetworkSession session, Subscription subscription) {
72+
private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttSession session, Subscription subscription) {
7373
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
7474
TopicFilter topicFilter = subscription.topicFilter();
7575
if (topicFilter.isInvalid()) {
@@ -91,7 +91,7 @@ private SubscribeAckReasonCode addSubscription(NetworkMqttUser user, MqttNetwork
9191
@Override
9292
public Array<UnsubscribeAckReasonCode> unsubscribe(
9393
NetworkMqttUser user,
94-
MqttNetworkSession session,
94+
MqttSession session,
9595
Array<TopicFilter> topicFilters) {
9696

9797
MutableArray<UnsubscribeAckReasonCode> unsubscribeResults = ArrayFactory.mutableArray(
@@ -105,7 +105,7 @@ public Array<UnsubscribeAckReasonCode> unsubscribe(
105105
return unsubscribeResults;
106106
}
107107

108-
private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttNetworkSession session, TopicFilter topicFilter) {
108+
private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttSession session, TopicFilter topicFilter) {
109109
if (topicFilter.isInvalid()) {
110110
return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID;
111111
} else if (subscriberTree.unsubscribe(user, topicFilter)) {
@@ -119,7 +119,7 @@ private UnsubscribeAckReasonCode removeSubscription(NetworkMqttUser user, MqttNe
119119
}
120120

121121
@Override
122-
public void cleanSubscriptions(NetworkMqttUser user, MqttNetworkSession session) {
122+
public void cleanSubscriptions(NetworkMqttUser user, MqttSession session) {
123123
Array<Subscription> subscriptions = session
124124
.activeSubscriptions()
125125
.subscriptions();
@@ -129,7 +129,7 @@ public void cleanSubscriptions(NetworkMqttUser user, MqttNetworkSession session)
129129
}
130130

131131
@Override
132-
public void restoreSubscriptions(NetworkMqttUser user, MqttNetworkSession session) {
132+
public void restoreSubscriptions(NetworkMqttUser user, MqttSession session) {
133133
Array<Subscription> subscriptions = session
134134
.activeSubscriptions()
135135
.subscriptions();

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import javasabr.mqtt.model.exception.MalformedProtocolMqttException;
44
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
55
import javasabr.mqtt.network.MqttConnection;
6-
import javasabr.mqtt.network.MqttNetworkSession;
76
import javasabr.mqtt.network.message.in.MqttInMessage;
87
import javasabr.mqtt.network.message.out.MqttOutMessage;
8+
import javasabr.mqtt.network.session.NetworkMqttSession;
99
import javasabr.mqtt.network.user.NetworkMqttUser;
1010
import javasabr.mqtt.network.util.ExtraErrorReasons;
1111
import javasabr.mqtt.service.MessageOutFactoryService;
@@ -47,7 +47,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM
4747
U castedUser = expectedUser.cast(user);
4848
M castedMessage = expectedMessage.cast(mqttInMessage);
4949
if (requireSession()) {
50-
MqttNetworkSession session = user.session();
50+
NetworkMqttSession session = user.session();
5151
if (session == null) {
5252
log.warning(user.clientId(), "[%s] Session is already closed"::formatted);
5353
handleSessionIsAlreadyClosed(user);
@@ -72,7 +72,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI
7272
U castedUser = expectedUser.cast(user);
7373
M castedMessage = expectedMessage.cast(mqttInMessage);
7474
if (requireSession()) {
75-
MqttNetworkSession session = user.session();
75+
NetworkMqttSession session = user.session();
7676
if (session == null) {
7777
log.warning(user.clientId(), "[%s] Session is already closed"::formatted);
7878
handleSessionIsAlreadyClosed(user);
@@ -86,7 +86,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI
8686

8787
protected void processValidMessage(MqttConnection connection, U user, M message) {}
8888

89-
protected void processValidMessage(MqttConnection connection, U user, MqttNetworkSession session, M message) {}
89+
protected void processValidMessage(MqttConnection connection, U user, NetworkMqttSession session, M message) {}
9090

9191
protected boolean processInvalidMessage(MqttConnection connection, U user, M message) {
9292
Exception exception = message.exception();
@@ -97,7 +97,7 @@ protected boolean processInvalidMessage(MqttConnection connection, U user, M mes
9797
return false;
9898
}
9999

100-
protected boolean processInvalidMessage(MqttConnection connection, U user, MqttNetworkSession session, M message) {
100+
protected boolean processInvalidMessage(MqttConnection connection, U user, NetworkMqttSession session, M message) {
101101
Exception exception = message.exception();
102102
if (exception instanceof MalformedProtocolMqttException) {
103103
malformedProtocolError(connection, user, exception);
@@ -111,7 +111,7 @@ protected void malformedProtocolError(MqttConnection connection, U user, Excepti
111111
MqttOutMessage feedback = messageOutFactoryService
112112
.resolveFactory(user)
113113
.newDisconnect(user, DisconnectReasonCode.MALFORMED_PACKET, exception.getMessage());
114-
user.sendWithFeedback(feedback)
114+
user.send(feedback)
115115
.thenAccept(_ -> connection.close());
116116
}
117117

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
import javasabr.mqtt.model.message.MqttMessageType;
1919
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
2020
import javasabr.mqtt.network.MqttConnection;
21-
import javasabr.mqtt.network.MqttNetworkSession;
2221
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
2322
import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
2423
import javasabr.mqtt.network.message.out.MqttOutMessage;
24+
import javasabr.mqtt.network.session.NetworkMqttSession;
2525
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
2626
import javasabr.mqtt.service.AuthenticationService;
2727
import javasabr.mqtt.service.ClientIdRegistry;
@@ -85,7 +85,7 @@ protected void processValidMessage(
8585
}
8686

8787
private void reject(ExternalNetworkMqttUser user, ConnectAckReasonCode connectAckReasonCode) {
88-
user.send(messageOutFactoryService
88+
user.sendAsync(messageOutFactoryService
8989
.resolveFactory(user)
9090
.newConnectAck(user, connectAckReasonCode));
9191
}
@@ -180,7 +180,7 @@ private void resolveClientConnectionConfig(ConfigurableNetworkMqttUser user, Con
180180
private Mono<Boolean> onConnected(
181181
ConfigurableNetworkMqttUser user,
182182
ConnectMqttInMessage message,
183-
MqttNetworkSession session,
183+
NetworkMqttSession session,
184184
boolean sessionRestored) {
185185

186186
MqttConnection connection = user.connection();
@@ -209,11 +209,11 @@ private Mono<Boolean> onConnected(
209209
subscriptionService.restoreSubscriptions(user, session);
210210

211211
return Mono.fromFuture(user
212-
.sendWithFeedback(connectAck)
212+
.send(connectAck)
213213
.thenApply(result -> onSentConnAck(user, session, result)));
214214
}
215215

216-
private boolean onSentConnAck(ConfigurableNetworkMqttUser user, MqttNetworkSession session, boolean result) {
216+
private boolean onSentConnAck(ConfigurableNetworkMqttUser user, NetworkMqttSession session, boolean result) {
217217

218218
if (!result) {
219219
log.warning(user.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted);
@@ -228,7 +228,7 @@ private boolean onSentConnAck(ConfigurableNetworkMqttUser user, MqttNetworkSessi
228228
protected boolean processInvalidMessage(
229229
MqttConnection connection,
230230
ExternalNetworkMqttUser user,
231-
MqttNetworkSession session,
231+
NetworkMqttSession session,
232232
ConnectMqttInMessage message) {
233233
Exception exception = message.exception();
234234
if (exception instanceof ConnectionRejectException cre) {

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import javasabr.mqtt.model.message.MqttMessageType;
44
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
55
import javasabr.mqtt.network.MqttConnection;
6-
import javasabr.mqtt.network.MqttNetworkSession;
76
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
87
import javasabr.mqtt.network.message.in.DisconnectMqttInMessage;
8+
import javasabr.mqtt.network.session.NetworkMqttSession;
99
import javasabr.mqtt.service.MessageOutFactoryService;
1010
import lombok.CustomLog;
1111

@@ -25,7 +25,7 @@ public MqttMessageType messageType() {
2525
protected void processValidMessage(
2626
MqttConnection connection,
2727
ExternalNetworkMqttUser user,
28-
MqttNetworkSession session,
28+
NetworkMqttSession session,
2929
DisconnectMqttInMessage message) {
3030
DisconnectReasonCode reasonCode = message.reasonCode();
3131
if (reasonCode == DisconnectReasonCode.NORMAL_DISCONNECTION) {

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import javasabr.mqtt.model.message.TrackableMqttMessage;
44
import javasabr.mqtt.network.MqttConnection;
5-
import javasabr.mqtt.network.MqttNetworkSession;
65
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
76
import javasabr.mqtt.network.message.in.MqttInMessage;
7+
import javasabr.mqtt.network.session.NetworkMqttSession;
88
import javasabr.mqtt.service.MessageOutFactoryService;
99

1010
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMqttMessage>
@@ -20,7 +20,7 @@ protected PendingOutResponseMqttInMessageHandler(
2020
protected void processValidMessage(
2121
MqttConnection connection,
2222
ExternalNetworkMqttUser user,
23-
MqttNetworkSession session,
23+
NetworkMqttSession session,
2424
P message) {
2525
session.updateOutPendingPacket(user, message);
2626
}

0 commit comments

Comments
 (0)