Skip to content

Commit 7648abb

Browse files
committed
add slotnumber to commands
1 parent e186a13 commit 7648abb

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ export interface CommandOptions<T = TypeMapping> {
1919
* Timeout for the command in milliseconds
2020
*/
2121
timeout?: number;
22+
/**
23+
* @internal
24+
* The slot the command is targeted to (if any)
25+
*/
26+
slotNumber?: number;
2227
}
2328

2429
export interface CommandToWrite extends CommandWaitingForReply {
@@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3338
listener: () => unknown;
3439
originalTimeout: number | undefined;
3540
} | undefined;
41+
slotNumber?: number
3642
}
3743

3844
interface CommandWaitingForReply {
@@ -219,6 +225,7 @@ export default class RedisCommandsQueue {
219225
channelsCounter: undefined,
220226
typeMapping: options?.typeMapping
221227
};
228+
value.slotNumber = options?.slotNumber
222229

223230
// If #maintenanceCommandTimeout was explicitly set, we should
224231
// use it instead of the timeout provided by the command

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ export default class RedisClusterSlots<
453453
nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
454454
return (
455455
node.connectPromise ?? // if the node is connecting
456-
node.client ?? // if the node is connected
456+
(node.client ? Promise.resolve(node.client) : undefined) ?? // if the node is connected
457457
this.#createNodeClient(node) // if the not is disconnected
458458
);
459459
}
@@ -547,20 +547,30 @@ export default class RedisClusterSlots<
547547
this.#emit('disconnect');
548548
}
549549

550-
getClient(
550+
async getClientAndSlotNumber(
551551
firstKey: RedisArgument | undefined,
552552
isReadonly: boolean | undefined
553-
) {
553+
): Promise<{
554+
client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
555+
slotNumber?: number
556+
}> {
554557
if (!firstKey) {
555-
return this.nodeClient(this.getRandomNode());
558+
return {
559+
client: await this.nodeClient(this.getRandomNode())
560+
};
556561
}
557562

558563
const slotNumber = calculateSlot(firstKey);
559564
if (!isReadonly) {
560-
return this.nodeClient(this.slots[slotNumber].master);
565+
return {
566+
client: await this.nodeClient(this.slots[slotNumber].master),
567+
slotNumber
568+
};
561569
}
562570

563-
return this.nodeClient(this.getSlotRandomNode(slotNumber));
571+
return {
572+
client: await this.nodeClient(this.getSlotRandomNode(slotNumber))
573+
};
564574
}
565575

566576
*#iterateAllNodes() {

packages/client/lib/cluster/index.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,16 @@ export default class RedisCluster<
416416
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
417417
): Promise<T> {
418418
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
419-
let client = await this._slots.getClient(firstKey, isReadonly);
419+
let { client, slotNumber } = await this._slots.getClientAndSlotNumber(firstKey, isReadonly);
420420
let i = 0;
421421

422422
let myFn = fn;
423423

424424
while (true) {
425425
try {
426+
if(options !== undefined) {
427+
options.slotNumber = slotNumber;
428+
}
426429
return await myFn(client, options);
427430
} catch (err) {
428431
myFn = fn;
@@ -451,7 +454,9 @@ export default class RedisCluster<
451454

452455
if (err.message.startsWith('MOVED')) {
453456
await this._slots.rediscover(client);
454-
client = await this._slots.getClient(firstKey, isReadonly);
457+
const clientAndSlot = await this._slots.getClientAndSlotNumber(firstKey, isReadonly);
458+
client = clientAndSlot.client;
459+
slotNumber = clientAndSlot.slotNumber;
455460
continue;
456461
}
457462

@@ -485,11 +490,11 @@ export default class RedisCluster<
485490
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
486491
return new ((this as any).Multi as Multi)(
487492
async (firstKey, isReadonly, commands) => {
488-
const client = await this._self._slots.getClient(firstKey, isReadonly);
493+
const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly);
489494
return client._executeMulti(commands);
490495
},
491496
async (firstKey, isReadonly, commands) => {
492-
const client = await this._self._slots.getClient(firstKey, isReadonly);
497+
const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly);
493498
return client._executePipeline(commands);
494499
},
495500
routing,

0 commit comments

Comments
 (0)