Skip to content

Commit e186a13

Browse files
committed
deduplicate notifications based on sequence id
1 parent 4ec6771 commit e186a13

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type RedisType = RedisClient<any, any, any, any, any>;
1212

1313
export const SMIGRATED_EVENT = "__SMIGRATED";
1414
export interface SMigratedEvent {
15+
seqId: number,
1516
source: { host: string, port: number };
1617
destination: { host: string, port: number };
1718
ranges: (number | [number, number])[]

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export default class RedisClusterSlots<
114114
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
115115
pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
116116
clientSideCache?: PooledClientSideCacheProvider;
117+
smigratedSeqIdsSeen = new Set<number>;
117118

118119
#isOpen = false;
119120

@@ -256,6 +257,12 @@ export default class RedisClusterSlots<
256257
#handleSmigrated = async (event: SMigratedEvent) => {
257258
dbgMaintenance(`[CSlots]: handle smigrated`, event);
258259

260+
if(this.smigratedSeqIdsSeen.has(event.seqId)) {
261+
dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`)
262+
return
263+
}
264+
this.smigratedSeqIdsSeen.add(event.seqId);
265+
259266
// slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
260267
// masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
261268
// replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();

0 commit comments

Comments
 (0)