From 874db8a99f7bb69a79c2693f4db10820020de194 Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Date: Sun, 30 Mar 2025 14:08:08 +0300 Subject: [PATCH 1/2] kafka: add client.id parsing per producer/consumer --- driver-kafka/README.md | 32 +++++++++ .../driver/kafka/KafkaBenchmarkDriver.java | 26 +++++-- .../kafka/KafkaBenchmarkDriverTest.java | 72 +++++++++++++++++++ 3 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java diff --git a/driver-kafka/README.md b/driver-kafka/README.md index eef66e161..96b2c4b65 100644 --- a/driver-kafka/README.md +++ b/driver-kafka/README.md @@ -8,3 +8,35 @@ NOTE: This is a slightly modified version with two key differences: - there is a new argument that converts all output result json files into a single csv. TODO: Document these changes. + +## Features + +### Zone-aware workers + +To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values. + +When running workers, pass the `zone.id`: + +```bash +export JVM_OPTS=-Dzone.id=az0 +/opt/benchmark/bin/benchmark-worker +``` + +Then pass the `client.id` template: +```yaml +commonConfig: | + bootstrap.servers=localhost:9092 + client.id=omb-client_az={zone.id} +``` + +This generates producer and consumer `client.id=omb-client_az=value` + +```yaml +producerConfig: | + client.id=omb-producer_az={zone.id} +consumerConfig: | + auto.offset.reset=earliest + client.id=omb-consumer_az={zone.id} +``` + +This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value` diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index d506b5b37..a2244606d 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -53,9 +53,10 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { private List producers = Collections.synchronizedList(new ArrayList<>()); private List consumers = Collections.synchronizedList(new ArrayList<>()); - private Properties topicProperties; - private Properties producerProperties; - private Properties consumerProperties; + // Visible for testing + Properties topicProperties; + Properties producerProperties; + Properties consumerProperties; private AdminClient admin; @@ -76,6 +77,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); + + if (producerProperties.containsKey(KAFKA_CLIENT_ID)) { + producerProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put( @@ -84,6 +93,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I consumerProperties = new Properties(); commonProperties.forEach((key, value) -> consumerProperties.put(key, value)); consumerProperties.load(new StringReader(config.consumerConfig)); + + if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) { + consumerProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + consumerProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put( @@ -165,7 +182,8 @@ private static String applyZoneId(String clientId, String zoneId) { return clientId.replace(ZONE_ID_TEMPLATE, zoneId); } - private static final ObjectMapper mapper = + // Visible for testing + static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } diff --git a/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java new file mode 100644 index 000000000..f6dd568e0 --- /dev/null +++ b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.driver.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class KafkaBenchmarkDriverTest { + @TempDir Path tempDir; + + @ParameterizedTest + @CsvSource({ + "client.id=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0", + "client.id=test_az={zone.id},client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.id=prod_az={zone.id},client.id=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.id=prod_az={zone.id},\"\",prod_az=az0,", + "\"\",\"\",client.id=cons_az={zone.id},,cons_az=az0" + }) + void testInitClientIdWithZoneId( + String commonConfig, + String producerConfig, + String consumerConfig, + String producerClientId, + String consumerClientId) + throws Exception { + // Given these configs + final Path configPath = tempDir.resolve("config"); + Config config = new Config(); + config.replicationFactor = 1; + config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig; + config.producerConfig = producerConfig; + config.consumerConfig = consumerConfig; + config.topicConfig = ""; + + // and the system property set for zone id + System.setProperty("zone.id", "az0"); + + try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) { + // When initializing kafka driver + Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config)); + driver.initialize(configPath.toFile(), null); + + // Then + if (producerClientId != null) { + assertThat(driver.producerProperties).containsEntry("client.id", producerClientId); + } else { + assertThat(driver.producerProperties).doesNotContainKey("client.id"); + } + if (consumerClientId != null) { + assertThat(driver.consumerProperties).containsEntry("client.id", consumerClientId); + } else { + assertThat(driver.consumerProperties).doesNotContainKey("client.id"); + } + } + } +} From cdd41ceeff382371dc5f28102dbca56f076257e8 Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Date: Sun, 30 Mar 2025 14:08:51 +0300 Subject: [PATCH 2/2] kafka: apply zone id to client.rack client.rack is used on features like follower fetching[1] and may land on producer side as well from the KIP-1123[2] proposal. It's valuable to have it as well as for client.id that is not fully supported across kafka implementations. [1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392:+Allow+consumers+to+fetch+from+closest+replica [2]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1123%3A+Rack-aware+partitioning+for+Kafka+Producer --- driver-kafka/README.md | 9 ++-- .../driver/kafka/KafkaBenchmarkDriver.java | 38 ++++++++------- .../kafka/KafkaBenchmarkDriverTest.java | 46 +++++++++++++++++++ 3 files changed, 70 insertions(+), 23 deletions(-) diff --git a/driver-kafka/README.md b/driver-kafka/README.md index 96b2c4b65..e784266f1 100644 --- a/driver-kafka/README.md +++ b/driver-kafka/README.md @@ -13,7 +13,7 @@ TODO: Document these changes. ### Zone-aware workers -To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values. +To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` or `client.rack` configs, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values. When running workers, pass the `zone.id`: @@ -27,16 +27,19 @@ Then pass the `client.id` template: commonConfig: | bootstrap.servers=localhost:9092 client.id=omb-client_az={zone.id} + client.rack=omb-client_az={zone.id} ``` -This generates producer and consumer `client.id=omb-client_az=value` +This generates producer and consumer `client.id=omb-client_az=value` and `client.rack=omb-client_az=value` ```yaml producerConfig: | client.id=omb-producer_az={zone.id} + client.rack=omb-producer_az={zone.id} consumerConfig: | auto.offset.reset=earliest client.id=omb-consumer_az={zone.id} + client.rack=omb-consumer_az={zone.id} ``` -This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value` +This generates producer `client.id=omb-producer_az=value` and `client.rack=omb-producer_az=value`; consumer `client.id=omb-consumer_az=value` and `client.rack=omb-consumer_az=value` diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index a2244606d..08444207c 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -48,6 +48,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { private static final String ZONE_ID_CONFIG = "zone.id"; private static final String ZONE_ID_TEMPLATE = "{zone.id}"; private static final String KAFKA_CLIENT_ID = "client.id"; + private static final String KAFKA_CLIENT_RACK = "client.rack"; private Config config; private List producers = Collections.synchronizedList(new ArrayList<>()); @@ -67,23 +68,15 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I Properties commonProperties = new Properties(); commonProperties.load(new StringReader(config.commonConfig)); - if (commonProperties.containsKey(KAFKA_CLIENT_ID)) { - commonProperties.put( - KAFKA_CLIENT_ID, - applyZoneId( - commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); - } + applyZoneIdIfNeeded(commonProperties, KAFKA_CLIENT_ID); + applyZoneIdIfNeeded(commonProperties, KAFKA_CLIENT_RACK); producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); - if (producerProperties.containsKey(KAFKA_CLIENT_ID)) { - producerProperties.put( - KAFKA_CLIENT_ID, - applyZoneId( - producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); - } + applyZoneIdIfNeeded(producerProperties, KAFKA_CLIENT_ID); + applyZoneIdIfNeeded(producerProperties, KAFKA_CLIENT_RACK); producerProperties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -94,12 +87,8 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I commonProperties.forEach((key, value) -> consumerProperties.put(key, value)); consumerProperties.load(new StringReader(config.consumerConfig)); - if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) { - consumerProperties.put( - KAFKA_CLIENT_ID, - applyZoneId( - consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); - } + applyZoneIdIfNeeded(consumerProperties, KAFKA_CLIENT_ID); + applyZoneIdIfNeeded(consumerProperties, KAFKA_CLIENT_RACK); consumerProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -112,6 +101,15 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I admin = AdminClient.create(commonProperties); } + private static void applyZoneIdIfNeeded(Properties props, String propKey) { + if (props.containsKey(propKey)) { + props.put( + propKey, + applyZoneId( + props.getProperty(propKey), System.getProperty(ZONE_ID_CONFIG))); + } + } + @Override public String getTopicNamePrefix() { return "test-topic"; @@ -178,8 +176,8 @@ public void close() throws Exception { admin.close(); } - private static String applyZoneId(String clientId, String zoneId) { - return clientId.replace(ZONE_ID_TEMPLATE, zoneId); + private static String applyZoneId(String propValue, String zoneId) { + return propValue.replace(ZONE_ID_TEMPLATE, zoneId); } // Visible for testing diff --git a/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java index f6dd568e0..f96d4df0f 100644 --- a/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java +++ b/driver-kafka/src/test/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriverTest.java @@ -69,4 +69,50 @@ void testInitClientIdWithZoneId( } } } + + @ParameterizedTest + @CsvSource({ + "client.rack=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0", + "client.rack=test_az={zone.id},client.rack=prod_az={zone.id},client.rack=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.rack=prod_az={zone.id},client.rack=cons_az={zone.id},prod_az=az0,cons_az=az0", + "\"\",client.rack=prod_az={zone.id},\"\",prod_az=az0,", + "\"\",\"\",client.rack=cons_az={zone.id},,cons_az=az0" + }) + void testInitClientRackWithZoneId( + String commonConfig, + String producerConfig, + String consumerConfig, + String producerClientRack, + String consumerClientRack) + throws Exception { + // Given these configs + final Path configPath = tempDir.resolve("config"); + Config config = new Config(); + config.replicationFactor = 1; + config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig; + config.producerConfig = producerConfig; + config.consumerConfig = consumerConfig; + config.topicConfig = ""; + + // and the system property set for zone id + System.setProperty("zone.id", "az0"); + + try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) { + // When initializing kafka driver + Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config)); + driver.initialize(configPath.toFile(), null); + + // Then + if (producerClientRack != null) { + assertThat(driver.producerProperties).containsEntry("client.rack", producerClientRack); + } else { + assertThat(driver.producerProperties).doesNotContainKey("client.rack"); + } + if (consumerClientRack != null) { + assertThat(driver.consumerProperties).containsEntry("client.rack", consumerClientRack); + } else { + assertThat(driver.consumerProperties).doesNotContainKey("client.rack"); + } + } + } }