Skip to content

Commit 1a7c950

Browse files
gljrobinsonkatheris
authored andcommitted
feat: Enable custom message for produce
Signed-off-by: Grace Jansen <grace.jansen1@ibm.com>
1 parent 35fffd9 commit 1a7c950

File tree

4 files changed

+15
-15
lines changed

4 files changed

+15
-15
lines changed

src/main/java/kafka/vertx/demo/Main.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public static void main(String[] args) {
2727

2828
// Set vertx timeout to deal with slow DNS connections
2929
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, "true");
30-
3130
Vertx vertx = Vertx.vertx(
3231
new VertxOptions()
3332
.setWarningExceptionTime(10).setWarningExceptionTimeUnit(TimeUnit.SECONDS)

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
public class PeriodicProducer extends AbstractVerticle {
2525

2626
private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class);
27+
private String customMessage;
2728

2829
@Override
2930
public void start(Promise<Void> startPromise) {
@@ -41,34 +42,33 @@ public void start(Promise<Void> startPromise) {
4142
private void setup(JsonObject config, Promise<Void> startPromise) {
4243
HashMap<String, String> props = new HashMap<>();
4344
config.forEach(entry -> props.put(entry.getKey(), entry.getValue().toString()));
44-
KafkaProducer<String, JsonObject> kafkaProducer = KafkaProducer.create(vertx, props);
45+
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);
4546

4647
TimeoutStream timerStream = vertx.periodicStream(2000);
4748
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer));
4849
timerStream.pause();
4950

50-
vertx.eventBus().<String>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
51+
vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
5152
logger.info("🚀 PeriodicConsumer started");
5253
startPromise.complete();
5354
}
5455

55-
private void handleCommand(TimeoutStream timerStream, Message<String> message) {
56-
String command = message.body();
56+
private void handleCommand(TimeoutStream timerStream, Message<JsonObject> message) {
57+
String command = message.body().getString("action", "none");
5758
if ("start".equals(command)) {
5859
logger.info("Producing Kafka records");
60+
customMessage = message.body().getString("custom", "Hello World");
5961
timerStream.resume();
6062
} else if ("stop".equals(command)) {
6163
logger.info("Stopping producing Kafka records");
6264
timerStream.pause();
6365
}
6466
}
6567

66-
private void produceKafkaRecord(KafkaProducer<String, JsonObject> kafkaProducer) {
67-
JsonObject payload = new JsonObject()
68-
.put("type", "tick")
69-
.put("when", Instant.now().toString());
68+
private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer) {
69+
String payload = customMessage;
7070

71-
KafkaProducerRecord<String, JsonObject> record = KafkaProducerRecord.create(Main.TOPIC, payload);
71+
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(Main.TOPIC, payload);
7272

7373
kafkaProducer
7474
.send(record)
@@ -81,6 +81,6 @@ private void produceKafkaRecord(KafkaProducer<String, JsonObject> kafkaProducer)
8181
.put("value", payload);
8282
vertx.eventBus().send(Main.PERIODIC_PRODUCER_BROADCAST, kafkaMetaData);
8383
})
84-
.onFailure(err -> logger.error("Error sending {}", payload.encode(), err));
84+
.onFailure(err -> logger.error("Error sending {}", payload, err));
8585
}
8686
}

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ private void handleProduceSocket(ServerWebSocket webSocket) {
8585
EventBus eventBus = vertx.eventBus();
8686

8787
webSocket.handler(buffer -> {
88-
String action = buffer.toJsonObject().getString("action", "none");
88+
JsonObject message = buffer.toJsonObject();
89+
String action = message.getString("action", "none");
8990
if ("start".equals(action) || "stop".equals(action)) {
90-
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, action);
91+
eventBus.send(Main.PERIODIC_PRODUCER_ADDRESS, message);
9192
}
9293
});
9394

src/main/resources/kafka.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ group.id=my_group
55
auto.offset.reset=earliest
66
enable.auto.commit=false
77
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
8-
value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer
8+
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
99

1010
## Producer specific properties
1111
acks=1
1212
key.serializer=org.apache.kafka.common.serialization.StringSerializer
13-
value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer
13+
value.serializer=org.apache.kafka.common.serialization.StringSerializer
1414

1515
## Optional security options
1616
# security.protocol=SSL

0 commit comments

Comments
 (0)