Skip to content

Commit cad6305

Browse files
authored
feat: Refactor flow control of consumer + producer (#19)
Refactor the flow of the consumer and producer. Stop the producer when the websocket disconnects. Only subscribe to the consumer when start is sent. Signed-off-by: Katherine Stanley <katheris@uk.ibm.com>
1 parent 11e9f47 commit cad6305

File tree

3 files changed

+34
-30
lines changed

3 files changed

+34
-30
lines changed

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ private void setup(HashMap<String, String> props) {
4848
}
4949

5050
private void handleCommand(TimeoutStream timerStream, Message<JsonObject> message) {
51-
String command = message.body().getString("action", "none");
52-
if ("start".equals(command)) {
51+
String command = message.body().getString(WebSocketServer.ACTION, "none");
52+
if (WebSocketServer.START_ACTION.equals(command)) {
5353
logger.info("Producing Kafka records");
5454
customMessage = message.body().getString("custom", "Hello World");
5555
timerStream.resume();
56-
} else if ("stop".equals(command)) {
56+
} else if (WebSocketServer.STOP_ACTION.equals(command)) {
5757
logger.info("Stopping producing Kafka records");
5858
timerStream.pause();
5959
}
@@ -62,6 +62,7 @@ private void handleCommand(TimeoutStream timerStream, Message<JsonObject> messag
6262
private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, String topic) {
6363
String payload = customMessage;
6464
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, payload);
65+
logger.debug("Producing record to topic {} with payload {}", topic, payload);
6566

6667
kafkaProducer
6768
.send(record)

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@
2121
import org.slf4j.LoggerFactory;
2222

2323
import java.util.HashMap;
24+
import java.util.Set;
2425

2526
public class WebSocketServer extends AbstractVerticle {
2627

28+
public static final String ACTION = "action";
29+
public static final String START_ACTION = "start";
30+
public static final String STOP_ACTION = "stop";
31+
2732
private static final String PRODUCE_PATH = "/demoproduce";
2833
private static final String CONSUME_PATH = "/democonsume";
2934

@@ -73,8 +78,8 @@ private void handleProduceSocket(ServerWebSocket webSocket) {
7378

7479
webSocket.handler(buffer -> {
7580
JsonObject message = buffer.toJsonObject();
76-
String action = message.getString("action", "none");
77-
if ("start".equals(action) || "stop".equals(action)) {
81+
String action = message.getString(ACTION, "none");
82+
if (START_ACTION.equals(action) || STOP_ACTION.equals(action)) {
7883
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, message);
7984
}
8085
});
@@ -83,11 +88,13 @@ private void handleProduceSocket(ServerWebSocket webSocket) {
8388
eventBus.send(webSocket.textHandlerID(), message.body().encode()));
8489

8590
webSocket.endHandler(ended -> {
86-
logger.info("WebSocket closed from {}", webSocket.remoteAddress().host());
91+
logger.info("Producer WebSocket closed from {}", webSocket.remoteAddress().host());
92+
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, new JsonObject().put(ACTION, STOP_ACTION));
8793
consumer.unregister();
8894
});
8995
webSocket.exceptionHandler(err -> {
90-
logger.error("WebSocket error", err);
96+
logger.error("Producer WebSocket error", err);
97+
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, new JsonObject().put(ACTION, STOP_ACTION));
9198
consumer.unregister();
9299
});
93100
}
@@ -97,47 +104,43 @@ private void handleConsumeSocket(ServerWebSocket webSocket) {
97104

98105
kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err));
99106

107+
String topic = kafkaConfig.get(Main.TOPIC_KEY);
108+
TopicPartition topicPartition = new TopicPartition().setTopic(topic);
109+
100110
kafkaConsumer.handler(record -> {
101111
JsonObject payload = new JsonObject()
102112
.put("topic", record.topic())
103113
.put("partition", record.partition())
104114
.put("offset", record.offset())
105115
.put("timestamp", record.timestamp())
106116
.put("value", record.value());
117+
logger.debug("Received record {}", payload);
107118
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
108119
});
109120

110-
String topic = kafkaConfig.get(Main.TOPIC_KEY);
111-
112-
kafkaConsumer.subscribe(topic)
113-
.onSuccess(ok -> logger.info("Subscribed to {}", topic))
114-
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
115-
116-
TopicPartition partition = new TopicPartition().setTopic(topic);
117-
118121
webSocket.handler(buffer -> {
119-
String action = buffer.toJsonObject().getString("action", "none");
120-
if ("start".equals(action)) {
121-
kafkaConsumer.resume(partition)
122-
.onFailure(err -> logger.error("Cannot resume consumer", err));
123-
} else if ("stop".equals(action)) {
124-
kafkaConsumer.pause(partition)
122+
String action = buffer.toJsonObject().getString(ACTION, "none");
123+
if (START_ACTION.equals(action)) {
124+
kafkaConsumer.subscription()
125+
.compose(sub -> (sub.size() > 0) ? kafkaConsumer.resume(topicPartition) : kafkaConsumer.subscribe(topic))
126+
.onSuccess(ok -> logger.info("Subscribed to {}", topic))
127+
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
128+
} else if (STOP_ACTION.equals(action)) {
129+
kafkaConsumer.pause(topicPartition)
125130
.onFailure(err -> logger.error("Cannot pause consumer", err));
126131
}
127132
});
128133

129134
webSocket.endHandler(done -> {
130-
logger.info("WebSocket closed from {}", webSocket.remoteAddress().host());
131-
kafkaConsumer.close().onFailure(err -> {
132-
logger.error("Closing Kafka consumer failed", err);
133-
});
135+
logger.info("Consumer WebSocket closed from {}", webSocket.remoteAddress().host());
136+
kafkaConsumer.close()
137+
.onFailure(err -> logger.error("Closing Kafka consumer failed", err));
134138
});
135139

136140
webSocket.exceptionHandler(err -> {
137-
logger.error("WebSocket error", err);
138-
kafkaConsumer.close().onFailure(kerr -> {
139-
logger.error("Closing Kafka consumer failed", kerr);
140-
});
141+
logger.error("Consumer WebSocket error", err);
142+
kafkaConsumer.close()
143+
.onFailure(kerr -> logger.error("Closing Kafka consumer failed", kerr));
141144
});
142145
}
143146
}

src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<logger name="io.vertx" level="info"/>
1111
<logger name="org.apache.kafka" level="error"/>
1212

13-
<root level="debug">
13+
<root level="info">
1414
<appender-ref ref="STDOUT"/>
1515
</root>
1616

0 commit comments

Comments
 (0)