Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b369463
working on publish delivering
JavaSaBr Dec 1, 2025
7301ada
working on publish delivering
JavaSaBr Dec 2, 2025
c6c7012
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 2, 2025
d69e8b9
working on publish delivering
JavaSaBr Dec 2, 2025
abdc101
working on publish delivering
JavaSaBr Dec 2, 2025
19ba642
working on publish delivering
JavaSaBr Dec 2, 2025
0ed5c09
finish first part of upgrading delivering publishes to subscribers
JavaSaBr Dec 3, 2025
9263c8e
working on improving testing
JavaSaBr Dec 4, 2025
bfc3ea8
resolve comments
JavaSaBr Dec 4, 2025
0011896
Merge branch 'improve-publish-delivering-to-subscribers' into improve…
JavaSaBr Dec 4, 2025
f8752d0
continuer add tests
JavaSaBr Dec 5, 2025
94766c6
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 5, 2025
1164f82
finish adding tests
JavaSaBr Dec 5, 2025
6c47ae6
update tests for messages
JavaSaBr Dec 5, 2025
bdc6380
fix on code review
JavaSaBr Dec 7, 2025
eaf8d3a
start working on integration ACL
JavaSaBr Dec 8, 2025
dd72fad
Merge branch 'improve-publish-delivering-to-subscribers-part-2' into …
JavaSaBr Dec 8, 2025
49acfdd
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 8, 2025
5776855
Merge branch 'improve-publish-delivering-to-subscribers-part-3' into …
JavaSaBr Dec 8, 2025
7cd86f5
working on updating message validation
JavaSaBr Dec 8, 2025
0bbe921
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 8, 2025
8f0945b
working on updating message validation
JavaSaBr Dec 8, 2025
08244dd
working on updating message validation
JavaSaBr Dec 8, 2025
2266e33
rename ACL service
JavaSaBr Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.MqttConnectionFactory;
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.message.in.PublishMqttInMessage;
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
import javasabr.mqtt.service.AuthorizationService;
import javasabr.mqtt.service.AuthenticationService;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.ConnectionService;
Expand All @@ -26,6 +29,7 @@
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
import javasabr.mqtt.service.impl.DefaultTopicService;
import javasabr.mqtt.service.impl.DisabledAuthorizationService;
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
import javasabr.mqtt.service.impl.FileCredentialsSource;
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
Expand All @@ -44,6 +48,13 @@
import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory;
import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory;
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishMessageExpiryIntervalMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishPayloadMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishQosMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishResponseTopicMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishRetainMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishTopicAliasMqttInMessageFieldValidator;
import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler;
Expand Down Expand Up @@ -96,6 +107,11 @@ AuthenticationService authenticationService(
@Value("${authentication.allow.anonymous:false}") boolean allowAnonymousAuth) {
return new SimpleAuthenticationService(credentialSource, allowAnonymousAuth);
}

@Bean
AuthorizationService authorizationService() {
return new DisabledAuthorizationService();
}

@Bean
SubscriptionService subscriptionService() {
Expand Down Expand Up @@ -147,16 +163,55 @@ MqttInMessageHandler publishAckMqttInMessageHandler(MessageOutFactoryService mes
MqttInMessageHandler publishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new PublishCompleteMqttInMessageHandler(messageOutFactoryService);
}

@Bean
PublishPayloadMqttInMessageFieldValidator publishPayloadMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishPayloadMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
PublishQosMqttInMessageFieldValidator publishQosMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishQosMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
PublishRetainMqttInMessageFieldValidator publishRetainMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishRetainMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
PublishMessageExpiryIntervalMqttInMessageFieldValidator publishMessageExpiryIntervalMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishMessageExpiryIntervalMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
PublishResponseTopicMqttInMessageFieldValidator publishResponseTopicMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishResponseTopicMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
PublishTopicAliasMqttInMessageFieldValidator publishTopicAliasMqttInMessageFieldValidator(
MessageOutFactoryService messageOutFactoryService) {
return new PublishTopicAliasMqttInMessageFieldValidator(messageOutFactoryService);
}

@Bean
MqttInMessageHandler publishMqttInMessageHandler(
PublishReceivingService publishReceivingService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
TopicService topicService,
AuthorizationService authorizationService,
List<? extends MqttInMessageFieldValidator<? super ExternalNetworkMqttUser, PublishMqttInMessage>> fieldValidators) {
return new PublishMqttInMessageHandler(
publishReceivingService,
messageOutFactoryService,
topicService);
topicService, authorizationService,
fieldValidators);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package javasabr.mqtt.service;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;

public interface AuthorizationService {

boolean authorizePublish(MqttUser mqttUser, TopicName topicName);

boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package javasabr.mqtt.service.impl;

import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.service.AuthorizationService;

public class DisabledAuthorizationService implements AuthorizationService {
@Override
public boolean authorizePublish(MqttUser mqttUser, TopicName topicName) {
return true;
}

@Override
public boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package javasabr.mqtt.service.message.handler.impl;

import java.util.Comparator;
import java.util.List;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.message.in.MqttInMessage;
import javasabr.mqtt.network.session.NetworkMqttSession;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;

@CustomLog
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public abstract class FieldsValidatedMqttInMessageHandler<U extends NetworkMqttUser, M extends MqttInMessage>
extends AbstractMqttInMessageHandler<U, M> {

MqttInMessageFieldValidator<? super U, M>[] fieldValidators;

protected FieldsValidatedMqttInMessageHandler(
Class<U> expectedUser,
Class<M> expectedMessage,
MessageOutFactoryService messageOutFactoryService,
List<? extends MqttInMessageFieldValidator<? super U, M>> fieldValidators) {
super(expectedUser, expectedMessage, messageOutFactoryService);
//noinspection unchecked
this.fieldValidators = fieldValidators
.stream()
.sorted(Comparator.comparingInt((MqttInMessageFieldValidator<? super U, M> validator) -> validator.order()))
.toArray(MqttInMessageFieldValidator[]::new);
}

@Override
protected final void processValidMessage(MqttConnection connection, U user, M message) {
for (MqttInMessageFieldValidator<? super U, M> fieldValidator : fieldValidators) {
if (fieldValidator.isNotValid(connection, user, message)) {
return;
}
}
processMessageWithValidFields(connection, user, message);
}

protected void processMessageWithValidFields(MqttConnection connection, U user, M message) {}

@Override
protected final void processValidMessage(MqttConnection connection, U user, NetworkMqttSession session, M message) {
for (MqttInMessageFieldValidator<? super U, M> fieldValidator : fieldValidators) {
if (fieldValidator.isNotValid(connection, user, message)) {
return;
}
}
processMessageWithValidFields(connection, user, session, message);
}

protected void processMessageWithValidFields(
MqttConnection connection,
U user,
NetworkMqttSession session,
M message) {}
}
Loading
Loading