Skip to content

Commit 8d7cb2d

Browse files
Add support for pubsub-sub-bench tool to client runner
Core Implementation: - Added prepare_pubsub_sub_bench_parameters() function with auto-filled connection parameters - Auto-fills: -host, -port, -a (auth), -user (ACL), -json-out-file, -resp, -test-time - Handles cluster mode with -oss-cluster-api-distribute-subscribers flag - Supports both single clientconfig and multiple clientconfigs formats - Updated main runner logic to detect and handle pubsub-sub-bench tool - Updated run_multiple_clients() to support pubsub-sub-bench in multi-client scenarios Features Supported: - Basic connection parameters (host, port, authentication) - RESP protocol version selection (2 or 3) - Cluster mode support - Test time override - JSON output file generation - User-defined arguments from YAML configuration - Unix socket fallback (logs warning, uses host/port) - TLS placeholder (logs warning for future implementation) Testing: - Added 5 comprehensive test cases for pubsub-sub-bench functionality - Tests cover: basic parameters, authentication (password + ACL), cluster mode, unix socket handling - Tests verify proper argument parsing and command generation - All existing tests continue to pass (24/24 passing) Example Configurations: - Single tool: pubsub-sub-bench-subscribe-1K-channels-128B-multiple-subscribers.yml - Mixed workload: pubsub-mixed-workload-memtier-publish-pubsub-subscribe.yml (memtier doing PUBLISH + pubsub-sub-bench doing SUBSCRIBE simultaneously) This enables realistic pub/sub testing scenarios with dedicated subscriber tools while maintaining full compatibility with existing memtier-based workflows.
1 parent 5579e33 commit 8d7cb2d

File tree

5 files changed

+960
-44
lines changed

5 files changed

+960
-44
lines changed

redis_benchmarks_specification/__runner__/runner.py

Lines changed: 166 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,29 @@ def run_single_client(client_config, client_image, client_tool, client_index):
158158
override_test_runs,
159159
unix_socket,
160160
)
161+
elif "pubsub-sub-bench" in client_tool:
162+
(
163+
_,
164+
benchmark_command_str,
165+
arbitrary_command,
166+
) = prepare_pubsub_sub_bench_parameters(
167+
client_config,
168+
client_tool,
169+
port,
170+
host,
171+
password,
172+
local_benchmark_output_filename,
173+
oss_cluster_api_enabled,
174+
tls_enabled,
175+
tls_skip_verify,
176+
test_tls_cert,
177+
test_tls_key,
178+
test_tls_cacert,
179+
resp_version,
180+
override_memtier_test_time,
181+
unix_socket,
182+
None, # username
183+
)
161184
else:
162185
# Handle other benchmark tools
163186
(
@@ -175,12 +198,11 @@ def run_single_client(client_config, client_image, client_tool, client_index):
175198
)
176199

177200
if args.benchmark_local_install:
178-
logging.info(f"Running client {client_index} benchmark outside of docker")
201+
logging.info(
202+
f"Running client {client_index} benchmark outside of docker"
203+
)
179204
benchmark_command_str = (
180-
"taskset -c "
181-
+ client_cpuset_cpus
182-
+ " "
183-
+ benchmark_command_str
205+
"taskset -c " + client_cpuset_cpus + " " + benchmark_command_str
184206
)
185207
stream = os.popen(benchmark_command_str)
186208
client_stdout = stream.read()
@@ -206,29 +228,34 @@ def run_single_client(client_config, client_image, client_tool, client_index):
206228
cpuset_cpus=client_cpuset_cpus,
207229
)
208230

209-
results.append({
210-
"client_index": client_index,
211-
"stdout": client_stdout,
212-
"config": client_config,
213-
"tool": client_tool,
214-
"image": client_image
215-
})
231+
results.append(
232+
{
233+
"client_index": client_index,
234+
"stdout": client_stdout,
235+
"config": client_config,
236+
"tool": client_tool,
237+
"image": client_image,
238+
}
239+
)
216240

217241
except Exception as e:
218242
logging.error(f"Error running client {client_index}: {e}")
219-
results.append({
220-
"client_index": client_index,
221-
"error": str(e),
222-
"config": client_config,
223-
"tool": client_tool,
224-
"image": client_image
225-
})
243+
results.append(
244+
{
245+
"client_index": client_index,
246+
"error": str(e),
247+
"config": client_config,
248+
"tool": client_tool,
249+
"image": client_image,
250+
}
251+
)
226252

227253
# Start all client threads
228-
for i, (client_config, client_image, client_tool) in enumerate(zip(client_configs, client_images, client_tools)):
254+
for i, (client_config, client_image, client_tool) in enumerate(
255+
zip(client_configs, client_images, client_tools)
256+
):
229257
thread = threading.Thread(
230-
target=run_single_client,
231-
args=(client_config, client_image, client_tool, i)
258+
target=run_single_client, args=(client_config, client_image, client_tool, i)
232259
)
233260
threads.append(thread)
234261
thread.start()
@@ -511,6 +538,80 @@ def prepare_memtier_benchmark_parameters(
511538
return None, benchmark_command_str, arbitrary_command
512539

513540

541+
def prepare_pubsub_sub_bench_parameters(
542+
clientconfig,
543+
full_benchmark_path,
544+
port,
545+
server,
546+
password,
547+
local_benchmark_output_filename,
548+
oss_cluster_api_enabled=False,
549+
tls_enabled=False,
550+
tls_skip_verify=False,
551+
tls_cert=None,
552+
tls_key=None,
553+
tls_cacert=None,
554+
resp_version=None,
555+
override_test_time=0,
556+
unix_socket="",
557+
username=None,
558+
):
559+
"""
560+
Prepare pubsub-sub-bench command parameters
561+
"""
562+
arbitrary_command = False
563+
benchmark_command = [
564+
full_benchmark_path,
565+
"-json-out-file",
566+
local_benchmark_output_filename,
567+
]
568+
569+
# Connection parameters
570+
if unix_socket != "":
571+
# pubsub-sub-bench doesn't support unix sockets directly
572+
# Fall back to host/port
573+
logging.warning("pubsub-sub-bench doesn't support unix sockets, using host/port")
574+
benchmark_command.extend(["-host", server, "-port", str(port)])
575+
else:
576+
benchmark_command.extend(["-host", server, "-port", str(port)])
577+
578+
# Authentication
579+
if username and password:
580+
# ACL style authentication
581+
benchmark_command.extend(["-user", username, "-a", password])
582+
elif password:
583+
# Password-only authentication
584+
benchmark_command.extend(["-a", password])
585+
586+
# TLS support (if the tool supports it in future versions)
587+
if tls_enabled:
588+
logging.warning("pubsub-sub-bench TLS support not implemented yet")
589+
590+
# RESP version
591+
if resp_version:
592+
if resp_version == "3":
593+
benchmark_command.extend(["-resp", "3"])
594+
elif resp_version == "2":
595+
benchmark_command.extend(["-resp", "2"])
596+
597+
# Cluster mode
598+
if oss_cluster_api_enabled:
599+
benchmark_command.append("-oss-cluster-api-distribute-subscribers")
600+
601+
# Test time override
602+
if override_test_time and override_test_time > 0:
603+
benchmark_command.extend(["-test-time", str(override_test_time)])
604+
605+
logging.info(f"Preparing pubsub-sub-bench parameters: {benchmark_command}")
606+
benchmark_command_str = " ".join(benchmark_command)
607+
608+
# Append user-defined arguments from YAML
609+
if "arguments" in clientconfig:
610+
benchmark_command_str = benchmark_command_str + " " + clientconfig["arguments"]
611+
612+
return benchmark_command, benchmark_command_str, arbitrary_command
613+
614+
514615
def process_self_contained_coordinator_stream(
515616
args,
516617
datasink_push_results_redistimeseries,
@@ -895,27 +996,35 @@ def delete_temporary_files(
895996
benchmark_command_str = "multiple_clients" # Placeholder
896997
else:
897998
# Legacy single client preparation
898-
if "memtier_benchmark" not in benchmark_tool:
899-
# prepare the benchmark command
999+
if "memtier_benchmark" in benchmark_tool:
9001000
(
901-
benchmark_command,
1001+
_,
9021002
benchmark_command_str,
903-
) = prepare_benchmark_parameters(
904-
benchmark_config,
1003+
arbitrary_command,
1004+
) = prepare_memtier_benchmark_parameters(
1005+
benchmark_config["clientconfig"],
9051006
full_benchmark_path,
9061007
port,
9071008
host,
1009+
password,
9081010
local_benchmark_output_filename,
909-
False,
910-
benchmark_tool_workdir,
911-
False,
1011+
oss_cluster_api_enabled,
1012+
tls_enabled,
1013+
tls_skip_verify,
1014+
test_tls_cert,
1015+
test_tls_key,
1016+
test_tls_cacert,
1017+
resp_version,
1018+
override_memtier_test_time,
1019+
override_test_runs,
1020+
unix_socket,
9121021
)
913-
else:
1022+
elif "pubsub-sub-bench" in benchmark_tool:
9141023
(
9151024
_,
9161025
benchmark_command_str,
9171026
arbitrary_command,
918-
) = prepare_memtier_benchmark_parameters(
1027+
) = prepare_pubsub_sub_bench_parameters(
9191028
benchmark_config["clientconfig"],
9201029
full_benchmark_path,
9211030
port,
@@ -930,8 +1039,23 @@ def delete_temporary_files(
9301039
test_tls_cacert,
9311040
resp_version,
9321041
override_memtier_test_time,
933-
override_test_runs,
9341042
unix_socket,
1043+
None, # username - could be extracted from config if needed
1044+
)
1045+
else:
1046+
# prepare the benchmark command for other tools
1047+
(
1048+
benchmark_command,
1049+
benchmark_command_str,
1050+
) = prepare_benchmark_parameters(
1051+
benchmark_config,
1052+
full_benchmark_path,
1053+
port,
1054+
host,
1055+
local_benchmark_output_filename,
1056+
False,
1057+
benchmark_tool_workdir,
1058+
False,
9351059
)
9361060

9371061
if (
@@ -954,7 +1078,9 @@ def delete_temporary_files(
9541078
is_multiple_clients = len(client_configs) > 1
9551079

9561080
if is_multiple_clients:
957-
logging.info(f"Running test with {len(client_configs)} client configurations")
1081+
logging.info(
1082+
f"Running test with {len(client_configs)} client configurations"
1083+
)
9581084
else:
9591085
# Legacy single client mode
9601086
client_container_image = extract_client_container_image(
@@ -983,7 +1109,9 @@ def delete_temporary_files(
9831109

9841110
if is_multiple_clients:
9851111
# Run multiple client configurations
986-
logging.info("Running multiple client configurations simultaneously")
1112+
logging.info(
1113+
"Running multiple client configurations simultaneously"
1114+
)
9871115
client_container_stdout, client_results = run_multiple_clients(
9881116
benchmark_config,
9891117
docker_client,
@@ -1006,7 +1134,9 @@ def delete_temporary_files(
10061134
unix_socket,
10071135
args,
10081136
)
1009-
logging.info(f"Completed {len(client_results)} client configurations")
1137+
logging.info(
1138+
f"Completed {len(client_results)} client configurations"
1139+
)
10101140
else:
10111141
# Legacy single client execution
10121142
if args.benchmark_local_install:
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
version: 0.4
2+
name: pubsub-mixed-workload-memtier-publish-pubsub-subscribe
3+
description: Mixed workload with memtier publishing messages and pubsub-sub-bench subscribing to channels simultaneously.
4+
dbconfig:
5+
configuration-parameters:
6+
save: '""'
7+
resources:
8+
requests:
9+
memory: 2g
10+
tested-groups:
11+
- pubsub
12+
tested-commands:
13+
- publish
14+
- subscribe
15+
redis-topologies:
16+
- oss-standalone
17+
build-variants:
18+
- gcc:8.5.0-amd64-debian-buster-default
19+
- dockerhub
20+
clientconfigs:
21+
- run_image: redislabs/memtier_benchmark:edge
22+
tool: memtier_benchmark
23+
arguments: --test-time 60 --pipeline 1 -d 128 --key-maximum 100 --command "PUBLISH channel-__key__ __data__" --command-key-pattern="R" -c 20 -t 2 --hide-histogram
24+
resources:
25+
requests:
26+
cpus: '2'
27+
memory: 1g
28+
- run_image: filipe958/pubsub-sub-bench:latest
29+
tool: pubsub-sub-bench
30+
arguments: -clients 5 -channel-minimum 1 -channel-maximum 100 -subscriber-prefix "channel-" -mode subscribe -test-time 60 -subscribers-per-channel 3 -verbose
31+
resources:
32+
requests:
33+
cpus: '2'
34+
memory: 1g
35+
priority: 23
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
version: 0.4
2+
name: pubsub-sub-bench-subscribe-1K-channels-128B-multiple-subscribers
3+
description: Runs pubsub-sub-bench for subscribing to 1000 channels with 128B messages and multiple subscribers per channel.
4+
dbconfig:
5+
configuration-parameters:
6+
save: '""'
7+
resources:
8+
requests:
9+
memory: 1g
10+
tested-groups:
11+
- pubsub
12+
tested-commands:
13+
- subscribe
14+
redis-topologies:
15+
- oss-standalone
16+
build-variants:
17+
- gcc:8.5.0-amd64-debian-buster-default
18+
- dockerhub
19+
clientconfig:
20+
run_image: filipe958/pubsub-sub-bench:latest
21+
tool: pubsub-sub-bench
22+
arguments: -clients 10 -messages 1000 -subscribers-per-channel 2 -channel-minimum 1 -channel-maximum 1000 -test-time 30 -data-size 128 -subscriber-prefix "bench-channel-" -mode subscribe -verbose
23+
resources:
24+
requests:
25+
cpus: '2'
26+
memory: 1g
27+
priority: 22

utils/generate_memory_requirements.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,14 @@ def main():
218218
# Single client config
219219
arguments = config["clientconfig"]["arguments"]
220220
else:
221-
print(f"Skipping {yaml_file_path}: Missing client configuration.")
221+
print(
222+
f"Skipping {yaml_file_path}: Missing client configuration."
223+
)
222224
continue
223225
except (KeyError, IndexError):
224-
print(f"Skipping {yaml_file_path}: Invalid client configuration format.")
226+
print(
227+
f"Skipping {yaml_file_path}: Invalid client configuration format."
228+
)
225229
continue
226230

227231
if not isinstance(arguments, str):

0 commit comments

Comments
 (0)