Skip to content

Commit 5db7fd7

Browse files
authored
Improve Publish Implementation, part 1 (#53)
Implement full validation of incoming publish messages
1 parent 5c31e14 commit 5db7fd7

File tree

46 files changed

+1218
-282
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1218
-282
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import javasabr.mqtt.network.MqttConnection;
1010
import javasabr.mqtt.network.MqttConnectionFactory;
1111
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
12+
import javasabr.mqtt.network.impl.ExternalMqttClient;
1213
import javasabr.mqtt.service.AuthenticationService;
1314
import javasabr.mqtt.service.ClientIdRegistry;
1415
import javasabr.mqtt.service.ConnectionService;
@@ -193,8 +194,8 @@ MqttInMessageHandler unsubscribeMqttInMessageHandler(
193194
}
194195

195196
@Bean
196-
ConnectionService mqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
197-
return new DefaultConnectionService(inMessageHandlers);
197+
ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
198+
return new DefaultConnectionService(ExternalMqttClient.class, inMessageHandlers);
198199
}
199200

200201
@Bean
@@ -298,7 +299,7 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
298299
env.getProperty(
299300
"mqtt.external.connection.topic.alias.maximum",
300301
int.class,
301-
MqttProperties.TOPIC_ALIAS_DEFAULT),
302+
MqttProperties.TOPIC_ALIAS_MAX_DEFAULT),
302303
env.getProperty(
303304
"mqtt.external.connection.default.session.expiration.time",
304305
long.class,

application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ class IntegrationSpecification extends Specification {
152152
isSupported(MqttVersion.MQTT_5) >> true
153153
isSupported(MqttVersion.MQTT_3_1_1) >> true
154154
serverConnectionConfig() >> serverConnConfig
155+
clientConnectionConfig() >> clientConnConfig
155156
client() >> Stub(UnsafeMqttClient) {
156157
connectionConfig() >> clientConnConfig
157158
connection() >> connectionRef.get()
@@ -179,6 +180,7 @@ class IntegrationSpecification extends Specification {
179180
isSupported(MqttVersion.MQTT_5) >> false
180181
isSupported(MqttVersion.MQTT_3_1_1) >> true
181182
serverConnectionConfig() >> serverConnConfig
183+
clientConnectionConfig() >> clientConnConfig
182184
client() >> Stub(UnsafeMqttClient) {
183185
connectionConfig() >> clientConnConfig
184186
connection() >> connectionRef.get()

application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,15 @@ class PublishRetryTest extends IntegrationSpecification {
4040
subscriber.send(new ConnectMqtt311OutMessage(subscriberId, keepAlive))
4141
def connectAck = subscriber.readNext() as ConnectAckMqttInMessage
4242
then:
43-
connectAck.reasonCode == ConnectAckReasonCode.SUCCESS
43+
connectAck.reasonCode() == ConnectAckReasonCode.SUCCESS
4444
when:
4545
subscriber.send(new SubscribeMqtt311OutMessage(
4646
1,
4747
Array.of(Subscription.minimal(TopicFilter.valueOf("test/retry/$subscriberId"), QoS.AT_LEAST_ONCE))))
4848
def subscribeAck = subscriber.readNext() as SubscribeAckMqttInMessage
4949
then:
50-
subscribeAck.reasonCodes.stream()
50+
subscribeAck.reasonCodes()
51+
.stream()
5152
.allMatch({ it == SubscribeAckReasonCode.GRANTED_QOS_1 })
5253
when:
5354
publisher.publishWith()
@@ -58,7 +59,7 @@ class PublishRetryTest extends IntegrationSpecification {
5859
.join()
5960
def receivedPublish = subscriber.readNext() as PublishMqttInMessage
6061
then:
61-
receivedPublish.payload == publishPayload
62+
receivedPublish.payload() == publishPayload
6263
when:
6364
subscriber.disconnect()
6465
Thread.sleep(1_000)

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ moquette-broker = "0.17"
3232
# https://mvnrepository.com/artifact/tools.jackson.core/jackson-core
3333
jackson = "3.0.1"
3434

35+
3536
[libraries]
3637
rlib-network = { module = "javasabr.rlib:rlib-network", version.ref = "rlib" }
3738
rlib-logger-api = { module = "javasabr.rlib:rlib-logger-api", version.ref = "rlib" }

model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ public int maxTopicLevels() {
3939
public int maxStringLength() {
4040
return server.maxStringLength();
4141
}
42+
43+
public int maxBinarySize() {
44+
return server.maxBinarySize();
45+
}
4246
}

model/src/main/java/javasabr/mqtt/model/MqttProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ public interface MqttProperties {
2727
int MAXIMUM_BINARY_SIZE = 2048;
2828
int MAXIMUM_TOPIC_LEVELS = 10;
2929

30-
long MESSAGE_EXPIRY_INTERVAL_UNDEFINED = -1;
30+
long MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET = -1;
3131
long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0;
32+
long MESSAGE_EXPIRY_INTERVAL_MIN = 0;
3233

3334
int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1;
3435
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;
@@ -39,11 +40,11 @@ public interface MqttProperties {
3940
int SERVER_KEEP_ALIVE_MIN = 0;
4041
int SERVER_KEEP_ALIVE_MAX = 0xFFFF;
4142

42-
int TOPIC_ALIAS_UNDEFINED = 0;
4343
int TOPIC_ALIAS_MIN = 1;
44-
int TOPIC_ALIAS_DEFAULT = 10;
44+
int TOPIC_ALIAS_MAX_DEFAULT = 10;
4545
int TOPIC_ALIAS_MAX = 0xFFFF;
46-
int TOPIC_ALIAS_NOT_SET = 0;
46+
int TOPIC_ALIAS_NOT_SET = Integer.MIN_VALUE;
47+
int TOPIC_ALIAS_INVALID = 0;
4748

4849
int SUBSCRIPTION_ID_IS_NOT_SET = 0;
4950
int MESSAGE_ID_IS_NOT_SET = 0;
Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
package javasabr.mqtt.model;
22

33
public interface MqttProtocolErrors {
4-
String NO_ANY_TOPIC_FILTER = "No any topic filters";
5-
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported qos or retain handling";
4+
String NO_ANY_TOPIC_FILTER = "Not provided any information about TopicFilters";
5+
String NO_ANY_TOPIC_NANE = "Not provided any information about TopicName";
6+
//String INVALID_TOPIC_ALIAS = "Provided invalid TopicAlias";
7+
String INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat";
8+
String INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval";
9+
String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName";
10+
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Unsupported 'QoS' or 'RetainHandling'";
11+
String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'";
12+
String PROTOCOL_LEVEL_UNSUPPORTED_NO_LOCAL_OPTION = "'NoLocal' option is not available on this protocol level";
13+
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_AS_PUBLISH_OPTION = "'RetainAsPublished' option is not available on this protocol level";
14+
String PROTOCOL_LEVEL_UNSUPPORTED_RETAIN_HANDLING_OPTION = "'RetainHandling' option is not available on this protocol level";
615
}
Lines changed: 20 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package javasabr.mqtt.model;
22

33
import javasabr.rlib.common.util.NumberUtils;
4+
import lombok.Builder;
45

6+
@Builder(toBuilder = true)
57
public record MqttServerConnectionConfig(
68
QoS maxQos,
79
int maxMessageSize,
@@ -71,78 +73,32 @@ public MqttServerConnectionConfig(
7173
}
7274

7375
public MqttServerConnectionConfig withMaxQos(QoS maxQos) {
74-
return new MqttServerConnectionConfig(
75-
maxQos,
76-
maxMessageSize,
77-
maxStringLength,
78-
maxBinarySize,
79-
maxTopicLevels,
80-
minKeepAliveTime,
81-
receiveMaxPublishes,
82-
topicAliasMaxValue,
83-
defaultSessionExpiryInterval,
84-
keepAliveEnabled,
85-
sessionsEnabled,
86-
retainAvailable,
87-
wildcardSubscriptionAvailable,
88-
subscriptionIdAvailable,
89-
sharedSubscriptionAvailable);
76+
return toBuilder()
77+
.maxQos(maxQos)
78+
.build();
9079
}
9180

9281
public MqttServerConnectionConfig withWildcardSubscriptionAvailable(boolean wildcardSubscriptionAvailable) {
93-
return new MqttServerConnectionConfig(
94-
maxQos,
95-
maxMessageSize,
96-
maxStringLength,
97-
maxBinarySize,
98-
maxTopicLevels,
99-
minKeepAliveTime,
100-
receiveMaxPublishes,
101-
topicAliasMaxValue,
102-
defaultSessionExpiryInterval,
103-
keepAliveEnabled,
104-
sessionsEnabled,
105-
retainAvailable,
106-
wildcardSubscriptionAvailable,
107-
subscriptionIdAvailable,
108-
sharedSubscriptionAvailable);
82+
return toBuilder()
83+
.wildcardSubscriptionAvailable(wildcardSubscriptionAvailable)
84+
.build();
10985
}
11086

11187
public MqttServerConnectionConfig withSharedSubscriptionAvailable(boolean sharedSubscriptionAvailable) {
112-
return new MqttServerConnectionConfig(
113-
maxQos,
114-
maxMessageSize,
115-
maxStringLength,
116-
maxBinarySize,
117-
maxTopicLevels,
118-
minKeepAliveTime,
119-
receiveMaxPublishes,
120-
topicAliasMaxValue,
121-
defaultSessionExpiryInterval,
122-
keepAliveEnabled,
123-
sessionsEnabled,
124-
retainAvailable,
125-
wildcardSubscriptionAvailable,
126-
subscriptionIdAvailable,
127-
sharedSubscriptionAvailable);
88+
return toBuilder()
89+
.sharedSubscriptionAvailable(sharedSubscriptionAvailable)
90+
.build();
12891
}
12992

13093
public MqttServerConnectionConfig withSubscriptionIdAvailable(boolean subscriptionIdAvailable) {
131-
return new MqttServerConnectionConfig(
132-
maxQos,
133-
maxMessageSize,
134-
maxStringLength,
135-
maxBinarySize,
136-
maxTopicLevels,
137-
minKeepAliveTime,
138-
receiveMaxPublishes,
139-
topicAliasMaxValue,
140-
defaultSessionExpiryInterval,
141-
keepAliveEnabled,
142-
sessionsEnabled,
143-
retainAvailable,
144-
wildcardSubscriptionAvailable,
145-
subscriptionIdAvailable,
146-
sharedSubscriptionAvailable);
94+
return toBuilder()
95+
.subscriptionIdAvailable(subscriptionIdAvailable)
96+
.build();
97+
}
98+
99+
public MqttServerConnectionConfig withRetainAvailable(boolean retainAvailable) {
100+
return toBuilder()
101+
.retainAvailable(retainAvailable)
102+
.build();
147103
}
148104
}

model/src/main/java/javasabr/mqtt/model/QoS.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public int number() {
3737
public QoS lower(QoS alternative) {
3838
return level > alternative.level ? alternative : this;
3939
}
40+
41+
public boolean isLower(QoS another) {
42+
return level < another.level;
43+
}
4044
}

model/src/main/java/javasabr/mqtt/model/topic/AbstractTopic.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.model.topic;
22

3+
import java.util.Objects;
34
import javasabr.mqtt.base.util.DebugUtils;
45
import javasabr.rlib.common.util.StringUtils;
56
import lombok.AccessLevel;
@@ -53,6 +54,20 @@ public String toString() {
5354
return rawTopic;
5455
}
5556

57+
@Override
58+
public boolean equals(Object object) {
59+
if (object == null || getClass() != object.getClass()) {
60+
return false;
61+
}
62+
var that = (AbstractTopic) object;
63+
return Objects.equals(rawTopic, that.rawTopic);
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hashCode(rawTopic);
69+
}
70+
5671
protected static String[] splitTopic(String topic) {
5772
int segmentCount = countOccurrencesOf(topic, AbstractTopic.DELIMITER) + 1;
5873
var segments = new String[segmentCount];

0 commit comments

Comments
 (0)