Skip to content

Commit 294a6c6

Browse files
authored
Improve publish delivering to subscribers part 3 (#133)
1 parent f53c249 commit 294a6c6

File tree

38 files changed

+421
-357
lines changed

38 files changed

+421
-357
lines changed

network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public abstract class MqttInMessage extends AbstractReadableNetworkPacket<MqttCo
3939
DebugUtils.registerIncludedFields("userProperties");
4040
}
4141

42-
public static final Array<StringPair> EMPTY_USER_PROPERTIES = Array.empty(StringPair.class);
4342
protected static final Array<String> EMPTY_STRINGS = Array.empty(String.class);
4443

4544
private record Utf8Decoder(CharsetDecoder decoder, ByteBuffer inBuffer, CharBuffer outBuffer) {}

network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* Publish message.
2525
*/
2626
@Getter
27-
@Accessors(fluent = true)
27+
@Accessors
2828
@FieldDefaults(level = AccessLevel.PROTECTED)
2929
public class PublishMqttInMessage extends TrackableMqttInMessage {
3030

network/src/main/java/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
import javasabr.mqtt.model.message.MqttMessageType;
44
import javasabr.mqtt.network.MqttConnection;
5-
import lombok.AccessLevel;
6-
import lombok.experimental.FieldDefaults;
75

86
/**
97
* Publish complete (QoS 2 delivery part 3).
108
*/
11-
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
129
public class PublishCompleteMqtt311OutMessage extends TrackableMqttOutMessage {
1310

1411
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_COMPLETE.ordinal();

network/src/main/java/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
import javasabr.mqtt.model.message.MqttMessageType;
44
import javasabr.mqtt.network.MqttConnection;
5-
import lombok.AccessLevel;
6-
import lombok.experimental.FieldDefaults;
75

86
/**
97
* Publish received (QoS 2 delivery part 1).
108
*/
11-
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
129
public class PublishReceivedMqtt311OutMessage extends TrackableMqttOutMessage {
1310

1411
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_RECEIVED.ordinal();

network/src/main/java/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@
33
import java.nio.ByteBuffer;
44
import javasabr.mqtt.model.message.MqttMessageType;
55
import javasabr.mqtt.network.MqttConnection;
6-
import lombok.AccessLevel;
7-
import lombok.experimental.FieldDefaults;
86

97
/**
108
* Publish release (QoS 2 delivery part 2).
119
*/
12-
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
1310
public class PublishReleaseMqtt311OutMessage extends TrackableMqttOutMessage {
1411

1512
public static final int MESSAGE_FLAGS = 0b0000_0010;

network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
1414
it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
1515
it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
1616
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
17-
it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties)
17+
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
1818
}
1919
def dataBuffer = BufferUtils.prepareBuffer(512) {
2020
it.put(AuthenticateReasonCode.SUCCESS)
@@ -30,12 +30,12 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
3030
inMessage.authenticationMethod() == authMethod
3131
inMessage.authenticationData() == authData
3232
inMessage.reason() == reasonString
33-
inMessage.userProperties() == userProperties
33+
inMessage.userProperties() == testUserProperties
3434
when:
3535
def propertiesBuffer2 = BufferUtils.prepareBuffer(512) {
3636
it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
3737
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
38-
it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties)
38+
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
3939
it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
4040
}
4141
def dataBuffer2 = BufferUtils.prepareBuffer(512) {
@@ -51,7 +51,7 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
5151
inMessage2.authenticationMethod() == authMethod
5252
inMessage2.authenticationData() == authData
5353
inMessage2.reason() == reasonString
54-
inMessage2.userProperties() == userProperties
54+
inMessage2.userProperties() == testUserProperties
5555
when:
5656
def propertiesBuffer3 = BufferUtils.prepareBuffer(512) {
5757
it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)

network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest {
191191
def "should not allow invalid message flags"() {
192192
given:
193193
def dataBuffer = BufferUtils.prepareBuffer(512) {
194-
it.putShort(messageId)
194+
it.putShort(testMessageId)
195195
it.put(PublishAckReasonCode.SUCCESS)
196196
it.putMbi(0)
197197
}

network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ConnectMqttInMessageTest extends BaseMqttInMessageTest {
4444
it.putProperty(MqttMessageProperty.REQUEST_PROBLEM_INFORMATION, requestProblemInformation ? 1 : 0)
4545
it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
4646
it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
47-
it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties)
47+
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
4848
}
4949
def dataBuffer = BufferUtils.prepareBuffer(512) {
5050
it.putString("MQTT")
@@ -76,7 +76,7 @@ class ConnectMqttInMessageTest extends BaseMqttInMessageTest {
7676
packet.willTopic() == ""
7777
packet.willQos() == 0
7878
packet.willPayload() == ArrayUtils.EMPTY_BYTE_ARRAY
79-
packet.userProperties() == userProperties
79+
packet.userProperties() == testUserProperties
8080
}
8181

8282
def "should not read packet correctly with invalid UTF8 strings"(byte[] stringBytes) {

network/src/test/groovy/javasabr/mqtt/network/message/in/DisconnectMqttInMessageTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class DisconnectMqttInMessageTest extends BaseMqttInMessageTest {
1212
it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)
1313
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
1414
it.putProperty(MqttMessageProperty.SERVER_REFERENCE, serverReference)
15-
it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties)
15+
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
1616
}
1717
def dataBuffer = BufferUtils.prepareBuffer(512) {
1818
it.putByte(DisconnectReasonCode.QUOTA_EXCEEDED.code())
@@ -28,7 +28,7 @@ class DisconnectMqttInMessageTest extends BaseMqttInMessageTest {
2828
inMessage.serverReference() == serverReference
2929
inMessage.reasonCode() == DisconnectReasonCode.QUOTA_EXCEEDED
3030
inMessage.sessionExpiryInterval() == sessionExpiryInterval
31-
inMessage.userProperties() == userProperties
31+
inMessage.userProperties() == testUserProperties
3232
when:
3333
propertiesBuffer = BufferUtils.prepareBuffer(512) {
3434
it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)

network/src/test/groovy/javasabr/mqtt/network/message/in/PublishAckMqttInMessageTest.groovy

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,30 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest {
1111
def "should read message correctly as MQTT 3.1.1"() {
1212
given:
1313
def dataBuffer = BufferUtils.prepareBuffer(512) {
14-
it.putShort(messageId)
14+
it.putShort(testMessageId)
1515
}
1616
when:
1717
def inMessage = new PublishAckMqttInMessage(PublishAckMqttInMessage.MESSAGE_FLAGS)
1818
def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
1919
then:
2020
result
21-
inMessage.exception() == null
22-
inMessage.reason() == null
23-
inMessage.messageId() == messageId
24-
inMessage.reasonCode() == PublishAckReasonCode.SUCCESS
25-
inMessage.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
21+
with(inMessage) {
22+
exception() == null
23+
reason() == null
24+
messageId() == testMessageId
25+
reasonCode() == PublishAckReasonCode.SUCCESS
26+
userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
27+
}
2628
}
2729

2830
def "should read message correctly as MQTT 5.0"() {
2931
given:
3032
def propertiesBuffer = BufferUtils.prepareBuffer(512) {
3133
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
32-
it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties)
34+
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
3335
}
3436
def dataBuffer = BufferUtils.prepareBuffer(512) {
35-
it.putShort(messageId)
37+
it.putShort(testMessageId)
3638
it.put(PublishAckReasonCode.PAYLOAD_FORMAT_INVALID)
3739
it.putMbi(propertiesBuffer.limit())
3840
it.put(propertiesBuffer)
@@ -42,26 +44,30 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest {
4244
def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
4345
then:
4446
result
45-
inMessage.exception() == null
46-
inMessage.reason() == reasonString
47-
inMessage.messageId() == messageId
48-
inMessage.reasonCode() == PublishAckReasonCode.PAYLOAD_FORMAT_INVALID
49-
inMessage.userProperties() == userProperties
47+
with(inMessage) {
48+
exception() == null
49+
reason() == reasonString
50+
messageId() == testMessageId
51+
reasonCode() == PublishAckReasonCode.PAYLOAD_FORMAT_INVALID
52+
userProperties() == userProperties
53+
}
5054
when:
5155
def dataBuffer2 = BufferUtils.prepareBuffer(512) {
52-
it.putShort(messageId)
56+
it.putShort(testMessageId)
5357
it.put(PublishAckReasonCode.UNSPECIFIED_ERROR)
5458
it.putMbi(0)
5559
}
5660
def inMessage2 = new PublishAckMqttInMessage(PublishAckMqttInMessage.MESSAGE_FLAGS)
5761
def result2 = inMessage2.read(defaultMqtt5Connection, dataBuffer2, dataBuffer2.limit())
5862
then:
5963
result2
60-
inMessage2.exception() == null
61-
inMessage2.reason() == null
62-
inMessage2.messageId() == messageId
63-
inMessage2.reasonCode() == PublishAckReasonCode.UNSPECIFIED_ERROR
64-
inMessage2.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
64+
with(inMessage2) {
65+
exception() == null
66+
reason() == null
67+
messageId() == testMessageId
68+
reasonCode() == PublishAckReasonCode.UNSPECIFIED_ERROR
69+
userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
70+
}
6571
}
6672

6773
def "should not allow to put reason 2 times"() {
@@ -71,7 +77,7 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest {
7177
it.putProperty(MqttMessageProperty.REASON_STRING, "reason1")
7278
}
7379
def dataBuffer = BufferUtils.prepareBuffer(512) {
74-
it.putShort(messageId)
80+
it.putShort(testMessageId)
7581
it.put(PublishAckReasonCode.SUCCESS)
7682
it.putMbi(propertiesBuffer.limit())
7783
it.put(propertiesBuffer)
@@ -81,14 +87,16 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest {
8187
def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
8288
then:
8389
!result
84-
inMessage.exception() instanceof MalformedProtocolMqttException
85-
inMessage.exception().message == "Property:[$MqttMessageProperty.REASON_STRING] is already presented in message:[$MqttMessageType.PUBLISH_ACK]"
90+
with(inMessage) {
91+
exception() instanceof MalformedProtocolMqttException
92+
exception().message == "Property:[$MqttMessageProperty.REASON_STRING] is already presented in message:[$MqttMessageType.PUBLISH_ACK]"
93+
}
8694
}
8795

8896
def "should not allow invalid message flags"() {
8997
given:
9098
def dataBuffer = BufferUtils.prepareBuffer(512) {
91-
it.putShort(messageId)
99+
it.putShort(testMessageId)
92100
it.put(PublishAckReasonCode.SUCCESS)
93101
it.putMbi(0)
94102
}
@@ -97,7 +105,9 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest {
97105
def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
98106
then:
99107
!result
100-
inMessage.exception() instanceof MalformedProtocolMqttException
101-
inMessage.exception().message == "Unexpected message flags:[0b0101_0101] in message:[$MqttMessageType.PUBLISH_ACK]"
108+
with(inMessage) {
109+
exception() instanceof MalformedProtocolMqttException
110+
exception().message == "Unexpected message flags:[0b0101_0101] in message:[$MqttMessageType.PUBLISH_ACK]"
111+
}
102112
}
103113
}

0 commit comments

Comments
 (0)