Skip to content

Commit 9466c81

Browse files
authored
Move more classes from network to model modules (#63)
1 parent ce7c413 commit 9466c81

File tree

84 files changed

+705
-556
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+705
-556
lines changed

application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy renamed to application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttNetworkSessionServiceTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import javasabr.mqtt.service.ClientIdRegistry
66
import javasabr.mqtt.service.session.MqttSessionService
77
import org.springframework.beans.factory.annotation.Autowired
88

9-
class MqttSessionServiceTest extends IntegrationSpecification {
9+
class MqttNetworkSessionServiceTest extends IntegrationSpecification {
1010

1111
@Autowired
1212
ClientIdRegistry clientIdRegistry

application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscribtionServiceTest.groovy renamed to application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import spock.lang.Unroll
1212

1313
import java.util.concurrent.CompletionException
1414

15-
class SubscribtionServiceTest extends IntegrationSpecification {
15+
class SubscriptionServiceTest extends IntegrationSpecification {
1616

1717
@Autowired
1818
ClientIdRegistry clientIdRegistry
@@ -38,11 +38,11 @@ class SubscribtionServiceTest extends IntegrationSpecification {
3838
.findSubscribers(topicName)
3939
then: "should find the subscriber"
4040
subscribers.size() == 1
41-
subscribers.get(0).owner() instanceof MqttClient
41+
subscribers.get(0).user() instanceof MqttClient
4242
when:
4343
def matchedSubscriber = subscribers.get(0)
4444
def subscription = matchedSubscriber.subscription()
45-
def owner = matchedSubscriber.owner() as MqttClient
45+
def owner = matchedSubscriber.user() as MqttClient
4646
then:
4747
owner.clientId() == clientId
4848
subscription.topicFilter().rawTopic() == topicFilter
@@ -61,11 +61,11 @@ class SubscribtionServiceTest extends IntegrationSpecification {
6161
.findSubscribers(topicName)
6262
then: "should find the reconnected subscriber"
6363
subscribers3.size() == 1
64-
subscribers3.get(0).owner() instanceof MqttClient
64+
subscribers3.get(0).user() instanceof MqttClient
6565
when:
6666
matchedSubscriber = subscribers3.get(0)
6767
subscription = matchedSubscriber.subscription()
68-
owner = matchedSubscriber.owner() as MqttClient
68+
owner = matchedSubscriber.user() as MqttClient
6969
then:
7070
owner.clientId() == clientId
7171
subscription.topicFilter().rawTopic() == topicFilter
@@ -147,8 +147,8 @@ class SubscribtionServiceTest extends IntegrationSpecification {
147147
def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName))
148148
then:
149149
subscribers.size() == targetCount
150-
(subscribers[0].owner() as MqttClient).clientId() == clientId1
151-
(subscribers[1].owner() as MqttClient).clientId() == clientId2
150+
(subscribers[0].user() as MqttClient).clientId() == clientId1
151+
(subscribers[1].user() as MqttClient).clientId() == clientId2
152152
cleanup:
153153
subscriber1.disconnect().join()
154154
subscriber2.disconnect().join()

core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import javasabr.mqtt.model.topic.TopicFilter;
99
import javasabr.mqtt.model.topic.TopicName;
1010
import javasabr.mqtt.network.MqttClient;
11-
import javasabr.mqtt.network.session.MqttSession;
11+
import javasabr.mqtt.network.session.MqttNetworkSession;
1212
import javasabr.rlib.collections.array.Array;
1313
import javasabr.rlib.collections.array.MutableArray;
1414

@@ -32,7 +32,7 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
3232
* @param subscriptions the list of request to subscribe topics
3333
* @return array of subscribe ack reason codes
3434
*/
35-
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttSession session, Array<Subscription> subscriptions);
35+
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttNetworkSession session, Array<Subscription> subscriptions);
3636

3737
/**
3838
* Removes MQTT client from listening to the topics.
@@ -41,9 +41,9 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
4141
* @param topicFilters topic filters
4242
* @return array of unsubscribe ack reason codes
4343
*/
44-
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttSession session, Array<TopicFilter> topicFilters);
44+
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttNetworkSession session, Array<TopicFilter> topicFilters);
4545

46-
void cleanSubscriptions(MqttClient client, MqttSession session);
46+
void cleanSubscriptions(MqttClient client, MqttNetworkSession session);
4747

48-
void restoreSubscriptions(MqttClient client, MqttSession session);
48+
void restoreSubscriptions(MqttClient client, MqttNetworkSession session);
4949
}

core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
55
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
66
import javasabr.mqtt.network.impl.AbstractMqttClient;
7-
import javasabr.mqtt.network.session.MqttSession;
7+
import javasabr.mqtt.network.session.MqttNetworkSession;
88
import javasabr.mqtt.service.ClientIdRegistry;
99
import javasabr.mqtt.service.SubscriptionService;
1010
import javasabr.mqtt.service.session.MqttSessionService;
@@ -43,7 +43,7 @@ protected Mono<?> releaseImpl(T client) {
4343
return Mono.empty();
4444
}
4545

46-
MqttSession session = client.session();
46+
MqttNetworkSession session = client.session();
4747
Mono<?> asyncActions = null;
4848

4949
if (session != null) {

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected void processReceivedValidMessage(
7373
"[%s] Received from client valid message:[%s] %s"::formatted);
7474

7575
try {
76-
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
76+
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()];
7777
//noinspection DataFlowIssue
7878
messageHandler.processValidMessage(connection, mqttInMessage);
7979
} catch (IndexOutOfBoundsException | NullPointerException ex) {
@@ -97,7 +97,7 @@ protected void processReceivedInvalidMessage(
9797
"[%s] Received from client invalid message:[%s] %s"::formatted);
9898

9999
try {
100-
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
100+
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()];
101101
//noinspection DataFlowIssue
102102
messageHandler.processInvalidMessage(connection, mqttInMessage);
103103
} catch (IndexOutOfBoundsException | NullPointerException ex) {

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import javasabr.mqtt.model.topic.TopicFilter;
1616
import javasabr.mqtt.model.topic.TopicName;
1717
import javasabr.mqtt.network.MqttClient;
18-
import javasabr.mqtt.network.session.MqttSession;
18+
import javasabr.mqtt.network.session.MqttNetworkSession;
1919
import javasabr.mqtt.service.SubscriptionService;
2020
import javasabr.rlib.collections.array.Array;
2121
import javasabr.rlib.collections.array.ArrayFactory;
@@ -40,7 +40,7 @@ public InMemorySubscriptionService() {
4040
@Override
4141
public MqttClient resolveClient(Subscriber subscriber) {
4242
if (subscriber instanceof SingleSubscriber single) {
43-
return (MqttClient) single.owner();
43+
return (MqttClient) single.user();
4444
}
4545
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
4646
}
@@ -55,7 +55,7 @@ public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber>
5555
@Override
5656
public Array<SubscribeAckReasonCode> subscribe(
5757
MqttClient client,
58-
MqttSession session,
58+
MqttNetworkSession session,
5959
Array<Subscription> subscriptions) {
6060

6161
MutableArray<SubscribeAckReasonCode> subscribeResults = ArrayFactory.mutableArray(
@@ -69,7 +69,7 @@ public Array<SubscribeAckReasonCode> subscribe(
6969
return subscribeResults;
7070
}
7171

72-
private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession session, Subscription subscription) {
72+
private SubscribeAckReasonCode addSubscription(MqttClient client, MqttNetworkSession session, Subscription subscription) {
7373
MqttClientConnectionConfig connectionConfig = client.connectionConfig();
7474
TopicFilter topicFilter = subscription.topicFilter();
7575
if (topicFilter.isInvalid()) {
@@ -91,7 +91,7 @@ private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession se
9191
@Override
9292
public Array<UnsubscribeAckReasonCode> unsubscribe(
9393
MqttClient client,
94-
MqttSession session,
94+
MqttNetworkSession session,
9595
Array<TopicFilter> topicFilters) {
9696

9797
MutableArray<UnsubscribeAckReasonCode> unsubscribeResults = ArrayFactory.mutableArray(
@@ -105,7 +105,7 @@ public Array<UnsubscribeAckReasonCode> unsubscribe(
105105
return unsubscribeResults;
106106
}
107107

108-
private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSession session, TopicFilter topicFilter) {
108+
private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttNetworkSession session, TopicFilter topicFilter) {
109109
if (topicFilter.isInvalid()) {
110110
return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID;
111111
} else if (subscriberTree.unsubscribe(client, topicFilter)) {
@@ -119,7 +119,7 @@ private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSessi
119119
}
120120

121121
@Override
122-
public void cleanSubscriptions(MqttClient client, MqttSession session) {
122+
public void cleanSubscriptions(MqttClient client, MqttNetworkSession session) {
123123
Array<Subscription> subscriptions = session
124124
.activeSubscriptions()
125125
.subscriptions();
@@ -129,7 +129,7 @@ public void cleanSubscriptions(MqttClient client, MqttSession session) {
129129
}
130130

131131
@Override
132-
public void restoreSubscriptions(MqttClient client, MqttSession session) {
132+
public void restoreSubscriptions(MqttClient client, MqttNetworkSession session) {
133133
Array<Subscription> subscriptions = session
134134
.activeSubscriptions()
135135
.subscriptions();

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import javasabr.mqtt.network.MqttConnection;
77
import javasabr.mqtt.network.message.in.MqttInMessage;
88
import javasabr.mqtt.network.message.out.MqttOutMessage;
9-
import javasabr.mqtt.network.session.MqttSession;
9+
import javasabr.mqtt.network.session.MqttNetworkSession;
1010
import javasabr.mqtt.network.util.ExtraErrorReasons;
1111
import javasabr.mqtt.service.MessageOutFactoryService;
1212
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
@@ -47,7 +47,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM
4747
C castedClient = expectedClient.cast(client);
4848
M castedMessage = expectedNetworkPacket.cast(mqttInMessage);
4949
if (requireSession()) {
50-
MqttSession session = client.session();
50+
MqttNetworkSession session = client.session();
5151
if (session == null) {
5252
log.warning(client.clientId(), "[%s] Session is already closed"::formatted);
5353
handleSessionIsAlreadyClosed(client);
@@ -72,7 +72,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI
7272
C castedClient = expectedClient.cast(client);
7373
M castedMessage = expectedNetworkPacket.cast(mqttInMessage);
7474
if (requireSession()) {
75-
MqttSession session = client.session();
75+
MqttNetworkSession session = client.session();
7676
if (session == null) {
7777
log.warning(client.clientId(), "[%s] Session is already closed"::formatted);
7878
handleSessionIsAlreadyClosed(client);
@@ -86,7 +86,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI
8686

8787
protected void processValidMessage(MqttConnection connection, C client, M message) {}
8888

89-
protected void processValidMessage(MqttConnection connection, C client, MqttSession session, M message) {}
89+
protected void processValidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) {}
9090

9191
protected boolean processInvalidMessage(MqttConnection connection, C client, M message) {
9292
Exception exception = message.exception();
@@ -97,7 +97,7 @@ protected boolean processInvalidMessage(MqttConnection connection, C client, M m
9797
return false;
9898
}
9999

100-
protected boolean processInvalidMessage(MqttConnection connection, C client, MqttSession session, M message) {
100+
protected boolean processInvalidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) {
101101
Exception exception = message.exception();
102102
if (exception instanceof MalformedProtocolMqttException) {
103103
malformedProtocolError(connection, client, exception);

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import javasabr.mqtt.network.impl.ExternalMqttClient;
2323
import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
2424
import javasabr.mqtt.network.message.out.MqttOutMessage;
25-
import javasabr.mqtt.network.session.MqttSession;
25+
import javasabr.mqtt.network.session.MqttNetworkSession;
2626
import javasabr.mqtt.service.AuthenticationService;
2727
import javasabr.mqtt.service.ClientIdRegistry;
2828
import javasabr.mqtt.service.MessageOutFactoryService;
@@ -178,7 +178,7 @@ private void resolveClientConnectionConfig(MqttClient.UnsafeMqttClient client, C
178178
private Mono<Boolean> onConnected(
179179
MqttClient.UnsafeMqttClient client,
180180
ConnectMqttInMessage packet,
181-
MqttSession session,
181+
MqttNetworkSession session,
182182
boolean sessionRestored) {
183183

184184
MqttConnection connection = client.connection();
@@ -211,7 +211,7 @@ private Mono<Boolean> onConnected(
211211
.thenApply(result -> onSentConnAck(client, session, result)));
212212
}
213213

214-
private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession session, boolean result) {
214+
private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttNetworkSession session, boolean result) {
215215

216216
if (!result) {
217217
log.warning(client.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted);
@@ -226,7 +226,7 @@ private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession se
226226
protected boolean processInvalidMessage(
227227
MqttConnection connection,
228228
ExternalMqttClient client,
229-
MqttSession session,
229+
MqttNetworkSession session,
230230
ConnectMqttInMessage message) {
231231
Exception exception = message.exception();
232232
if (exception instanceof ConnectionRejectException cre) {

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import javasabr.mqtt.network.MqttConnection;
66
import javasabr.mqtt.network.impl.ExternalMqttClient;
77
import javasabr.mqtt.network.message.in.DisconnectMqttInMessage;
8-
import javasabr.mqtt.network.session.MqttSession;
8+
import javasabr.mqtt.network.session.MqttNetworkSession;
99
import javasabr.mqtt.service.MessageOutFactoryService;
1010
import lombok.CustomLog;
1111

@@ -25,7 +25,7 @@ public MqttMessageType messageType() {
2525
protected void processValidMessage(
2626
MqttConnection connection,
2727
ExternalMqttClient client,
28-
MqttSession session,
28+
MqttNetworkSession session,
2929
DisconnectMqttInMessage message) {
3030
DisconnectReasonCode reasonCode = message.reasonCode();
3131
if (reasonCode == DisconnectReasonCode.NORMAL_DISCONNECTION) {

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package javasabr.mqtt.service.message.handler.impl;
22

3-
import javasabr.mqtt.model.TrackableMessage;
3+
import javasabr.mqtt.model.message.TrackableMqttMessage;
44
import javasabr.mqtt.network.MqttConnection;
55
import javasabr.mqtt.network.impl.ExternalMqttClient;
66
import javasabr.mqtt.network.message.in.MqttInMessage;
7-
import javasabr.mqtt.network.session.MqttSession;
7+
import javasabr.mqtt.network.session.MqttNetworkSession;
88
import javasabr.mqtt.service.MessageOutFactoryService;
99

10-
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMessage>
10+
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMqttMessage>
1111
extends AbstractMqttInMessageHandler<ExternalMqttClient, P> {
1212

1313
protected PendingOutResponseMqttInMessageHandler(
@@ -20,7 +20,7 @@ protected PendingOutResponseMqttInMessageHandler(
2020
protected void processValidMessage(
2121
MqttConnection connection,
2222
ExternalMqttClient client,
23-
MqttSession session,
23+
MqttNetworkSession session,
2424
P message) {
2525
session.updateOutPendingPacket(client, message);
2626
}

0 commit comments

Comments
 (0)