Skip to content

Commit c2ac2ad

Browse files
committed
Merge branch 'develop' into feature-broker-30
# Conflicts: # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java
2 parents bacefb7 + f53c249 commit c2ac2ad

18 files changed

+874
-152
lines changed

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,29 @@ protected boolean handleReceivedTrackableMessageImpl(
3737
if (trackedMessageMeta == null) {
3838
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
3939
return true;
40-
} else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH) {
40+
}
41+
42+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
43+
if (trackedMessageType != MqttMessageType.PUBLISH) {
4144
log.warning(clientId, trackedMessageMeta, messageId,
4245
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
46+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH);
4347
return true;
4448
}
49+
4550
if (!(message instanceof PublishAckMqttInMessage publishAck)) {
4651
log.warning(clientId, message.messageType(), messageId,
47-
"[%s] Not expected message type:[%s] for messageId:[%d]"::formatted);
52+
"[%s] Not expected message type:%s for messageId:[%d]"::formatted);
53+
handleNotExpectedResponseMessage(user, message, MqttMessageType.PUBLISH_ACK);
4854
return true;
4955
}
5056

5157
PublishAckReasonCode reasonCode = publishAck.reasonCode();
5258
if (reasonCode != PublishAckReasonCode.SUCCESS) {
59+
// just to note in logs, we can't do anything with this
5360
log.warning(clientId, reasonCode, messageId, "[%s] Received error response:[%s] for publish:[%s]"::formatted);
5461
}
55-
62+
5663
MessageTacker messageTacker = session.outMessageTracker();
5764
messageTacker.remove(messageId);
5865

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import javasabr.mqtt.network.message.in.PublishReceivedMqttInMessage;
1515
import javasabr.mqtt.service.MessageOutFactoryService;
1616
import lombok.CustomLog;
17+
import org.jspecify.annotations.NonNull;
1718
import org.jspecify.annotations.Nullable;
1819

1920
@CustomLog
@@ -35,18 +36,22 @@ protected boolean handleReceivedTrackableMessageImpl(
3536
TrackableMqttMessage message,
3637
@Nullable TrackedMessageMeta trackedMessageMeta) {
3738
if (message instanceof PublishReceivedMqttInMessage publishReceived) {
38-
return handlePublishRelease(user, session, message, trackedMessageMeta, publishReceived);
39+
return handlePublishReceive(user, session, message, trackedMessageMeta, publishReceived);
3940
} else if (message instanceof PublishCompleteMqttInMessage publishComplete) {
4041
handlePublishComplete(user, session, message, trackedMessageMeta, publishComplete);
4142
return true;
4243
} else {
4344
log.warning(user.clientId(), message.messageType(), message.messageId(),
4445
"[%s] Not expected message type:[%s] for messageId:[%d]"::formatted);
46+
handleNotExpectedResponseMessage(user, message, calculateExpectedMessageType(trackedMessageMeta));
4547
return true;
4648
}
4749
}
4850

49-
private boolean handlePublishRelease(
51+
/**
52+
* @return true if need to cancel the flow
53+
*/
54+
private boolean handlePublishReceive(
5055
ExternalNetworkMqttUser user,
5156
MqttSession session,
5257
TrackableMqttMessage message,
@@ -55,39 +60,45 @@ private boolean handlePublishRelease(
5560

5661
int messageId = message.messageId();
5762
String clientId = user.clientId();
58-
if (trackedMessageMeta != null && trackedMessageMeta.messageType() != MqttMessageType.PUBLISH) {
63+
PublishReceivedReasonCode reasonCode = publishReceived.reasonCode();
64+
65+
// if we unknown this flow
66+
if (trackedMessageMeta == null) {
67+
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
68+
// for success reason code we should answer that we don't know what this flow
69+
if (reasonCode == PublishReceivedReasonCode.SUCCESS) {
70+
user.sendInBackground(messageOutFactoryService
71+
.resolveFactory(user)
72+
.newPublishRelease(messageId, PublishReleaseReasonCode.PACKET_IDENTIFIER_NOT_FOUND));
73+
}
74+
return true;
75+
}
76+
77+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
78+
if (trackedMessageType != MqttMessageType.PUBLISH) {
5979
log.warning(clientId, trackedMessageMeta, messageId,
6080
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
81+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH);
6182
return true;
6283
}
6384

6485
MessageTacker messageTacker = session.outMessageTracker();
65-
PublishReceivedReasonCode reasonCode = publishReceived.reasonCode();
6686
if (reasonCode != PublishReceivedReasonCode.SUCCESS) {
6787
log.warning(clientId, reasonCode, messageId,
6888
"[%s] Received error response:[%s] for publish:[%s]"::formatted);
6989
// we can cancel the flow
70-
if (trackedMessageMeta != null) {
71-
messageTacker.remove(messageId);
72-
}
90+
messageTacker.remove(messageId);
7391
return true;
7492
}
7593

76-
PublishReleaseReasonCode releaseResult;
77-
// we unknown this flow
78-
if (trackedMessageMeta == null) {
79-
releaseResult = PublishReleaseReasonCode.PACKET_IDENTIFIER_NOT_FOUND;
80-
} else {
81-
releaseResult = PublishReleaseReasonCode.SUCCESS;
82-
messageTacker.update(messageId, MqttMessageType.PUBLISH_RELEASE, reasonCode);
83-
}
84-
94+
// switch flow from publish to release phase
95+
messageTacker.update(messageId, MqttMessageType.PUBLISH_RELEASE, reasonCode);
96+
97+
// completed this phase
8598
user.sendInBackground(messageOutFactoryService
8699
.resolveFactory(user)
87-
.newPublishRelease(messageId, releaseResult));
88-
89-
// cancel this flow if it's not success
90-
return releaseResult != PublishReleaseReasonCode.SUCCESS;
100+
.newPublishRelease(messageId, PublishReleaseReasonCode.SUCCESS));
101+
return false;
91102
}
92103

93104
private void handlePublishComplete(
@@ -99,12 +110,18 @@ private void handlePublishComplete(
99110

100111
int messageId = message.messageId();
101112
String clientId = user.clientId();
113+
114+
// if we unknown this flow
102115
if (trackedMessageMeta == null) {
103116
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
104117
return;
105-
} else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH_RELEASE) {
118+
}
119+
120+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
121+
if (trackedMessageType != MqttMessageType.PUBLISH_RELEASE) {
106122
log.warning(clientId, trackedMessageMeta, messageId,
107123
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
124+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH_RELEASE);
108125
return;
109126
}
110127

@@ -118,4 +135,12 @@ private void handlePublishComplete(
118135
MessageTacker messageTacker = session.outMessageTracker();
119136
messageTacker.remove(messageId);
120137
}
138+
139+
private static MqttMessageType calculateExpectedMessageType(@Nullable TrackedMessageMeta trackedMessageMeta) {
140+
if (trackedMessageMeta != null && trackedMessageMeta.messageType() == MqttMessageType.PUBLISH_RELEASE) {
141+
return MqttMessageType.PUBLISH_COMPLETE;
142+
}
143+
// by default, we expect 'PUBLISH_RECEIVED'
144+
return MqttMessageType.PUBLISH_RECEIVED;
145+
}
121146
}

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package javasabr.mqtt.service.publish.handler.impl;
22

33
import javasabr.mqtt.model.MqttProperties;
4+
import javasabr.mqtt.model.MqttProtocolErrors;
45
import javasabr.mqtt.model.MqttUser;
56
import javasabr.mqtt.model.message.MqttMessageType;
67
import javasabr.mqtt.model.message.TrackableMqttMessage;
78
import javasabr.mqtt.model.publishing.Publish;
9+
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
810
import javasabr.mqtt.model.session.MessageTacker;
911
import javasabr.mqtt.model.session.MqttSession;
1012
import javasabr.mqtt.model.session.ProcessingPublishes;
@@ -13,6 +15,7 @@
1315
import javasabr.mqtt.model.session.TrackedMessageMeta;
1416
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
1517
import javasabr.mqtt.service.MessageOutFactoryService;
18+
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
1619
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1720
import lombok.AccessLevel;
1821
import lombok.CustomLog;
@@ -98,4 +101,26 @@ protected void retryDeliveringImpl(ExternalNetworkMqttUser user, MqttSession ses
98101
send(user, publish.withDuplicated());
99102
}
100103
}
104+
105+
protected void handleNotExpectedFlowState(
106+
ExternalNetworkMqttUser user,
107+
MqttMessageType trackedMessageType,
108+
MqttMessageType expectedTrackedMessageType) {
109+
MqttMessageOutFactory messageOutFactory = messageOutFactoryService.resolveFactory(user);
110+
String reason = MqttProtocolErrors.UNEXPECTED_FLOW_STATE.formatted(
111+
trackedMessageType,
112+
expectedTrackedMessageType);
113+
user.closeWithReason(messageOutFactory.newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, reason));
114+
}
115+
116+
protected void handleNotExpectedResponseMessage(
117+
ExternalNetworkMqttUser user,
118+
TrackableMqttMessage receivedMessage,
119+
MqttMessageType expectedMessageType) {
120+
MqttMessageOutFactory messageOutFactory = messageOutFactoryService.resolveFactory(user);
121+
String reason = MqttProtocolErrors.UNEXPECTED_RESPONSE_MESSAGE.formatted(
122+
receivedMessage.messageType(),
123+
expectedMessageType);
124+
user.closeWithReason(messageOutFactory.newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, reason));
125+
}
101126
}

core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,17 @@ public boolean remove(TrackableMqttMessage message) {
9090
lock.unlockWrite(stamp);
9191
}
9292
}
93-
93+
94+
@Override
95+
public int size() {
96+
long stamp = lock.readLock();
97+
try {
98+
return processing.size();
99+
} finally {
100+
lock.unlockRead(stamp);
101+
}
102+
}
103+
94104
public void clear() {
95105
long stamp = lock.writeLock();
96106
try {

core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryTrackedMessageMeta.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.service.session.impl;
22

3+
import javasabr.mqtt.base.util.DebugUtils;
34
import javasabr.mqtt.model.message.MqttMessageType;
45
import javasabr.mqtt.model.reason.code.ReasonCode;
56
import javasabr.mqtt.model.session.TrackedMessageMeta;
@@ -18,7 +19,16 @@
1819
@FieldDefaults(level = AccessLevel.PRIVATE)
1920
public class InMemoryTrackedMessageMeta implements TrackedMessageMeta {
2021

22+
static {
23+
DebugUtils.registerIncludedFields("messageType", "reasonCode");
24+
}
25+
2126
MqttMessageType messageType;
2227
@Nullable
2328
ReasonCode reasonCode;
29+
30+
@Override
31+
public String toString() {
32+
return DebugUtils.toJsonString(this);
33+
}
2434
}

core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ class Qos0MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl
2222
def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE)
2323
def subscriber = new SingleSubscriber(user, subscription)
2424
def originalMessageId = 60
25-
def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload)
25+
def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload)
2626
.withDuplicated()
2727
when:
28-
def result = publishOutHandler.handle(publish, subscriber)
28+
def result = publishOutHandler.handle(testPublish, subscriber)
2929
then:
3030
result == PublishHandlingResult.SUCCESS
3131
with(user.nextSentMessage(PublishMqtt5OutMessage)) {

core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandlerTest.groovy

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
3131
def client2 = subscriber2.user() as TestExternalNetworkMqttUser
3232
def client3 = publisher.user() as TestExternalNetworkMqttUser
3333
def topicFilter = defaultTopicService.createTopicFilter(client1, "Qos1MqttPublishInMessageHandlerTest/1")
34-
def topicName = defaultTopicService.createTopicName(client1, "Qos1MqttPublishInMessageHandlerTest/1")
34+
def expectedTopicName = defaultTopicService.createTopicName(client1, "Qos1MqttPublishInMessageHandlerTest/1")
3535
def expectedMessageId = 35
3636
defaultSubscriptionService.subscribe(
3737
client1,
@@ -45,19 +45,22 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
4545
.session()
4646
.inMessageTracker()
4747
when:
48-
publishInHandler.handle(client3, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
48+
publishInHandler.handle(client3, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, expectedTopicName, testPayload))
4949
then: 'sender should have feedback'
50-
def publishAck = client3.nextSentMessage(PublishAckMqtt5OutMessage)
51-
publishAck.reasonCode() == PublishAckReasonCode.SUCCESS
52-
publishAck.messageId() == expectedMessageId
53-
publishAck.reason() == null
54-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
50+
with(client3.nextSentMessage(PublishAckMqtt5OutMessage)) {
51+
reasonCode() == PublishAckReasonCode.SUCCESS
52+
messageId() == expectedMessageId
53+
reason() == null
54+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
55+
}
5556
inMessageTracker.stored(expectedMessageId) == null
5657
then: 'subscribers should receive the publish'
57-
def message1 = client1.nextSentMessage(PublishMqtt5OutMessage)
58-
message1.topicName() == topicName
59-
def message2 = client2.nextSentMessage(PublishMqtt5OutMessage)
60-
message2.topicName() == topicName
58+
with(client1.nextSentMessage(PublishMqtt5OutMessage)) {
59+
topicName() == expectedTopicName
60+
}
61+
with(client2.nextSentMessage(PublishMqtt5OutMessage)) {
62+
topicName() == expectedTopicName
63+
}
6164
}
6265

6366
def "should provide feedback for accepted publish without any subscriber"() {
@@ -75,11 +78,12 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
7578
when:
7679
publishInHandler.handle(user, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
7780
then: 'sender should have feedback that no matched subscribers'
78-
def publishAck = user.nextSentMessage(PublishAckMqtt5OutMessage)
79-
publishAck.reasonCode() == PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS
80-
publishAck.messageId() == expectedMessageId
81-
publishAck.reason() == null
82-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
81+
with(user.nextSentMessage(PublishAckMqtt5OutMessage)) {
82+
reasonCode() == PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS
83+
messageId() == expectedMessageId
84+
reason() == null
85+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
86+
}
8387
inMessageTracker.stored(expectedMessageId) == null
8488
}
8589

@@ -99,10 +103,11 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
99103
topicName,
100104
testPayload))
101105
then:
102-
def disconnect = user.nextSentMessage(DisconnectMqtt5OutMessage)
103-
disconnect.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
104-
disconnect.reason() == MqttProtocolErrors.MISSED_REQUIRED_MESSAGE_ID
105-
disconnect.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
106+
with(user.nextSentMessage(DisconnectMqtt5OutMessage)) {
107+
reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
108+
reason() == MqttProtocolErrors.MISSED_REQUIRED_MESSAGE_ID
109+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
110+
}
106111
}
107112

108113
def "should provide feedback that message id is already used"() {
@@ -121,13 +126,14 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
121126
when:
122127
publishInHandler.handle(user, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
123128
then:
124-
def publishAck = user.nextSentMessage(PublishAckMqtt5OutMessage)
125-
publishAck.reasonCode() == PublishAckReasonCode.PACKET_IDENTIFIER_IN_USE
126-
publishAck.reason() == null
127-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
128-
def trackedMessageMeta = inMessageTracker.stored(expectedMessageId)
129-
trackedMessageMeta != null
130-
trackedMessageMeta.messageType() == MqttMessageType.SUBSCRIBE
129+
with(user.nextSentMessage(PublishAckMqtt5OutMessage)) {
130+
reasonCode() == PublishAckReasonCode.PACKET_IDENTIFIER_IN_USE
131+
reason() == null
132+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
133+
}
134+
with(inMessageTracker.stored(expectedMessageId)) {
135+
messageType() == MqttMessageType.SUBSCRIBE
136+
}
131137
}
132138

133139
def "should skip handling duplicated publish"() {

0 commit comments

Comments
 (0)