Skip to content

Commit a49e6ee

Browse files
authored
SWIFT-674 Make MongoSwift.ClientSession async, and implement MongoSwiftSync.ClientSession (#391)
1 parent 0059a7a commit a49e6ee

16 files changed

+478
-281
lines changed

.swiftlint.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ opt_in_rules:
2525
- missing_docs
2626
- modifier_order
2727
- multiline_arguments
28-
- multiline_function_chains
2928
- multiline_literal_brackets
3029
- multiline_parameters
3130
- operator_usage_whitespace

Package.resolved

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Sources/MongoSwift/ClientSession.swift

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import CLibMongoC
22
import Foundation
3+
import NIO
34

45
/**
56
* A MongoDB client session.
@@ -111,23 +112,23 @@ public final class ClientSession {
111112
public let options: ClientSessionOptions?
112113

113114
/// Initializes a new client session.
114-
internal init(client: MongoClient, options: ClientSessionOptions? = nil) throws {
115+
internal init(client: MongoClient, options: ClientSessionOptions? = nil) {
115116
self.options = options
116117
self.client = client
117118
self.state = .notStarted(opTime: nil, clusterTime: nil)
118119
}
119120

120121
/// Starts this session's corresponding libmongoc session, if it has not been started already. Throws an error if
121122
/// this session has already been ended.
122-
internal func startIfNeeded() throws {
123+
internal func startIfNeeded() -> EventLoopFuture<Void> {
123124
switch self.state {
124125
case .notStarted:
125126
let operation = StartSessionOperation(session: self)
126-
try self.client.executeOperation(operation)
127+
return self.client.operationExecutor.execute(operation, client: self.client, session: nil)
127128
case .started:
128-
return
129+
return self.client.operationExecutor.makeSucceededFuture(Void())
129130
case .ended:
130-
throw ClientSession.SessionInactiveError
131+
return self.client.operationExecutor.makeFailedFuture(ClientSession.SessionInactiveError)
131132
}
132133
}
133134

@@ -143,18 +144,28 @@ public final class ClientSession {
143144
return connection
144145
}
145146

146-
/// Destroy the underlying `mongoc_client_session_t` and ends this session. Has no effect if this session is
147-
/// already ended.
148-
internal func end() {
149-
if case let .started(session, _) = self.state {
150-
mongoc_client_session_destroy(session)
147+
/// Ends this `ClientSession`. Call this method when you are finished using the session. You must ensure that all
148+
/// operations using this session have completed before calling this. The returned future must be fulfilled before
149+
/// this session's parent `MongoClient` is closed.
150+
public func end() -> EventLoopFuture<Void> {
151+
switch self.state {
152+
case .notStarted, .ended:
153+
self.state = .ended
154+
return self.client.operationExecutor.makeSucceededFuture(Void())
155+
case let .started(session, _):
156+
return self.client.operationExecutor.execute {
157+
mongoc_client_session_destroy(session)
158+
self.state = .ended
159+
}
151160
}
152-
self.state = .ended
153161
}
154162

155163
/// Cleans up internal state.
156164
deinit {
157-
self.end()
165+
guard case .ended = self.state else {
166+
assertionFailure("ClientSession was not ended before going out of scope; please call ClientSession.end()")
167+
return
168+
}
158169
}
159170

160171
/**

Sources/MongoSwift/MongoClient.swift

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public class MongoClient {
292292

293293
/// Closes this `MongoClient`. Call this method exactly once when you are finished using the client. You must
294294
/// ensure that all operations using the client have completed before calling this. The returned future must be
295-
/// fulfilled before the client goes out of scope.
295+
/// fulfilled before the `EventLoopGroup` provided to this client's constructor is shut down.
296296
public func close() -> EventLoopFuture<Void> {
297297
return self.operationExecutor.execute {
298298
self.connectionPool.close()
@@ -303,14 +303,9 @@ public class MongoClient {
303303
}
304304
}
305305

306-
/**
307-
* Starts a new `ClientSession` with the provided options.
308-
*
309-
* - Throws:
310-
* - `RuntimeError.compatibilityError` if the deployment does not support sessions.
311-
*/
312-
public func startSession(options: ClientSessionOptions? = nil) throws -> ClientSession {
313-
return try ClientSession(client: self, options: options)
306+
/// Starts a new `ClientSession` with the provided options.
307+
public func startSession(options: ClientSessionOptions? = nil) -> ClientSession {
308+
return ClientSession(client: self, options: options)
314309
}
315310

316311
/**
@@ -322,11 +317,30 @@ public class MongoClient {
322317
*/
323318
public func withSession<T>(
324319
options: ClientSessionOptions? = nil,
325-
_ sessionBody: (ClientSession) throws -> T
326-
) throws -> T {
327-
let session = try ClientSession(client: self, options: options)
328-
defer { session.end() }
329-
return try sessionBody(session)
320+
_ sessionBody: (ClientSession) throws -> EventLoopFuture<T>
321+
) -> EventLoopFuture<T> {
322+
let promise = self.operationExecutor.makePromise(of: T.self)
323+
let session = self.startSession(options: options)
324+
do {
325+
let bodyFuture = try sessionBody(session)
326+
// regardless of whether body's returned future succeeds we want to call session.end() once its complete.
327+
// only once session.end() finishes can we fulfill the returned promise. otherwise the user can't tell if
328+
// it is safe to close the parent client of this session, and they could inadvertently close it before the
329+
// session is actually ended and its parent `mongoc_client_t` is returned to the pool.
330+
bodyFuture.flatMap { _ in
331+
session.end()
332+
}.flatMapError { _ in
333+
session.end()
334+
}.whenComplete { _ in
335+
promise.completeWith(bodyFuture)
336+
}
337+
} catch {
338+
session.end().whenComplete { _ in
339+
promise.fail(error)
340+
}
341+
}
342+
343+
return promise.futureResult
330344
}
331345

332346
/**

Sources/MongoSwift/Operations/Operation.swift

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,34 +47,46 @@ internal class OperationExecutor {
4747
return self.makeFailedFuture(MongoClient.ClosedClientError)
4848
}
4949

50+
let doOperation = { () -> T.OperationResult in
51+
// select a connection in following order of priority:
52+
// 1. connection specifically provided for use with this operation
53+
// 2. if a session was provided, use its underlying connection
54+
// 3. a new connection from the pool
55+
let connection = try connection ?? resolveConnection(client: client, session: session)
56+
return try operation.execute(using: connection, session: session)
57+
}
58+
5059
if let session = session {
5160
if case .ended = session.state {
5261
return self.makeFailedFuture(ClientSession.SessionInactiveError)
5362
}
5463
guard session.client == client else {
5564
return self.makeFailedFuture(ClientSession.ClientMismatchError)
5665
}
57-
}
5866

59-
return self.execute {
60-
// if a session was provided, start it if it hasn't been started already.
61-
try session?.startIfNeeded()
62-
// select a connection in following order of priority:
63-
// 1. connection specifically provided for use with this operation
64-
// 2. if a session was provided, use its underlying connection
65-
// 3. a new connection from the pool
66-
let connection = try connection ?? resolveConnection(client: client, session: session)
67-
return try operation.execute(using: connection, session: session)
67+
// start the session if needed (which generates a new operation itself), and then execute the operation.
68+
return session.startIfNeeded().flatMap { self.execute(doOperation) }
6869
}
70+
71+
// no session was provided, so we can just jump to executing the operation.
72+
return self.execute(doOperation)
6973
}
7074

71-
internal func execute<T>(body: @escaping () throws -> T) -> EventLoopFuture<T> {
75+
internal func execute<T>(_ body: @escaping () throws -> T) -> EventLoopFuture<T> {
7276
return self.threadPool.runIfActive(eventLoop: self.eventLoopGroup.next(), body)
7377
}
7478

7579
internal func makeFailedFuture<T>(_ error: Error) -> EventLoopFuture<T> {
7680
return self.eventLoopGroup.next().makeFailedFuture(error)
7781
}
82+
83+
internal func makeSucceededFuture<T>(_ value: T) -> EventLoopFuture<T> {
84+
return self.eventLoopGroup.next().makeSucceededFuture(value)
85+
}
86+
87+
internal func makePromise<T>(of type: T.Type) -> EventLoopPromise<T> {
88+
return self.eventLoopGroup.next().makePromise(of: type)
89+
}
7890
}
7991

8092
/// Given a client and optionally a session associated which are to be associated with an operation, returns a

Sources/MongoSwiftSync/ClientSession.swift

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,40 @@ public final class ClientSession {
3737
/// The most recent cluster time seen by this session. This value will be nil if either of the following are true:
3838
/// - No operations have been executed using this session and `advanceClusterTime` has not been called.
3939
/// - This session has been ended.
40-
public var clusterTime: Document? {
41-
fatalError("unimplemented")
42-
}
40+
public var clusterTime: Document? { return self.asyncSession.clusterTime }
4341

4442
/// The operation time of the most recent operation performed using this session. This value will be nil if either
4543
/// of the following are true:
4644
/// - No operations have been performed using this session and `advanceOperationTime` has not been called.
4745
/// - This session has been ended.
48-
public var operationTime: Timestamp? {
49-
fatalError("unimplemented")
50-
}
46+
public var operationTime: Timestamp? { return self.asyncSession.operationTime }
5147

5248
/// The options used to start this session.
53-
public let options: ClientSessionOptions?
49+
public var options: ClientSessionOptions? { return self.asyncSession.options }
5450

5551
/// Initializes a new client session.
56-
internal init(wrapping session: MongoSwift.ClientSession) throws {
57-
fatalError("unimplemented")
52+
internal init(client: MongoClient, options: ClientSessionOptions?) {
53+
self.client = client
54+
self.asyncSession = client.asyncClient.startSession(options: options)
55+
}
56+
57+
/// Ends the underlying async session.
58+
internal func end() {
59+
// we only call this method from places that we can't throw (deinit, defers) so we handle the error here
60+
// instead. the async method will only fail if the async client, thread pool, or event loop group have been
61+
// closed/ended. we manage the lifetimes of all of those ourselves, so if we hit the assertionFailure it's due
62+
// to a bug in our own code.
63+
do {
64+
try self.asyncSession.end().wait()
65+
} catch {
66+
assertionFailure("Error ending async session: \(error)")
67+
}
5868
}
5969

6070
/// Cleans up internal state.
6171
deinit {
62-
fatalError("unimplemented")
72+
// a repeated call to `end` is a no-op so it's ok to call this even if `end()` was already called explicitly.
73+
self.end()
6374
}
6475

6576
/**
@@ -71,7 +82,7 @@ public final class ClientSession {
7182
* - clusterTime: The session's new cluster time, as a `Document` like `["cluster time": Timestamp(...)]`
7283
*/
7384
public func advanceClusterTime(to clusterTime: Document) {
74-
fatalError("unimplemented")
85+
self.asyncSession.advanceClusterTime(to: clusterTime)
7586
}
7687

7788
/**
@@ -83,6 +94,6 @@ public final class ClientSession {
8394
* - operationTime: The session's new operationTime
8495
*/
8596
public func advanceOperationTime(to operationTime: Timestamp) {
86-
fatalError("unimplemented")
97+
self.asyncSession.advanceOperationTime(to: operationTime)
8798
}
8899
}

Sources/MongoSwiftSync/MongoClient.swift

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class MongoClient {
2222
private let eventLoopGroup: MultiThreadedEventLoopGroup
2323

2424
/// The underlying async client.
25-
private let asyncClient: MongoSwift.MongoClient
25+
internal let asyncClient: MongoSwift.MongoClient
2626

2727
/**
2828
* Create a new client connection to a MongoDB server. For options that included in both the connection string URI
@@ -65,14 +65,9 @@ public class MongoClient {
6565
}
6666
}
6767

68-
/**
69-
* Starts a new `ClientSession` with the provided options.
70-
*
71-
* - Throws:
72-
* - `RuntimeError.compatibilityError` if the deployment does not support sessions.
73-
*/
74-
public func startSession(options: ClientSessionOptions? = nil) throws -> ClientSession {
75-
fatalError("unimplemented")
68+
/// Starts a new `ClientSession` with the provided options.
69+
public func startSession(options: ClientSessionOptions? = nil) -> ClientSession {
70+
return ClientSession(client: self, options: options)
7671
}
7772

7873
/**
@@ -85,8 +80,10 @@ public class MongoClient {
8580
public func withSession<T>(
8681
options: ClientSessionOptions? = nil,
8782
_ sessionBody: (ClientSession) throws -> T
88-
) throws -> T {
89-
fatalError("unimplemented")
83+
) rethrows -> T {
84+
let session = self.startSession(options: options)
85+
defer { session.end() }
86+
return try sessionBody(session)
9087
}
9188

9289
/**
@@ -160,7 +157,7 @@ public class MongoClient {
160157
* - Returns: a `MongoDatabase` corresponding to the provided database name
161158
*/
162159
public func db(_ name: String, options: DatabaseOptions? = nil) -> MongoDatabase {
163-
return MongoDatabase(asyncDB: self.asyncClient.db(name, options: options))
160+
return MongoDatabase(client: self, asyncDB: self.asyncClient.db(name, options: options))
164161
}
165162

166163
/**

Sources/MongoSwiftSync/MongoCollection.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,13 @@ public struct MongoCollection<T: Codable> {
3838
/// The underlying asynchronous collection.
3939
internal let asyncColl: MongoSwift.MongoCollection<T>
4040

41+
/// The client this collection was derived from. We store this to ensure it remains open for as long as this object
42+
/// is in scope.
43+
private let client: MongoClient
44+
4145
/// Initializes a new `MongoCollection` instance wrapping the provided async collection.
42-
internal init(asyncCollection: MongoSwift.MongoCollection<T>) {
46+
internal init(client: MongoClient, asyncCollection: MongoSwift.MongoCollection<T>) {
47+
self.client = client
4348
self.asyncColl = asyncCollection
4449
}
4550

Sources/MongoSwiftSync/MongoDatabase.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,13 @@ public struct MongoDatabase {
2424
/// The underlying asynchronous database.
2525
private let asyncDB: MongoSwift.MongoDatabase
2626

27+
/// The client this database was derived from. We store this to ensure it remains open for as long as this object
28+
/// is in scope.
29+
private let client: MongoClient
30+
2731
/// Initializes a new `MongoDatabase` instance wrapping the provided async database.
28-
internal init(asyncDB: MongoSwift.MongoDatabase) {
32+
internal init(client: MongoClient, asyncDB: MongoSwift.MongoDatabase) {
33+
self.client = client
2934
self.asyncDB = asyncDB
3035
}
3136

@@ -78,7 +83,7 @@ public struct MongoDatabase {
7883
options: CollectionOptions? = nil
7984
) -> MongoCollection<T> {
8085
let asyncColl = self.asyncDB.collection(name, withType: T.self, options: options)
81-
return MongoCollection(asyncCollection: asyncColl)
86+
return MongoCollection(client: self.client, asyncCollection: asyncColl)
8287
}
8388

8489
/**
@@ -135,7 +140,7 @@ public struct MongoDatabase {
135140
options: options,
136141
session: session?.asyncSession)
137142
.wait()
138-
return MongoCollection(asyncCollection: asyncColl)
143+
return MongoCollection(client: self.client, asyncCollection: asyncColl)
139144
}
140145

141146
/**

0 commit comments

Comments
 (0)