Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
wireVersion?: number,
wireVersion?: number | null,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
get commonWireVersion(): number | null {
return this.description.commonWireVersion;
}

Expand Down
4 changes: 2 additions & 2 deletions src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class TopologyDescription {
logicalSessionTimeoutMinutes: number | null;
heartbeatFrequencyMS: number;
localThresholdMS: number;
commonWireVersion: number;
commonWireVersion: number | null;
/**
* Create a TopologyDescription
*/
Expand All @@ -66,7 +66,7 @@ export class TopologyDescription {
this.setName = setName ?? null;
this.maxElectionId = maxElectionId ?? null;
this.maxSetVersion = maxSetVersion ?? null;
this.commonWireVersion = commonWireVersion ?? 0;
this.commonWireVersion = commonWireVersion ?? null;

// determine server compatibility
for (const serverDescription of this.servers.values()) {
Expand Down
69 changes: 69 additions & 0 deletions test/integration/crud/aggregation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,73 @@ describe('Aggregation', function () {
.finally(() => client.close());
}
});

it(
'should perform aggregations with a write stage on secondary when readPreference is secondary',
{
metadata: { requires: { topology: 'replicaset' } },
async test() {
const databaseName = this.configuration.db;
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
maxPoolSize: 1,
monitorCommands: true
});

const events = [];
client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events));

// Discover primary to be able to check the actual server address
await client.db('admin').command({ hello: 1 });
const [helloEvent] = events;
const primaryAddress = helloEvent.address;

// Clear events
events.length = 0;

const src = client.db(databaseName).collection('read_pref_src');
const outMerge = client.db(databaseName).collection('read_pref_merge_out');
const outOut = client.db(databaseName).collection('read_pref_out_out');

await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]);
await src.insertMany([{ a: 1 }, { a: 2 }]);
await Promise.all([
src
.aggregate(
[
{
$merge: {
into: 'read_pref_merge_out',
whenMatched: 'replace',
whenNotMatched: 'insert'
}
}
],
{ readPreference: 'secondary' }
)
.toArray(),
src
.aggregate(
[
{
$out: 'read_pref_out_out'
}
],
{ readPreference: 'secondary' }
)
.toArray()
]);

expect(events).to.have.length(2);
events.forEach(event => {
expect(event).to.have.property('commandName', 'aggregate');
expect(event.address).to.not.equal(primaryAddress);
expect(event).to.have.deep.nested.property('command.$readPreference', {
mode: 'secondary'
});
});

await client.close();
}
}
);
});
63 changes: 63 additions & 0 deletions test/unit/sdam/topology_description.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { expect } from 'chai';

import { TopologyType } from '../../../src/sdam/common';
import { ServerDescription } from '../../../src/sdam/server_description';
import { TopologyDescription } from '../../../src/sdam/topology_description';

describe('TopologyDescription', function () {
describe('#constructor', function () {
it('sets commonWireVersion to null', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

expect(initial.commonWireVersion).to.equal(null);
});
});

describe('update()', function () {
it('initializes commonWireVersion from first non-zero maxWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const updated = initial.update(sd1);

expect(updated.commonWireVersion).to.equal(25);
});

it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sd2 = new ServerDescription('b:27017', {
maxWireVersion: 21
});

let updated = initial.update(sd1);
updated = updated.update(sd2);

expect(updated.commonWireVersion).to.equal(21);
});

it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sdUnknown = new ServerDescription('b:27017', {
maxWireVersion: 0
});

let updated = initial.update(sd1);
updated = updated.update(sdUnknown);

expect(updated.commonWireVersion).to.equal(25);
});
});
});