Skip to content

Commit 5770e15

Browse files
authored
feat: Allow config file to be provided at runtime
* feat: Allow config file to be provided at runtime * feat: Handle promise completion in start methods * feat: Update README for new properties file * fix: Don't use future for synchronous code Signed-off-by: Katherine Stanley <katheris@uk.ibm.com>
1 parent b2cbf3e commit 5770e15

File tree

4 files changed

+59
-33
lines changed

4 files changed

+59
-33
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ Test out the app by connecting to the websocket endpoints (UI will be added soon
3535
- Disconnect from the consume websocket
3636

3737
## Configuration
38-
To configure the application to connect to your Kafka edit the properties file in `src/main/resources`.
39-
If your Kafka is secured you will need to enable the security configuration options and add your certificate to `src/main/resources`.
38+
To configure the application to connect to your Kafka edit the properties file called `kafka.properties`.
39+
Alternatively you can provide a custom path to the properties file using `-Dproperties_path=<path>` when starting the application.
40+
41+
If your Kafka is secured you will need to enable the security configuration options in your properties file

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99
import io.vertx.config.ConfigRetrieverOptions;
1010
import io.vertx.config.ConfigStoreOptions;
1111
import io.vertx.core.AbstractVerticle;
12+
import io.vertx.core.Future;
1213
import io.vertx.core.Promise;
1314
import io.vertx.core.TimeoutStream;
1415
import io.vertx.core.eventbus.Message;
16+
import io.vertx.core.file.FileSystem;
1517
import io.vertx.core.json.JsonObject;
1618
import io.vertx.kafka.client.producer.KafkaProducer;
1719
import io.vertx.kafka.client.producer.KafkaProducerRecord;
1820
import org.slf4j.Logger;
1921
import org.slf4j.LoggerFactory;
2022

21-
import java.time.Instant;
2223
import java.util.HashMap;
2324
import java.util.Optional;
2425

@@ -29,18 +30,34 @@ public class PeriodicProducer extends AbstractVerticle {
2930

3031
@Override
3132
public void start(Promise<Void> startPromise) {
32-
ConfigRetriever.create(vertx,
33+
loadKafkaConfig()
34+
.onSuccess(config -> {
35+
setup(config);
36+
startPromise.complete();
37+
})
38+
.onFailure(startPromise::fail);
39+
}
40+
41+
private Future<JsonObject> loadKafkaConfig() {
42+
String path = Optional.ofNullable(System.getProperty("properties_path")).orElse("kafka.properties");
43+
ConfigRetriever configRetriever = ConfigRetriever.create(vertx,
3344
new ConfigRetrieverOptions().addStore(
3445
new ConfigStoreOptions()
3546
.setType("file")
3647
.setFormat("properties")
37-
.setConfig(new JsonObject().put("path", "kafka.properties").put("raw-data", true))))
38-
.getConfig()
39-
.onSuccess(config -> setup(config, startPromise))
40-
.onFailure(startPromise::fail);
48+
.setConfig(new JsonObject().put("path", path).put("raw-data", true))));
49+
FileSystem fileSystem = vertx.fileSystem();
50+
return fileSystem.exists(path)
51+
.compose(exists -> {
52+
if (exists) {
53+
return configRetriever.getConfig();
54+
} else {
55+
return Future.failedFuture("Kafka properties file is missing. Either specify using -Dproperties_path=<path> or use the default path of kafka.properties.");
56+
}
57+
});
4158
}
4259

43-
private void setup(JsonObject config, Promise<Void> startPromise) {
60+
private void setup(JsonObject config) {
4461
HashMap<String, String> props = new HashMap<>();
4562
config.forEach(entry -> props.put(entry.getKey(), entry.getValue().toString()));
4663
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);
@@ -55,7 +72,6 @@ private void setup(JsonObject config, Promise<Void> startPromise) {
5572

5673
vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
5774
logger.info("🚀 PeriodicProducer started");
58-
startPromise.complete();
5975
}
6076

6177
private void handleCommand(TimeoutStream timerStream, Message<JsonObject> message) {

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
import io.vertx.config.ConfigRetrieverOptions;
1010
import io.vertx.config.ConfigStoreOptions;
1111
import io.vertx.core.AbstractVerticle;
12+
import io.vertx.core.Future;
1213
import io.vertx.core.Promise;
1314
import io.vertx.core.eventbus.EventBus;
1415
import io.vertx.core.eventbus.MessageConsumer;
16+
import io.vertx.core.file.FileSystem;
17+
import io.vertx.core.http.HttpServer;
1518
import io.vertx.core.http.ServerWebSocket;
1619
import io.vertx.core.json.JsonObject;
1720
import io.vertx.ext.web.Router;
@@ -35,38 +38,43 @@ public class WebSocketServer extends AbstractVerticle {
3538

3639
@Override
3740
public void start(Promise<Void> startPromise) {
38-
loadKafkaConfig(startPromise);
39-
4041
Router router = Router.router(vertx);
4142
router.get().handler(StaticHandler.create());
4243

43-
vertx.createHttpServer()
44-
.requestHandler(router)
45-
.webSocketHandler(this::handleWebSocket)
46-
.listen(8080, "localhost")
47-
.onSuccess(ok -> {
48-
logger.info("🚀 WebSocketServer started");
49-
startPromise.complete();
50-
})
51-
.onFailure(err -> {
52-
logger.error("❌ WebSocketServer failed to listen", err);
53-
startPromise.fail(err);
54-
});
44+
loadKafkaConfig()
45+
.compose(config -> startWebSocket(router, config))
46+
.onSuccess(ok -> startPromise.complete())
47+
.onFailure(startPromise::fail);
5548
}
5649

57-
private void loadKafkaConfig(Promise<Void> startPromise) {
58-
ConfigRetriever.create(vertx,
50+
private Future<JsonObject> loadKafkaConfig() {
51+
String path = Optional.ofNullable(System.getProperty("properties_path")).orElse("kafka.properties");
52+
ConfigRetriever configRetriever = ConfigRetriever.create(vertx,
5953
new ConfigRetrieverOptions().addStore(
6054
new ConfigStoreOptions()
6155
.setType("file")
6256
.setFormat("properties")
63-
.setConfig(new JsonObject().put("path", "kafka.properties").put("raw-data", true))))
64-
.getConfig()
65-
.onSuccess(config -> {
66-
kafkaConfig = new HashMap<>();
67-
config.forEach(entry -> kafkaConfig.put(entry.getKey(), entry.getValue().toString()));
68-
})
69-
.onFailure(startPromise::fail);
57+
.setConfig(new JsonObject().put("path", path).put("raw-data", true))));
58+
FileSystem fileSystem = vertx.fileSystem();
59+
return fileSystem.exists(path)
60+
.compose(exists -> {
61+
if (exists) {
62+
return configRetriever.getConfig();
63+
} else {
64+
return Future.failedFuture("Kafka properties file is missing. Either specify using -Dproperties_path=<path> or use the default path of kafka.properties.");
65+
}
66+
});
67+
}
68+
69+
private Future<HttpServer> startWebSocket(Router router, JsonObject config) {
70+
kafkaConfig = new HashMap<>();
71+
config.forEach(entry -> kafkaConfig.put(entry.getKey(), entry.getValue().toString()));
72+
return vertx.createHttpServer()
73+
.requestHandler(router)
74+
.webSocketHandler(this::handleWebSocket)
75+
.listen(8080, "localhost")
76+
.onSuccess(ok -> logger.info("🚀 WebSocketServer started"))
77+
.onFailure(err -> logger.error("❌ WebSocketServer failed to listen", err));
7078
}
7179

7280
private void handleWebSocket(ServerWebSocket webSocket) {

0 commit comments

Comments
 (0)