Skip to content

Commit 5a82836

Browse files
committed
🍻 deal with mqtt broker
1 parent a4ccb0e commit 5a82836

File tree

3 files changed

+41
-34
lines changed

3 files changed

+41
-34
lines changed

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/MqttTransportHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.netty.util.concurrent.Future;
77
import io.netty.util.concurrent.GenericFutureListener;
88
import iot.technology.mqtt.adapter.JsonMqttAdaptor;
9+
import iot.technology.mqtt.storage.Producer;
10+
import iot.technology.mqtt.storage.msg.ProtoQueueMsg;
911
import iot.technology.tsl.adaptor.AdaptorException;
1012
import lombok.extern.slf4j.Slf4j;
1113

@@ -33,6 +35,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
3335
private volatile boolean connected;
3436
private volatile InetSocketAddress address;
3537
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
38+
private final Producer<ProtoQueueMsg> msgProducer = new Producer<>(DEVICE_TELEMETRY_TOPIC);
3639

3740
public MqttTransportHandler() {
3841
this.mqttQoSMap = new ConcurrentHashMap<>();
@@ -90,15 +93,16 @@ private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMs
9093
}
9194

9295
String topicName = mqttMsg.variableHeader().topicName();
93-
int msgId = mqttMsg.variableHeader().messageId();
96+
int msgId = mqttMsg.variableHeader().packetId();
9497
processDevicePublish(ctx, mqttMsg, topicName, msgId);
9598

9699
}
97100

98101
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
99102
try {
100103
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
101-
System.out.println(JsonMqttAdaptor.validatePayload(mqttMsg.payload()));
104+
log.info(JsonMqttAdaptor.validatePayload(mqttMsg.payload()));
105+
msgProducer.send(DEVICE_TELEMETRY_TOPIC, new ProtoQueueMsg("msg", JsonMqttAdaptor.validatePayload(mqttMsg.payload())));
102106
} else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
103107
JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
104108
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/adapter/JsonMqttAdaptor.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
import com.google.gson.JsonElement;
55
import com.google.gson.JsonParser;
66
import com.google.gson.JsonSyntaxException;
7-
import iot.technology.tsl.adaptor.AdaptorException;
8-
import iot.technology.tsl.adaptor.JsonConverter;
9-
import iot.technology.tsl.data.kv.AttributeKvEntry;
10-
import iot.technology.tsl.data.kv.KvEntry;
11-
import iot.technology.tsl.session.SessionMsgType;
127
import io.netty.buffer.ByteBuf;
138
import io.netty.buffer.ByteBufAllocator;
149
import io.netty.buffer.UnpooledByteBufAllocator;
1510
import io.netty.handler.codec.mqtt.MqttMessage;
1611
import io.netty.handler.codec.mqtt.MqttPublishMessage;
12+
import iot.technology.tsl.adaptor.AdaptorException;
13+
import iot.technology.tsl.adaptor.JsonConverter;
14+
import iot.technology.tsl.data.kv.AttributeKvEntry;
15+
import iot.technology.tsl.data.kv.KvEntry;
16+
import iot.technology.tsl.session.SessionMsgType;
1717
import lombok.extern.slf4j.Slf4j;
1818

1919
import java.nio.charset.Charset;
@@ -50,10 +50,11 @@ public static void convertToMsg(SessionMsgType type, MqttMessage inbound) throws
5050
private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound) throws AdaptorException {
5151
String payload = validatePayload(inbound.payload());
5252
try {
53-
Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
54-
for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
55-
log.info("key: {}",entry.getKey());
56-
for (KvEntry kvEntry: entry.getValue()) {
53+
Map<Long, List<KvEntry>> telemetryMaps =
54+
JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
55+
for (Map.Entry<Long, List<KvEntry>> entry : telemetryMaps.entrySet()) {
56+
log.info("key: {}", entry.getKey());
57+
for (KvEntry kvEntry : entry.getValue()) {
5758
log.info("属性名: {}. 属性值: {}", kvEntry.getKey(), kvEntry.getValueAsString());
5859
}
5960
}
@@ -65,9 +66,11 @@ private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound)
6566
private static void convertToUpdateAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
6667
String payload = validatePayload(inbound.payload());
6768
try {
68-
Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getAttributes();
69-
for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
70-
log.info("属性名: {}. 属性值: {}",attributeKvEntry.getKey(), attributeKvEntry.getValueAsString());
69+
Set<AttributeKvEntry> attributeKvEntrySet =
70+
JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId())
71+
.getAttributes();
72+
for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet) {
73+
log.info("属性名: {}. 属性值: {}", attributeKvEntry.getKey(), attributeKvEntry.getValueAsString());
7174
}
7275
} catch (IllegalStateException | JsonSyntaxException ex) {
7376
throw new AdaptorException(ex);
@@ -89,7 +92,7 @@ private static void convertToGetAttributesRequest(MqttPublishMessage inbound) th
8992
log.info("共享设备属性: {} ", sharedKey);
9093
}
9194
}
92-
}catch (RuntimeException e) {
95+
} catch (RuntimeException e) {
9396
throw new AdaptorException(e);
9497
}
9598
}
@@ -104,14 +107,10 @@ private static Set<String> toStringSet(JsonElement requestBody, String name) {
104107
}
105108

106109
public static String validatePayload(ByteBuf payloadData) throws AdaptorException {
107-
try {
108-
String payload = payloadData.toString(UTF8);
109-
if (payload == null) {
110-
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
111-
}
112-
return payload;
113-
} finally {
114-
payloadData.release();
110+
String payload = payloadData.toString(UTF8);
111+
if (payload == null) {
112+
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
115113
}
114+
return payload;
116115
}
117116
}

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/storage/Producer.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package iot.technology.mqtt.storage;
22

33
import iot.technology.mqtt.storage.msg.QueueMsg;
4-
import iot.technology.mqtt.storage.queue.QueueCallback;
4+
import lombok.Data;
5+
import lombok.extern.slf4j.Slf4j;
56

67
/**
78
* @author james mu
89
* @date 2020/8/31 11:05
910
*/
11+
@Data
12+
@Slf4j
1013
public class Producer<T extends QueueMsg> {
1114

1215
private final InMemoryStorage storage = InMemoryStorage.getInstance();
@@ -21,17 +24,18 @@ public void init() {
2124

2225
}
2326

24-
public void send(String topicName, T msg, QueueCallback callback) {
27+
public void send(String topicName, T msg) {
28+
log.info("topic: {}, msg: {}", topicName, msg);
2529
boolean result = storage.put(topicName, msg);
26-
if (result) {
27-
if (callback != null) {
28-
callback.onSuccess(null);
29-
}
30-
} else {
31-
if (callback != null) {
32-
callback.onFailure(new RuntimeException("Failure add msg to InMemoryQueue"));
33-
}
34-
}
30+
// if (result) {
31+
// if (callback != null) {
32+
// callback.onSuccess(null);
33+
// }
34+
// } else {
35+
// if (callback != null) {
36+
// callback.onFailure(new RuntimeException("Failure add msg to InMemoryQueue"));
37+
// }
38+
// }
3539
}
3640

3741
public void stop() {

0 commit comments

Comments
 (0)