Skip to content

Commit bc2d1c9

Browse files
jpongekatheris
authored andcommitted
A rewrite without service proxies
Also: - direct JsonObject rather than POJO mapping, - 1 Kakfa consumer per (consumer) websocket which is more correct (1 can stop, while other keep consuming events) Signed-off-by: Julien Ponge <jponge@redhat.com> Co-authored by: Grace Jansen <grace.jansen1@ibm.com>
1 parent 516395b commit bc2d1c9

16 files changed

+228
-610
lines changed

pom.xml

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,10 @@
3939
<groupId>io.vertx</groupId>
4040
<artifactId>vertx-kafka-client</artifactId>
4141
</dependency>
42-
<dependency>
43-
<groupId>io.vertx</groupId>
44-
<artifactId>vertx-service-proxy</artifactId>
45-
</dependency>
46-
<dependency>
47-
<groupId>io.vertx</groupId>
48-
<artifactId>vertx-codegen</artifactId>
49-
<scope>provided</scope>
50-
</dependency>
51-
<dependency>
52-
<groupId>io.vertx</groupId>
53-
<artifactId>vertx-codegen</artifactId>
54-
<classifier>processor</classifier>
55-
</dependency>
5642
<dependency>
5743
<groupId>io.vertx</groupId>
5844
<artifactId>vertx-config</artifactId>
5945
</dependency>
60-
<dependency>
61-
<groupId>com.fasterxml.jackson.core</groupId>
62-
<artifactId>jackson-databind</artifactId>
63-
<version>${jackson-databind.version}</version>
64-
</dependency>
6546
<dependency>
6647
<groupId>ch.qos.logback</groupId>
6748
<artifactId>logback-classic</artifactId>

src/main/java/kafka/vertx/demo/Main.java

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@
55
*/
66
package kafka.vertx.demo;
77

8-
import io.vertx.core.CompositeFuture;
9-
import io.vertx.core.Future;
10-
import io.vertx.core.Vertx;
11-
import io.vertx.core.VertxOptions;
8+
import io.vertx.core.*;
129
import io.vertx.core.spi.resolver.ResolverProvider;
13-
import kafka.vertx.demo.consumer.ConsumerVerticle;
14-
import kafka.vertx.demo.producer.ProducerVerticle;
15-
import kafka.vertx.demo.websocket.WebsocketVerticle;
1610
import org.slf4j.Logger;
1711
import org.slf4j.LoggerFactory;
1812

@@ -22,9 +16,9 @@
2216

2317
public class Main {
2418

25-
private static String websocketVerticleName = "websocket";
26-
private static String producerVerticleName = "producer";
27-
private static String consumerVerticleName = "consumer";
19+
public static final String TOPIC = "demo";
20+
public static final String PERIODIC_PRODUCER_ADDRESS = "demo.periodic.producer";
21+
public static final String PERIODIC_PRODUCER_BROADCAST = "demo.periodic.producer.broadcast";
2822

2923
private static final Logger logger = LoggerFactory.getLogger(Main.class);
3024

@@ -33,45 +27,17 @@ public static void main(String[] args) {
3327

3428
// Set vertx timeout to deal with slow DNS connections
3529
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, "true");
30+
3631
Vertx vertx = Vertx.vertx(
3732
new VertxOptions()
3833
.setWarningExceptionTime(10).setWarningExceptionTimeUnit(TimeUnit.SECONDS)
3934
.setMaxEventLoopExecuteTime(20).setMaxEventLoopExecuteTimeUnit((TimeUnit.SECONDS)));
4035

41-
Future<String> webSocketDeployment = vertx
42-
.deployVerticle(new WebsocketVerticle())
43-
.onSuccess(id -> verticleDeployed(websocketVerticleName, id))
44-
.onFailure(err -> verticleFailedToDeploy(websocketVerticleName, err));
45-
46-
Future<String> producerDeployment = vertx
47-
.deployVerticle(new ProducerVerticle())
48-
.onSuccess(id -> verticleDeployed(producerVerticleName, id))
49-
.onFailure(err -> verticleFailedToDeploy(producerVerticleName, err));
50-
51-
Future<String> consumerDeployment = vertx
52-
.deployVerticle(new ConsumerVerticle())
53-
.onSuccess(id -> verticleDeployed(consumerVerticleName, id))
54-
.onFailure(err -> verticleFailedToDeploy(consumerVerticleName, err));
55-
56-
// Create CompositeFuture to wait for verticles to start
57-
CompositeFuture.join(webSocketDeployment, producerDeployment, consumerDeployment)
58-
.onSuccess(res -> appStarted(currentTimeMillis() - startTime))
59-
.onFailure(Main::appFailedToStart);
60-
}
61-
62-
private static void appFailedToStart(Throwable t) {
63-
logger.error("❌ Application failed to start", t);
64-
}
65-
66-
private static void appStarted(long duration) {
67-
logger.info("✅ Application has started in {}ms, go to http://localhost:8080 to see the app running.", duration);
68-
}
69-
70-
private static void verticleDeployed(String name, String id) {
71-
logger.info("🚀 Verticle {} deployed: {}", name, id);
72-
}
36+
Future<String> periodicProducerDeployment = vertx.deployVerticle(new PeriodicProducer());
37+
Future<String> webSocketServerDeployment = vertx.deployVerticle(new WebSocketServer());
7338

74-
private static void verticleFailedToDeploy(String name, Throwable err) {
75-
logger.error("❌ Verticle {} failed to deploy: {}", name, err.getMessage());
39+
CompositeFuture.all(periodicProducerDeployment, webSocketServerDeployment)
40+
.onSuccess(ok -> logger.info("✅ Application started in {}ms", currentTimeMillis() - startTime))
41+
.onFailure(err -> logger.error("❌ Application failed to start", err));
7642
}
7743
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
(C) Copyright IBM Corp. 2020 All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
package kafka.vertx.demo;
7+
8+
import io.vertx.config.ConfigRetriever;
9+
import io.vertx.config.ConfigRetrieverOptions;
10+
import io.vertx.config.ConfigStoreOptions;
11+
import io.vertx.core.AbstractVerticle;
12+
import io.vertx.core.Promise;
13+
import io.vertx.core.TimeoutStream;
14+
import io.vertx.core.eventbus.Message;
15+
import io.vertx.core.json.JsonObject;
16+
import io.vertx.kafka.client.producer.KafkaProducer;
17+
import io.vertx.kafka.client.producer.KafkaProducerRecord;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.time.Instant;
22+
import java.util.HashMap;
23+
24+
public class PeriodicProducer extends AbstractVerticle {
25+
26+
private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class);
27+
28+
@Override
29+
public void start(Promise<Void> startPromise) {
30+
ConfigRetriever.create(vertx,
31+
new ConfigRetrieverOptions().addStore(
32+
new ConfigStoreOptions()
33+
.setType("file")
34+
.setFormat("properties")
35+
.setConfig(new JsonObject().put("path", "kafka.properties").put("raw-data", true))))
36+
.getConfig()
37+
.onSuccess(config -> setup(config, startPromise))
38+
.onFailure(startPromise::fail);
39+
}
40+
41+
private void setup(JsonObject config, Promise<Void> startPromise) {
42+
HashMap<String, String> props = new HashMap<>();
43+
config.forEach(entry -> props.put(entry.getKey(), entry.getValue().toString()));
44+
KafkaProducer<String, JsonObject> kafkaProducer = KafkaProducer.create(vertx, props);
45+
46+
TimeoutStream timerStream = vertx.periodicStream(2000);
47+
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer));
48+
timerStream.pause();
49+
50+
vertx.eventBus().<String>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
51+
logger.info("🚀 PeriodicConsumer started");
52+
startPromise.complete();
53+
}
54+
55+
private void handleCommand(TimeoutStream timerStream, Message<String> message) {
56+
String command = message.body();
57+
if ("start".equals(command)) {
58+
logger.info("Producing Kafka records");
59+
timerStream.resume();
60+
} else if ("stop".equals(command)) {
61+
logger.info("Stopping producing Kafka records");
62+
timerStream.pause();
63+
}
64+
}
65+
66+
private void produceKafkaRecord(KafkaProducer<String, JsonObject> kafkaProducer) {
67+
JsonObject payload = new JsonObject()
68+
.put("type", "tick")
69+
.put("when", Instant.now().toString());
70+
71+
KafkaProducerRecord<String, JsonObject> record = KafkaProducerRecord.create(Main.TOPIC, payload);
72+
73+
kafkaProducer
74+
.send(record)
75+
.onSuccess(metadata -> {
76+
JsonObject kafkaMetaData = new JsonObject()
77+
.put("topic", metadata.getTopic())
78+
.put("partition", metadata.getPartition())
79+
.put("offset", metadata.getOffset())
80+
.put("timestamp", metadata.getTimestamp())
81+
.put("value", payload);
82+
vertx.eventBus().send(Main.PERIODIC_PRODUCER_BROADCAST, kafkaMetaData);
83+
})
84+
.onFailure(err -> logger.error("Error sending {}", payload.encode(), err));
85+
}
86+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
(C) Copyright IBM Corp. 2020 All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
package kafka.vertx.demo;
7+
8+
import io.vertx.config.ConfigRetriever;
9+
import io.vertx.config.ConfigRetrieverOptions;
10+
import io.vertx.config.ConfigStoreOptions;
11+
import io.vertx.core.AbstractVerticle;
12+
import io.vertx.core.Promise;
13+
import io.vertx.core.eventbus.EventBus;
14+
import io.vertx.core.http.ServerWebSocket;
15+
import io.vertx.core.json.JsonObject;
16+
import io.vertx.ext.web.Router;
17+
import io.vertx.ext.web.handler.StaticHandler;
18+
import io.vertx.kafka.client.common.TopicPartition;
19+
import io.vertx.kafka.client.consumer.KafkaConsumer;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.HashMap;
24+
25+
public class WebSocketServer extends AbstractVerticle {
26+
27+
private static final String PRODUCE_PATH = "/demoproduce";
28+
private static final String CONSUME_PATH = "/democonsume";
29+
30+
private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
31+
32+
private HashMap<String, String> kafkaConfig;
33+
34+
@Override
35+
public void start(Promise<Void> startPromise) {
36+
loadKafkaConfig(startPromise);
37+
38+
Router router = Router.router(vertx);
39+
router.get().handler(StaticHandler.create());
40+
41+
vertx.createHttpServer()
42+
.requestHandler(router)
43+
.webSocketHandler(this::handleWebSocket)
44+
.listen(8080, "localhost")
45+
.onSuccess(ok -> {
46+
logger.info("🚀 WebSocketServer started");
47+
startPromise.complete();
48+
})
49+
.onFailure(err -> {
50+
logger.error("❌ WebSocketServer failed to listen", err);
51+
startPromise.fail(err);
52+
});
53+
}
54+
55+
private void loadKafkaConfig(Promise<Void> startPromise) {
56+
ConfigRetriever.create(vertx,
57+
new ConfigRetrieverOptions().addStore(
58+
new ConfigStoreOptions()
59+
.setType("file")
60+
.setFormat("properties")
61+
.setConfig(new JsonObject().put("path", "kafka.properties").put("raw-data", true))))
62+
.getConfig()
63+
.onSuccess(config -> {
64+
kafkaConfig = new HashMap<>();
65+
config.forEach(entry -> kafkaConfig.put(entry.getKey(), entry.getValue().toString()));
66+
})
67+
.onFailure(startPromise::fail);
68+
}
69+
70+
private void handleWebSocket(ServerWebSocket webSocket) {
71+
switch (webSocket.path()) {
72+
case PRODUCE_PATH:
73+
handleProduceSocket(webSocket);
74+
break;
75+
case CONSUME_PATH:
76+
handleConsumeSocket(webSocket);
77+
break;
78+
default:
79+
webSocket.reject();
80+
}
81+
}
82+
83+
private void handleProduceSocket(ServerWebSocket webSocket) {
84+
EventBus eventBus = vertx.eventBus();
85+
86+
webSocket.handler(buffer -> {
87+
String action = buffer.toJsonObject().getString("action", "none");
88+
if ("start".equals(action) || "stop".equals(action)) {
89+
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, action);
90+
}
91+
});
92+
93+
eventBus.<JsonObject>consumer(Main.PERIODIC_PRODUCER_BROADCAST, message ->
94+
eventBus.send(webSocket.textHandlerID(), message.body().encode()));
95+
}
96+
97+
private void handleConsumeSocket(ServerWebSocket webSocket) {
98+
KafkaConsumer<String, JsonObject> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig);
99+
100+
kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err));
101+
102+
kafkaConsumer.handler(record -> {
103+
JsonObject payload = new JsonObject()
104+
.put("topic", record.topic())
105+
.put("partition", record.partition())
106+
.put("offset", record.offset())
107+
.put("timestamp", record.timestamp())
108+
.put("value", record.value());
109+
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
110+
});
111+
112+
kafkaConsumer.subscribe(Main.TOPIC)
113+
.onSuccess(ok -> logger.info("Subscribed to {}", Main.TOPIC))
114+
.onFailure(err -> logger.error("Could not subscribe to {}", Main.TOPIC, err));
115+
116+
TopicPartition partition = new TopicPartition().setTopic(Main.TOPIC);
117+
118+
webSocket.handler(buffer -> {
119+
String action = buffer.toJsonObject().getString("action", "none");
120+
if ("start".equals(action)) {
121+
kafkaConsumer.resume(partition)
122+
.onFailure(err -> logger.error("Cannot resume consumer", err));
123+
} else if ("stop".equals(action)) {
124+
kafkaConsumer.pause(partition)
125+
.onFailure(err -> logger.error("Cannot pause consumer", err));
126+
}
127+
});
128+
129+
}
130+
}

src/main/java/kafka/vertx/demo/consumer/ConsumerService.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)