diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java index 731fb107..a4f79d94 100644 --- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java +++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java @@ -2,6 +2,7 @@ import java.net.InetSocketAddress; import java.util.Collection; +import java.util.List; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttServerConnectionConfig; import javasabr.mqtt.model.QoS; @@ -9,7 +10,9 @@ import javasabr.mqtt.network.MqttConnectionFactory; import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler; import javasabr.mqtt.network.impl.ExternalNetworkMqttUser; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; import javasabr.mqtt.network.user.NetworkMqttUserFactory; +import javasabr.mqtt.service.AuthorizationService; import javasabr.mqtt.service.AuthenticationService; import javasabr.mqtt.service.ClientIdRegistry; import javasabr.mqtt.service.ConnectionService; @@ -26,6 +29,7 @@ import javasabr.mqtt.service.impl.DefaultPublishDeliveringService; import javasabr.mqtt.service.impl.DefaultPublishReceivingService; import javasabr.mqtt.service.impl.DefaultTopicService; +import javasabr.mqtt.service.impl.DisabledAuthorizationService; import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory; import javasabr.mqtt.service.impl.FileCredentialsSource; import javasabr.mqtt.service.impl.InMemoryClientIdRegistry; @@ -44,6 +48,13 @@ import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory; import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory; import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory; +import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishMessageExpiryIntervalMqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishPayloadMqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishQosMqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishResponseTopicMqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishRetainMqttInMessageFieldValidator; +import javasabr.mqtt.service.message.validator.PublishTopicAliasMqttInMessageFieldValidator; import javasabr.mqtt.service.publish.handler.MqttPublishInMessageHandler; import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler; import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler; @@ -96,6 +107,11 @@ AuthenticationService authenticationService( @Value("${authentication.allow.anonymous:false}") boolean allowAnonymousAuth) { return new SimpleAuthenticationService(credentialSource, allowAnonymousAuth); } + + @Bean + AuthorizationService authorizationService() { + return new DisabledAuthorizationService(); + } @Bean SubscriptionService subscriptionService() { @@ -147,16 +163,55 @@ MqttInMessageHandler publishAckMqttInMessageHandler(MessageOutFactoryService mes MqttInMessageHandler publishCompleteMqttInMessageHandler(MessageOutFactoryService messageOutFactoryService) { return new PublishCompleteMqttInMessageHandler(messageOutFactoryService); } + + @Bean + PublishPayloadMqttInMessageFieldValidator publishPayloadMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishPayloadMqttInMessageFieldValidator(messageOutFactoryService); + } + + @Bean + PublishQosMqttInMessageFieldValidator publishQosMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishQosMqttInMessageFieldValidator(messageOutFactoryService); + } + + @Bean + PublishRetainMqttInMessageFieldValidator publishRetainMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishRetainMqttInMessageFieldValidator(messageOutFactoryService); + } + + @Bean + PublishMessageExpiryIntervalMqttInMessageFieldValidator publishMessageExpiryIntervalMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishMessageExpiryIntervalMqttInMessageFieldValidator(messageOutFactoryService); + } + + @Bean + PublishResponseTopicMqttInMessageFieldValidator publishResponseTopicMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishResponseTopicMqttInMessageFieldValidator(messageOutFactoryService); + } + + @Bean + PublishTopicAliasMqttInMessageFieldValidator publishTopicAliasMqttInMessageFieldValidator( + MessageOutFactoryService messageOutFactoryService) { + return new PublishTopicAliasMqttInMessageFieldValidator(messageOutFactoryService); + } @Bean MqttInMessageHandler publishMqttInMessageHandler( PublishReceivingService publishReceivingService, MessageOutFactoryService messageOutFactoryService, - TopicService topicService) { + TopicService topicService, + AuthorizationService authorizationService, + List> fieldValidators) { return new PublishMqttInMessageHandler( publishReceivingService, messageOutFactoryService, - topicService); + topicService, authorizationService, + fieldValidators); } @Bean diff --git a/core-service/src/main/java/javasabr/mqtt/service/AuthorizationService.java b/core-service/src/main/java/javasabr/mqtt/service/AuthorizationService.java new file mode 100644 index 00000000..d6fbbd02 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/AuthorizationService.java @@ -0,0 +1,12 @@ +package javasabr.mqtt.service; + +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; + +public interface AuthorizationService { + + boolean authorizePublish(MqttUser mqttUser, TopicName topicName); + + boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter); +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/DisabledAuthorizationService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/DisabledAuthorizationService.java new file mode 100644 index 00000000..e40270ee --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/DisabledAuthorizationService.java @@ -0,0 +1,18 @@ +package javasabr.mqtt.service.impl; + +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.model.topic.TopicFilter; +import javasabr.mqtt.model.topic.TopicName; +import javasabr.mqtt.service.AuthorizationService; + +public class DisabledAuthorizationService implements AuthorizationService { + @Override + public boolean authorizePublish(MqttUser mqttUser, TopicName topicName) { + return true; + } + + @Override + public boolean authorizeSubscribe(MqttUser mqttUser, TopicFilter topicFilter) { + return true; + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java new file mode 100644 index 00000000..967628b9 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java @@ -0,0 +1,62 @@ +package javasabr.mqtt.service.message.handler.impl; + +import java.util.Comparator; +import java.util.List; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.MqttInMessage; +import javasabr.mqtt.network.session.NetworkMqttSession; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.experimental.FieldDefaults; + +@CustomLog +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public abstract class FieldsValidatedMqttInMessageHandler + extends AbstractMqttInMessageHandler { + + MqttInMessageFieldValidator[] fieldValidators; + + protected FieldsValidatedMqttInMessageHandler( + Class expectedUser, + Class expectedMessage, + MessageOutFactoryService messageOutFactoryService, + List> fieldValidators) { + super(expectedUser, expectedMessage, messageOutFactoryService); + //noinspection unchecked + this.fieldValidators = fieldValidators + .stream() + .sorted(Comparator.comparingInt((MqttInMessageFieldValidator validator) -> validator.order())) + .toArray(MqttInMessageFieldValidator[]::new); + } + + @Override + protected final void processValidMessage(MqttConnection connection, U user, M message) { + for (MqttInMessageFieldValidator fieldValidator : fieldValidators) { + if (fieldValidator.isNotValid(connection, user, message)) { + return; + } + } + processMessageWithValidFields(connection, user, message); + } + + protected void processMessageWithValidFields(MqttConnection connection, U user, M message) {} + + @Override + protected final void processValidMessage(MqttConnection connection, U user, NetworkMqttSession session, M message) { + for (MqttInMessageFieldValidator fieldValidator : fieldValidators) { + if (fieldValidator.isNotValid(connection, user, message)) { + return; + } + } + processMessageWithValidFields(connection, user, session, message); + } + + protected void processMessageWithValidFields( + MqttConnection connection, + U user, + NetworkMqttSession session, + M message) {} +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java index af2a2d6e..e846fc1e 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java @@ -1,10 +1,8 @@ package javasabr.mqtt.service.message.handler.impl; -import javasabr.mqtt.model.MqttClientConnectionConfig; +import java.util.List; import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.MqttProtocolErrors; -import javasabr.mqtt.model.PayloadFormat; -import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.message.MqttMessageType; import javasabr.mqtt.model.publishing.Publish; import javasabr.mqtt.model.reason.code.DisconnectReasonCode; @@ -14,31 +12,37 @@ import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.impl.ExternalNetworkMqttUser; import javasabr.mqtt.network.message.in.PublishMqttInMessage; -import javasabr.mqtt.network.message.out.MqttOutMessage; import javasabr.mqtt.network.session.NetworkMqttSession; +import javasabr.mqtt.service.AuthorizationService; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishReceivingService; import javasabr.mqtt.service.TopicService; +import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator; import javasabr.rlib.common.util.StringUtils; import lombok.AccessLevel; import lombok.CustomLog; import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; @CustomLog @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class PublishMqttInMessageHandler - extends AbstractMqttInMessageHandler { + extends FieldsValidatedMqttInMessageHandler { PublishReceivingService publishReceivingService; TopicService topicService; + AuthorizationService authorizationService; public PublishMqttInMessageHandler( PublishReceivingService publishReceivingService, MessageOutFactoryService messageOutFactoryService, - TopicService topicService) { - super(ExternalNetworkMqttUser.class, PublishMqttInMessage.class, messageOutFactoryService); + TopicService topicService, + AuthorizationService authorizationService, + List> fieldValidators) { + super(ExternalNetworkMqttUser.class, PublishMqttInMessage.class, messageOutFactoryService, fieldValidators); this.publishReceivingService = publishReceivingService; this.topicService = topicService; + this.authorizationService = authorizationService; } @Override @@ -47,77 +51,28 @@ public MqttMessageType messageType() { } @Override - protected void processValidMessage( + protected void processMessageWithValidFields( MqttConnection connection, ExternalNetworkMqttUser user, NetworkMqttSession session, PublishMqttInMessage publishMessage) { - - if (!validateBaseFields(connection, user, publishMessage)) { + + TopicName finalTopicName = resolveFinalTopicName(user, session, publishMessage); + if (finalTopicName == null) { + return; + } else if (!authorizationService.authorizePublish(user, finalTopicName)) { + handleNotAuthorize(user); return; - } - - String rawResponseTopicName = publishMessage.rawResponseTopicName(); - TopicName responseTopicName = null; - if (rawResponseTopicName != null) { - if (!TopicValidator.validateTopicName(rawResponseTopicName)) { - log.warning(user.clientId(), rawResponseTopicName, "[%s] Provided invalid response TopicName:[%s]"::formatted); - handleInvalidResponseTopicName(user); - return; - } - responseTopicName = topicService.createTopicName(user, rawResponseTopicName); - } - - MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); - int topicAliasMaxValue = connectionConfig.topicAliasMaxValue(); - TopicNameMapping topicNameMapping = session.topicNameMapping(); - TopicName topicNameByAlias = null; - - String rawTopicName = publishMessage.rawTopicName(); - boolean providedRawTopicName = !StringUtils.isEmpty(rawTopicName); - int topicAlias = publishMessage.topicAlias(); - - if (!providedRawTopicName) { - if (topicAlias == MqttProperties.TOPIC_ALIAS_NOT_SET) { - log.warning(user.clientId(), "[%s] Not provided any information about TopicName"::formatted); - handleNotProvidedTopicName(user); - return; - } else if (topicAlias < MqttProperties.TOPIC_ALIAS_MIN || topicAlias > topicAliasMaxValue) { - log.warning(user.clientId(), topicAlias, "[%s] Provided invalid TopicAlias:[%d]"::formatted); - handleInvalidTopicAlias(user); - return; - } - topicNameByAlias = topicNameMapping.resolve(topicAlias); - if (topicNameByAlias == null) { - log.warning(user.clientId(), topicAlias, "[%s] Unknown TopicAlias:[%d]"::formatted); - handleNotProvidedTopicName(user); - return; - } - } - - TopicName topicName; - - if (providedRawTopicName) { - if (!TopicValidator.validateTopicName(rawTopicName)) { - handleInvalidTopicName(user, session, publishMessage); - log.warning(user.clientId(), publishMessage.rawTopicName(), "[%s] TopicName:[%s] is invalid"::formatted); - return; - } - topicName = topicService.createTopicName(user, rawTopicName); - if (topicAlias != MqttProperties.TOPIC_ALIAS_NOT_SET) { - topicNameMapping.update(topicAlias, topicName); - } - } else { - topicName = topicNameByAlias; } byte[] payload = publishMessage.payload(); + TopicName responseTopicName = resolveResponseTopic(user, publishMessage); //noinspection DataFlowIssue everything is already validated Publish publish = new Publish( publishMessage.messageId(), publishMessage.qos(), - topicName, + finalTopicName, responseTopicName, payload, publishMessage.duplicate(), @@ -126,119 +81,73 @@ protected void processValidMessage( publishMessage.subscriptionIds(), publishMessage.correlationData(), publishMessage.messageExpiryInterval(), - topicAlias, + publishMessage.topicAlias(), publishMessage.payloadFormat(), publishMessage.userProperties()); publishReceivingService.processPublish(user, publish); } - - private boolean validateBaseFields( - MqttConnection connection, + + @Nullable + private TopicName resolveFinalTopicName( ExternalNetworkMqttUser user, + NetworkMqttSession session, PublishMqttInMessage publishMessage) { - byte[] payload = publishMessage.payload(); - if (payload == null) { - log.warning(user.clientId(), "[%s] Unexpected missed payload"::formatted); - return false; - } - QoS requestedQos = publishMessage.qos(); - MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); - if (connectionConfig.maxQos().isLowerThan(requestedQos)) { - log.warning(user.clientId(), requestedQos, "[%s] Requested QoS:[%s] is not supported"::formatted); - handleNotSupportedQos(user); - return false; - } + TopicNameMapping topicNameMapping = session.topicNameMapping(); + String rawTopicName = publishMessage.rawTopicName(); + boolean providedRawTopicName = !StringUtils.isEmpty(rawTopicName); + int topicAlias = publishMessage.topicAlias(); - boolean retain = publishMessage.retain(); - if (retain && !connectionConfig.retainAvailable()) { - log.warning(user.clientId(), "[%s] 'RETAIN' option is not supported"::formatted); - handleNotSupportedRetain(user); - return false; - } + TopicName topicNameByAlias; + TopicName finalTopicName; - PayloadFormat payloadFormat = publishMessage.payloadFormat(); - if (payloadFormat == PayloadFormat.INVALID) { - log.warning(user.clientId(), "[%s] Provided invalid PayloadFormat"::formatted); - handleInvalidPayloadFormat(user); - return false; + if (!providedRawTopicName) { + topicNameByAlias = topicNameMapping.resolve(topicAlias); + if (topicNameByAlias == null) { + log.warning(user.clientId(), topicAlias, "[%s] Unknown TopicAlias:[%d]"::formatted); + handleNotProvidedTopicName(user); + return null; + } + finalTopicName = topicNameByAlias; + } else { + if (!TopicValidator.validateTopicName(rawTopicName)) { + handleInvalidTopicName(user); + log.warning(user.clientId(), rawTopicName, "[%s] TopicName:[%s] is invalid"::formatted); + return null; + } + finalTopicName = topicService.createTopicName(user, rawTopicName); + if (topicAlias != MqttProperties.TOPIC_ALIAS_NOT_SET) { + topicNameMapping.update(topicAlias, finalTopicName); + } } + return finalTopicName; + } - long messageExpiryInterval = publishMessage.messageExpiryInterval(); - if (messageExpiryInterval != MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET - && messageExpiryInterval < MqttProperties.MESSAGE_EXPIRY_INTERVAL_MIN) { - log.warning(user.clientId(), "[%s] Provided invalid MessageExpiryInterval"::formatted); - handleInvalidMessageExpiryInterval(user); - return false; + @Nullable + private TopicName resolveResponseTopic(ExternalNetworkMqttUser user, PublishMqttInMessage publishMessage) { + String rawResponseTopicName = publishMessage.rawResponseTopicName(); + if (rawResponseTopicName != null) { + return topicService.createTopicName(user, rawResponseTopicName); } - return true; + return null; } - private void handleNotSupportedQos(ExternalNetworkMqttUser user) { + private void handleNotProvidedTopicName(ExternalNetworkMqttUser user) { user.closeWithReason(messageOutFactoryService .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.QOS_NOT_SUPPORTED)); + .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.NO_ANY_TOPIC_NANE)); } - - private void handleNotSupportedRetain(ExternalNetworkMqttUser user) { + + private void handleNotAuthorize(ExternalNetworkMqttUser user) { user.closeWithReason(messageOutFactoryService .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.RETAIN_NOT_SUPPORTED)); - } - - private void handleNotProvidedTopicName(ExternalNetworkMqttUser user) { - MqttOutMessage response = messageOutFactoryService - .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.NO_ANY_TOPIC_NANE); - user.closeWithReason(response); - } - - private void handleInvalidTopicAlias(ExternalNetworkMqttUser user) { - MqttOutMessage response = messageOutFactoryService - .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.TOPIC_ALIAS_INVALID); - user.closeWithReason(response); - } - - private void handleInvalidPayloadFormat(ExternalNetworkMqttUser user) { - MqttOutMessage response = messageOutFactoryService - .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT); - user.closeWithReason(response); - } - - private void handleInvalidResponseTopicName(ExternalNetworkMqttUser user) { - MqttOutMessage response = messageOutFactoryService - .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.INVALID_RESPONSE_TOPIC_NAME); - user.closeWithReason(response); + .newDisconnect(user, DisconnectReasonCode.NOT_AUTHORIZED)); } - private void handleInvalidMessageExpiryInterval(ExternalNetworkMqttUser user) { - MqttOutMessage response = messageOutFactoryService - .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL); - user.closeWithReason(response); - } - - private void handleInvalidTopicName( - ExternalNetworkMqttUser user, - NetworkMqttSession session, - PublishMqttInMessage publishMessage) { - int messagedId = publishMessage.messageId(); - MqttOutMessage response = messageOutFactoryService + private void handleInvalidTopicName(ExternalNetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService .resolveFactory(user) - .newDisconnect(user, DisconnectReasonCode.TOPIC_NAME_INVALID); - // without messageId we do not need to clean it - if (messagedId == MqttProperties.MESSAGE_ID_IS_NOT_SET) { - user.closeWithReason(response); - return; - } - user - .closeWithReason(response) - .thenAccept(_ -> session - .inMessageTracker() - .remove(messagedId)); + .newDisconnect(user, DisconnectReasonCode.TOPIC_NAME_INVALID)); } } diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/MqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/MqttInMessageFieldValidator.java new file mode 100644 index 00000000..6bcd29f2 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/MqttInMessageFieldValidator.java @@ -0,0 +1,14 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttUser; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.MqttInMessage; + +public abstract class MqttInMessageFieldValidator { + + public abstract boolean isNotValid(MqttConnection connection, U user, M message); + + public int order() { + return 0; + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishMessageExpiryIntervalMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishMessageExpiryIntervalMqttInMessageFieldValidator.java new file mode 100644 index 00000000..3092bcba --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishMessageExpiryIntervalMqttInMessageFieldValidator.java @@ -0,0 +1,50 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttProperties; +import javasabr.mqtt.model.MqttProtocolErrors; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishMessageExpiryIntervalMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = PublishRetainMqttInMessageFieldValidator.ORDER + 1; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + long messageExpiryInterval = message.messageExpiryInterval(); + if (messageExpiryInterval != MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET + && messageExpiryInterval < MqttProperties.MESSAGE_EXPIRY_INTERVAL_MIN) { + log.warning(user.clientId(), "[%s] Provided invalid MessageExpiryInterval"::formatted); + handleInvalidMessageExpiryInterval(user); + return true; + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleInvalidMessageExpiryInterval(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect( + user, + DisconnectReasonCode.PROTOCOL_ERROR, + MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishPayloadMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishPayloadMqttInMessageFieldValidator.java new file mode 100644 index 00000000..63b8ce46 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishPayloadMqttInMessageFieldValidator.java @@ -0,0 +1,54 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttProtocolErrors; +import javasabr.mqtt.model.PayloadFormat; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishPayloadMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = 10; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + byte[] payload = message.payload(); + if (payload == null) { + log.warning(user.clientId(), "[%s] Missed payload"::formatted); + return true; + } + PayloadFormat payloadFormat = message.payloadFormat(); + if (payloadFormat == PayloadFormat.INVALID) { + log.warning(user.clientId(), "[%s] Provided invalid PayloadFormat"::formatted); + handleInvalidPayloadFormat(user); + return true; + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleInvalidPayloadFormat(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect( + user, + DisconnectReasonCode.PROTOCOL_ERROR, + MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishQosMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishQosMqttInMessageFieldValidator.java new file mode 100644 index 00000000..2076b711 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishQosMqttInMessageFieldValidator.java @@ -0,0 +1,47 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttClientConnectionConfig; +import javasabr.mqtt.model.QoS; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishQosMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = PublishPayloadMqttInMessageFieldValidator.ORDER + 1; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + QoS requestedQos = message.qos(); + MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); + if (connectionConfig.maxQos().isLowerThan(requestedQos)) { + log.warning(user.clientId(), requestedQos, "[%s] Requested QoS:[%s] is not supported"::formatted); + handleNotSupportedQos(user); + return true; + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleNotSupportedQos(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect(user, DisconnectReasonCode.QOS_NOT_SUPPORTED)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishResponseTopicMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishResponseTopicMqttInMessageFieldValidator.java new file mode 100644 index 00000000..299d16d7 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishResponseTopicMqttInMessageFieldValidator.java @@ -0,0 +1,51 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttProtocolErrors; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.model.topic.TopicValidator; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishResponseTopicMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = PublishMessageExpiryIntervalMqttInMessageFieldValidator.ORDER + 1; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + String rawResponseTopicName = message.rawResponseTopicName(); + if (rawResponseTopicName != null) { + if (!TopicValidator.validateTopicName(rawResponseTopicName)) { + log.warning(user.clientId(), rawResponseTopicName, "[%s] Provided invalid ResponseTopic:[%s]"::formatted); + handleInvalidResponseTopicName(user); + return true; + } + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleInvalidResponseTopicName(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect( + user, + DisconnectReasonCode.PROTOCOL_ERROR, + MqttProtocolErrors.PROVIDED_INVALID_RESPONSE_TOPIC)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishRetainMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishRetainMqttInMessageFieldValidator.java new file mode 100644 index 00000000..6fff81d2 --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishRetainMqttInMessageFieldValidator.java @@ -0,0 +1,46 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttClientConnectionConfig; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishRetainMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = PublishQosMqttInMessageFieldValidator.ORDER + 1; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); + boolean retain = message.retain(); + if (retain && !connectionConfig.retainAvailable()) { + log.warning(user.clientId(), "[%s] 'RETAIN' option is not supported"::formatted); + handleNotSupportedRetain(user); + return true; + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleNotSupportedRetain(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect(user, DisconnectReasonCode.RETAIN_NOT_SUPPORTED)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishTopicAliasMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishTopicAliasMqttInMessageFieldValidator.java new file mode 100644 index 00000000..9e8d023f --- /dev/null +++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/PublishTopicAliasMqttInMessageFieldValidator.java @@ -0,0 +1,64 @@ +package javasabr.mqtt.service.message.validator; + +import javasabr.mqtt.model.MqttClientConnectionConfig; +import javasabr.mqtt.model.MqttProperties; +import javasabr.mqtt.model.MqttProtocolErrors; +import javasabr.mqtt.model.reason.code.DisconnectReasonCode; +import javasabr.mqtt.network.MqttConnection; +import javasabr.mqtt.network.message.in.PublishMqttInMessage; +import javasabr.mqtt.network.user.NetworkMqttUser; +import javasabr.mqtt.service.MessageOutFactoryService; +import javasabr.rlib.common.util.StringUtils; +import lombok.AccessLevel; +import lombok.CustomLog; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; + +@CustomLog +@RequiredArgsConstructor +@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) +public class PublishTopicAliasMqttInMessageFieldValidator extends + MqttInMessageFieldValidator { + + public static final int ORDER = PublishResponseTopicMqttInMessageFieldValidator.ORDER + 1; + + MessageOutFactoryService messageOutFactoryService; + + @Override + public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, PublishMqttInMessage message) { + boolean providedRawTopicName = !StringUtils.isEmpty(message.rawTopicName()); + if (!providedRawTopicName) { + int topicAlias = message.topicAlias(); + if (topicAlias == MqttProperties.TOPIC_ALIAS_NOT_SET) { + log.warning(user.clientId(), "[%s] Not provided any information about TopicName"::formatted); + handleNotProvidedTopicName(user); + return true; + } + MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); + int topicAliasMaxValue = connectionConfig.topicAliasMaxValue(); + if (topicAlias < MqttProperties.TOPIC_ALIAS_MIN || topicAlias > topicAliasMaxValue) { + log.warning(user.clientId(), topicAlias, "[%s] Provided invalid TopicAlias:[%d]"::formatted); + handleInvalidTopicAlias(user); + return true; + } + } + return false; + } + + @Override + public int order() { + return ORDER; + } + + private void handleNotProvidedTopicName(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.NO_ANY_TOPIC_NANE)); + } + + private void handleInvalidTopicAlias(NetworkMqttUser user) { + user.closeWithReason(messageOutFactoryService + .resolveFactory(user) + .newDisconnect(user, DisconnectReasonCode.TOPIC_ALIAS_INVALID)); + } +} diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java index 86bbf3b9..040a8721 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java @@ -15,7 +15,6 @@ import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.SubscriptionService; import lombok.CustomLog; -import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @CustomLog diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy index f1054d75..2da0a9f0 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy @@ -1,6 +1,5 @@ package javasabr.mqtt.service - import javasabr.mqtt.model.MqttClientConnectionConfig import javasabr.mqtt.model.MqttProperties import javasabr.mqtt.model.MqttServerConnectionConfig @@ -8,14 +7,24 @@ import javasabr.mqtt.model.MqttVersion import javasabr.mqtt.model.QoS import javasabr.mqtt.network.MqttConnection import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler +import javasabr.mqtt.network.message.in.PublishMqttInMessage +import javasabr.mqtt.network.user.NetworkMqttUser import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService import javasabr.mqtt.service.impl.DefaultPublishDeliveringService import javasabr.mqtt.service.impl.DefaultPublishReceivingService import javasabr.mqtt.service.impl.DefaultTopicService +import javasabr.mqtt.service.impl.DisabledAuthorizationService import javasabr.mqtt.service.impl.InMemorySubscriptionService import javasabr.mqtt.service.message.handler.impl.PublishReleaseMqttInMessageHandler import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory +import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishMessageExpiryIntervalMqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishPayloadMqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishQosMqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishResponseTopicMqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishRetainMqttInMessageFieldValidator +import javasabr.mqtt.service.message.validator.PublishTopicAliasMqttInMessageFieldValidator import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishInMessageHandler import javasabr.mqtt.service.publish.handler.impl.Qos0MqttPublishOutMessageHandler import javasabr.mqtt.service.publish.handler.impl.Qos1MqttPublishInMessageHandler @@ -87,6 +96,19 @@ abstract class IntegrationServiceSpecification extends Specification { @Shared def defaultMqttSessionService = new InMemoryMqttSessionService(60_000); + + @Shared + def disabledAclService = new DisabledAuthorizationService() + + @Shared + List> publishInFieldValidators = [ + new PublishRetainMqttInMessageFieldValidator(defaultMessageOutFactoryService), + new PublishQosMqttInMessageFieldValidator(defaultMessageOutFactoryService), + new PublishPayloadMqttInMessageFieldValidator(defaultMessageOutFactoryService), + new PublishMessageExpiryIntervalMqttInMessageFieldValidator(defaultMessageOutFactoryService), + new PublishResponseTopicMqttInMessageFieldValidator(defaultMessageOutFactoryService), + new PublishTopicAliasMqttInMessageFieldValidator(defaultMessageOutFactoryService) + ] @Shared def defaultExternalServerConnectionConfig = new MqttServerConnectionConfig( diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy index cc47a675..daaca7b1 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy @@ -23,7 +23,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def expectedTopicAlias = 5 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser @@ -51,7 +53,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser mqttUser.returnCompletedFeatures(false) @@ -77,7 +81,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser mqttUser.session(null) when: @@ -96,7 +102,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: def publishMessage = new PublishMqttInMessage(0b0110_0010 as byte) {{ @@ -117,7 +125,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser mqttUser @@ -145,7 +155,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: def publishMessage = new PublishMqttInMessage(0 as byte) @@ -162,7 +174,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -186,7 +200,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -208,7 +224,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -231,7 +249,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -254,7 +274,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -267,7 +289,7 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { then: def disconnectReason = mqttUser.nextSentMessage(DisconnectMqtt5OutMessage) disconnectReason.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR - disconnectReason.reason() == MqttProtocolErrors.INVALID_RESPONSE_TOPIC_NAME + disconnectReason.reason() == MqttProtocolErrors.PROVIDED_INVALID_RESPONSE_TOPIC disconnectReason.serverReference() == null } @@ -277,7 +299,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -299,7 +323,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: 'topic alias is too high' @@ -334,7 +360,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: @@ -357,7 +385,9 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { def messageHandler = new PublishMqttInMessageHandler( publishReceivingService, defaultMessageOutFactoryService, - defaultTopicService) + defaultTopicService, + disabledAclService, + publishInFieldValidators) def expectedMessageId = 15 def mqttUser = mqttConnection.user() as TestExternalNetworkMqttUser when: diff --git a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java index f59e317f..f66d1dc0 100644 --- a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java +++ b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java @@ -2,11 +2,11 @@ public interface MqttProtocolErrors { String NO_ANY_TOPIC_FILTERS = "Not provided any information about 'Topic Filters'"; - String NO_ANY_TOPIC_NANE = "Not provided any information about TopicName"; + String NO_ANY_TOPIC_NANE = "Not provided any information about `Topic Name`"; //String INVALID_TOPIC_ALIAS = "Provided invalid TopicAlias"; - String PROVIDED_INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat"; - String PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval"; + String PROVIDED_INVALID_PAYLOAD_FORMAT = "Provided invalid 'Payload Format Indicator'"; + String PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid 'Message Expiry Interval'"; String PROVIDED_INVALID_SESSION_EXPIRY_INTERVAL = "Provided invalid 'Session Expiry Interval'"; String PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES = "Provided invalid 'Receive Maximum'"; String PROVIDED_INVALID_MAX_QOS = "Provided invalid 'Maximum QoS'"; @@ -17,8 +17,8 @@ public interface MqttProtocolErrors { String PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE = "Provided invalid 'Subscription Identifiers Available'"; String PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Shared Subscription Available'"; String PROVIDED_INVALID_SERVER_KEEP_ALIVE = "Provided invalid 'Server Keep Alive'"; - - String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName"; + String PROVIDED_INVALID_RESPONSE_TOPIC = "Provided invalid 'Response Topic'"; + String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Provided unsupported 'QoS' or 'RetainHandling'"; String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'"; String NOT_EXPECTED_MESSAGE_ID = "'Packet Identifier' must be zero'";