Skip to content

Commit f53c249

Browse files
authored
Improve publish delivering to subscribers part 2 (#130)
1 parent f406600 commit f53c249

18 files changed

+874
-152
lines changed

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,29 @@ protected boolean handleReceivedTrackableMessageImpl(
4040
if (trackedMessageMeta == null) {
4141
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
4242
return true;
43-
} else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH) {
43+
}
44+
45+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
46+
if (trackedMessageType != MqttMessageType.PUBLISH) {
4447
log.warning(clientId, trackedMessageMeta, messageId,
4548
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
49+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH);
4650
return true;
4751
}
52+
4853
if (!(message instanceof PublishAckMqttInMessage publishAck)) {
4954
log.warning(clientId, message.messageType(), messageId,
50-
"[%s] Not expected message type:[%s] for messageId:[%d]"::formatted);
55+
"[%s] Not expected message type:%s for messageId:[%d]"::formatted);
56+
handleNotExpectedResponseMessage(user, message, MqttMessageType.PUBLISH_ACK);
5157
return true;
5258
}
5359

5460
PublishAckReasonCode reasonCode = publishAck.reasonCode();
5561
if (reasonCode != PublishAckReasonCode.SUCCESS) {
62+
// just to note in logs, we can't do anything with this
5663
log.warning(clientId, reasonCode, messageId, "[%s] Received error response:[%s] for publish:[%s]"::formatted);
5764
}
58-
65+
5966
MessageTacker messageTacker = session.outMessageTracker();
6067
messageTacker.remove(messageId);
6168

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import javasabr.mqtt.service.MessageOutFactoryService;
1616
import javasabr.mqtt.service.SubscriptionService;
1717
import lombok.CustomLog;
18+
import org.jspecify.annotations.NonNull;
1819
import org.jspecify.annotations.Nullable;
1920

2021
@CustomLog
@@ -38,18 +39,22 @@ protected boolean handleReceivedTrackableMessageImpl(
3839
TrackableMqttMessage message,
3940
@Nullable TrackedMessageMeta trackedMessageMeta) {
4041
if (message instanceof PublishReceivedMqttInMessage publishReceived) {
41-
return handlePublishRelease(user, session, message, trackedMessageMeta, publishReceived);
42+
return handlePublishReceive(user, session, message, trackedMessageMeta, publishReceived);
4243
} else if (message instanceof PublishCompleteMqttInMessage publishComplete) {
4344
handlePublishComplete(user, session, message, trackedMessageMeta, publishComplete);
4445
return true;
4546
} else {
4647
log.warning(user.clientId(), message.messageType(), message.messageId(),
4748
"[%s] Not expected message type:[%s] for messageId:[%d]"::formatted);
49+
handleNotExpectedResponseMessage(user, message, calculateExpectedMessageType(trackedMessageMeta));
4850
return true;
4951
}
5052
}
5153

52-
private boolean handlePublishRelease(
54+
/**
55+
* @return true if need to cancel the flow
56+
*/
57+
private boolean handlePublishReceive(
5358
ExternalNetworkMqttUser user,
5459
MqttSession session,
5560
TrackableMqttMessage message,
@@ -58,39 +63,45 @@ private boolean handlePublishRelease(
5863

5964
int messageId = message.messageId();
6065
String clientId = user.clientId();
61-
if (trackedMessageMeta != null && trackedMessageMeta.messageType() != MqttMessageType.PUBLISH) {
66+
PublishReceivedReasonCode reasonCode = publishReceived.reasonCode();
67+
68+
// if we unknown this flow
69+
if (trackedMessageMeta == null) {
70+
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
71+
// for success reason code we should answer that we don't know what this flow
72+
if (reasonCode == PublishReceivedReasonCode.SUCCESS) {
73+
user.sendInBackground(messageOutFactoryService
74+
.resolveFactory(user)
75+
.newPublishRelease(messageId, PublishReleaseReasonCode.PACKET_IDENTIFIER_NOT_FOUND));
76+
}
77+
return true;
78+
}
79+
80+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
81+
if (trackedMessageType != MqttMessageType.PUBLISH) {
6282
log.warning(clientId, trackedMessageMeta, messageId,
6383
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
84+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH);
6485
return true;
6586
}
6687

6788
MessageTacker messageTacker = session.outMessageTracker();
68-
PublishReceivedReasonCode reasonCode = publishReceived.reasonCode();
6989
if (reasonCode != PublishReceivedReasonCode.SUCCESS) {
7090
log.warning(clientId, reasonCode, messageId,
7191
"[%s] Received error response:[%s] for publish:[%s]"::formatted);
7292
// we can cancel the flow
73-
if (trackedMessageMeta != null) {
74-
messageTacker.remove(messageId);
75-
}
93+
messageTacker.remove(messageId);
7694
return true;
7795
}
7896

79-
PublishReleaseReasonCode releaseResult;
80-
// we unknown this flow
81-
if (trackedMessageMeta == null) {
82-
releaseResult = PublishReleaseReasonCode.PACKET_IDENTIFIER_NOT_FOUND;
83-
} else {
84-
releaseResult = PublishReleaseReasonCode.SUCCESS;
85-
messageTacker.update(messageId, MqttMessageType.PUBLISH_RELEASE, reasonCode);
86-
}
87-
97+
// switch flow from publish to release phase
98+
messageTacker.update(messageId, MqttMessageType.PUBLISH_RELEASE, reasonCode);
99+
100+
// completed this phase
88101
user.sendInBackground(messageOutFactoryService
89102
.resolveFactory(user)
90-
.newPublishRelease(messageId, releaseResult));
91-
92-
// cancel this flow if it's not success
93-
return releaseResult != PublishReleaseReasonCode.SUCCESS;
103+
.newPublishRelease(messageId, PublishReleaseReasonCode.SUCCESS));
104+
return false;
94105
}
95106

96107
private void handlePublishComplete(
@@ -102,12 +113,18 @@ private void handlePublishComplete(
102113

103114
int messageId = message.messageId();
104115
String clientId = user.clientId();
116+
117+
// if we unknown this flow
105118
if (trackedMessageMeta == null) {
106119
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
107120
return;
108-
} else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH_RELEASE) {
121+
}
122+
123+
MqttMessageType trackedMessageType = trackedMessageMeta.messageType();
124+
if (trackedMessageType != MqttMessageType.PUBLISH_RELEASE) {
109125
log.warning(clientId, trackedMessageMeta, messageId,
110126
"[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
127+
handleNotExpectedFlowState(user, trackedMessageType, MqttMessageType.PUBLISH_RELEASE);
111128
return;
112129
}
113130

@@ -121,4 +138,12 @@ private void handlePublishComplete(
121138
MessageTacker messageTacker = session.outMessageTracker();
122139
messageTacker.remove(messageId);
123140
}
141+
142+
private static MqttMessageType calculateExpectedMessageType(@Nullable TrackedMessageMeta trackedMessageMeta) {
143+
if (trackedMessageMeta != null && trackedMessageMeta.messageType() == MqttMessageType.PUBLISH_RELEASE) {
144+
return MqttMessageType.PUBLISH_COMPLETE;
145+
}
146+
// by default, we expect 'PUBLISH_RECEIVED'
147+
return MqttMessageType.PUBLISH_RECEIVED;
148+
}
124149
}

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package javasabr.mqtt.service.publish.handler.impl;
22

33
import javasabr.mqtt.model.MqttProperties;
4+
import javasabr.mqtt.model.MqttProtocolErrors;
45
import javasabr.mqtt.model.MqttUser;
56
import javasabr.mqtt.model.message.MqttMessageType;
67
import javasabr.mqtt.model.message.TrackableMqttMessage;
78
import javasabr.mqtt.model.publishing.Publish;
9+
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
810
import javasabr.mqtt.model.session.MessageTacker;
911
import javasabr.mqtt.model.session.MqttSession;
1012
import javasabr.mqtt.model.session.ProcessingPublishes;
@@ -14,6 +16,7 @@
1416
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
1517
import javasabr.mqtt.service.MessageOutFactoryService;
1618
import javasabr.mqtt.service.SubscriptionService;
19+
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
1720
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1821
import lombok.AccessLevel;
1922
import lombok.CustomLog;
@@ -100,4 +103,26 @@ protected void retryDeliveringImpl(ExternalNetworkMqttUser user, MqttSession ses
100103
send(user, publish.withDuplicated());
101104
}
102105
}
106+
107+
protected void handleNotExpectedFlowState(
108+
ExternalNetworkMqttUser user,
109+
MqttMessageType trackedMessageType,
110+
MqttMessageType expectedTrackedMessageType) {
111+
MqttMessageOutFactory messageOutFactory = messageOutFactoryService.resolveFactory(user);
112+
String reason = MqttProtocolErrors.UNEXPECTED_FLOW_STATE.formatted(
113+
trackedMessageType,
114+
expectedTrackedMessageType);
115+
user.closeWithReason(messageOutFactory.newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, reason));
116+
}
117+
118+
protected void handleNotExpectedResponseMessage(
119+
ExternalNetworkMqttUser user,
120+
TrackableMqttMessage receivedMessage,
121+
MqttMessageType expectedMessageType) {
122+
MqttMessageOutFactory messageOutFactory = messageOutFactoryService.resolveFactory(user);
123+
String reason = MqttProtocolErrors.UNEXPECTED_RESPONSE_MESSAGE.formatted(
124+
receivedMessage.messageType(),
125+
expectedMessageType);
126+
user.closeWithReason(messageOutFactory.newDisconnect(user, DisconnectReasonCode.PROTOCOL_ERROR, reason));
127+
}
103128
}

core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,17 @@ public boolean remove(TrackableMqttMessage message) {
9090
lock.unlockWrite(stamp);
9191
}
9292
}
93-
93+
94+
@Override
95+
public int size() {
96+
long stamp = lock.readLock();
97+
try {
98+
return processing.size();
99+
} finally {
100+
lock.unlockRead(stamp);
101+
}
102+
}
103+
94104
public void clear() {
95105
long stamp = lock.writeLock();
96106
try {

core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryTrackedMessageMeta.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package javasabr.mqtt.service.session.impl;
22

3+
import javasabr.mqtt.base.util.DebugUtils;
34
import javasabr.mqtt.model.message.MqttMessageType;
45
import javasabr.mqtt.model.reason.code.ReasonCode;
56
import javasabr.mqtt.model.session.TrackedMessageMeta;
@@ -18,7 +19,16 @@
1819
@FieldDefaults(level = AccessLevel.PRIVATE)
1920
public class InMemoryTrackedMessageMeta implements TrackedMessageMeta {
2021

22+
static {
23+
DebugUtils.registerIncludedFields("messageType", "reasonCode");
24+
}
25+
2126
MqttMessageType messageType;
2227
@Nullable
2328
ReasonCode reasonCode;
29+
30+
@Override
31+
public String toString() {
32+
return DebugUtils.toJsonString(this);
33+
}
2434
}

core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class Qos0MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl
2424
def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE)
2525
def subscriber = new SingleSubscriber(user, subscription)
2626
def originalMessageId = 60
27-
def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload)
27+
def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload)
2828
.withDuplicated()
2929
when:
30-
def result = publishOutHandler.handle(publish, subscriber)
30+
def result = publishOutHandler.handle(testPublish, subscriber)
3131
then:
3232
result == PublishHandlingResult.SUCCESS
3333
with(user.nextSentMessage(PublishMqtt5OutMessage)) {

core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandlerTest.groovy

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
3131
def client2 = subscriber2.user() as TestExternalNetworkMqttUser
3232
def client3 = publisher.user() as TestExternalNetworkMqttUser
3333
def topicFilter = defaultTopicService.createTopicFilter(client1, "Qos1MqttPublishInMessageHandlerTest/1")
34-
def topicName = defaultTopicService.createTopicName(client1, "Qos1MqttPublishInMessageHandlerTest/1")
34+
def expectedTopicName = defaultTopicService.createTopicName(client1, "Qos1MqttPublishInMessageHandlerTest/1")
3535
def expectedMessageId = 35
3636
defaultSubscriptionService.subscribe(
3737
client1,
@@ -45,19 +45,22 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
4545
.session()
4646
.inMessageTracker()
4747
when:
48-
publishInHandler.handle(client3, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
48+
publishInHandler.handle(client3, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, expectedTopicName, testPayload))
4949
then: 'sender should have feedback'
50-
def publishAck = client3.nextSentMessage(PublishAckMqtt5OutMessage)
51-
publishAck.reasonCode() == PublishAckReasonCode.SUCCESS
52-
publishAck.messageId() == expectedMessageId
53-
publishAck.reason() == null
54-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
50+
with(client3.nextSentMessage(PublishAckMqtt5OutMessage)) {
51+
reasonCode() == PublishAckReasonCode.SUCCESS
52+
messageId() == expectedMessageId
53+
reason() == null
54+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
55+
}
5556
inMessageTracker.stored(expectedMessageId) == null
5657
then: 'subscribers should receive the publish'
57-
def message1 = client1.nextSentMessage(PublishMqtt5OutMessage)
58-
message1.topicName() == topicName
59-
def message2 = client2.nextSentMessage(PublishMqtt5OutMessage)
60-
message2.topicName() == topicName
58+
with(client1.nextSentMessage(PublishMqtt5OutMessage)) {
59+
topicName() == expectedTopicName
60+
}
61+
with(client2.nextSentMessage(PublishMqtt5OutMessage)) {
62+
topicName() == expectedTopicName
63+
}
6164
}
6265

6366
def "should provide feedback for accepted publish without any subscriber"() {
@@ -75,11 +78,12 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
7578
when:
7679
publishInHandler.handle(user, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
7780
then: 'sender should have feedback that no matched subscribers'
78-
def publishAck = user.nextSentMessage(PublishAckMqtt5OutMessage)
79-
publishAck.reasonCode() == PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS
80-
publishAck.messageId() == expectedMessageId
81-
publishAck.reason() == null
82-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
81+
with(user.nextSentMessage(PublishAckMqtt5OutMessage)) {
82+
reasonCode() == PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS
83+
messageId() == expectedMessageId
84+
reason() == null
85+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
86+
}
8387
inMessageTracker.stored(expectedMessageId) == null
8488
}
8589

@@ -99,10 +103,11 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
99103
topicName,
100104
testPayload))
101105
then:
102-
def disconnect = user.nextSentMessage(DisconnectMqtt5OutMessage)
103-
disconnect.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
104-
disconnect.reason() == MqttProtocolErrors.MISSED_REQUIRED_MESSAGE_ID
105-
disconnect.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
106+
with(user.nextSentMessage(DisconnectMqtt5OutMessage)) {
107+
reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR
108+
reason() == MqttProtocolErrors.MISSED_REQUIRED_MESSAGE_ID
109+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
110+
}
106111
}
107112

108113
def "should provide feedback that message id is already used"() {
@@ -121,13 +126,14 @@ class Qos1MqttPublishInMessageHandlerTest extends QosMqttPublishInMessageHandler
121126
when:
122127
publishInHandler.handle(user, Publish.minimal(expectedMessageId, QoS.AT_MOST_ONCE, topicName, testPayload))
123128
then:
124-
def publishAck = user.nextSentMessage(PublishAckMqtt5OutMessage)
125-
publishAck.reasonCode() == PublishAckReasonCode.PACKET_IDENTIFIER_IN_USE
126-
publishAck.reason() == null
127-
publishAck.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
128-
def trackedMessageMeta = inMessageTracker.stored(expectedMessageId)
129-
trackedMessageMeta != null
130-
trackedMessageMeta.messageType() == MqttMessageType.SUBSCRIBE
129+
with(user.nextSentMessage(PublishAckMqtt5OutMessage)) {
130+
reasonCode() == PublishAckReasonCode.PACKET_IDENTIFIER_IN_USE
131+
reason() == null
132+
userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
133+
}
134+
with(inMessageTracker.stored(expectedMessageId)) {
135+
messageType() == MqttMessageType.SUBSCRIBE
136+
}
131137
}
132138

133139
def "should skip handling duplicated publish"() {

0 commit comments

Comments
 (0)