|
11 | 11 | import io.vertx.core.AbstractVerticle; |
12 | 12 | import io.vertx.core.Promise; |
13 | 13 | import io.vertx.core.eventbus.EventBus; |
| 14 | +import io.vertx.core.eventbus.MessageConsumer; |
14 | 15 | import io.vertx.core.http.ServerWebSocket; |
15 | 16 | import io.vertx.core.json.JsonObject; |
16 | 17 | import io.vertx.ext.web.Router; |
@@ -90,8 +91,17 @@ private void handleProduceSocket(ServerWebSocket webSocket) { |
90 | 91 | } |
91 | 92 | }); |
92 | 93 |
|
93 | | - eventBus.<JsonObject>consumer(Main.PERIODIC_PRODUCER_BROADCAST, message -> |
| 94 | + MessageConsumer<JsonObject> consumer = eventBus.<JsonObject>consumer(Main.PERIODIC_PRODUCER_BROADCAST, message -> |
94 | 95 | eventBus.send(webSocket.textHandlerID(), message.body().encode())); |
| 96 | + |
| 97 | + webSocket.endHandler(ended -> { |
| 98 | + logger.info("WebSocket closed from {}", webSocket.remoteAddress().host()); |
| 99 | + consumer.unregister(); |
| 100 | + }); |
| 101 | + webSocket.exceptionHandler(err -> { |
| 102 | + logger.error("WebSocket error", err); |
| 103 | + consumer.unregister(); |
| 104 | + }); |
95 | 105 | } |
96 | 106 |
|
97 | 107 | private void handleConsumeSocket(ServerWebSocket webSocket) { |
@@ -126,5 +136,18 @@ private void handleConsumeSocket(ServerWebSocket webSocket) { |
126 | 136 | } |
127 | 137 | }); |
128 | 138 |
|
| 139 | + webSocket.endHandler(done -> { |
| 140 | + logger.info("WebSocket closed from {}", webSocket.remoteAddress().host()); |
| 141 | + kafkaConsumer.close().onFailure(err -> { |
| 142 | + logger.error("Closing Kafka consumer failed", err); |
| 143 | + }); |
| 144 | + }); |
| 145 | + |
| 146 | + webSocket.exceptionHandler(err -> { |
| 147 | + logger.error("WebSocket error", err); |
| 148 | + kafkaConsumer.close().onFailure(kerr -> { |
| 149 | + logger.error("Closing Kafka consumer failed", kerr); |
| 150 | + }); |
| 151 | + }); |
129 | 152 | } |
130 | 153 | } |
0 commit comments