Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions driver-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TODO: Document these changes.

### Zone-aware workers

To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` config, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values.
To pass a zone/rack ID (e.g. cloud region availability zone name) to the Kafka clients (producer, consumer) client-id configuration, use the system property `zone.id`, and use the template `{zone.id}` on the `client.id` or `client.rack` configs, either on the `commonConfig`, `producerConfig`, or `consumerConfig` Driver values.

When running workers, pass the `zone.id`:

Expand All @@ -28,16 +28,19 @@ Then pass the `client.id` template:
commonConfig: |
bootstrap.servers=localhost:9092
client.id=omb-client_az={zone.id}
client.rack={zone.id}
```

This generates producer and consumer `client.id=omb-client_az=value`
This generates producer and consumer `client.id=omb-client_az=value` and `client.rack=omb-client_az=value`

```yaml
producerConfig: |
client.id=omb-producer_az={zone.id}
client.rack={zone.id}
consumerConfig: |
auto.offset.reset=earliest
client.id=omb-consumer_az={zone.id}
client.rack={zone.id}
```

This generates producer `client.id=omb-producer_az=value` and consumer `client.id=omb-consumer_az=value`
This generates producer `client.id=omb-producer_az=value` and `client.rack=value`; consumer `client.id=omb-consumer_az=value` and `client.rack=value`
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver {
private static final String ZONE_ID_CONFIG = "zone.id";
private static final String ZONE_ID_TEMPLATE = "{zone.id}";
private static final String KAFKA_CLIENT_ID = "client.id";
private static final String KAFKA_CLIENT_RACK = "client.rack";
private Config config;

private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -67,23 +68,15 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
Properties commonProperties = new Properties();
commonProperties.load(new StringReader(config.commonConfig));

if (commonProperties.containsKey(KAFKA_CLIENT_ID)) {
commonProperties.put(
KAFKA_CLIENT_ID,
applyZoneId(
commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
}
applyZoneIdIfNeeded(commonProperties, KAFKA_CLIENT_ID);
applyZoneIdIfNeeded(commonProperties, KAFKA_CLIENT_RACK);

producerProperties = new Properties();
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
producerProperties.load(new StringReader(config.producerConfig));

if (producerProperties.containsKey(KAFKA_CLIENT_ID)) {
producerProperties.put(
KAFKA_CLIENT_ID,
applyZoneId(
producerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
}
applyZoneIdIfNeeded(producerProperties, KAFKA_CLIENT_ID);
applyZoneIdIfNeeded(producerProperties, KAFKA_CLIENT_RACK);

producerProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand All @@ -94,12 +87,8 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
commonProperties.forEach((key, value) -> consumerProperties.put(key, value));
consumerProperties.load(new StringReader(config.consumerConfig));

if (consumerProperties.containsKey(KAFKA_CLIENT_ID)) {
consumerProperties.put(
KAFKA_CLIENT_ID,
applyZoneId(
consumerProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG)));
}
applyZoneIdIfNeeded(consumerProperties, KAFKA_CLIENT_ID);
applyZoneIdIfNeeded(consumerProperties, KAFKA_CLIENT_RACK);

consumerProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand All @@ -112,6 +101,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
admin = AdminClient.create(commonProperties);
}

private static void applyZoneIdIfNeeded(Properties props, String propKey) {
if (props.containsKey(propKey)) {
props.put(
propKey, applyZoneId(props.getProperty(propKey), System.getProperty(ZONE_ID_CONFIG)));
}
}

@Override
public String getTopicNamePrefix() {
return "test-topic";
Expand Down Expand Up @@ -178,8 +174,8 @@ public void close() throws Exception {
admin.close();
}

private static String applyZoneId(String clientId, String zoneId) {
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
private static String applyZoneId(String propValue, String zoneId) {
return propValue.replace(ZONE_ID_TEMPLATE, zoneId);
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,52 @@ void testInitClientIdWithZoneId(
}
}
}

@ParameterizedTest
@CsvSource({
"client.rack=test_az={zone.id},\"\",\"\",test_az=az0,test_az=az0",
"client.rack=test_az={zone.id},"
+ "client.rack=prod_az={zone.id},"
+ "client.rack=cons_az={zone.id},prod_az=az0,cons_az=az0",
"\"\",client.rack=prod_az={zone.id},client.rack=cons_az={zone.id},prod_az=az0,cons_az=az0",
"\"\",client.rack=prod_az={zone.id},\"\",prod_az=az0,",
"\"\",\"\",client.rack=cons_az={zone.id},,cons_az=az0"
})
void testInitClientRackWithZoneId(
String commonConfig,
String producerConfig,
String consumerConfig,
String producerClientRack,
String consumerClientRack)
throws Exception {
// Given these configs
final Path configPath = tempDir.resolve("config");
Config config = new Config();
config.replicationFactor = 1;
config.commonConfig = "bootstrap.servers=localhost:9092\n" + commonConfig;
config.producerConfig = producerConfig;
config.consumerConfig = consumerConfig;
config.topicConfig = "";

// and the system property set for zone id
System.setProperty("zone.id", "az0");

try (KafkaBenchmarkDriver driver = new KafkaBenchmarkDriver()) {
// When initializing kafka driver
Files.write(configPath, KafkaBenchmarkDriver.mapper.writeValueAsBytes(config));
driver.initialize(configPath.toFile(), null);

// Then
if (producerClientRack != null) {
assertThat(driver.producerProperties).containsEntry("client.rack", producerClientRack);
} else {
assertThat(driver.producerProperties).doesNotContainKey("client.rack");
}
if (consumerClientRack != null) {
assertThat(driver.consumerProperties).containsEntry("client.rack", consumerClientRack);
} else {
assertThat(driver.consumerProperties).doesNotContainKey("client.rack");
}
}
}
}