Skip to content

Commit 2770f2a

Browse files
authored
Reorganize mqtt client to mqtt network user (#64)
1 parent 9466c81 commit 2770f2a

File tree

71 files changed

+879
-874
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+879
-874
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import javasabr.mqtt.model.MqttProperties;
66
import javasabr.mqtt.model.MqttServerConnectionConfig;
77
import javasabr.mqtt.model.QoS;
8-
import javasabr.mqtt.network.MqttClientFactory;
98
import javasabr.mqtt.network.MqttConnection;
109
import javasabr.mqtt.network.MqttConnectionFactory;
11-
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
12-
import javasabr.mqtt.network.impl.ExternalMqttClient;
10+
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
11+
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
12+
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
1313
import javasabr.mqtt.service.AuthenticationService;
1414
import javasabr.mqtt.service.ClientIdRegistry;
1515
import javasabr.mqtt.service.ConnectionService;
@@ -19,14 +19,14 @@
1919
import javasabr.mqtt.service.PublishReceivingService;
2020
import javasabr.mqtt.service.SubscriptionService;
2121
import javasabr.mqtt.service.TopicService;
22-
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
22+
import javasabr.mqtt.service.handler.client.ExternalNetworkMqttUserReleaseHandler;
2323
import javasabr.mqtt.service.impl.DefaultConnectionService;
2424
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService;
2525
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
2626
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2727
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
2828
import javasabr.mqtt.service.impl.DefaultTopicService;
29-
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
29+
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
3030
import javasabr.mqtt.service.impl.FileCredentialsSource;
3131
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
3232
import javasabr.mqtt.service.impl.InMemorySubscriptionService;
@@ -195,7 +195,7 @@ MqttInMessageHandler unsubscribeMqttInMessageHandler(
195195

196196
@Bean
197197
ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
198-
return new DefaultConnectionService(ExternalMqttClient.class, inMessageHandlers);
198+
return new DefaultConnectionService(ExternalNetworkMqttUser.class, inMessageHandlers);
199199
}
200200

201201
@Bean
@@ -265,11 +265,11 @@ PublishReceivingService publishReceivingService(
265265
}
266266

267267
@Bean
268-
MqttClientReleaseHandler externalMqttClientReleaseHandler(
268+
NetworkMqttUserReleaseHandler externalMqttClientReleaseHandler(
269269
ClientIdRegistry clientIdRegistry,
270270
MqttSessionService sessionService,
271271
SubscriptionService subscriptionService) {
272-
return new ExternalMqttClientReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
272+
return new ExternalNetworkMqttUserReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
273273
}
274274

275275
@Bean
@@ -352,16 +352,16 @@ ServerNetworkConfig externalNetworkConfig(
352352
}
353353

354354
@Bean
355-
MqttClientFactory externalClientFactory(MqttClientReleaseHandler externalMqttClientReleaseHandler) {
356-
return new ExternalMqttClientFactory(externalMqttClientReleaseHandler);
355+
NetworkMqttUserFactory externalClientFactory(NetworkMqttUserReleaseHandler externalNetworkMqttUserReleaseHandler) {
356+
return new ExternalNetworkMqttUserFactory(externalNetworkMqttUserReleaseHandler);
357357
}
358358

359359
@Bean
360360
MqttConnectionFactory externalConnectionFactory(
361361
MqttServerConnectionConfig externalServerConnectionConfig,
362-
MqttClientFactory externalClientFactory,
362+
NetworkMqttUserFactory mqttUserFactory,
363363
@Value("${mqtt.external.connection.max.packets.by.read:100}") int maxPacketsByRead) {
364-
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, externalClientFactory, maxPacketsByRead);
364+
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, mqttUserFactory, maxPacketsByRead);
365365
}
366366

367367
@Bean

application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import javasabr.mqtt.model.MqttServerConnectionConfig
1010
import javasabr.mqtt.model.MqttVersion
1111
import javasabr.mqtt.network.MqttConnection
1212
import javasabr.mqtt.network.MqttMockClient
13+
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser
1314
import org.springframework.beans.factory.annotation.Autowired
1415
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
1516
import spock.lang.Specification
@@ -18,8 +19,6 @@ import java.nio.charset.StandardCharsets
1819
import java.util.concurrent.atomic.AtomicInteger
1920
import java.util.concurrent.atomic.AtomicReference
2021

21-
import static javasabr.mqtt.network.MqttClient.UnsafeMqttClient
22-
2322
@SpringJUnitConfig(classes = MqttBrokerTestConfig)
2423
class IntegrationSpecification extends Specification {
2524

@@ -153,7 +152,7 @@ class IntegrationSpecification extends Specification {
153152
isSupported(MqttVersion.MQTT_3_1_1) >> true
154153
serverConnectionConfig() >> serverConnConfig
155154
clientConnectionConfig() >> clientConnConfig
156-
client() >> Stub(UnsafeMqttClient) {
155+
user() >> Stub(ConfigurableNetworkMqttUser) {
157156
connectionConfig() >> clientConnConfig
158157
connection() >> connectionRef.get()
159158
clientId() >> clientId
@@ -181,7 +180,7 @@ class IntegrationSpecification extends Specification {
181180
isSupported(MqttVersion.MQTT_3_1_1) >> true
182181
serverConnectionConfig() >> serverConnConfig
183182
clientConnectionConfig() >> clientConnConfig
184-
client() >> Stub(UnsafeMqttClient) {
183+
user() >> Stub(ConfigurableNetworkMqttUser) {
185184
connectionConfig() >> clientConnConfig
186185
connection() >> connectionRef.get()
187186
clientId() >> clientId

application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.hivemq.client.mqtt.datatypes.MqttQos
44
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException
55
import javasabr.mqtt.broker.application.IntegrationSpecification
66
import javasabr.mqtt.model.topic.TopicName
7-
import javasabr.mqtt.network.MqttClient
7+
import javasabr.mqtt.network.user.NetworkMqttUser
88
import javasabr.mqtt.service.ClientIdRegistry
99
import javasabr.mqtt.service.impl.InMemorySubscriptionService
1010
import org.springframework.beans.factory.annotation.Autowired
@@ -38,11 +38,11 @@ class SubscriptionServiceTest extends IntegrationSpecification {
3838
.findSubscribers(topicName)
3939
then: "should find the subscriber"
4040
subscribers.size() == 1
41-
subscribers.get(0).user() instanceof MqttClient
41+
subscribers.get(0).user() instanceof NetworkMqttUser
4242
when:
4343
def matchedSubscriber = subscribers.get(0)
4444
def subscription = matchedSubscriber.subscription()
45-
def owner = matchedSubscriber.user() as MqttClient
45+
def owner = matchedSubscriber.user() as NetworkMqttUser
4646
then:
4747
owner.clientId() == clientId
4848
subscription.topicFilter().rawTopic() == topicFilter
@@ -61,11 +61,11 @@ class SubscriptionServiceTest extends IntegrationSpecification {
6161
.findSubscribers(topicName)
6262
then: "should find the reconnected subscriber"
6363
subscribers3.size() == 1
64-
subscribers3.get(0).user() instanceof MqttClient
64+
subscribers3.get(0).user() instanceof NetworkMqttUser
6565
when:
6666
matchedSubscriber = subscribers3.get(0)
6767
subscription = matchedSubscriber.subscription()
68-
owner = matchedSubscriber.user() as MqttClient
68+
owner = matchedSubscriber.user() as NetworkMqttUser
6969
then:
7070
owner.clientId() == clientId
7171
subscription.topicFilter().rawTopic() == topicFilter
@@ -147,8 +147,8 @@ class SubscriptionServiceTest extends IntegrationSpecification {
147147
def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName))
148148
then:
149149
subscribers.size() == targetCount
150-
(subscribers[0].user() as MqttClient).clientId() == clientId1
151-
(subscribers[1].user() as MqttClient).clientId() == clientId2
150+
(subscribers[0].user() as NetworkMqttUser).clientId() == clientId1
151+
(subscribers[1].user() as NetworkMqttUser).clientId() == clientId2
152152
cleanup:
153153
subscriber1.disconnect().join()
154154
subscriber2.disconnect().join()

application/src/test/resources/log4j2.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</Console>
77
</Appenders>
88
<Loggers>
9-
<Logger name="javasabr.mqtt.network.impl.AbstractMqttClient" level="DEBUG" additivity="false">
9+
<Logger name="javasabr.mqtt.network.impl.AbstractNetworkMqttUser" level="DEBUG" additivity="false">
1010
<AppenderRef ref="BrokerConsoleTest"/>
1111
</Logger>
1212
<Logger name="javasabr.mqtt.service.impl.DefaultConnectionService" level="DEBUG" additivity="false">
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package javasabr.mqtt.service;
22

3-
import javasabr.mqtt.network.MqttClient;
43
import javasabr.mqtt.network.MqttConnection;
4+
import javasabr.mqtt.network.user.NetworkMqttUser;
55
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
66

77
public interface MessageOutFactoryService {
88

9-
MqttMessageOutFactory resolveFactory(MqttClient client);
9+
MqttMessageOutFactory resolveFactory(NetworkMqttUser user);
1010

1111
MqttMessageOutFactory resolveFactory(MqttConnection connection);
1212
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package javasabr.mqtt.service;
22

33
import javasabr.mqtt.model.publishing.Publish;
4-
import javasabr.mqtt.network.MqttClient;
4+
import javasabr.mqtt.network.user.NetworkMqttUser;
55

66
public interface PublishReceivingService {
77

8-
void processPublish(MqttClient client, Publish publish);
8+
void processPublish(NetworkMqttUser user, Publish publish);
99
}

core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import javasabr.mqtt.model.subscription.Subscription;
88
import javasabr.mqtt.model.topic.TopicFilter;
99
import javasabr.mqtt.model.topic.TopicName;
10-
import javasabr.mqtt.network.MqttClient;
11-
import javasabr.mqtt.network.session.MqttNetworkSession;
10+
import javasabr.mqtt.network.MqttNetworkSession;
11+
import javasabr.mqtt.network.user.NetworkMqttUser;
1212
import javasabr.rlib.collections.array.Array;
1313
import javasabr.rlib.collections.array.MutableArray;
1414

@@ -17,7 +17,7 @@
1717
*/
1818
public interface SubscriptionService {
1919

20-
MqttClient resolveClient(Subscriber subscriber);
20+
NetworkMqttUser resolveClient(Subscriber subscriber);
2121

2222
default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
2323
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
@@ -28,22 +28,22 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
2828
/**
2929
* Subscribes MQTT client to listen to topics.
3030
*
31-
* @param client MQTT client which requests subscriptions
31+
* @param user MQTT client which requests subscriptions
3232
* @param subscriptions the list of request to subscribe topics
3333
* @return array of subscribe ack reason codes
3434
*/
35-
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttNetworkSession session, Array<Subscription> subscriptions);
35+
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttNetworkSession session, Array<Subscription> subscriptions);
3636

3737
/**
3838
* Removes MQTT client from listening to the topics.
3939
*
40-
* @param client MQTT client to be removed
40+
* @param user MQTT client to be removed
4141
* @param topicFilters topic filters
4242
* @return array of unsubscribe ack reason codes
4343
*/
44-
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttNetworkSession session, Array<TopicFilter> topicFilters);
44+
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttNetworkSession session, Array<TopicFilter> topicFilters);
4545

46-
void cleanSubscriptions(MqttClient client, MqttNetworkSession session);
46+
void cleanSubscriptions(NetworkMqttUser user, MqttNetworkSession session);
4747

48-
void restoreSubscriptions(MqttClient client, MqttNetworkSession session);
48+
void restoreSubscriptions(NetworkMqttUser user, MqttNetworkSession session);
4949
}

core-service/src/main/java/javasabr/mqtt/service/TopicService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import javasabr.mqtt.model.topic.TopicFilter;
44
import javasabr.mqtt.model.topic.TopicName;
5-
import javasabr.mqtt.network.MqttClient;
5+
import javasabr.mqtt.network.user.NetworkMqttUser;
66

77
public interface TopicService {
88

9-
TopicFilter createTopicFilter(MqttClient client, String rawTopicFilter);
9+
TopicFilter createTopicFilter(NetworkMqttUser user, String rawTopicFilter);
1010

11-
boolean isValidTopicFilter(MqttClient client, String rawTopicFilter);
11+
boolean isValidTopicFilter(NetworkMqttUser user, String rawTopicFilter);
1212

13-
TopicName createTopicName(MqttClient client, String rawTopicName);
13+
TopicName createTopicName(NetworkMqttUser user, String rawTopicName);
1414
}

core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java renamed to core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package javasabr.mqtt.service.handler.client;
22

33
import javasabr.mqtt.model.MqttClientConnectionConfig;
4-
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
5-
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
6-
import javasabr.mqtt.network.impl.AbstractMqttClient;
7-
import javasabr.mqtt.network.session.MqttNetworkSession;
4+
import javasabr.mqtt.network.MqttNetworkSession;
5+
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
6+
import javasabr.mqtt.network.impl.AbstractNetworkMqttUser;
7+
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
88
import javasabr.mqtt.service.ClientIdRegistry;
99
import javasabr.mqtt.service.SubscriptionService;
1010
import javasabr.mqtt.service.session.MqttSessionService;
@@ -18,40 +18,40 @@
1818
@CustomLog
1919
@RequiredArgsConstructor
2020
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
21-
public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttClient> implements
22-
MqttClientReleaseHandler {
21+
public abstract class AbstractNetworkMqttUserReleaseHandler<T extends AbstractNetworkMqttUser> implements
22+
NetworkMqttUserReleaseHandler {
2323

2424
ClientIdRegistry clientIdRegistry;
2525
MqttSessionService sessionService;
2626
SubscriptionService subscriptionService;
2727

2828
@Override
29-
public Mono<?> release(UnsafeMqttClient client) {
30-
var clientId = client.clientId();
29+
public Mono<?> release(ConfigurableNetworkMqttUser user) {
30+
var clientId = user.clientId();
3131
//noinspection unchecked
32-
return releaseImpl((T) client)
32+
return releaseImpl((T) user)
3333
.doOnNext(_ -> log.info(clientId, "[%s] Client was released"::formatted));
3434
}
3535

36-
protected Mono<?> releaseImpl(T client) {
36+
protected Mono<?> releaseImpl(T user) {
3737

38-
String clientId = client.clientId();
39-
client.clientId(StringUtils.EMPTY);
38+
String clientId = user.clientId();
39+
user.clientId(StringUtils.EMPTY);
4040

4141
if (StringUtils.isEmpty(clientId)) {
42-
log.warning(client.clientId(), "[%s] This client is already released or rejected"::formatted);
42+
log.warning(user.clientId(), "[%s] This client is already released or rejected"::formatted);
4343
return Mono.empty();
4444
}
4545

46-
MqttNetworkSession session = client.session();
46+
MqttNetworkSession session = user.session();
4747
Mono<?> asyncActions = null;
4848

4949
if (session != null) {
50-
subscriptionService.cleanSubscriptions(client, session);
51-
MqttClientConnectionConfig connectionConfig = client.connectionConfig();
50+
subscriptionService.cleanSubscriptions(user, session);
51+
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
5252
if (connectionConfig.sessionsEnabled()) {
5353
asyncActions = sessionService.store(clientId, session, connectionConfig.sessionExpiryInterval());
54-
client.session(null);
54+
user.session(null);
5555
}
5656
}
5757

core-service/src/main/java/javasabr/mqtt/service/handler/client/ExternalMqttClientReleaseHandler.java renamed to core-service/src/main/java/javasabr/mqtt/service/handler/client/ExternalNetworkMqttUserReleaseHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package javasabr.mqtt.service.handler.client;
22

3-
import javasabr.mqtt.network.impl.ExternalMqttClient;
3+
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
44
import javasabr.mqtt.service.ClientIdRegistry;
55
import javasabr.mqtt.service.SubscriptionService;
66
import javasabr.mqtt.service.session.MqttSessionService;
77

8-
public class ExternalMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<ExternalMqttClient> {
8+
public class ExternalNetworkMqttUserReleaseHandler extends
9+
AbstractNetworkMqttUserReleaseHandler<ExternalNetworkMqttUser> {
910

10-
public ExternalMqttClientReleaseHandler(
11+
public ExternalNetworkMqttUserReleaseHandler(
1112
ClientIdRegistry clientIdRegistry,
1213
MqttSessionService sessionService,
1314
SubscriptionService subscriptionService) {

0 commit comments

Comments
 (0)