Skip to content

Commit 62f08b5

Browse files
authored
ft: Properties path as dir (#16)
* feat: Cert path relative to kafka.properties Convert the truststore and keystore files to absolute path based on location of kafka.properties file. Signed-off-by: Katherine Stanley <katheris@uk.ibm.com> * feat: Refactor to use config loading in Main class Signed-off-by: Katherine Stanley <katheris@uk.ibm.com> * feat: Add workflow for PRs that change the code Signed-off-by: Katherine Stanley <katheris@uk.ibm.com> * feat: Use vert.x Config processor function Signed-off-by: Katherine Stanley <katheris@uk.ibm.com> * feat: Remove use of java.io.File class Signed-off-by: Katherine Stanley <katheris@uk.ibm.com>
1 parent b0be0c8 commit 62f08b5

File tree

9 files changed

+277
-86
lines changed

9 files changed

+277
-86
lines changed

.github/workflows/java-pr-jobs.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name: Java
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- 'src/**'
7+
- 'pom.xml'
8+
9+
jobs:
10+
build:
11+
runs-on: ubuntu-latest
12+
name: Test
13+
14+
steps:
15+
- uses: actions/checkout@v2
16+
- name: Set up JDK 1.8
17+
uses: actions/setup-java@v1
18+
with:
19+
java-version: 1.8
20+
- name: Test
21+
run: mvn test

pom.xml

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
<properties>
1212
<kafka.version>2.3.0</kafka.version>
1313
<vertx.version>4.0.0-milestone5</vertx.version>
14+
<logback.version>1.2.3</logback.version>
1415
<jackson-databind.version>2.10.2</jackson-databind.version>
16+
<junit.version>5.6.0</junit.version>
17+
<hamcrest.version>2.2</hamcrest.version>
18+
<maven.surefire.version>3.0.0-M4</maven.surefire.version>
1519
</properties>
1620

1721
<dependencyManagement>
@@ -23,6 +27,13 @@
2327
<type>pom</type>
2428
<scope>import</scope>
2529
</dependency>
30+
<dependency>
31+
<groupId>org.junit</groupId>
32+
<artifactId>junit-bom</artifactId>
33+
<version>${junit.version}</version>
34+
<type>pom</type>
35+
<scope>import</scope>
36+
</dependency>
2637
</dependencies>
2738
</dependencyManagement>
2839

@@ -35,6 +46,11 @@
3546
<groupId>io.vertx</groupId>
3647
<artifactId>vertx-web</artifactId>
3748
</dependency>
49+
<dependency>
50+
<groupId>com.fasterxml.jackson.core</groupId>
51+
<artifactId>jackson-databind</artifactId>
52+
<version>${jackson-databind.version}</version>
53+
</dependency>
3854
<dependency>
3955
<groupId>io.vertx</groupId>
4056
<artifactId>vertx-kafka-client</artifactId>
@@ -46,7 +62,23 @@
4662
<dependency>
4763
<groupId>ch.qos.logback</groupId>
4864
<artifactId>logback-classic</artifactId>
49-
<version>1.2.3</version>
65+
<version>${logback.version}</version>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.junit.jupiter</groupId>
69+
<artifactId>junit-jupiter</artifactId>
70+
<scope>test</scope>
71+
</dependency>
72+
<dependency>
73+
<groupId>io.vertx</groupId>
74+
<artifactId>vertx-junit5</artifactId>
75+
<scope>test</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.hamcrest</groupId>
79+
<artifactId>hamcrest-core</artifactId>
80+
<version>${hamcrest.version}</version>
81+
<scope>test</scope>
5082
</dependency>
5183
</dependencies>
5284

@@ -67,6 +99,11 @@
6799
<target>1.8</target>
68100
</configuration>
69101
</plugin>
102+
<plugin>
103+
<groupId>org.apache.maven.plugins</groupId>
104+
<artifactId>maven-surefire-plugin</artifactId>
105+
<version>${maven.surefire.version}</version>
106+
</plugin>
70107
</plugins>
71108
</pluginManagement>
72109
<plugins>

src/main/java/kafka/vertx/demo/Main.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,30 @@
99
import io.vertx.config.ConfigRetrieverOptions;
1010
import io.vertx.config.ConfigStoreOptions;
1111
import io.vertx.core.*;
12+
import io.vertx.core.file.FileSystem;
1213
import io.vertx.core.json.JsonObject;
1314
import io.vertx.core.spi.resolver.ResolverProvider;
15+
import org.apache.kafka.common.config.SslConfigs;
1416
import org.slf4j.Logger;
1517
import org.slf4j.LoggerFactory;
1618

19+
import java.nio.file.Files;
20+
import java.nio.file.Path;
21+
import java.nio.file.Paths;
1722
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Function;
1824

1925
import static java.lang.System.currentTimeMillis;
2026

2127
public class Main {
2228

23-
public static final String TOPIC = "demo";
2429
public static final String PERIODIC_PRODUCER_ADDRESS = "demo.periodic.producer";
2530
public static final String PERIODIC_PRODUCER_BROADCAST = "demo.periodic.producer.broadcast";
31+
public static final String TOPIC_KEY = "topic";
32+
public static final String PROPERTIES_PATH_ENV_NAME = "properties_path";
33+
public static final String DEFAULT_PROPERTIES_PATH = "kafka.properties";
34+
35+
private static final String DEFAULT_TOPIC = "demo";
2636

2737
private static final Logger logger = LoggerFactory.getLogger(Main.class);
2838

@@ -48,8 +58,56 @@ public static void main(String[] args) {
4858
Future<String> periodicProducerDeployment = vertx.deployVerticle(new PeriodicProducer());
4959
Future<String> webSocketServerDeployment = vertx.deployVerticle(new WebSocketServer());
5060

51-
CompositeFuture.all(periodicProducerDeployment, webSocketServerDeployment)
61+
CompositeFuture.join(periodicProducerDeployment, webSocketServerDeployment)
5262
.onSuccess(ok -> logger.info("✅ Application started in {}ms", currentTimeMillis() - startTime))
5363
.onFailure(err -> logger.error("❌ Application failed to start", err));
5464
}
65+
66+
public static Future<JsonObject> loadKafkaConfig(Vertx vertx, String properties) {
67+
Path propertiesPath = Paths.get(properties);
68+
Path propertiesCompletePath = Files.isDirectory(propertiesPath) ? propertiesPath.resolve(DEFAULT_PROPERTIES_PATH) : propertiesPath;
69+
ConfigRetriever configRetriever = ConfigRetriever.create(vertx,
70+
new ConfigRetrieverOptions().addStore(
71+
new ConfigStoreOptions()
72+
.setType("file")
73+
.setFormat("properties")
74+
.setConfig(new JsonObject().put("path", propertiesCompletePath.toString()).put("raw-data", true)))).setConfigurationProcessor(configurationProcessor(propertiesPath));
75+
FileSystem fileSystem = vertx.fileSystem();
76+
return fileSystem.exists(properties)
77+
.compose(exists -> {
78+
if (exists) {
79+
return configRetriever.getConfig();
80+
} else {
81+
return Future.failedFuture(String.format("Kafka properties file at location %s is missing. Either specify using -D%s=<path> or use the default path of %s.", propertiesCompletePath, PROPERTIES_PATH_ENV_NAME, DEFAULT_PROPERTIES_PATH));
82+
}
83+
});
84+
}
85+
86+
protected static Function<JsonObject, JsonObject> configurationProcessor(Path propertiesPath) {
87+
Path propertiesAbsolutePath = propertiesPath.toAbsolutePath();
88+
Path propertiesDir;
89+
if (Files.isDirectory(propertiesAbsolutePath)) {
90+
propertiesDir = propertiesAbsolutePath;
91+
} else {
92+
propertiesDir = propertiesAbsolutePath.getParent();
93+
}
94+
return (properties) -> {
95+
JsonObject kafkaConfig = new JsonObject();
96+
properties.forEach(entry -> {
97+
String key = entry.getKey();
98+
String value = entry.getValue().toString();
99+
if (SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG.equals(key) || SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG.equals(key)) {
100+
Path truststorePath = Paths.get(value);
101+
if (!truststorePath.isAbsolute()) {
102+
value = propertiesDir.resolve(value).toString();
103+
}
104+
}
105+
kafkaConfig.put(key, value);
106+
});
107+
if (!kafkaConfig.containsKey(TOPIC_KEY)) {
108+
kafkaConfig.put(TOPIC_KEY, DEFAULT_TOPIC);
109+
}
110+
return kafkaConfig;
111+
};
112+
}
55113
}

src/main/java/kafka/vertx/demo/PeriodicProducer.java

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,17 @@
55
*/
66
package kafka.vertx.demo;
77

8-
import io.vertx.config.ConfigRetriever;
9-
import io.vertx.config.ConfigRetrieverOptions;
10-
import io.vertx.config.ConfigStoreOptions;
118
import io.vertx.core.AbstractVerticle;
12-
import io.vertx.core.Future;
139
import io.vertx.core.Promise;
1410
import io.vertx.core.TimeoutStream;
1511
import io.vertx.core.eventbus.Message;
16-
import io.vertx.core.file.FileSystem;
1712
import io.vertx.core.json.JsonObject;
1813
import io.vertx.kafka.client.producer.KafkaProducer;
1914
import io.vertx.kafka.client.producer.KafkaProducerRecord;
20-
import org.apache.kafka.common.config.SslConfigs;
2115
import org.slf4j.Logger;
2216
import org.slf4j.LoggerFactory;
2317

24-
import java.io.File;
2518
import java.util.HashMap;
26-
import java.util.Optional;
2719

2820
public class PeriodicProducer extends AbstractVerticle {
2921

@@ -32,52 +24,23 @@ public class PeriodicProducer extends AbstractVerticle {
3224

3325
@Override
3426
public void start(Promise<Void> startPromise) {
35-
loadKafkaConfig()
27+
String propertiesPath = System.getProperty(Main.PROPERTIES_PATH_ENV_NAME, Main.DEFAULT_PROPERTIES_PATH);
28+
Main.loadKafkaConfig(vertx, propertiesPath)
3629
.onSuccess(config -> {
37-
setup(config);
30+
HashMap<String, String> props = config.mapTo(HashMap.class);
31+
setup(props);
3832
startPromise.complete();
3933
})
4034
.onFailure(startPromise::fail);
4135
}
4236

43-
private Future<JsonObject> loadKafkaConfig() {
44-
String path = Optional.ofNullable(System.getProperty("properties_path")).orElse("kafka.properties");
45-
ConfigRetriever configRetriever = ConfigRetriever.create(vertx,
46-
new ConfigRetrieverOptions().addStore(
47-
new ConfigStoreOptions()
48-
.setType("file")
49-
.setFormat("properties")
50-
.setConfig(new JsonObject().put("path", path).put("raw-data", true))));
51-
FileSystem fileSystem = vertx.fileSystem();
52-
return fileSystem.exists(path)
53-
.compose(exists -> {
54-
if (exists) {
55-
return configRetriever.getConfig();
56-
} else {
57-
return Future.failedFuture("Kafka properties file is missing. Either specify using -Dproperties_path=<path> or use the default path of kafka.properties.");
58-
}
59-
});
60-
}
61-
62-
private void setup(JsonObject config) {
63-
HashMap<String, String> props = new HashMap<>();
64-
config.forEach(entry -> {
65-
String key = entry.getKey();
66-
String value = entry.getValue().toString();
67-
if (SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG.equals(key) || SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG.equals(key)) {
68-
File trustStorefile = new File(value);
69-
value = trustStorefile.toPath().toAbsolutePath().toString();
70-
}
71-
props.put(key, value);
72-
});
37+
private void setup(HashMap<String, String> props) {
7338
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);
7439

75-
String topic = Optional.ofNullable(config.getString("topic")).orElse(Main.TOPIC);
76-
7740
kafkaProducer.exceptionHandler(err -> logger.error("Kafka error: {}", err));
7841

7942
TimeoutStream timerStream = vertx.periodicStream(2000);
80-
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, topic));
43+
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, props.get(Main.TOPIC_KEY)));
8144
timerStream.pause();
8245

8346
vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));

src/main/java/kafka/vertx/demo/WebSocketServer.java

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,22 @@
55
*/
66
package kafka.vertx.demo;
77

8-
import io.vertx.config.ConfigRetriever;
9-
import io.vertx.config.ConfigRetrieverOptions;
10-
import io.vertx.config.ConfigStoreOptions;
118
import io.vertx.core.AbstractVerticle;
129
import io.vertx.core.Future;
1310
import io.vertx.core.Promise;
1411
import io.vertx.core.eventbus.EventBus;
1512
import io.vertx.core.eventbus.MessageConsumer;
16-
import io.vertx.core.file.FileSystem;
1713
import io.vertx.core.http.HttpServer;
1814
import io.vertx.core.http.ServerWebSocket;
1915
import io.vertx.core.json.JsonObject;
2016
import io.vertx.ext.web.Router;
2117
import io.vertx.ext.web.handler.StaticHandler;
2218
import io.vertx.kafka.client.common.TopicPartition;
2319
import io.vertx.kafka.client.consumer.KafkaConsumer;
24-
import org.apache.kafka.common.config.SslConfigs;
2520
import org.slf4j.Logger;
2621
import org.slf4j.LoggerFactory;
2722

28-
import java.io.File;
2923
import java.util.HashMap;
30-
import java.util.Optional;
3124

3225
public class WebSocketServer extends AbstractVerticle {
3326

@@ -43,42 +36,17 @@ public void start(Promise<Void> startPromise) {
4336
Router router = Router.router(vertx);
4437
router.get().handler(StaticHandler.create());
4538

46-
loadKafkaConfig()
47-
.compose(config -> startWebSocket(router, config))
39+
String propertiesPath = System.getProperty(Main.PROPERTIES_PATH_ENV_NAME, Main.DEFAULT_PROPERTIES_PATH);
40+
Main.loadKafkaConfig(vertx, propertiesPath)
41+
.compose(config -> {
42+
kafkaConfig = config.mapTo(HashMap.class);
43+
return startWebSocket(router);
44+
})
4845
.onSuccess(ok -> startPromise.complete())
4946
.onFailure(startPromise::fail);
5047
}
5148

52-
private Future<JsonObject> loadKafkaConfig() {
53-
String path = Optional.ofNullable(System.getProperty("properties_path")).orElse("kafka.properties");
54-
ConfigRetriever configRetriever = ConfigRetriever.create(vertx,
55-
new ConfigRetrieverOptions().addStore(
56-
new ConfigStoreOptions()
57-
.setType("file")
58-
.setFormat("properties")
59-
.setConfig(new JsonObject().put("path", path).put("raw-data", true))));
60-
FileSystem fileSystem = vertx.fileSystem();
61-
return fileSystem.exists(path)
62-
.compose(exists -> {
63-
if (exists) {
64-
return configRetriever.getConfig();
65-
} else {
66-
return Future.failedFuture("Kafka properties file is missing. Either specify using -Dproperties_path=<path> or use the default path of kafka.properties.");
67-
}
68-
});
69-
}
70-
71-
private Future<HttpServer> startWebSocket(Router router, JsonObject config) {
72-
kafkaConfig = new HashMap<>();
73-
config.forEach(entry -> {
74-
String key = entry.getKey();
75-
String value = entry.getValue().toString();
76-
if (SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG.equals(key) || SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG.equals(key)) {
77-
File trustStorefile = new File(value);
78-
value = trustStorefile.toPath().toAbsolutePath().toString();
79-
}
80-
kafkaConfig.put(key, value);
81-
});
49+
private Future<HttpServer> startWebSocket(Router router) {
8250
return vertx.createHttpServer()
8351
.requestHandler(router)
8452
.webSocketHandler(this::handleWebSocket)
@@ -139,7 +107,7 @@ private void handleConsumeSocket(ServerWebSocket webSocket) {
139107
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
140108
});
141109

142-
String topic = Optional.ofNullable(kafkaConfig.get("topic")).orElse(Main.TOPIC);
110+
String topic = kafkaConfig.get(Main.TOPIC_KEY);
143111

144112
kafkaConsumer.subscribe(topic)
145113
.onSuccess(ok -> logger.info("Subscribed to {}", topic))

0 commit comments

Comments
 (0)