Skip to content

Commit 0748f25

Browse files
authored
feat: Fancier start
* Various Maven POM improvements - Upgraded versions - Use the vertx-dependencies BOM to align dependencies * Use webSocketHandler instead of websocketHanbler Deprecated in 3.8.x, removed in v4 * Move to Vertx.next() * Need jackson-databind as a separate dependency It is not included in the Vert.x stack by default anymore. * Fancier and simplified Main / start method * Replace printf by logger calls * Kafka is too noisy Signed-off-by: Julien Ponge <jponge@redhat.com>
1 parent 6aad5f2 commit 0748f25

File tree

7 files changed

+194
-142
lines changed

7 files changed

+194
-142
lines changed

pom.xml

Lines changed: 108 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2,104 +2,118 @@
22
<project xmlns="http://maven.apache.org/POM/4.0.0"
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5-
<modelVersion>4.0.0</modelVersion>
5+
<modelVersion>4.0.0</modelVersion>
66

7-
<groupId>kafka.vertx</groupId>
8-
<artifactId>demo</artifactId>
9-
<version>0.0.2-SNAPSHOT</version>
7+
<groupId>kafka.vertx</groupId>
8+
<artifactId>demo</artifactId>
9+
<version>0.0.2-SNAPSHOT</version>
1010

11-
<properties>
12-
<kafka.version>2.3.0</kafka.version>
13-
<vertx.version>4.0.0-SNAPSHOT</vertx.version>
14-
</properties>
11+
<properties>
12+
<kafka.version>2.3.0</kafka.version>
13+
<vertx.version>4.0.0-SNAPSHOT</vertx.version>
14+
<jackson-databind.version>2.10.2</jackson-databind.version>
15+
</properties>
16+
17+
<dependencyManagement>
18+
<dependencies>
19+
<dependency>
20+
<groupId>io.vertx</groupId>
21+
<artifactId>vertx-dependencies</artifactId>
22+
<version>${vertx.version}</version>
23+
<type>pom</type>
24+
<scope>import</scope>
25+
</dependency>
26+
</dependencies>
27+
</dependencyManagement>
1528

16-
<dependencyManagement>
1729
<dependencies>
18-
<dependency>
19-
<groupId>io.vertx</groupId>
20-
<artifactId>vertx-dependencies</artifactId>
21-
<version>${vertx.version}</version>
22-
<type>pom</type>
23-
<scope>import</scope>
24-
</dependency>
30+
<dependency>
31+
<groupId>io.vertx</groupId>
32+
<artifactId>vertx-core</artifactId>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.vertx</groupId>
36+
<artifactId>vertx-web</artifactId>
37+
</dependency>
38+
<dependency>
39+
<groupId>io.vertx</groupId>
40+
<artifactId>vertx-kafka-client</artifactId>
41+
</dependency>
42+
<dependency>
43+
<groupId>io.vertx</groupId>
44+
<artifactId>vertx-service-proxy</artifactId>
45+
</dependency>
46+
<dependency>
47+
<groupId>io.vertx</groupId>
48+
<artifactId>vertx-codegen</artifactId>
49+
<scope>provided</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>io.vertx</groupId>
53+
<artifactId>vertx-codegen</artifactId>
54+
<classifier>processor</classifier>
55+
</dependency>
56+
<dependency>
57+
<groupId>io.vertx</groupId>
58+
<artifactId>vertx-config</artifactId>
59+
</dependency>
60+
<dependency>
61+
<groupId>com.fasterxml.jackson.core</groupId>
62+
<artifactId>jackson-databind</artifactId>
63+
<version>${jackson-databind.version}</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>ch.qos.logback</groupId>
67+
<artifactId>logback-classic</artifactId>
68+
<version>1.2.3</version>
69+
</dependency>
2570
</dependencies>
26-
</dependencyManagement>
27-
28-
<dependencies>
29-
<dependency>
30-
<groupId>io.vertx</groupId>
31-
<artifactId>vertx-core</artifactId>
32-
</dependency>
33-
<dependency>
34-
<groupId>io.vertx</groupId>
35-
<artifactId>vertx-web</artifactId>
36-
</dependency>
37-
<dependency>
38-
<groupId>io.vertx</groupId>
39-
<artifactId>vertx-kafka-client</artifactId>
40-
</dependency>
41-
<dependency>
42-
<groupId>io.vertx</groupId>
43-
<artifactId>vertx-service-proxy</artifactId>
44-
</dependency>
45-
<dependency>
46-
<groupId>io.vertx</groupId>
47-
<artifactId>vertx-codegen</artifactId>
48-
<scope>provided</scope>
49-
</dependency>
50-
<dependency>
51-
<groupId>io.vertx</groupId>
52-
<artifactId>vertx-codegen</artifactId>
53-
<classifier>processor</classifier>
54-
</dependency>
55-
<dependency>
56-
<groupId>io.vertx</groupId>
57-
<artifactId>vertx-config</artifactId>
58-
</dependency>
59-
</dependencies>
6071

61-
<build>
62-
<pluginManagement>
63-
<plugins>
64-
<plugin>
65-
<artifactId>maven-compiler-plugin</artifactId>
66-
<version>3.8.1</version>
67-
<configuration>
68-
<source>1.8</source>
69-
<target>1.8</target>
70-
</configuration>
71-
</plugin>
72-
</plugins>
73-
</pluginManagement>
74-
<plugins>
75-
<plugin>
76-
<groupId>org.apache.maven.plugins</groupId>
77-
<artifactId>maven-shade-plugin</artifactId>
78-
<version>3.2.2</version>
79-
<executions>
80-
<execution>
81-
<phase>package</phase>
82-
<goals>
83-
<goal>shade</goal>
84-
</goals>
85-
<configuration>
86-
<transformers>
87-
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
88-
<manifestEntries>
89-
<Main-Class>kafka.vertx.demo.Main</Main-Class>
90-
</manifestEntries>
91-
</transformer>
92-
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
93-
<resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>
94-
</transformer>
95-
</transformers>
96-
<artifactSet>
97-
</artifactSet>
98-
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-all.jar</outputFile>
99-
</configuration>
100-
</execution>
101-
</executions>
102-
</plugin>
103-
</plugins>
104-
</build>
72+
<build>
73+
<pluginManagement>
74+
<plugins>
75+
<plugin>
76+
<artifactId>maven-compiler-plugin</artifactId>
77+
<version>3.8.1</version>
78+
<configuration>
79+
<source>1.8</source>
80+
<target>1.8</target>
81+
</configuration>
82+
</plugin>
83+
</plugins>
84+
</pluginManagement>
85+
<plugins>
86+
<plugin>
87+
<groupId>org.apache.maven.plugins</groupId>
88+
<artifactId>maven-shade-plugin</artifactId>
89+
<version>3.2.2</version>
90+
<executions>
91+
<execution>
92+
<phase>package</phase>
93+
<goals>
94+
<goal>shade</goal>
95+
</goals>
96+
<configuration>
97+
<transformers>
98+
<transformer
99+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
100+
<manifestEntries>
101+
<Main-Class>kafka.vertx.demo.Main</Main-Class>
102+
</manifestEntries>
103+
</transformer>
104+
<transformer
105+
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
106+
<resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>
107+
</transformer>
108+
</transformers>
109+
<artifactSet>
110+
</artifactSet>
111+
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-all.jar
112+
</outputFile>
113+
</configuration>
114+
</execution>
115+
</executions>
116+
</plugin>
117+
</plugins>
118+
</build>
105119
</project>

src/main/java/kafka/vertx/demo/Main.java

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,68 +7,71 @@
77

88
import io.vertx.core.CompositeFuture;
99
import io.vertx.core.Future;
10-
import io.vertx.core.Promise;
1110
import io.vertx.core.Vertx;
1211
import io.vertx.core.VertxOptions;
13-
import io.vertx.core.dns.AddressResolverOptions;
1412
import io.vertx.core.spi.resolver.ResolverProvider;
1513
import kafka.vertx.demo.consumer.ConsumerVerticle;
1614
import kafka.vertx.demo.producer.ProducerVerticle;
1715
import kafka.vertx.demo.websocket.WebsocketVerticle;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1818

1919
import java.util.concurrent.TimeUnit;
2020

21+
import static java.lang.System.currentTimeMillis;
22+
2123
public class Main {
2224

2325
private static String websocketVerticleName = "websocket";
2426
private static String producerVerticleName = "producer";
2527
private static String consumerVerticleName = "consumer";
2628

29+
private static final Logger logger = LoggerFactory.getLogger(Main.class);
30+
2731
public static void main(String[] args) {
32+
long startTime = currentTimeMillis();
33+
2834
// Set vertx timeout to deal with slow DNS connections
2935
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, "true");
3036
Vertx vertx = Vertx.vertx(
3137
new VertxOptions()
3238
.setWarningExceptionTime(10).setWarningExceptionTimeUnit(TimeUnit.SECONDS)
3339
.setMaxEventLoopExecuteTime(20).setMaxEventLoopExecuteTimeUnit((TimeUnit.SECONDS)));
3440

35-
// Instantiating WebSocket Verticle
36-
Future websocketVerticleFuture = Future.future(promise -> {
37-
WebsocketVerticle websocketVerticle = new WebsocketVerticle();
38-
vertx.deployVerticle(websocketVerticle, res -> res.map(id -> handleSuccessfulDeploy(websocketVerticleName, id, promise))
39-
.otherwise(t -> handleFailedDeploy(websocketVerticleName, t, promise)));
40-
});
41-
42-
// Instantiating Producer Verticle to enable consuming Kafka event
43-
Future producerVerticleFuture = Future.future(promise -> {
44-
ProducerVerticle producerVerticle = new ProducerVerticle();
45-
vertx.deployVerticle(producerVerticle, res -> res.map(id -> handleSuccessfulDeploy(producerVerticleName, id, promise))
46-
.otherwise(t -> handleFailedDeploy(producerVerticleName, t, promise)));
47-
});
48-
49-
// Instantiating Consumer Verticle to enable consume Kafka events
50-
Future consumerVerticleFuture = Future.future(promise -> {
51-
ConsumerVerticle consumerVerticle = new ConsumerVerticle();
52-
vertx.deployVerticle(consumerVerticle, res -> res.map(id -> handleSuccessfulDeploy(consumerVerticleName, id, promise))
53-
.otherwise(t -> handleFailedDeploy(consumerVerticleName, t, promise)));
54-
});
41+
Future<String> webSocketDeployment = vertx
42+
.deployVerticle(new WebsocketVerticle())
43+
.onSuccess(id -> verticleDeployed(websocketVerticleName, id))
44+
.onFailure(err -> verticleFailedToDeploy(websocketVerticleName, err));
45+
46+
Future<String> producerDeployment = vertx
47+
.deployVerticle(new ProducerVerticle())
48+
.onSuccess(id -> verticleDeployed(producerVerticleName, id))
49+
.onFailure(err -> verticleFailedToDeploy(producerVerticleName, err));
50+
51+
Future<String> consumerDeployment = vertx
52+
.deployVerticle(new ConsumerVerticle())
53+
.onSuccess(id -> verticleDeployed(consumerVerticleName, id))
54+
.onFailure(err -> verticleFailedToDeploy(consumerVerticleName, err));
5555

5656
// Create CompositeFuture to wait for verticles to start
57-
CompositeFuture.join(websocketVerticleFuture, producerVerticleFuture, consumerVerticleFuture)
58-
.onSuccess(res -> System.out.printf("Application has started, go to localhost:8080 to see the app running.\n"))
59-
.onFailure(t -> System.out.printf("Application failed to start: %s%n", t));
57+
CompositeFuture.join(webSocketDeployment, producerDeployment, consumerDeployment)
58+
.onSuccess(res -> appStarted(currentTimeMillis() - startTime))
59+
.onFailure(Main::appFailedToStart);
60+
}
61+
62+
private static void appFailedToStart(Throwable t) {
63+
logger.error("❌ Application failed to start", t);
64+
}
6065

66+
private static void appStarted(long duration) {
67+
logger.info("✅ Application has started in {}ms, go to http://localhost:8080 to see the app running.", duration);
6168
}
6269

63-
private static Void handleSuccessfulDeploy(String verticleName, String id, Promise promise) {
64-
System.out.printf("Verticle %s deployed: %s%n", verticleName, id);
65-
promise.complete();
66-
return null;
70+
private static void verticleDeployed(String name, String id) {
71+
logger.info("🚀 Verticle {} deployed: {}", name, id);
6772
}
6873

69-
private static Void handleFailedDeploy(String verticleName, Throwable t, Promise promise) {
70-
System.out.printf("Verticle %s failed to deploy: %s%n", verticleName, t);
71-
promise.fail(t);
72-
return null;
74+
private static void verticleFailedToDeploy(String name, Throwable err) {
75+
logger.error("❌ Verticle {} failed to deploy: {}", name, err.getMessage());
7376
}
7477
}

src/main/java/kafka/vertx/demo/consumer/ConsumerServiceImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,26 @@
1212
import io.vertx.core.json.JsonObject;
1313
import io.vertx.kafka.client.common.TopicPartition;
1414
import io.vertx.kafka.client.consumer.KafkaConsumer;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
1518
import java.util.Map;
1619

1720
public class ConsumerServiceImpl implements ConsumerService {
1821

1922
private KafkaConsumer<String, String> consumer;
2023
private Vertx vertx;
24+
2125
//Creating boolean to track whether consumer has already been instantiated and is paused.
2226
private boolean paused = false;
2327
private String eventBusId;
2428

29+
private static final Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
30+
2531
public ConsumerServiceImpl(Vertx vertx, Map<String, String> config) {
2632
this.vertx = vertx;
2733
consumer = KafkaConsumer.create(vertx, config);
28-
consumer.exceptionHandler(t -> System.out.printf("KafkaConsumer Exception: %s%n", t));
34+
consumer.exceptionHandler(t -> logger.error("KafkaConsumer Exception", t));
2935
}
3036

3137
@Override

src/main/java/kafka/vertx/demo/producer/PeriodicProducer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import io.vertx.core.AbstractVerticle;
99
import io.vertx.core.json.JsonObject;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
1012

1113
import java.util.concurrent.TimeUnit;
1214

@@ -16,6 +18,8 @@ public class PeriodicProducer extends AbstractVerticle {
1618
private String message;
1719
private String eventBusId;
1820

21+
private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class);
22+
1923
public PeriodicProducer(String topicName, String message, String eventBusId) {
2024
this.topicName = topicName;
2125
this.message = message;
@@ -31,7 +35,7 @@ public void start() {
3135
vertx.eventBus().send(eventBusId, JsonObject.mapFrom(recordData).encode());
3236
return null;
3337
}).otherwise(t -> {
34-
System.out.printf("Failure in producing record: %s%n", t);
38+
logger.error("Failure in producing record", t);
3539
return null;
3640
})));
3741
}

src/main/java/kafka/vertx/demo/producer/ProducerServiceImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@
1111
import io.vertx.kafka.client.producer.KafkaProducer;
1212
import io.vertx.kafka.client.producer.KafkaProducerRecord;
1313
import io.vertx.kafka.client.producer.RecordMetadata;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1416

1517
import java.util.Map;
1618

17-
public class ProducerServiceImpl implements ProducerService{
19+
public class ProducerServiceImpl implements ProducerService {
1820

1921
private KafkaProducer<String, String> producer;
2022

23+
private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
24+
2125
public ProducerServiceImpl(Vertx vertx, Map<String, String> config) {
2226
producer = KafkaProducer.create(vertx, config);
23-
producer.exceptionHandler(t -> System.out.printf("KafkaProducer Exception: %s%n", t));
27+
producer.exceptionHandler(t -> logger.error("KafkaProducer Exception", t));
2428
}
2529

2630
@Override

0 commit comments

Comments
 (0)