Skip to content

Commit 5cf46be

Browse files
authored
#69 Improve publish delivering to subscribers part 4 (#135)
1 parent 294a6c6 commit 5cf46be

16 files changed

+614
-181
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22

33
import java.net.InetSocketAddress;
44
import java.util.Collection;
5+
import java.util.List;
56
import javasabr.mqtt.model.MqttProperties;
67
import javasabr.mqtt.model.MqttServerConnectionConfig;
78
import javasabr.mqtt.model.QoS;
89
import javasabr.mqtt.network.MqttConnection;
910
import javasabr.mqtt.network.MqttConnectionFactory;
1011
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
1112
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
13+
import javasabr.mqtt.network.message.in.PublishMqttInMessage;
1214
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
15+
import javasabr.mqtt.service.AuthorizationService;
1316
import javasabr.mqtt.service.AuthenticationService;
1417
import javasabr.mqtt.service.ClientIdRegistry;
1518
import javasabr.mqtt.service.ConnectionService;
@@ -26,6 +29,7 @@
2629
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
2730
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
2831
import javasabr.mqtt.service.impl.DefaultTopicService;
32+
import javasabr.mqtt.service.impl.DisabledAuthorizationService;
2933
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
3034
import javasabr.mqtt.service.impl.FileCredentialsSource;
3135
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
@@ -44,6 +48,13 @@
4448
import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory;
4549
import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory;
4650
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
51+
import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
52+
import javasabr.mqtt.service.message.validator.PublishMessageExpiryIntervalMqttInMessageFieldValidator;
53+
import javasabr.mqtt.service.message.validator.PublishPayloadMqttInMessageFieldValidator;
54+
import javasabr.mqtt.service.message.validator.PublishQosMqttInMessageFieldValidator;
55+
import javasabr.mqtt.service.message.validator.PublishResponseTopicMqttInMessageFieldValidator;
56+
import javasabr.mqtt.service.message.validator.PublishRetainMqttInMessageFieldValidator;
57+
import javasabr.mqtt.service.message.validator.PublishTopicAliasMqttInMessageFieldValidator;
4758
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
4859
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
4960
import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler;
@@ -96,6 +107,11 @@ AuthenticationService authenticationService(
96107
@Value("${authentication.allow.anonymous:false}") boolean allowAnonymousAuth) {
97108
return new SimpleAuthenticationService(credentialSource, allowAnonymousAuth);
98109
}
110+
111+
@Bean
112+
AuthorizationService authorizationService() {
113+
return new DisabledAuthorizationService();
114+
}
99115

100116
@Bean
101117
SubscriptionService subscriptionService() {
@@ -147,16 +163,55 @@ MqttInMessageHandler publishAckMqttInMessageHandler(MessageOutFactoryService mes
147163
MqttInMessageHandler publishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
148164
return new PublishCompleteMqttInMessageHandler(messageOutFactoryService);
149165
}
166+
167+
@Bean
168+
PublishPayloadMqttInMessageFieldValidator publishPayloadMqttInMessageFieldValidator(
169+
MessageOutFactoryService messageOutFactoryService) {
170+
return new PublishPayloadMqttInMessageFieldValidator(messageOutFactoryService);
171+
}
172+
173+
@Bean
174+
PublishQosMqttInMessageFieldValidator publishQosMqttInMessageFieldValidator(
175+
MessageOutFactoryService messageOutFactoryService) {
176+
return new PublishQosMqttInMessageFieldValidator(messageOutFactoryService);
177+
}
178+
179+
@Bean
180+
PublishRetainMqttInMessageFieldValidator publishRetainMqttInMessageFieldValidator(
181+
MessageOutFactoryService messageOutFactoryService) {
182+
return new PublishRetainMqttInMessageFieldValidator(messageOutFactoryService);
183+
}
184+
185+
@Bean
186+
PublishMessageExpiryIntervalMqttInMessageFieldValidator publishMessageExpiryIntervalMqttInMessageFieldValidator(
187+
MessageOutFactoryService messageOutFactoryService) {
188+
return new PublishMessageExpiryIntervalMqttInMessageFieldValidator(messageOutFactoryService);
189+
}
190+
191+
@Bean
192+
PublishResponseTopicMqttInMessageFieldValidator publishResponseTopicMqttInMessageFieldValidator(
193+
MessageOutFactoryService messageOutFactoryService) {
194+
return new PublishResponseTopicMqttInMessageFieldValidator(messageOutFactoryService);
195+
}
196+
197+
@Bean
198+
PublishTopicAliasMqttInMessageFieldValidator publishTopicAliasMqttInMessageFieldValidator(
199+
MessageOutFactoryService messageOutFactoryService) {
200+
return new PublishTopicAliasMqttInMessageFieldValidator(messageOutFactoryService);
201+
}
150202

151203
@Bean
152204
MqttInMessageHandler publishMqttInMessageHandler(
153205
PublishReceivingService publishReceivingService,
154206
MessageOutFactoryService messageOutFactoryService,
155-
TopicService topicService) {
207+
TopicService topicService,
208+
AuthorizationService authorizationService,
209+
List<? extends MqttInMessageFieldValidator<? super ExternalNetworkMqttUser, PublishMqttInMessage>> fieldValidators) {
156210
return new PublishMqttInMessageHandler(
157211
publishReceivingService,
158212
messageOutFactoryService,
159-
topicService);
213+
topicService, authorizationService,
214+
fieldValidators);
160215
}
161216

162217
@Bean
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package javasabr.mqtt.service;
2+
3+
import javasabr.mqtt.model.MqttUser;
4+
import javasabr.mqtt.model.topic.TopicFilter;
5+
import javasabr.mqtt.model.topic.TopicName;
6+
7+
public interface AuthorizationService {
8+
9+
boolean authorizePublish(MqttUser mqttUser, TopicName topicName);
10+
11+
boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter);
12+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package javasabr.mqtt.service.impl;
2+
3+
import javasabr.mqtt.model.MqttUser;
4+
import javasabr.mqtt.model.topic.TopicFilter;
5+
import javasabr.mqtt.model.topic.TopicName;
6+
import javasabr.mqtt.service.AuthorizationService;
7+
8+
public class DisabledAuthorizationService implements AuthorizationService {
9+
@Override
10+
public boolean authorizePublish(MqttUser mqttUser, TopicName topicName) {
11+
return true;
12+
}
13+
14+
@Override
15+
public boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter) {
16+
return true;
17+
}
18+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package javasabr.mqtt.service.message.handler.impl;
2+
3+
import java.util.Comparator;
4+
import java.util.List;
5+
import javasabr.mqtt.network.MqttConnection;
6+
import javasabr.mqtt.network.message.in.MqttInMessage;
7+
import javasabr.mqtt.network.session.NetworkMqttSession;
8+
import javasabr.mqtt.network.user.NetworkMqttUser;
9+
import javasabr.mqtt.service.MessageOutFactoryService;
10+
import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
11+
import lombok.AccessLevel;
12+
import lombok.CustomLog;
13+
import lombok.experimental.FieldDefaults;
14+
15+
@CustomLog
16+
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
17+
public abstract class FieldsValidatedMqttInMessageHandler<U extends NetworkMqttUser, M extends MqttInMessage>
18+
extends AbstractMqttInMessageHandler<U, M> {
19+
20+
MqttInMessageFieldValidator<? super U, M>[] fieldValidators;
21+
22+
protected FieldsValidatedMqttInMessageHandler(
23+
Class<U> expectedUser,
24+
Class<M> expectedMessage,
25+
MessageOutFactoryService messageOutFactoryService,
26+
List<? extends MqttInMessageFieldValidator<? super U, M>> fieldValidators) {
27+
super(expectedUser, expectedMessage, messageOutFactoryService);
28+
//noinspection unchecked
29+
this.fieldValidators = fieldValidators
30+
.stream()
31+
.sorted(Comparator.comparingInt((MqttInMessageFieldValidator<? super U, M> validator) -> validator.order()))
32+
.toArray(MqttInMessageFieldValidator[]::new);
33+
}
34+
35+
@Override
36+
protected final void processValidMessage(MqttConnection connection, U user, M message) {
37+
for (MqttInMessageFieldValidator<? super U, M> fieldValidator : fieldValidators) {
38+
if (fieldValidator.isNotValid(connection, user, message)) {
39+
return;
40+
}
41+
}
42+
processMessageWithValidFields(connection, user, message);
43+
}
44+
45+
protected void processMessageWithValidFields(MqttConnection connection, U user, M message) {}
46+
47+
@Override
48+
protected final void processValidMessage(MqttConnection connection, U user, NetworkMqttSession session, M message) {
49+
for (MqttInMessageFieldValidator<? super U, M> fieldValidator : fieldValidators) {
50+
if (fieldValidator.isNotValid(connection, user, message)) {
51+
return;
52+
}
53+
}
54+
processMessageWithValidFields(connection, user, session, message);
55+
}
56+
57+
protected void processMessageWithValidFields(
58+
MqttConnection connection,
59+
U user,
60+
NetworkMqttSession session,
61+
M message) {}
62+
}

0 commit comments

Comments
 (0)