Skip to content

Commit a4ccb0e

Browse files
committed
🎨 improve mqtt module structure
1 parent 3d5657f commit a4ccb0e

File tree

13 files changed

+252
-107
lines changed

13 files changed

+252
-107
lines changed

IOT-Guide-MQTT/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
</properties>
4040

4141
<dependencies>
42+
<dependency>
43+
<groupId>org.springframework.boot</groupId>
44+
<artifactId>spring-boot-starter-web</artifactId>
45+
</dependency>
4246
<dependency>
4347
<groupId>io.netty</groupId>
4448
<artifactId>netty-all</artifactId>
@@ -67,5 +71,17 @@
6771
</dependency>
6872
</dependencies>
6973

74+
<build>
75+
<plugins>
76+
<plugin>
77+
<groupId>org.springframework.boot</groupId>
78+
<artifactId>spring-boot-maven-plugin</artifactId>
79+
<configuration>
80+
<mainClass>iot.technology.mqtt.MqttServerApp</mainClass>
81+
</configuration>
82+
</plugin>
83+
</plugins>
84+
</build>
85+
7086

7187
</project>

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/IOTMqttServer.java

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,76 @@
88
import io.netty.handler.logging.LogLevel;
99
import io.netty.handler.logging.LoggingHandler;
1010
import io.netty.util.ResourceLeakDetector;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.stereotype.Service;
14+
15+
import javax.annotation.PostConstruct;
16+
import javax.annotation.PreDestroy;
1117

1218
/**
1319
* @Author: 穆书伟
14-
* @Date: 19-4-3 下午3:14
15-
* @Version 1.0
20+
* @Date: 21-3-9 下午20:07
21+
* @Version 2.0
1622
*/
17-
public class IOTMqttServer {
23+
@Slf4j
24+
@Service("iotMqttServer")
25+
public class IOTMqttServer {
26+
27+
@Value("${server.bind_port}")
28+
private Integer port;
29+
@Value("${server.netty.boss_group_thread_count}")
30+
private Integer bossGroupThreadCount;
31+
@Value("${server.netty.worker_group_thread_count}")
32+
private Integer workerGroupThreadCount;
33+
@Value("${server.netty.leak_detector_level}")
34+
private String leakDetectorLevel;
35+
@Value("${server.netty.max_payload_size}")
36+
private Integer maxPayloadSize;
1837

19-
private static final int PORT = 1883;
20-
private static final String leakDetectorLevel = "DISABLED";
21-
private static final Integer bossGroupThreadCount = 1;
22-
private static final Integer workerGroupThreadCount = 12;
23-
private static final Integer maxPayloadSize = 65536;
38+
private ChannelFuture channelFuture;
39+
private EventLoopGroup bossGroup;
40+
private EventLoopGroup workerGroup;
2441

25-
public static void main(String[] args) throws Exception {
42+
@PostConstruct
43+
public void init() throws Exception {
44+
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
2645
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
2746

28-
EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
29-
EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
47+
log.info("Starting Server");
48+
//创建boss线程组 用于服务端接受客户端的连接
49+
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
50+
// 创建 worker 线程组 用于进行 SocketChannel 的数据读写
51+
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
52+
// 创建 ServerBootstrap 对象
53+
ServerBootstrap b = new ServerBootstrap();
54+
//设置使用的EventLoopGroup
55+
b.group(bossGroup, workerGroup)
56+
//设置要被实例化的为 NioServerSocketChannel 类
57+
.channel(NioServerSocketChannel.class)
58+
// 设置 NioServerSocketChannel 的处理器
59+
.handler(new LoggingHandler(LogLevel.INFO))
60+
// 设置连入服务端的 Client 的 SocketChannel 的处理器
61+
.childHandler(new MqttTransportServerInitializer(maxPayloadSize));
62+
// 绑定端口,并同步等待成功,即启动服务端
63+
channelFuture = b.bind(port).sync();
64+
65+
log.info("Server started!");
66+
67+
}
3068

69+
@PreDestroy
70+
public void shutdown() throws InterruptedException {
71+
log.info("Stopping Server");
3172
try {
32-
ServerBootstrap b = new ServerBootstrap();
33-
b.group(bossGroup,workerGroup)
34-
.channel(NioServerSocketChannel.class)
35-
.handler(new LoggingHandler(LogLevel.INFO))
36-
.childHandler(new MqttTransportServerInitializer(maxPayloadSize));
37-
ChannelFuture f = b.bind(PORT);
38-
f.channel().closeFuture().sync();
73+
// 监听服务端关闭,并阻塞等待
74+
channelFuture.channel().closeFuture().sync();
3975
} finally {
40-
bossGroup.shutdownGracefully();
76+
// 优雅关闭两个 EventLoopGroup 对象
4177
workerGroup.shutdownGracefully();
78+
bossGroup.shutdownGracefully();
4279
}
80+
log.info("server stopped!");
81+
4382
}
4483
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package iot.technology.mqtt;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
/**
7+
* @author jamesmsw
8+
* @date 2021/3/9 8:41 下午
9+
*/
10+
@SpringBootApplication(scanBasePackages = "iot.technology.mqtt")
11+
public class MqttServerApp {
12+
13+
/**
14+
* @param args
15+
*/
16+
public static void main(String[] args) {
17+
SpringApplication.run(MqttServerApp.class, args);
18+
}
19+
}

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/MqttTransportHandler.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package iot.technology.mqtt;
22

3-
import iot.technology.mqtt.adapter.JsonMqttAdaptor;
4-
import iot.technology.tsl.adaptor.AdaptorException;
53
import io.netty.channel.ChannelHandlerContext;
64
import io.netty.channel.ChannelInboundHandlerAdapter;
75
import io.netty.handler.codec.mqtt.*;
86
import io.netty.util.concurrent.Future;
97
import io.netty.util.concurrent.GenericFutureListener;
8+
import iot.technology.mqtt.adapter.JsonMqttAdaptor;
9+
import iot.technology.tsl.adaptor.AdaptorException;
1010
import lombok.extern.slf4j.Slf4j;
1111

1212
import java.net.InetSocketAddress;
@@ -15,23 +15,24 @@
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.ConcurrentMap;
1717

18-
import static iot.technology.mqtt.MqttTopics.*;
1918
import static io.netty.handler.codec.mqtt.MqttMessageType.*;
2019
import static io.netty.handler.codec.mqtt.MqttQoS.*;
20+
import static iot.technology.mqtt.MqttTopics.*;
2121
import static iot.technology.tsl.session.SessionMsgType.*;
2222

2323
/**
2424
* @Author: 穆书伟
25-
* @Date: 19-4-3 下午3:35
26-
* @Version 1.0
25+
* @Date: 21-3-9 下午20:22
26+
* @Version 2.0
2727
*/
28+
@Slf4j
2829
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
2930

3031
public static final MqttQoS MAX_SUPPORTED_QOS_LVL = MqttQoS.AT_LEAST_ONCE;
3132

3233
private volatile boolean connected;
3334
private volatile InetSocketAddress address;
34-
private final ConcurrentMap<MqttTopicMatcher,Integer> mqttQoSMap;
35+
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
3536

3637
public MqttTransportHandler() {
3738
this.mqttQoSMap = new ConcurrentHashMap<>();
@@ -40,7 +41,7 @@ public MqttTransportHandler() {
4041
@Override
4142
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
4243
if (msg instanceof MqttMessage) {
43-
processMqttMsg(ctx,(MqttMessage) msg);
44+
processMqttMsg(ctx, (MqttMessage) msg);
4445
} else {
4546
ctx.close();
4647
}
@@ -68,7 +69,7 @@ private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
6869
break;
6970
case PINGREQ:
7071
if (checkConnected(ctx)) {
71-
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
72+
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
7273
}
7374
break;
7475
case DISCONNECT:
@@ -98,9 +99,9 @@ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage
9899
try {
99100
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
100101
System.out.println(JsonMqttAdaptor.validatePayload(mqttMsg.payload()));
101-
} else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
102+
} else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
102103
JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
103-
} else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
104+
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
104105
JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
105106
}
106107
ctx.writeAndFlush(createMqttPubAckMsg(msgId));

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/MqttTransportServerInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCh
1515

1616
private final int maxPayloadSize;
1717

18-
public MqttTransportServerInitializer(int maxPayloadSize){
18+
public MqttTransportServerInitializer(int maxPayloadSize) {
1919
this.maxPayloadSize = maxPayloadSize;
2020
}
2121

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package iot.technology.mqtt.adapter;
2+
3+
/**
4+
* @author jamesmsw
5+
* @date 2021/2/19 3:09 下午
6+
*/
7+
public class MsgListener {
8+
}

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/storage/Consumer.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package iot.technology.mqtt.storage;
22

33
import iot.technology.mqtt.storage.msg.QueueMsg;
4-
import iot.technology.mqtt.storage.queue.TopicPartitionInfo;
54
import lombok.extern.slf4j.Slf4j;
65

76
import java.util.Collections;
@@ -16,7 +15,7 @@
1615
@Slf4j
1716
public class Consumer<T extends QueueMsg> {
1817
private final InMemoryStorage storage = InMemoryStorage.getInstance();
19-
private volatile Set<TopicPartitionInfo> partitions;
18+
private volatile Set<String> topics;
2019
private volatile boolean stopped;
2120
private volatile boolean subscribed;
2221
private final String topic;
@@ -31,12 +30,12 @@ public String getTopic() {
3130
}
3231

3332
public void subscribe() {
34-
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, true));
33+
topics = Collections.singleton(topic);
3534
subscribed = true;
3635
}
3736

38-
public void subscribe(Set<TopicPartitionInfo> partitions) {
39-
this.partitions = partitions;
37+
public void subscribe(Set<String> topics) {
38+
this.topics = topics;
4039
subscribed = true;
4140
}
4241

@@ -46,11 +45,9 @@ public void unsubscribe() {
4645

4746
public List<T> poll(long durationInMillis) {
4847
if (subscribed) {
49-
List<T> messages = partitions
48+
List<T> messages = topics
5049
.stream()
51-
.map(tpi -> {
52-
return storage.get(tpi.getFullTopicName());
53-
})
50+
.map(storage::get)
5451
.flatMap(List::stream)
5552
.map(msg -> (T) msg).collect(Collectors.toList());
5653

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package iot.technology.mqtt.storage;
2+
3+
import iot.technology.mqtt.storage.msg.ProtoQueueMsg;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.stereotype.Service;
6+
7+
import javax.annotation.PostConstruct;
8+
import javax.annotation.PreDestroy;
9+
import java.util.List;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
/**
14+
* @author jamesmsw
15+
* @date 2021/2/19 9:24 上午
16+
*/
17+
@Slf4j
18+
@Service
19+
public class DefaultTransportService {
20+
21+
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
22+
private volatile boolean stopped = false;
23+
protected Consumer<ProtoQueueMsg> transportConsumer;
24+
25+
public DefaultTransportService() {
26+
}
27+
28+
@PostConstruct
29+
public void init() {
30+
transportConsumer = new Consumer<>("transport.notifications");
31+
mainConsumerExecutor.execute(() -> {
32+
while (!stopped) {
33+
try {
34+
List<ProtoQueueMsg> records = transportConsumer.poll(25);
35+
if (records.size() == 0) {
36+
continue;
37+
}
38+
records.forEach(record -> {
39+
try {
40+
//processToTransportMsg(record.getValue());
41+
} catch (Throwable e) {
42+
log.warn("Failed to process the notification.", e);
43+
}
44+
});
45+
transportConsumer.commit();
46+
} catch (Exception e) {
47+
if (!stopped) {
48+
log.warn("Failed to obtain messages from queue.", e);
49+
try {
50+
Thread.sleep(25);
51+
} catch (InterruptedException e2) {
52+
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
53+
}
54+
}
55+
}
56+
}
57+
58+
});
59+
}
60+
61+
@PreDestroy
62+
public void destroy() {
63+
stopped = true;
64+
if (transportConsumer != null) {
65+
transportConsumer.unsubscribe();
66+
}
67+
}
68+
}

IOT-Guide-MQTT/src/main/java/iot/technology/mqtt/storage/InMemoryStorage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.ArrayList;
66
import java.util.Collections;
77
import java.util.List;
8-
import java.util.concurrent.BlockingDeque;
8+
import java.util.concurrent.BlockingQueue;
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.LinkedBlockingDeque;
1111

@@ -15,8 +15,10 @@
1515
*/
1616
public final class InMemoryStorage {
1717

18+
19+
private final ConcurrentHashMap<String, BlockingQueue<QueueMsg>> storage;
20+
1821
private static InMemoryStorage instance;
19-
private final ConcurrentHashMap<String, BlockingDeque<QueueMsg>> storage;
2022

2123
private InMemoryStorage() {
2224
storage = new ConcurrentHashMap<>();

0 commit comments

Comments
 (0)