File tree Expand file tree Collapse file tree 2 files changed +22
-2
lines changed
src/main/java/kafka/vertx/demo Expand file tree Collapse file tree 2 files changed +22
-2
lines changed Original file line number Diff line number Diff line change 1717import io .vertx .core .json .JsonObject ;
1818import io .vertx .kafka .client .producer .KafkaProducer ;
1919import io .vertx .kafka .client .producer .KafkaProducerRecord ;
20+ import org .apache .kafka .common .config .SslConfigs ;
2021import org .slf4j .Logger ;
2122import org .slf4j .LoggerFactory ;
2223
24+ import java .io .File ;
2325import java .util .HashMap ;
2426import java .util .Optional ;
2527
@@ -59,7 +61,15 @@ private Future<JsonObject> loadKafkaConfig() {
5961
6062 private void setup (JsonObject config ) {
6163 HashMap <String , String > props = new HashMap <>();
62- config .forEach (entry -> props .put (entry .getKey (), entry .getValue ().toString ()));
64+ config .forEach (entry -> {
65+ String key = entry .getKey ();
66+ String value = entry .getValue ().toString ();
67+ if (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG .equals (key ) || SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG .equals (key )) {
68+ File trustStorefile = new File (value );
69+ value = trustStorefile .toPath ().toAbsolutePath ().toString ();
70+ }
71+ props .put (key , value );
72+ });
6373 KafkaProducer <String , String > kafkaProducer = KafkaProducer .create (vertx , props );
6474
6575 String topic = Optional .ofNullable (config .getString ("topic" )).orElse (Main .TOPIC );
Original file line number Diff line number Diff line change 2121import io .vertx .ext .web .handler .StaticHandler ;
2222import io .vertx .kafka .client .common .TopicPartition ;
2323import io .vertx .kafka .client .consumer .KafkaConsumer ;
24+ import org .apache .kafka .common .config .SslConfigs ;
2425import org .slf4j .Logger ;
2526import org .slf4j .LoggerFactory ;
2627
28+ import java .io .File ;
2729import java .util .HashMap ;
2830import java .util .Optional ;
2931
@@ -68,7 +70,15 @@ private Future<JsonObject> loadKafkaConfig() {
6870
6971 private Future <HttpServer > startWebSocket (Router router , JsonObject config ) {
7072 kafkaConfig = new HashMap <>();
71- config .forEach (entry -> kafkaConfig .put (entry .getKey (), entry .getValue ().toString ()));
73+ config .forEach (entry -> {
74+ String key = entry .getKey ();
75+ String value = entry .getValue ().toString ();
76+ if (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG .equals (key ) || SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG .equals (key )) {
77+ File trustStorefile = new File (value );
78+ value = trustStorefile .toPath ().toAbsolutePath ().toString ();
79+ }
80+ kafkaConfig .put (key , value );
81+ });
7282 return vertx .createHttpServer ()
7383 .requestHandler (router )
7484 .webSocketHandler (this ::handleWebSocket )
You can’t perform that action at this time.
0 commit comments