Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
wireVersion?: number,
wireVersion: number | null,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
get commonWireVersion(): number | null {
return this.description.commonWireVersion;
}

Expand Down
4 changes: 2 additions & 2 deletions src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class TopologyDescription {
logicalSessionTimeoutMinutes: number | null;
heartbeatFrequencyMS: number;
localThresholdMS: number;
commonWireVersion: number;
commonWireVersion: number | null;
/**
* Create a TopologyDescription
*/
Expand All @@ -66,7 +66,7 @@ export class TopologyDescription {
this.setName = setName ?? null;
this.maxElectionId = maxElectionId ?? null;
this.maxSetVersion = maxSetVersion ?? null;
this.commonWireVersion = commonWireVersion ?? 0;
this.commonWireVersion = commonWireVersion ?? null;

// determine server compatibility
for (const serverDescription of this.servers.values()) {
Expand Down
69 changes: 69 additions & 0 deletions test/integration/crud/aggregation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,73 @@ describe('Aggregation', function () {
.finally(() => client.close());
}
});

it(
'should perform aggregations with a write stage on secondary when readPreference is secondary',
{
metadata: { requires: { topology: 'replicaset' } },
async test() {
const databaseName = this.configuration.db;
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
maxPoolSize: 1,
monitorCommands: true
});

const events = [];
client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events));

// Discover primary to be able to check the actual server address
await client.db('admin').command({ hello: 1 });
const [helloEvent] = events;
const primaryAddress = helloEvent.address;

// Clear events
events.length = 0;

const src = client.db(databaseName).collection('read_pref_src');
const outMerge = client.db(databaseName).collection('read_pref_merge_out');
const outOut = client.db(databaseName).collection('read_pref_out_out');

await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]);
await src.insertMany([{ a: 1 }, { a: 2 }]);
await Promise.all([
src
.aggregate(
[
{
$merge: {
into: 'read_pref_merge_out',
whenMatched: 'replace',
whenNotMatched: 'insert'
}
}
],
{ readPreference: 'secondary' }
)
.toArray(),
src
.aggregate(
[
{
$out: 'read_pref_out_out'
}
],
{ readPreference: 'secondary' }
)
.toArray()
]);

expect(events).to.have.length(2);
events.forEach(event => {
expect(event).to.have.property('commandName', 'aggregate');
expect(event.address).to.not.equal(primaryAddress);
expect(event).to.have.deep.nested.property('command.$readPreference', {
mode: 'secondary'
});
});

await client.close();
}
}
);
});
55 changes: 2 additions & 53 deletions test/unit/sdam/server_selection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,23 +345,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.Sharded,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a mongos', function () {
expect(servers).to.deep.equal([mongos]);
});
});
});

context('when the topology is load balanced', function () {
Expand Down Expand Up @@ -431,23 +414,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.LoadBalanced,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a load balancer', function () {
expect(servers).to.deep.equal([loadBalancer]);
});
});
});

context('when the topology is single', function () {
Expand Down Expand Up @@ -517,23 +483,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.Single,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a standalone', function () {
expect(servers).to.deep.equal([single]);
});
});
});

context('localThresholdMS is respected as an option', function () {
Expand Down Expand Up @@ -580,7 +529,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION);
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));
expect(servers).to.have.lengthOf(2);
const selectedAddresses = new Set(servers.map(({ address }) => address));
Expand All @@ -599,7 +548,7 @@ describe('server selection', function () {
MIN_SECONDARY_WRITE_WIRE_VERSION,
{ localThresholdMS: 5 }
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION);
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));
expect(servers).to.have.lengthOf(1);
const selectedAddresses = new Set(servers.map(({ address }) => address));
Expand Down
63 changes: 63 additions & 0 deletions test/unit/sdam/topology_description.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { expect } from 'chai';

import { TopologyType } from '../../../src/sdam/common';
import { ServerDescription } from '../../../src/sdam/server_description';
import { TopologyDescription } from '../../../src/sdam/topology_description';

describe('TopologyDescription', function () {
describe('#constructor', function () {
it('sets commonWireVersion to null', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

expect(initial.commonWireVersion).to.equal(null);
});
});

describe('update()', function () {
it('initializes commonWireVersion from first non-zero maxWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const updated = initial.update(sd1);

expect(updated.commonWireVersion).to.equal(25);
});

it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sd2 = new ServerDescription('b:27017', {
maxWireVersion: 21
});

let updated = initial.update(sd1);
updated = updated.update(sd2);

expect(updated.commonWireVersion).to.equal(21);
});

it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sdUnknown = new ServerDescription('b:27017', {
maxWireVersion: 0
});

let updated = initial.update(sd1);
updated = updated.update(sdUnknown);

expect(updated.commonWireVersion).to.equal(25);
});
});
});