From d96b78295282e989bcc1ef3fe5e3425663ebad06 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 4 Nov 2025 12:39:45 +0000 Subject: [PATCH 01/17] Track RTT in nanos. --- subscriber.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/subscriber.go b/subscriber.go index 2eabefd..a4b39fa 100644 --- a/subscriber.go +++ b/subscriber.go @@ -31,7 +31,7 @@ const ( redisTLSCert = "tls_cert" redisTLSKey = "tls_key" redisTLSInsecureSkipVerify = "tls_insecure_skip_verify" - timestampSize = 13 // UnixMilli() produces 13-digit number + timestampSize = 19 // UnixNano() produces 19-digit number ) const Inf = rate.Limit(math.MaxFloat64) @@ -75,7 +75,7 @@ func publisherRoutine(clientName string, channels []string, mode string, measure // Pre-generate payload once per goroutine // For RTT mode: we'll use a template with padding that we'll prepend timestamp to - // Timestamp format: 13 bytes (e.g., "1762249648882") + // Timestamp format: 19 bytes (e.g., "1762259663660769761") // Format: " " to reach dataSize var paddingPayload string if measureRTT && dataSize > timestampSize+1 { @@ -101,7 +101,7 @@ func publisherRoutine(clientName string, channels []string, mode string, measure time.Sleep(r.Delay()) } if measureRTT { - now := time.Now().UnixMilli() + now := time.Now().UnixNano() if dataSize > timestampSize+1 { // Format: " " msg = strconv.FormatInt(int64(now), 10) + " " + paddingPayload @@ -215,9 +215,9 @@ func subscriberRoutine(clientName, mode string, channels []string, verbose bool, log.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) } if measureRTT { - now := time.Now().UnixMicro() + now := time.Now().UnixNano() // Extract timestamp from payload (format: " " or just "") - // Timestamp is always 13 bytes (UnixMilli) + // Timestamp is always 19 bytes (UnixNano) timestampStr := msg.Payload if len(msg.Payload) > timestampSize { timestampStr = msg.Payload[:timestampSize] @@ -226,7 +226,7 @@ func subscriberRoutine(clientName, mode string, channels []string, verbose bool, rtt := now - ts rttLatencyChannel <- rtt if verbose { - log.Printf("RTT measured: %d ms\n", rtt/1000) + log.Printf("RTT measured: %d ns\n", rtt) } } else { log.Printf("Invalid timestamp in message: %s, err: %v\n", timestampStr, err) @@ -244,7 +244,7 @@ func main() { rps := flag.Int64("rps", 0, "Max rps for publisher mode. If 0 no limit is applied and the DB is stressed up to maximum.") rpsburst := flag.Int64("rps-burst", 0, "Max rps burst for publisher mode. If 0 the allowed burst will be the amount of clients.") password := flag.String("a", "", "Password for Redis Auth.") - dataSize := flag.Int("data-size", 128, "Payload size in bytes. In RTT mode, timestamp (13 bytes) + space + padding to reach this size.") + dataSize := flag.Int("data-size", 128, "Payload size in bytes. In RTT mode, timestamp (19 bytes) + space + padding to reach this size.") mode := flag.String("mode", "subscribe", "Mode: 'subscribe', 'ssubscribe', 'publish', or 'spublish'.") username := flag.String("user", "", "Used to send ACL style 'AUTH username pass'. Needs -a.") subscribers_placement := flag.String("subscribers-placement-per-channel", "dense", "(dense,sparse) dense - Place all subscribers to channel in a specific shard. sparse- spread the subscribers across as many shards possible, in a round-robin manner.") From 70bb8935a544711bbdec5af33a10d8317f5f7180 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 4 Nov 2025 12:57:07 +0000 Subject: [PATCH 02/17] Adjusted histogram conversion. --- subscriber.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/subscriber.go b/subscriber.go index a4b39fa..9459498 100644 --- a/subscriber.go +++ b/subscriber.go @@ -568,11 +568,11 @@ func main() { p95 := hist.ValueAtQuantile(95.0) p99 := hist.ValueAtQuantile(99.0) p999 := hist.ValueAtQuantile(99.9) - fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000.0) - fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000.0) - fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000.0) - fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000.0) - fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000.0) + fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000000.0) + fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000000.0) + fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000000.0) + fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000000.0) + fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000000.0) } else { } fmt.Fprintf(w, "#################################################\n") @@ -745,7 +745,7 @@ func updateCLI( for _, v := range tickRttValues { total += v } - avgRTTms = float64(total) / float64(len(tickRttValues)) / 1000.0 + avgRTTms = float64(total) / float64(len(tickRttValues)) / 1000000.0 tickRttValues = tickRttValues[:0] fmt.Fprintf(w, "%.3f\t", avgRTTms) } else { From 7e6396957a67f4e091ff0d58351a3d0440233173 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 4 Nov 2025 16:07:03 +0000 Subject: [PATCH 03/17] Include publish latency and avg. sub count per channel --- subscriber.go | 171 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 39 deletions(-) diff --git a/subscriber.go b/subscriber.go index 9459498..58ee779 100644 --- a/subscriber.go +++ b/subscriber.go @@ -58,7 +58,7 @@ type testResult struct { Addresses []string `json:"Addresses"` } -func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client, useLimiter bool, rateLimiter *rate.Limiter) { +func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client, useLimiter bool, rateLimiter *rate.Limiter, publishLatencyChannel chan int64, subscriberCountChannel chan int64) { defer wg.Done() if verbose { @@ -112,15 +112,28 @@ func publisherRoutine(clientName string, channels []string, mode string, measure } else { msg = paddingPayload } + + // Measure publish latency + startPublish := time.Now().UnixNano() + var subscriberCount int64 var err error switch mode { case "spublish": - err = client.SPublish(ctx, ch, msg).Err() + subscriberCount, err = client.SPublish(ctx, ch, msg).Result() default: - err = client.Publish(ctx, ch, msg).Err() + subscriberCount, err = client.Publish(ctx, ch, msg).Result() } + publishLatency := time.Now().UnixNano() - startPublish + if err != nil { log.Printf("Error publishing to channel %s: %v", ch, err) + } else { + // Send metrics to channels + publishLatencyChannel <- publishLatency + subscriberCountChannel <- subscriberCount + if verbose { + log.Printf("Published to %s: %d subscribers, latency: %d ns", ch, subscriberCount, publishLatency) + } } atomic.AddUint64(&totalMessages, 1) } @@ -414,7 +427,9 @@ func main() { } pprof.StartCPUProfile(f) } - rttLatencyChannel := make(chan int64, 100000) // Channel for RTT measurements. buffer of 100K messages to process + rttLatencyChannel := make(chan int64, 1000000) // Channel for RTT measurements. buffer of 1M messages to process + publishLatencyChannel := make(chan int64, 1000000) // Channel for publish latency measurements + subscriberCountChannel := make(chan int64, 1000000) // Channel for subscriber count tracking totalCreatedClients := 0 if strings.Contains(*mode, "publish") { var requestRate = Inf @@ -472,7 +487,7 @@ func main() { } wg.Add(1) - go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client, useRateLimiter, rateLimiter) + go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client, useRateLimiter, rateLimiter, publishLatencyChannel, subscriberCountChannel) atomic.AddInt64(&totalPublishers, 1) atomic.AddUint64(&totalConnects, 1) } @@ -548,7 +563,7 @@ func main() { w := new(tabwriter.Writer) tick := time.NewTicker(time.Duration(*client_update_tick) * time.Second) - closed, start_time, duration, totalMessages, messageRateTs, rttValues := updateCLI(tick, c, total_messages, w, *test_time, *measureRTT, *mode, rttLatencyChannel, *verbose) + closed, start_time, duration, totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues := updateCLI(tick, c, total_messages, w, *test_time, *measureRTT, *mode, rttLatencyChannel, publishLatencyChannel, subscriberCountChannel, *verbose) messageRate := float64(totalMessages) / float64(duration.Seconds()) if *cpuprofile != "" { @@ -558,22 +573,60 @@ func main() { fmt.Fprintf(w, "Mode: %s\n", *mode) fmt.Fprintf(w, "Total Duration: %f Seconds\n", duration.Seconds()) fmt.Fprintf(w, "Message Rate: %f msg/sec\n", messageRate) - if *measureRTT && (*mode != "publish" && *mode != "spublish") { - hist := hdrhistogram.New(1, 10_000_000, 3) // 1us to 10s, 3 sig digits - for _, rtt := range rttValues { - _ = hist.RecordValue(rtt) + + if strings.Contains(*mode, "publish") { + // Publisher mode: show publish latency and subscriber count stats + if len(publishLatencyValues) > 0 { + hist := hdrhistogram.New(1, 10_000_000, 3) // 1ns to 10s, 3 sig digits + for _, latency := range publishLatencyValues { + _ = hist.RecordValue(latency) + } + avg := hist.Mean() + p50 := hist.ValueAtQuantile(50.0) + p95 := hist.ValueAtQuantile(95.0) + p99 := hist.ValueAtQuantile(99.0) + p999 := hist.ValueAtQuantile(99.9) + fmt.Fprintf(w, "Avg Publish Latency %.3f ms\n", avg/1000000.0) + fmt.Fprintf(w, "P50 Publish Latency %.3f ms\n", float64(p50)/1000000.0) + fmt.Fprintf(w, "P95 Publish Latency %.3f ms\n", float64(p95)/1000000.0) + fmt.Fprintf(w, "P99 Publish Latency %.3f ms\n", float64(p99)/1000000.0) + fmt.Fprintf(w, "P999 Publish Latency %.3f ms\n", float64(p999)/1000000.0) + } + + if len(subscriberCountValues) > 0 { + hist := hdrhistogram.New(0, 1_000_000, 3) // 0 to 1M subscribers, 3 sig digits + for _, count := range subscriberCountValues { + _ = hist.RecordValue(count) + } + avg := hist.Mean() + p50 := hist.ValueAtQuantile(50.0) + p95 := hist.ValueAtQuantile(95.0) + p99 := hist.ValueAtQuantile(99.0) + p999 := hist.ValueAtQuantile(99.9) + fmt.Fprintf(w, "Avg Subscribers %.1f (per-node in cluster mode)\n", avg) + fmt.Fprintf(w, "P50 Subscribers %d\n", p50) + fmt.Fprintf(w, "P95 Subscribers %d\n", p95) + fmt.Fprintf(w, "P99 Subscribers %d\n", p99) + fmt.Fprintf(w, "P999 Subscribers %d\n", p999) + } + } else if *measureRTT { + // Subscriber mode with RTT measurement + if len(rttValues) > 0 { + hist := hdrhistogram.New(1, 10_000_000, 3) // 1ns to 10s, 3 sig digits + for _, rtt := range rttValues { + _ = hist.RecordValue(rtt) + } + avg := hist.Mean() + p50 := hist.ValueAtQuantile(50.0) + p95 := hist.ValueAtQuantile(95.0) + p99 := hist.ValueAtQuantile(99.0) + p999 := hist.ValueAtQuantile(99.9) + fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000000.0) + fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000000.0) + fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000000.0) + fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000000.0) + fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000000.0) } - avg := hist.Mean() - p50 := hist.ValueAtQuantile(50.0) - p95 := hist.ValueAtQuantile(95.0) - p99 := hist.ValueAtQuantile(99.0) - p999 := hist.ValueAtQuantile(99.9) - fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000000.0) - fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000000.0) - fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000000.0) - fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000000.0) - fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000000.0) - } else { } fmt.Fprintf(w, "#################################################\n") fmt.Fprint(w, "\r\n") @@ -656,8 +709,10 @@ func updateCLI( measureRTT bool, mode string, rttLatencyChannel chan int64, + publishLatencyChannel chan int64, + subscriberCountChannel chan int64, verbose bool, -) (bool, time.Time, time.Duration, uint64, []float64, []int64) { +) (bool, time.Time, time.Duration, uint64, []float64, []int64, []int64, []int64) { start := time.Now() prevTime := time.Now() @@ -666,27 +721,28 @@ func updateCLI( messageRateTs := []float64{} tickRttValues := []int64{} rttValues := []int64{} + tickPublishLatencyValues := []int64{} + publishLatencyValues := []int64{} + tickSubscriberCountValues := []int64{} + subscriberCountValues := []int64{} w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight) // Header - if measureRTT { - fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t") + fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t") - if strings.Contains(mode, "subscribe") { - fmt.Fprint(w, "Active subscriptions\t") - } else { - fmt.Fprint(w, "Active publishers\t") + if strings.Contains(mode, "subscribe") { + fmt.Fprint(w, "Active subscriptions\t") + if measureRTT { + fmt.Fprint(w, "Avg RTT (ms)\t") } - fmt.Fprint(w, "Avg RTT (ms)\t\n") } else { - fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t") - if strings.Contains(mode, "subscribe") { - fmt.Fprint(w, "Active subscriptions\t\n") - } else { - fmt.Fprint(w, "Active publishers\t\n") - } + // Publisher mode + fmt.Fprint(w, "Active publishers\t") + fmt.Fprint(w, "Pub Latency (ms)\t") + fmt.Fprint(w, "Avg Subs per channel per node\t") } + fmt.Fprint(w, "\n") w.Flush() // Main loop @@ -696,6 +752,14 @@ func updateCLI( rttValues = append(rttValues, rtt) tickRttValues = append(tickRttValues, rtt) + case publishLatency := <-publishLatencyChannel: + publishLatencyValues = append(publishLatencyValues, publishLatency) + tickPublishLatencyValues = append(tickPublishLatencyValues, publishLatency) + + case subscriberCount := <-subscriberCountChannel: + subscriberCountValues = append(subscriberCountValues, subscriberCount) + tickSubscriberCountValues = append(tickSubscriberCountValues, subscriberCount) + case <-tick.C: now := time.Now() took := now.Sub(prevTime) @@ -725,7 +789,7 @@ func updateCLI( if verbose { fmt.Printf("[DEBUG] Test time reached! Stopping after %.2f seconds\n", elapsed.Seconds()) } - return true, start, time.Since(start), totalMessages, messageRateTs, rttValues + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues } } @@ -738,7 +802,36 @@ func updateCLI( fmt.Fprintf(w, "%d\t", atomic.LoadInt64(&totalPublishers)) } - if measureRTT { + // For publisher mode, show publish latency instead of RTT + if strings.Contains(mode, "publish") { + var avgPublishLatencyMs float64 + if len(tickPublishLatencyValues) > 0 { + var total int64 + for _, v := range tickPublishLatencyValues { + total += v + } + avgPublishLatencyMs = float64(total) / float64(len(tickPublishLatencyValues)) / 1000000.0 + tickPublishLatencyValues = tickPublishLatencyValues[:0] + fmt.Fprintf(w, "%.3f\t", avgPublishLatencyMs) + } else { + fmt.Fprintf(w, "--\t") + } + + // Show average subscriber count + var avgSubscriberCount float64 + if len(tickSubscriberCountValues) > 0 { + var total int64 + for _, v := range tickSubscriberCountValues { + total += v + } + avgSubscriberCount = float64(total) / float64(len(tickSubscriberCountValues)) + tickSubscriberCountValues = tickSubscriberCountValues[:0] + fmt.Fprintf(w, "%.1f\t", avgSubscriberCount) + } else { + fmt.Fprintf(w, "--\t") + } + } else if measureRTT { + // For subscriber mode with RTT measurement var avgRTTms float64 if len(tickRttValues) > 0 { var total int64 @@ -757,12 +850,12 @@ func updateCLI( w.Flush() if message_limit > 0 && totalMessages >= uint64(message_limit) { - return true, start, time.Since(start), totalMessages, messageRateTs, rttValues + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues } case <-c: fmt.Println("received Ctrl-c - shutting down") - return true, start, time.Since(start), totalMessages, messageRateTs, rttValues + return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues } } } From a1ea9e4d1f91265642b36482f94a8a8c4e47d5d8 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 4 Nov 2025 16:22:10 +0000 Subject: [PATCH 04/17] Updated go-redis to 9.16.0 (https://github.com/redis/go-redis/releases/tag/v9.16.0) --- README.md | 2 +- go.mod | 8 ++++---- go.sum | 20 ++++++++++---------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 9be80f5..62f8f0c 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ Usage of ./pubsub-sub-bench: -cpuprofile string write cpu profile to file -data-size int - Payload size in bytes. In RTT mode, timestamp (13 bytes) + space + padding to reach this size. (default 128) + Payload size in bytes. In RTT mode, timestamp (19 bytes) + space + padding to reach this size. (default 128) -host string redis host. (default "127.0.0.1") -json-out-file string diff --git a/go.mod b/go.mod index 7be750d..2e5b4ea 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,16 @@ module github.com/RedisLabs/pubsub-sub-bench -go 1.23.0 +go 1.24.0 toolchain go1.24.1 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 - github.com/redis/go-redis/v9 v9.0.5 - golang.org/x/time v0.11.0 + github.com/redis/go-redis/v9 v9.16.0 + golang.org/x/time v0.14.0 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go.sum b/go.sum index b1b423a..e7cdf66 100644 --- a/go.sum +++ b/go.sum @@ -3,12 +3,12 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= -github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= -github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= -github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -27,8 +27,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= -github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= +github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERSEP4= +github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -52,8 +52,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= From 43403d8b1a792747a8f655728536a633ee2f7495 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 4 Nov 2025 17:49:39 +0000 Subject: [PATCH 05/17] Handling errors on pubsub --- .gitignore | 2 + Makefile | 25 ++- subscriber.go | 30 ++- subscriber_test.go | 444 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 494 insertions(+), 7 deletions(-) create mode 100644 subscriber_test.go diff --git a/.gitignore b/.gitignore index 6d1b19f..1fee62f 100644 --- a/.gitignore +++ b/.gitignore @@ -87,6 +87,8 @@ Thumbs.db # Coverage Results # #################### coverage.txt +.coverdata/ +pubsub-sub-bench-test # Profiler Results # #################### diff --git a/Makefile b/Makefile index 628f2fd..e85c9cc 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,11 @@ build-race: $(GOBUILDRACE) \ -ldflags=$(LDFLAGS) . +build-cover: + @echo "Building binary with coverage instrumentation..." + $(GOBUILD) -cover \ + -ldflags=$(LDFLAGS) . + checkfmt: @echo 'Checking gofmt';\ bash -c "diff -u <(echo -n) <(go fmt .)";\ @@ -52,9 +57,21 @@ fmt: get: $(GOGET) -t -v ./... -test: get +test: get build-cover $(GOFMT) ./... - $(GOTEST) -race -covermode=atomic ./... + @rm -rf .coverdata + @mkdir -p .coverdata + $(GOTEST) -v -race -covermode=atomic ./... -coverage: get test - $(GOTEST) -race -coverprofile=coverage.txt -covermode=atomic . +coverage: get build-cover + $(GOFMT) ./... + @rm -rf .coverdata + @mkdir -p .coverdata + $(GOTEST) -v -race -covermode=atomic . + @if [ -d .coverdata ] && [ -n "$$(ls -A .coverdata 2>/dev/null)" ]; then \ + echo "Converting coverage data..."; \ + go tool covdata textfmt -i=.coverdata -o coverage.txt; \ + else \ + echo "No coverage data found, creating empty coverage file"; \ + touch coverage.txt; \ + fi diff --git a/subscriber.go b/subscriber.go index 58ee779..fdc376a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -126,7 +126,8 @@ func publisherRoutine(clientName string, channels []string, mode string, measure publishLatency := time.Now().UnixNano() - startPublish if err != nil { - log.Printf("Error publishing to channel %s: %v", ch, err) + log.Printf("Publisher %s: error publishing to channel %s: %v", clientName, ch, err) + // Don't send metrics on error, but still count the message attempt } else { // Send metrics to channels publishLatencyChannel <- publishLatency @@ -218,11 +219,34 @@ func subscriberRoutine(clientName, mode string, channels []string, verbose bool, // Handle messages msg, err := pubsub.ReceiveMessage(ctx) if err != nil { - // Handle Redis connection errors, e.g., reconnect immediately + // Handle Redis connection errors if err == redis.Nil || err == context.DeadlineExceeded || err == context.Canceled { continue } - panic(err) + // Connection error (EOF, network error, etc.) - attempt to reconnect + log.Printf("Subscriber %s: connection error: %v - attempting to reconnect\n", clientName, err) + + // Close the bad connection + if pubsub != nil { + pubsub.Close() + atomic.AddInt64(&totalSubscribedChannels, int64(-len(channels))) + } + + // Wait a bit before reconnecting + time.Sleep(100 * time.Millisecond) + + // Resubscribe + switch mode { + case "ssubscribe": + pubsub = client.SSubscribe(ctx, channels...) + default: + pubsub = client.Subscribe(ctx, channels...) + } + atomic.AddInt64(&totalSubscribedChannels, int64(len(channels))) + atomic.AddUint64(&totalConnects, 1) + + log.Printf("Subscriber %s: reconnected successfully\n", clientName) + continue } if verbose { log.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload)) diff --git a/subscriber_test.go b/subscriber_test.go new file mode 100644 index 0000000..477fc50 --- /dev/null +++ b/subscriber_test.go @@ -0,0 +1,444 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "syscall" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// TestMain sets up the test environment +func TestMain(m *testing.M) { + // Create coverage directory + coverDir := ".coverdata" + os.MkdirAll(coverDir, 0755) + + // Check if binary exists (should be built by make) + if _, err := os.Stat("./pubsub-sub-bench"); err != nil { + fmt.Fprintf(os.Stderr, "Binary ./pubsub-sub-bench not found. Run 'make build' first.\n") + os.Exit(1) + } + + // Run tests + exitCode := m.Run() + + os.Exit(exitCode) +} + +func getBinaryPath() string { + // Use the binary built by make + return "./pubsub-sub-bench" +} + +func getTestConnectionDetails() (string, string) { + value, exists := os.LookupEnv("REDIS_TEST_HOST") + host := "127.0.0.1" + port := "6379" + password := "" + valuePassword, existsPassword := os.LookupEnv("REDIS_TEST_PASSWORD") + if exists && value != "" { + host = value + } + valuePort, existsPort := os.LookupEnv("REDIS_TEST_PORT") + if existsPort && valuePort != "" { + port = valuePort + } + if existsPassword && valuePassword != "" { + password = valuePassword + } + return host + ":" + port, password +} + +func TestSubscriberMode(t *testing.T) { + var tests = []struct { + name string + wantExitCode int + args []string + timeout time.Duration + }{ + { + "simple subscribe", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "subscribe", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + }, + 2 * time.Second, // Just verify it can connect and subscribe + }, + { + "ssubscribe mode", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "ssubscribe", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + }, + 2 * time.Second, + }, + { + "subscribe with RTT", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "subscribe", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--measure-rtt-latency", + }, + 2 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(getBinaryPath(), tt.args...) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, "GOCOVERDIR=.coverdata") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + + // Start the command + err := cmd.Start() + if err != nil { + t.Fatalf("Failed to start command: %v", err) + } + + // Wait for timeout, then kill + time.Sleep(tt.timeout) + cmd.Process.Signal(os.Interrupt) + + // Wait for process to finish + err = cmd.Wait() + exitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } + } + + if exitCode != tt.wantExitCode { + t.Errorf("got exit code = %v, want %v\nOutput: %s", exitCode, tt.wantExitCode, out.String()) + } + }) + } +} + +func TestPublisherMode(t *testing.T) { + hostPort, password := getTestConnectionDetails() + + // Create a Redis client for verification + client := redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: 0, + }) + defer client.Close() + + ctx := context.Background() + + // Test connection + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available at %s: %v", hostPort, err) + } + + var tests = []struct { + name string + wantExitCode int + args []string + }{ + { + "simple publish", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "publish", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "1", + "--data-size", "128", + }, + }, + { + "publish with rate limit", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "publish", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "1", + "--rps", "100", + "--data-size", "256", + }, + }, + { + "publish with RTT measurement", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "publish", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "1", + "--measure-rtt-latency", + "--data-size", "512", + }, + }, + { + "spublish mode", + 0, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "spublish", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "1", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(getBinaryPath(), tt.args...) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, "GOCOVERDIR=.coverdata") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + + // Run the command and wait for it to complete (--test-time will make it exit) + err := cmd.Run() + exitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } + } + + if exitCode != tt.wantExitCode { + t.Errorf("got exit code = %v, want %v\nOutput: %s", exitCode, tt.wantExitCode, out.String()) + } + }) + } +} + +func TestPublisherSubscriberIntegration(t *testing.T) { + hostPort, password := getTestConnectionDetails() + + // Create a Redis client for verification + client := redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: 0, + }) + defer client.Close() + + ctx := context.Background() + + // Test connection + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available at %s: %v", hostPort, err) + } + + t.Run("publisher and subscriber together", func(t *testing.T) { + // Start subscriber first + subCmd := exec.Command(getBinaryPath(), + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "subscribe", + "--clients", "2", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "2", + ) + subCmd.Env = os.Environ() + subCmd.Env = append(subCmd.Env, "GOCOVERDIR=.coverdata") + var subOut bytes.Buffer + subCmd.Stdout = &subOut + subCmd.Stderr = &subOut + + err := subCmd.Start() + if err != nil { + t.Fatalf("Failed to start subscriber: %v", err) + } + + // Give subscriber time to connect + time.Sleep(500 * time.Millisecond) + + // Start publisher (will run for 1 second and exit) + pubCmd := exec.Command(getBinaryPath(), + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "publish", + "--clients", "1", + "--channel-minimum", "1", + "--channel-maximum", "2", + "--test-time", "1", + "--rps", "50", + "--data-size", "128", + ) + pubCmd.Env = os.Environ() + pubCmd.Env = append(pubCmd.Env, "GOCOVERDIR=.coverdata") + var pubOut bytes.Buffer + pubCmd.Stdout = &pubOut + pubCmd.Stderr = &pubOut + + // Run publisher and wait for it to complete + err = pubCmd.Run() + pubExitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + pubExitCode = ws.ExitStatus() + } + } + + // Stop subscriber + time.Sleep(500 * time.Millisecond) + subCmd.Process.Signal(os.Interrupt) + err = subCmd.Wait() + subExitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + subExitCode = ws.ExitStatus() + } + } + + if pubExitCode != 0 { + t.Errorf("Publisher exit code = %v, want 0\nOutput: %s", pubExitCode, pubOut.String()) + } + if subExitCode != 0 { + t.Errorf("Subscriber exit code = %v, want 0\nOutput: %s", subExitCode, subOut.String()) + } + + t.Logf("Subscriber output:\n%s", subOut.String()) + t.Logf("Publisher output:\n%s", pubOut.String()) + }) +} + +func TestErrorCases(t *testing.T) { + var tests = []struct { + name string + wantExitCode int + args []string + }{ + { + "invalid mode", + 1, + []string{ + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "invalid_mode", + }, + }, + { + "invalid port", + 1, + []string{ + "--host", "127.0.0.1", + "--port", "99999", + "--mode", "subscribe", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(getBinaryPath(), tt.args...) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, "GOCOVERDIR=.coverdata") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + + err := cmd.Run() + exitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } + } + + // For error cases, we expect non-zero exit code + if tt.wantExitCode != 0 && exitCode == 0 { + t.Errorf("expected non-zero exit code, got 0\nOutput: %s", out.String()) + } else if tt.wantExitCode == 0 && exitCode != 0 { + t.Errorf("expected exit code 0, got %d\nOutput: %s", exitCode, out.String()) + } + }) + } +} + +func TestDataSizeVariations(t *testing.T) { + var tests = []struct { + name string + dataSize string + wantExitCode int + }{ + {"small payload 64 bytes", "64", 0}, + {"medium payload 512 bytes", "512", 0}, + {"large payload 4096 bytes", "4096", 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(getBinaryPath(), + "--host", "127.0.0.1", + "--port", "6379", + "--mode", "publish", + "--clients", "1", + "--channel-minimum", "1", + "--channel-maximum", "1", + "--test-time", "1", + "--data-size", tt.dataSize, + ) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, "GOCOVERDIR=.coverdata") + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + + // Run the command and wait for it to complete (--test-time will make it exit) + err := cmd.Run() + exitCode := 0 + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + exitCode = ws.ExitStatus() + } + } + + if exitCode != tt.wantExitCode { + t.Errorf("got exit code = %v, want %v\nOutput: %s", exitCode, tt.wantExitCode, out.String()) + } + }) + } +} From a33483375809db83911726f2756011c4c2ba6efd Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Fri, 28 Nov 2025 21:35:28 +0000 Subject: [PATCH 06/17] Respect payload size with RTT measurment on the js tool --- js/ioredis/lib/publisher.js | 79 +++++++++++++++++++++++++++--------- js/ioredis/lib/subscriber.js | 10 ++++- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js index 8c65a5a..20c6f5a 100644 --- a/js/ioredis/lib/publisher.js +++ b/js/ioredis/lib/publisher.js @@ -18,31 +18,72 @@ async function publisherRoutine( ); } - const payload = !measureRTT ? 'A'.repeat(dataSize) : ''; + // Pre-allocate payload once per publisher to avoid repeated allocations + // Timestamp format: 13 bytes for milliseconds (e.g., "1730745600000") + // Format: " " to reach dataSize + const timestampSize = 13; // Date.now() returns milliseconds (13 digits) + let paddingPayload = ''; + + if (measureRTT && dataSize > timestampSize + 1) { + // +1 for space separator + const paddingSize = dataSize - timestampSize - 1; + paddingPayload = 'A'.repeat(paddingSize); + } else if (!measureRTT) { + paddingPayload = 'A'.repeat(dataSize); + } + const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher try { - while (isRunningRef.value) { - for (const channel of channels) { - try { - // Apply rate limiting if configured - if (rateLimiter) { - await rateLimiter.removeTokens(1); - } - - let msg = payload; - if (measureRTT) { - msg = Date.now().toString(); + if (measureRTT) { + // RTT mode: generate timestamp for each message with padding to reach dataSize + while (isRunningRef.value) { + for (const channel of channels) { + try { + // Apply rate limiting if configured + if (rateLimiter) { + await rateLimiter.removeTokens(1); + } + + let msg; + if (dataSize > timestampSize + 1) { + // Format: " " + msg = Date.now().toString() + ' ' + paddingPayload; + } else { + // Just timestamp if dataSize is too small + msg = Date.now().toString(); + } + + if (mode === 'spublish') { + await duplicatedClient.spublish(channel, msg); + } else { + await duplicatedClient.publish(channel, msg); + } + totalMessagesRef.value++; + } catch (err) { + console.error(`Error publishing to channel ${channel}:`, err); } + } + } + } else { + // Fixed payload mode: reuse pre-allocated payload + while (isRunningRef.value) { + for (const channel of channels) { + try { + // Apply rate limiting if configured + if (rateLimiter) { + await rateLimiter.removeTokens(1); + } - if (mode === 'spublish') { - await duplicatedClient.spublish(channel, msg); - } else { - await duplicatedClient.publish(channel, msg); + if (mode === 'spublish') { + await duplicatedClient.spublish(channel, payload); + } else { + await duplicatedClient.publish(channel, payload); + } + totalMessagesRef.value++; + } catch (err) { + console.error(`Error publishing to channel ${channel}:`, err); } - totalMessagesRef.value++; - } catch (err) { - console.error(`Error publishing to channel ${channel}:`, err); } } } diff --git a/js/ioredis/lib/subscriber.js b/js/ioredis/lib/subscriber.js index 5d296f3..1b9c638 100644 --- a/js/ioredis/lib/subscriber.js +++ b/js/ioredis/lib/subscriber.js @@ -67,7 +67,15 @@ async function subscriberRoutine( if (measureRTT) { try { const now = Date.now(); - const timestamp = Number(message); // Timestamp from publisher + // Extract timestamp from payload (format: " " or just "") + // Timestamp is always 13 bytes for milliseconds (Date.now()) + const timestampSize = 13; + let timestampStr = message; + if (message.length > timestampSize) { + // Extract just the timestamp part (first 13 characters) + timestampStr = message.substring(0, timestampSize); + } + const timestamp = Number(timestampStr); const rtt = now - timestamp; if (rtt >= 0) { // Add to accumulator for per-tick average calculation From 9a2efa6a7b1621c2fc025d82b93bbdbd2abfdaae Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:12:15 +0000 Subject: [PATCH 07/17] wait for cluster ready --- js/ioredis/lib/redisManager.js | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index e739be2..b04c8e6 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -73,11 +73,26 @@ async function runBenchmark(argv) { } ); + // Wait for cluster to be ready and discover all nodes + await new Promise((resolve) => { + cluster.on('ready', resolve); + }); + + // Get all master nodes from the cluster + const nodes = cluster.nodes('master'); + console.log(`Cluster mode - discovered ${nodes.length} master nodes`); + // Populate slotClientMap by using the cluster client for all slots + // The cluster client will automatically route commands to the correct node + // based on the key's hash slot, handling MOVED/ASK redirects automatically + for (let slot = 0; slot <= 16383; slot++) { + slotClientMap.set(slot, cluster); + } clients.push(cluster); - - console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`); + nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); + + console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes: ${nodeAddresses.join(', ')}`); } else { const client = new Redis(redisOptions); clients.push(client); From 8c1f1c4b6baeecbe60a8e1b2cc3fbab26e6b58fb Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:30:23 +0000 Subject: [PATCH 08/17] bumped io-redis to 5.6.0 --- js/ioredis/lib/subscriber.js | 10 ++++++---- js/ioredis/package.json | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/js/ioredis/lib/subscriber.js b/js/ioredis/lib/subscriber.js index 1b9c638..3ec51be 100644 --- a/js/ioredis/lib/subscriber.js +++ b/js/ioredis/lib/subscriber.js @@ -31,12 +31,14 @@ async function subscriberRoutine( } totalSubscribedRef.value -= channels.slice(1).length; } - // Duplicate connection afresh. - pubsub = client.duplicate(); - } else { - pubsub = client.duplicate(); + // Close existing connection before creating new one + await pubsub.quit(); } + // Duplicate connection afresh. + // For cluster clients, duplicate() creates a new cluster-aware client + pubsub = client.duplicate(); + // Set up error logging. pubsub.on('error', (err) => { console.error(`[${clientName}] Redis error: ${err.message}`); diff --git a/js/ioredis/package.json b/js/ioredis/package.json index a9521e9..70a727f 100644 --- a/js/ioredis/package.json +++ b/js/ioredis/package.json @@ -6,7 +6,7 @@ "dependencies": { "cluster-key-slot": "^1.1.0", "hdr-histogram-js": "^2.0.1", - "ioredis": "^4.30.0", + "ioredis": "^5.6.0", "seedrandom": "^3.0.5", "yargs": "^17.7.2" }, From ca68f573304a7705bba014f9e2bcdea1ce771e4e Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:32:15 +0000 Subject: [PATCH 09/17] Fixed wrong naming on var --- js/ioredis/lib/publisher.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js index 20c6f5a..d35bfa1 100644 --- a/js/ioredis/lib/publisher.js +++ b/js/ioredis/lib/publisher.js @@ -76,9 +76,9 @@ async function publisherRoutine( } if (mode === 'spublish') { - await duplicatedClient.spublish(channel, payload); + await duplicatedClient.spublish(channel, paddingPayload); } else { - await duplicatedClient.publish(channel, payload); + await duplicatedClient.publish(channel, paddingPayload); } totalMessagesRef.value++; } catch (err) { From 8f7c92948134e723b72239f6fd79dc707316cd75 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:34:31 +0000 Subject: [PATCH 10/17] Fixed wrong naming on var --- js/ioredis/lib/redisManager.js | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index b04c8e6..3772300 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -82,17 +82,37 @@ async function runBenchmark(argv) { const nodes = cluster.nodes('master'); console.log(`Cluster mode - discovered ${nodes.length} master nodes`); - // Populate slotClientMap by using the cluster client for all slots - // The cluster client will automatically route commands to the correct node - // based on the key's hash slot, handling MOVED/ASK redirects automatically - for (let slot = 0; slot <= 16383; slot++) { - slotClientMap.set(slot, cluster); + // Get the cluster slots mapping to determine which node serves which slots + const slotsMapping = await cluster.cluster('SLOTS'); + + // slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...] + // For each slot range, create a direct connection to the master node + for (const slotRange of slotsMapping) { + const startSlot = slotRange[0]; + const endSlot = slotRange[1]; + const masterInfo = slotRange[2]; // [host, port, nodeId] + const host = masterInfo[0]; + const port = masterInfo[1]; + + // Create a standalone Redis client for this node + const nodeClient = new Redis({ + ...redisOptions, + host, + port + }); + + clients.push(nodeClient); + + // Map all slots in this range to this node's client + for (let slot = startSlot; slot <= endSlot; slot++) { + slotClientMap.set(slot, nodeClient); + } } - clients.push(cluster); nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes: ${nodeAddresses.join(', ')}`); + console.log(`Cluster mode - mapped ${slotClientMap.size} slots to individual node clients`); } else { const client = new Redis(redisOptions); clients.push(client); From b47b88b89e11e5832bcd6d471e3568a8e7efafe9 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:36:43 +0000 Subject: [PATCH 11/17] Fixed wrong naming on var --- js/ioredis/lib/redisManager.js | 50 +++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index 3772300..e5f2b0b 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -85,8 +85,16 @@ async function runBenchmark(argv) { // Get the cluster slots mapping to determine which node serves which slots const slotsMapping = await cluster.cluster('SLOTS'); + // Build a map from "host:port" to the actual node client + const nodeMap = new Map(); + for (const node of nodes) { + const key = `${node.options.host}:${node.options.port}`; + nodeMap.set(key, node); + clients.push(node); + } + // slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...] - // For each slot range, create a direct connection to the master node + // For each slot range, map slots to the corresponding node client for (const slotRange of slotsMapping) { const startSlot = slotRange[0]; const endSlot = slotRange[1]; @@ -94,14 +102,26 @@ async function runBenchmark(argv) { const host = masterInfo[0]; const port = masterInfo[1]; - // Create a standalone Redis client for this node - const nodeClient = new Redis({ - ...redisOptions, - host, - port - }); + // Find the node client for this host:port + const nodeKey = `${host}:${port}`; + let nodeClient = nodeMap.get(nodeKey); + + if (!nodeClient) { + // If not found by exact match, try to find by port only + // (useful when cluster returns internal IPs but we connect via external IP) + for (const [key, client] of nodeMap.entries()) { + if (key.endsWith(`:${port}`)) { + nodeClient = client; + console.log(`Matched node ${nodeKey} to ${key} by port`); + break; + } + } + } - clients.push(nodeClient); + if (!nodeClient) { + console.warn(`Warning: No node client found for ${nodeKey}, using first available node`); + nodeClient = nodes[0]; + } // Map all slots in this range to this node's client for (let slot = startSlot; slot <= endSlot; slot++) { @@ -112,7 +132,7 @@ async function runBenchmark(argv) { nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes: ${nodeAddresses.join(', ')}`); - console.log(`Cluster mode - mapped ${slotClientMap.size} slots to individual node clients`); + console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); } else { const client = new Redis(redisOptions); clients.push(client); @@ -164,7 +184,17 @@ async function runBenchmark(argv) { const publisherName = `publisher#${clientId}`; let client; - client = clients[0] + // For sharded pub/sub in cluster mode, get the client for the first channel's slot + if (argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']) { + const slot = clusterKeySlot(channels[0]); + client = slotClientMap.get(slot); + if (!client) { + console.error(`No client found for slot ${slot} (channel: ${channels[0]})`); + client = clients[0]; // Fallback + } + } else { + client = clients[0]; + } if (argv.verbose) { console.log(`Publisher ${clientId} targeting channels ${channels}`); From ff2c1dd9c84609d3b38f29261d3f8ada79f215ce Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:47:41 +0000 Subject: [PATCH 12/17] cluster slots fix --- js/ioredis/lib/publisher.js | 6 ++-- js/ioredis/lib/redisManager.js | 59 ++++++++++++++++------------------ js/ioredis/lib/subscriber.js | 7 ++-- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js index d35bfa1..69c84d0 100644 --- a/js/ioredis/lib/publisher.js +++ b/js/ioredis/lib/publisher.js @@ -8,7 +8,8 @@ async function publisherRoutine( client, isRunningRef, totalMessagesRef, - rateLimiter + rateLimiter, + skipDuplicate = false ) { if (verbose) { console.log( @@ -32,7 +33,8 @@ async function publisherRoutine( paddingPayload = 'A'.repeat(dataSize); } - const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher + // For cluster node clients, don't duplicate to preserve cluster routing + const duplicatedClient = skipDuplicate ? client : client.duplicate(); try { if (measureRTT) { diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index e5f2b0b..51162ae 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -85,53 +85,41 @@ async function runBenchmark(argv) { // Get the cluster slots mapping to determine which node serves which slots const slotsMapping = await cluster.cluster('SLOTS'); - // Build a map from "host:port" to the actual node client - const nodeMap = new Map(); - for (const node of nodes) { - const key = `${node.options.host}:${node.options.port}`; - nodeMap.set(key, node); - clients.push(node); - } - // slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...] - // For each slot range, map slots to the corresponding node client + // For each slot range, create a direct standalone connection to the master node + const nodeClientsMap = new Map(); // Map from "host:port" to standalone client + for (const slotRange of slotsMapping) { const startSlot = slotRange[0]; const endSlot = slotRange[1]; const masterInfo = slotRange[2]; // [host, port, nodeId] - const host = masterInfo[0]; + const host = masterInfo[0]; // Use internal IP from CLUSTER SLOTS const port = masterInfo[1]; - // Find the node client for this host:port + // Create or reuse a standalone client for this node using internal IP const nodeKey = `${host}:${port}`; - let nodeClient = nodeMap.get(nodeKey); - - if (!nodeClient) { - // If not found by exact match, try to find by port only - // (useful when cluster returns internal IPs but we connect via external IP) - for (const [key, client] of nodeMap.entries()) { - if (key.endsWith(`:${port}`)) { - nodeClient = client; - console.log(`Matched node ${nodeKey} to ${key} by port`); - break; - } - } - } - + let nodeClient = nodeClientsMap.get(nodeKey); if (!nodeClient) { - console.warn(`Warning: No node client found for ${nodeKey}, using first available node`); - nodeClient = nodes[0]; + console.log(`Creating standalone client for node ${host}:${port} (slots ${startSlot}-${endSlot})`); + nodeClient = new Redis({ + ...redisOptions, + host, + port + }); + nodeClientsMap.set(nodeKey, nodeClient); + clients.push(nodeClient); } - // Map all slots in this range to this node's client + // Map all slots in this range to this node's standalone client for (let slot = startSlot; slot <= endSlot; slot++) { slotClientMap.set(slot, nodeClient); } } - nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); + nodeAddresses = Array.from(nodeClientsMap.keys()); - console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes: ${nodeAddresses.join(', ')}`); + console.log(`Cluster mode - created ${nodeClientsMap.size} standalone node clients`); + console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`); console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); } else { const client = new Redis(redisOptions); @@ -200,6 +188,8 @@ async function runBenchmark(argv) { console.log(`Publisher ${clientId} targeting channels ${channels}`); } + const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']; + promises.push( publisherRoutine( publisherName, @@ -210,7 +200,9 @@ async function runBenchmark(argv) { argv['data-size'], client, isRunningRef, - totalMessagesRef + totalMessagesRef, + null, // rateLimiter + skipDuplicate ) ); @@ -245,6 +237,8 @@ async function runBenchmark(argv) { console.log(`Reconnect interval for ${subscriberName}: ${reconnectInterval}ms`); } + const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']; + promises.push( subscriberRoutine( subscriberName, @@ -261,7 +255,8 @@ async function runBenchmark(argv) { totalSubscribedRef, totalConnectsRef, argv.verbose, - argv.clients + argv.clients, + skipDuplicate ) ); } diff --git a/js/ioredis/lib/subscriber.js b/js/ioredis/lib/subscriber.js index 3ec51be..31558a8 100644 --- a/js/ioredis/lib/subscriber.js +++ b/js/ioredis/lib/subscriber.js @@ -13,7 +13,8 @@ async function subscriberRoutine( totalSubscribedRef, totalConnectsRef, verbose, - totalClients + totalClients, + skipDuplicate = false ) { let pubsub = null; let reconnectTimer = null; @@ -35,9 +36,9 @@ async function subscriberRoutine( await pubsub.quit(); } - // Duplicate connection afresh. + // For cluster node clients, don't duplicate to preserve cluster routing // For cluster clients, duplicate() creates a new cluster-aware client - pubsub = client.duplicate(); + pubsub = skipDuplicate ? client : client.duplicate(); // Set up error logging. pubsub.on('error', (err) => { From 8f85e3e6e99a9095151b63b1588d950c56197f85 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:50:08 +0000 Subject: [PATCH 13/17] cluster slots fix --- js/ioredis/lib/redisManager.js | 50 ++++++++++------------------------ 1 file changed, 14 insertions(+), 36 deletions(-) diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index 51162ae..29bce3d 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -69,7 +69,11 @@ async function runBenchmark(argv) { enableReadyCheck: true, lazyConnect: false, connectTimeout: argv['redis-timeout'], - slotsRefreshInterval: argv['slot-refresh-interval'] + slotsRefreshInterval: argv['slot-refresh-interval'], + enableOfflineQueue: true, + retryDelayOnClusterDown: 300, + retryDelayOnFailover: 100, + maxRedirections: 16 } ); @@ -82,45 +86,19 @@ async function runBenchmark(argv) { const nodes = cluster.nodes('master'); console.log(`Cluster mode - discovered ${nodes.length} master nodes`); - // Get the cluster slots mapping to determine which node serves which slots - const slotsMapping = await cluster.cluster('SLOTS'); - - // slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...] - // For each slot range, create a direct standalone connection to the master node - const nodeClientsMap = new Map(); // Map from "host:port" to standalone client - - for (const slotRange of slotsMapping) { - const startSlot = slotRange[0]; - const endSlot = slotRange[1]; - const masterInfo = slotRange[2]; // [host, port, nodeId] - const host = masterInfo[0]; // Use internal IP from CLUSTER SLOTS - const port = masterInfo[1]; - - // Create or reuse a standalone client for this node using internal IP - const nodeKey = `${host}:${port}`; - let nodeClient = nodeClientsMap.get(nodeKey); - if (!nodeClient) { - console.log(`Creating standalone client for node ${host}:${port} (slots ${startSlot}-${endSlot})`); - nodeClient = new Redis({ - ...redisOptions, - host, - port - }); - nodeClientsMap.set(nodeKey, nodeClient); - clients.push(nodeClient); - } - - // Map all slots in this range to this node's standalone client - for (let slot = startSlot; slot <= endSlot; slot++) { - slotClientMap.set(slot, nodeClient); - } + // For cluster mode, use the cluster client itself for all slots + // The cluster client will automatically route commands to the correct node + // and handle MOVED/ASK redirects + for (let slot = 0; slot <= 16383; slot++) { + slotClientMap.set(slot, cluster); } - nodeAddresses = Array.from(nodeClientsMap.keys()); + clients.push(cluster); + nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); - console.log(`Cluster mode - created ${nodeClientsMap.size} standalone node clients`); + console.log(`Cluster mode - using cluster client with ${nodes.length} master nodes`); console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`); - console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); + console.log(`Cluster mode - mapped all slots to cluster client (auto-routing enabled)`); } else { const client = new Redis(redisOptions); clients.push(client); From f040559c2ccfa995d5095065f8d6ddd315f7bb3a Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Mon, 1 Dec 2025 10:52:24 +0000 Subject: [PATCH 14/17] cluster slots fix --- js/ioredis/lib/publisher.js | 15 +++++++++++- js/ioredis/lib/redisManager.js | 45 ++++++++++++++++++++++++++++------ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js index 69c84d0..14bcc20 100644 --- a/js/ioredis/lib/publisher.js +++ b/js/ioredis/lib/publisher.js @@ -33,9 +33,22 @@ async function publisherRoutine( paddingPayload = 'A'.repeat(dataSize); } - // For cluster node clients, don't duplicate to preserve cluster routing + // For cluster mode, use the client directly without duplication + // The cluster client handles routing and can be shared across publishers const duplicatedClient = skipDuplicate ? client : client.duplicate(); + // Wait for client to be ready + if (!skipDuplicate) { + await new Promise((resolve, reject) => { + if (duplicatedClient.status === 'ready') { + resolve(); + } else { + duplicatedClient.once('ready', resolve); + duplicatedClient.once('error', reject); + } + }); + } + try { if (measureRTT) { // RTT mode: generate timestamp for each message with padding to reach dataSize diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index 29bce3d..e7234ac 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -86,19 +86,48 @@ async function runBenchmark(argv) { const nodes = cluster.nodes('master'); console.log(`Cluster mode - discovered ${nodes.length} master nodes`); - // For cluster mode, use the cluster client itself for all slots - // The cluster client will automatically route commands to the correct node - // and handle MOVED/ASK redirects - for (let slot = 0; slot <= 16383; slot++) { - slotClientMap.set(slot, cluster); + // Get the cluster slots mapping to determine which node serves which slots + const slotsMapping = await cluster.cluster('SLOTS'); + + console.log(`Cluster SLOTS mapping:`, JSON.stringify(slotsMapping, null, 2)); + + // Build a map from slot ranges to the actual node Redis clients + // The nodes returned by cluster.nodes() are the actual connected Redis instances + for (const slotRange of slotsMapping) { + const startSlot = slotRange[0]; + const endSlot = slotRange[1]; + const masterInfo = slotRange[2]; // [host, port, nodeId] + const host = masterInfo[0]; + const port = masterInfo[1]; + + // Find the matching node client by port + let nodeClient = null; + for (const node of nodes) { + if (node.options.port === port) { + nodeClient = node; + console.log(`Mapped slots ${startSlot}-${endSlot} to node ${node.options.host}:${node.options.port}`); + break; + } + } + + if (!nodeClient) { + console.warn(`Warning: No node found for ${host}:${port}, using first node`); + nodeClient = nodes[0]; + } + + // Map all slots in this range to the node client + for (let slot = startSlot; slot <= endSlot; slot++) { + slotClientMap.set(slot, nodeClient); + } } - clients.push(cluster); + // Add all node clients to the clients array + clients.push(...nodes); nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); - console.log(`Cluster mode - using cluster client with ${nodes.length} master nodes`); + console.log(`Cluster mode - using ${nodes.length} node clients from cluster`); console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`); - console.log(`Cluster mode - mapped all slots to cluster client (auto-routing enabled)`); + console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); } else { const client = new Redis(redisOptions); clients.push(client); From 657eee28146247d2e25adb623885cc3bfd9eba42 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Fri, 5 Dec 2025 12:27:16 +0000 Subject: [PATCH 15/17] Using 4.30.2-v4-beta.1 io-redis --- js/ioredis/lib/config.js | 8 ++--- js/ioredis/lib/publisher.js | 2 ++ js/ioredis/lib/redisManager.js | 63 ++++++++++++++++------------------ js/ioredis/package-lock.json | 9 +++-- js/ioredis/package.json | 2 +- 5 files changed, 41 insertions(+), 43 deletions(-) diff --git a/js/ioredis/lib/config.js b/js/ioredis/lib/config.js index c5be263..50d1c87 100644 --- a/js/ioredis/lib/config.js +++ b/js/ioredis/lib/config.js @@ -29,11 +29,11 @@ function parseArgs() { .option('test-time', { default: 0 }) .option('rand-seed', { default: 12345 }) .option('subscriber-prefix', { default: 'channel-' }) - .option('oss-cluster-api-distribute-subscribers', { default: false }) + .option('oss-cluster-api-distribute-subscribers', { type: 'boolean', default: false }) .option('slot-refresh-interval', { default: -1 }) - .option('print-messages', { default: false }) - .option('verbose', { default: false }) - .option('measure-rtt-latency', { default: false }) + .option('print-messages', { type: 'boolean', default: false }) + .option('verbose', { type: 'boolean', default: false }) + .option('measure-rtt-latency', { type: 'boolean', default: false }) .option('redis-timeout', { default: 120000 }) .option('pool-size', { default: 0 }) .help().argv; diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js index 14bcc20..4588780 100644 --- a/js/ioredis/lib/publisher.js +++ b/js/ioredis/lib/publisher.js @@ -77,6 +77,7 @@ async function publisherRoutine( totalMessagesRef.value++; } catch (err) { console.error(`Error publishing to channel ${channel}:`, err); + process.exit(1); } } } @@ -98,6 +99,7 @@ async function publisherRoutine( totalMessagesRef.value++; } catch (err) { console.error(`Error publishing to channel ${channel}:`, err); + process.exit(1); } } } diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index e7234ac..1918914 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -73,7 +73,8 @@ async function runBenchmark(argv) { enableOfflineQueue: true, retryDelayOnClusterDown: 300, retryDelayOnFailover: 100, - maxRedirections: 16 + maxRedirections: 16, + maxRetriesPerRequest: null } ); @@ -89,43 +90,37 @@ async function runBenchmark(argv) { // Get the cluster slots mapping to determine which node serves which slots const slotsMapping = await cluster.cluster('SLOTS'); - console.log(`Cluster SLOTS mapping:`, JSON.stringify(slotsMapping, null, 2)); - - // Build a map from slot ranges to the actual node Redis clients - // The nodes returned by cluster.nodes() are the actual connected Redis instances + console.log(`\nCluster SLOTS mapping:`); for (const slotRange of slotsMapping) { - const startSlot = slotRange[0]; - const endSlot = slotRange[1]; - const masterInfo = slotRange[2]; // [host, port, nodeId] - const host = masterInfo[0]; - const port = masterInfo[1]; - - // Find the matching node client by port - let nodeClient = null; - for (const node of nodes) { - if (node.options.port === port) { - nodeClient = node; - console.log(`Mapped slots ${startSlot}-${endSlot} to node ${node.options.host}:${node.options.port}`); - break; - } - } - - if (!nodeClient) { - console.warn(`Warning: No node found for ${host}:${port}, using first node`); - nodeClient = nodes[0]; - } + console.log(` Slots ${slotRange[0]}-${slotRange[1]}: ${slotRange[2][0]}:${slotRange[2][1]}`); + } + console.log(''); - // Map all slots in this range to the node client - for (let slot = startSlot; slot <= endSlot; slot++) { - slotClientMap.set(slot, nodeClient); + // Use the cluster's internal slot mapping to get the node for each slot + // cluster.slots[slot] contains an array of node keys (e.g., ["127.0.0.1:6379"]) + // cluster.connectionPool.getInstanceByKey(key) returns the Redis instance for that node + console.log('Mapping slots to cluster nodes...'); + for (let slot = 0; slot <= 16383; slot++) { + const nodeKeys = cluster.slots[slot]; + if (nodeKeys && nodeKeys.length > 0) { + // Get the master node (first in the array) + const masterKey = nodeKeys[0]; + const nodeClient = cluster.connectionPool.getInstanceByKey(masterKey); + if (nodeClient) { + slotClientMap.set(slot, nodeClient); + + // Track unique nodes + if (!nodeAddresses.includes(masterKey)) { + nodeAddresses.push(masterKey); + clients.push(nodeClient); + // Increase max listeners since multiple publishers/subscribers will share this client + nodeClient.setMaxListeners(0); // 0 = unlimited + } + } } } - // Add all node clients to the clients array - clients.push(...nodes); - nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`); - - console.log(`Cluster mode - using ${nodes.length} node clients from cluster`); + console.log(`\nCluster mode - using ${clients.length} cluster node connections`); console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`); console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); } else { @@ -186,6 +181,8 @@ async function runBenchmark(argv) { if (!client) { console.error(`No client found for slot ${slot} (channel: ${channels[0]})`); client = clients[0]; // Fallback + } else if (argv.verbose) { + console.log(`Publisher ${clientId}: channel=${channels[0]}, slot=${slot}, node=${client.options.host}:${client.options.port}`); } } else { client = clients[0]; diff --git a/js/ioredis/package-lock.json b/js/ioredis/package-lock.json index 142e3b1..7419264 100644 --- a/js/ioredis/package-lock.json +++ b/js/ioredis/package-lock.json @@ -11,7 +11,7 @@ "dependencies": { "cluster-key-slot": "^1.1.0", "hdr-histogram-js": "^2.0.1", - "ioredis": "^4.30.0", + "ioredis": "4.30.2-v4-beta.1", "seedrandom": "^3.0.5", "yargs": "^17.7.2" }, @@ -481,10 +481,9 @@ } }, "node_modules/ioredis": { - "version": "4.30.0", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.30.0.tgz", - "integrity": "sha512-P9F4Eo6zicYsIJbEy/mPJmSxKY0rVcmiy5H8oXPxPDotQRCvCBjBuI5QWoQQanVE9jdeocnum5iqYAHl4pHdLA==", - "license": "MIT", + "version": "4.30.2-v4-beta.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.30.2-v4-beta.1.tgz", + "integrity": "sha512-vOTYX8T9cq/KUrpE270OoaDPDuQOUP9U9UwlTRrn82ke3ytBh4rWv8ghJckffCkPUHT12Mant93s0DGiyiKbRQ==", "dependencies": { "@ioredis/commands": "^1.0.2", "cluster-key-slot": "^1.1.0", diff --git a/js/ioredis/package.json b/js/ioredis/package.json index 70a727f..0007537 100644 --- a/js/ioredis/package.json +++ b/js/ioredis/package.json @@ -6,7 +6,7 @@ "dependencies": { "cluster-key-slot": "^1.1.0", "hdr-histogram-js": "^2.0.1", - "ioredis": "^5.6.0", + "ioredis": "4.30.2-v4-beta.1", "seedrandom": "^3.0.5", "yargs": "^17.7.2" }, From 2624c2b569ce794d72960b7281825c7dcde220e3 Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Fri, 5 Dec 2025 21:12:08 +0000 Subject: [PATCH 16/17] Ensuring indivual cluster clients --- js/ioredis/lib/redisManager.js | 190 ++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 77 deletions(-) diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index 1918914..5105328 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -56,73 +56,71 @@ async function runBenchmark(argv) { console.log(`Using ${argv['redis-timeout']} redis-timeout`); if (argv['oss-cluster-api-distribute-subscribers']) { - cluster = new Redis.Cluster( - [ - { - host: argv.host, - port: argv.port - } - ], - { - redisOptions, - scaleReads: 'master', - enableReadyCheck: true, - lazyConnect: false, - connectTimeout: argv['redis-timeout'], - slotsRefreshInterval: argv['slot-refresh-interval'], - enableOfflineQueue: true, - retryDelayOnClusterDown: 300, - retryDelayOnFailover: 100, - maxRedirections: 16, - maxRetriesPerRequest: null - } - ); + // Create N independent cluster clients for publishers + const numClusterClients = argv.clients; + + console.log(`\nCluster mode - creating ${numClusterClients} cluster clients...`); + + const clusterOptions = { + redisOptions, + scaleReads: 'master', + enableReadyCheck: true, + lazyConnect: false, + connectTimeout: argv['redis-timeout'], + slotsRefreshInterval: argv['slot-refresh-interval'], + enableOfflineQueue: true, + retryDelayOnClusterDown: 300, + retryDelayOnFailover: 100, + maxRedirections: 16, + maxRetriesPerRequest: null + }; + + // Create N cluster clients + for (let i = 0; i < numClusterClients; i++) { + const clusterClient = new Redis.Cluster( + [{ host: argv.host, port: argv.port }], + clusterOptions + ); - // Wait for cluster to be ready and discover all nodes - await new Promise((resolve) => { - cluster.on('ready', resolve); - }); + clusterClient.setMaxListeners(0); // Unlimited listeners + clients.push(clusterClient); - // Get all master nodes from the cluster - const nodes = cluster.nodes('master'); - console.log(`Cluster mode - discovered ${nodes.length} master nodes`); + // Wait for cluster client to be ready + await new Promise((resolve, reject) => { + clusterClient.on('ready', resolve); + clusterClient.on('error', reject); + }); - // Get the cluster slots mapping to determine which node serves which slots - const slotsMapping = await cluster.cluster('SLOTS'); + // Use the first cluster client to discover topology + if (i === 0) { + cluster = clusterClient; + const slotsMapping = await clusterClient.cluster('SLOTS'); - console.log(`\nCluster SLOTS mapping:`); - for (const slotRange of slotsMapping) { - console.log(` Slots ${slotRange[0]}-${slotRange[1]}: ${slotRange[2][0]}:${slotRange[2][1]}`); - } - console.log(''); + console.log(`Cluster mode - discovered ${slotsMapping.length} master nodes\n`); + console.log(`Cluster SLOTS mapping:`); - // Use the cluster's internal slot mapping to get the node for each slot - // cluster.slots[slot] contains an array of node keys (e.g., ["127.0.0.1:6379"]) - // cluster.connectionPool.getInstanceByKey(key) returns the Redis instance for that node - console.log('Mapping slots to cluster nodes...'); - for (let slot = 0; slot <= 16383; slot++) { - const nodeKeys = cluster.slots[slot]; - if (nodeKeys && nodeKeys.length > 0) { - // Get the master node (first in the array) - const masterKey = nodeKeys[0]; - const nodeClient = cluster.connectionPool.getInstanceByKey(masterKey); - if (nodeClient) { - slotClientMap.set(slot, nodeClient); - - // Track unique nodes - if (!nodeAddresses.includes(masterKey)) { - nodeAddresses.push(masterKey); - clients.push(nodeClient); - // Increase max listeners since multiple publishers/subscribers will share this client - nodeClient.setMaxListeners(0); // 0 = unlimited + for (const slotRange of slotsMapping) { + const startSlot = slotRange[0]; + const endSlot = slotRange[1]; + const host = slotRange[2][0]; + const port = slotRange[2][1]; + const nodeAddr = `${host}:${port}`; + + console.log(` Slots ${startSlot}-${endSlot}: ${nodeAddr}`); + + if (!nodeAddresses.includes(nodeAddr)) { + nodeAddresses.push(nodeAddr); } } + console.log(''); + } + + if ((i + 1) % 10 === 0 || i === numClusterClients - 1) { + console.log(` Created ${i + 1}/${numClusterClients} cluster clients...`); } } - console.log(`\nCluster mode - using ${clients.length} cluster node connections`); - console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`); - console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`); + console.log(`\nCluster mode - created ${clients.length} cluster clients`); } else { const client = new Redis(redisOptions); clients.push(client); @@ -172,27 +170,18 @@ async function runBenchmark(argv) { } const publisherName = `publisher#${clientId}`; - let client; - - // For sharded pub/sub in cluster mode, get the client for the first channel's slot - if (argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']) { - const slot = clusterKeySlot(channels[0]); - client = slotClientMap.get(slot); - if (!client) { - console.error(`No client found for slot ${slot} (channel: ${channels[0]})`); - client = clients[0]; // Fallback - } else if (argv.verbose) { - console.log(`Publisher ${clientId}: channel=${channels[0]}, slot=${slot}, node=${client.options.host}:${client.options.port}`); - } - } else { - client = clients[0]; - } + + // Round-robin through the cluster clients + const client = clients[(clientId - 1) % clients.length]; if (argv.verbose) { - console.log(`Publisher ${clientId} targeting channels ${channels}`); + const slot = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers'] + ? clusterKeySlot(channels[0]) + : 'N/A'; + console.log(`Publisher ${clientId}: channel=${channels[0]}, slot=${slot}, client=${(clientId - 1) % clients.length}/${clients.length}`); } - const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']; + const skipDuplicate = true; // Don't duplicate cluster clients promises.push( publisherRoutine( @@ -229,8 +218,55 @@ async function runBenchmark(argv) { } const subscriberName = `subscriber#${clientId}`; - const slot = clusterKeySlot(channels[0]); - const client = slotClientMap.get(slot); + let client; + + // For sharded pub/sub in cluster mode, we need standalone clients for subscribers + // because ioredis Cluster clients don't support SSUBSCRIBE properly + if (argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']) { + const slot = clusterKeySlot(channels[0]); + + // Get node info from the first cluster client + const nodeKeys = cluster.slots[slot]; + if (!nodeKeys || nodeKeys.length === 0) { + console.error(`No node found for slot ${slot} (channel: ${channels[0]})`); + process.exit(1); + } + + const masterKey = nodeKeys[0]; + const [host, port] = masterKey.split(':'); + + // Create a standalone Redis client connected directly to the node + client = new Redis({ + host, + port: parseInt(port), + username: argv.user || undefined, + password: argv.a || undefined, + connectTimeout: argv['redis-timeout'], + commandTimeout: argv['redis-timeout'], + maxRetriesPerRequest: 1, + enableReadyCheck: true, + lazyConnect: false + }); + + client.setMaxListeners(0); + + // Wait for client to be ready + await new Promise((resolve, reject) => { + if (client.status === 'ready') { + resolve(); + } else { + client.once('ready', resolve); + client.once('error', reject); + } + }); + + if (argv.verbose) { + console.log(`Subscriber ${clientId}: channel=${channels[0]}, slot=${slot}, node=${host}:${port}`); + } + } else { + // Standalone mode - use first client + client = clients[0]; + } const reconnectInterval = randomInt( argv['min-reconnect-interval'], @@ -241,7 +277,7 @@ async function runBenchmark(argv) { console.log(`Reconnect interval for ${subscriberName}: ${reconnectInterval}ms`); } - const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers']; + const skipDuplicate = true; // Don't duplicate - we already created a dedicated client promises.push( subscriberRoutine( From d3f746c5024fa4ff55257dc8f413aa2690a0ed1e Mon Sep 17 00:00:00 2001 From: fcostaoliveira Date: Tue, 9 Dec 2025 23:35:49 +0000 Subject: [PATCH 17/17] Added --rps to spublish --- js/ioredis/lib/config.js | 1 + js/ioredis/lib/redisManager.js | 27 +++++++++++++++++++++++---- js/ioredis/package-lock.json | 6 ++++++ js/ioredis/package.json | 3 ++- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/js/ioredis/lib/config.js b/js/ioredis/lib/config.js index 50d1c87..a4fa612 100644 --- a/js/ioredis/lib/config.js +++ b/js/ioredis/lib/config.js @@ -36,6 +36,7 @@ function parseArgs() { .option('measure-rtt-latency', { type: 'boolean', default: false }) .option('redis-timeout', { default: 120000 }) .option('pool-size', { default: 0 }) + .option('rps', { description: 'Target requests per second (RPS) for publishers', default: 0 }) .help().argv; } diff --git a/js/ioredis/lib/redisManager.js b/js/ioredis/lib/redisManager.js index 5105328..4106c72 100644 --- a/js/ioredis/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -4,6 +4,7 @@ const { publisherRoutine } = require('./publisher'); const { subscriberRoutine } = require('./subscriber'); const { updateCLI, writeFinalResults, createRttHistogram, RttAccumulator } = require('./metrics'); const seedrandom = require('seedrandom'); +const { RateLimiter } = require('limiter'); async function runBenchmark(argv) { console.log(`pubsub-sub-bench (JavaScript version)`); @@ -158,7 +159,13 @@ async function runBenchmark(argv) { // Run publishers totalPublishersRef.value = argv.clients; console.log(`Starting ${argv.clients} publishers in ${argv.mode} mode`); - + + // Log rate limiting if RPS is specified + if (argv.rps > 0) { + const rpsPerPublisher = argv.rps / argv.clients; + console.log(`Rate limiting enabled: Target ${argv.rps} RPS total (${rpsPerPublisher.toFixed(2)} RPS per publisher)`); + } + for (let clientId = 1; clientId <= argv.clients; clientId++) { const channels = []; const numChannels = pickChannelCount(argv); @@ -183,6 +190,18 @@ async function runBenchmark(argv) { const skipDuplicate = true; // Don't duplicate cluster clients + // Create a rate limiter for this publisher if RPS is specified + let publisherRateLimiter = null; + if (argv.rps > 0) { + const rpsPerPublisher = argv.rps / argv.clients; + // RateLimiter takes tokensPerInterval and interval + // For RPS, we want rpsPerPublisher tokens per second (1000ms) + publisherRateLimiter = new RateLimiter({ + tokensPerInterval: rpsPerPublisher, + interval: 'second' + }); + } + promises.push( publisherRoutine( publisherName, @@ -194,13 +213,13 @@ async function runBenchmark(argv) { client, isRunningRef, totalMessagesRef, - null, // rateLimiter + publisherRateLimiter, skipDuplicate ) ); - + totalConnectsRef.value++; - + if (clientId % 100 === 0) { console.log(`Created ${clientId} publishers so far.`); } diff --git a/js/ioredis/package-lock.json b/js/ioredis/package-lock.json index 7419264..884e8d3 100644 --- a/js/ioredis/package-lock.json +++ b/js/ioredis/package-lock.json @@ -12,6 +12,7 @@ "cluster-key-slot": "^1.1.0", "hdr-histogram-js": "^2.0.1", "ioredis": "4.30.2-v4-beta.1", + "limiter": "^3.0.0", "seedrandom": "^3.0.5", "yargs": "^17.7.2" }, @@ -561,6 +562,11 @@ "url": "https://github.com/sponsors/antonk52" } }, + "node_modules/limiter": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/limiter/-/limiter-3.0.0.tgz", + "integrity": "sha512-hev7DuXojsTFl2YwyzUJMDnZ/qBDd3yZQLSH3aD4tdL1cqfc3TMnoecEJtWFaQFdErZsKoFMBTxF/FBSkgDbEg==" + }, "node_modules/lint-staged": { "version": "15.5.1", "resolved": "https://registry.npmjs.org/lint-staged/-/lint-staged-15.5.1.tgz", diff --git a/js/ioredis/package.json b/js/ioredis/package.json index 0007537..322034e 100644 --- a/js/ioredis/package.json +++ b/js/ioredis/package.json @@ -7,6 +7,7 @@ "cluster-key-slot": "^1.1.0", "hdr-histogram-js": "^2.0.1", "ioredis": "4.30.2-v4-beta.1", + "limiter": "^3.0.0", "seedrandom": "^3.0.5", "yargs": "^17.7.2" }, @@ -21,4 +22,4 @@ "lint-staged": { "js/**/*.js": "prettier --write" } -} \ No newline at end of file +}