Skip to content

Commit b2cbf3e

Browse files
authored
feat: Make topic configurable (#12)
Signed-off-by: Katherine Stanley <katheris@uk.ibm.com>
1 parent fdcd3ff commit b2cbf3e

File tree

4 files changed

+15
-9
lines changed

4 files changed

+15
-9
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
A starter app for testing out your [Apache Kafka](https://kafka.apache.org) deployment and trying out the [Vert.x Kafka client](https://vertx.io/docs/vertx-kafka-client/java/).
44

5-
The app allows you to send records to a topic in Kafka called `test` every two seconds and consume back those records.
5+
The app allows you to send records to a topic in Kafka called `demo` every two seconds and consume back those records.
66

77
## Running the app
88

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.time.Instant;
2222
import java.util.HashMap;
23+
import java.util.Optional;
2324

2425
public class PeriodicProducer extends AbstractVerticle {
2526

@@ -44,10 +45,12 @@ private void setup(JsonObject config, Promise<Void> startPromise) {
4445
config.forEach(entry -> props.put(entry.getKey(), entry.getValue().toString()));
4546
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);
4647

48+
String topic = Optional.ofNullable(config.getString("topic")).orElse(Main.TOPIC);
49+
4750
kafkaProducer.exceptionHandler(err -> logger.error("Kafka error: {}", err));
4851

4952
TimeoutStream timerStream = vertx.periodicStream(2000);
50-
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer));
53+
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, topic));
5154
timerStream.pause();
5255

5356
vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
@@ -67,10 +70,9 @@ private void handleCommand(TimeoutStream timerStream, Message<JsonObject> messag
6770
}
6871
}
6972

70-
private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer) {
73+
private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, String topic) {
7174
String payload = customMessage;
72-
73-
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(Main.TOPIC, payload);
75+
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, payload);
7476

7577
kafkaProducer
7678
.send(record)

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import java.util.HashMap;
25+
import java.util.Optional;
2526

2627
public class WebSocketServer extends AbstractVerticle {
2728

@@ -120,11 +121,13 @@ private void handleConsumeSocket(ServerWebSocket webSocket) {
120121
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
121122
});
122123

123-
kafkaConsumer.subscribe(Main.TOPIC)
124-
.onSuccess(ok -> logger.info("Subscribed to {}", Main.TOPIC))
125-
.onFailure(err -> logger.error("Could not subscribe to {}", Main.TOPIC, err));
124+
String topic = Optional.ofNullable(kafkaConfig.get("topic")).orElse(Main.TOPIC);
126125

127-
TopicPartition partition = new TopicPartition().setTopic(Main.TOPIC);
126+
kafkaConsumer.subscribe(topic)
127+
.onSuccess(ok -> logger.info("Subscribed to {}", topic))
128+
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
129+
130+
TopicPartition partition = new TopicPartition().setTopic(topic);
128131

129132
webSocket.handler(buffer -> {
130133
String action = buffer.toJsonObject().getString("action", "none");

src/main/resources/kafka.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
bootstrap.servers=localhost:9092
2+
# topic=
23

34
## Consumer specific properties
45
group.id=my_group

0 commit comments

Comments
 (0)