Skip to content

Commit 8b10a1f

Browse files
committed
deduplicate smigrated notifications
1 parent 49328c1 commit 8b10a1f

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type RedisType = RedisClient<any, any, any, any, any>;
1212

1313
export const SMIGRATED_EVENT = "__SMIGRATED";
1414
export interface SMigratedEvent {
15+
seqId: number,
1516
source: { host: string, port: number };
1617
destination: { host: string, port: number };
1718
ranges: (number | [number, number])[]

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export default class RedisClusterSlots<
114114
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
115115
pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
116116
clientSideCache?: PooledClientSideCacheProvider;
117+
smigratedSeqIdsSeen = new Set<number>;
117118

118119
#isOpen = false;
119120

@@ -256,6 +257,12 @@ export default class RedisClusterSlots<
256257
#handleSmigrated = async (event: SMigratedEvent) => {
257258
dbgMaintenance(`[CSlots]: handle smigrated`, event);
258259

260+
if(this.smigratedSeqIdsSeen.has(event.seqId)) {
261+
dbgMaintenance(`[CSlots]: sequence id ${event.seqId} already seen, abort`)
262+
return
263+
}
264+
this.smigratedSeqIdsSeen.add(event.seqId);
265+
259266
// slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
260267
// masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
261268
// replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
@@ -446,7 +453,7 @@ export default class RedisClusterSlots<
446453
nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
447454
return (
448455
node.connectPromise ?? // if the node is connecting
449-
node.client ?? // if the node is connected
456+
(node.client ? Promise.resolve(node.client) : undefined) ?? // if the node is connected
450457
this.#createNodeClient(node) // if the not is disconnected
451458
);
452459
}
@@ -540,20 +547,30 @@ export default class RedisClusterSlots<
540547
this.#emit('disconnect');
541548
}
542549

543-
getClient(
550+
async getClientAndSlotNumber(
544551
firstKey: RedisArgument | undefined,
545552
isReadonly: boolean | undefined
546-
) {
553+
): Promise<{
554+
client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
555+
slotNumber?: number
556+
}> {
547557
if (!firstKey) {
548-
return this.nodeClient(this.getRandomNode());
558+
return {
559+
client: await this.nodeClient(this.getRandomNode())
560+
};
549561
}
550562

551563
const slotNumber = calculateSlot(firstKey);
552564
if (!isReadonly) {
553-
return this.nodeClient(this.slots[slotNumber].master);
565+
return {
566+
client: await this.nodeClient(this.slots[slotNumber].master),
567+
slotNumber
568+
};
554569
}
555570

556-
return this.nodeClient(this.getSlotRandomNode(slotNumber));
571+
return {
572+
client: await this.nodeClient(this.getSlotRandomNode(slotNumber))
573+
};
557574
}
558575

559576
*#iterateAllNodes() {

0 commit comments

Comments
 (0)