Skip to content

Commit 2c74066

Browse files
authored
SWIFT-1392 Add MongoCursor and ChangeStream async/await APIs (#706)
1 parent ea257f6 commit 2c74066

File tree

10 files changed

+540
-23
lines changed

10 files changed

+540
-23
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#if compiler(>=5.5) && canImport(_Concurrency) && os(Linux)
2+
/// Extension to `ChangeStream` to support async/await APIs.
3+
extension ChangeStream: AsyncSequence, AsyncIteratorProtocol {
4+
public typealias AsyncIterator = ChangeStream
5+
6+
public func makeAsyncIterator() -> ChangeStream<T> {
7+
self
8+
}
9+
10+
// TODO: SWIFT-1415 Make this a property rather than a method.
11+
/**
12+
* Indicates whether this change stream has the potential to return more data.
13+
*
14+
* This change stream will be dead after `next()` returns `nil`, but it may still be alive after `tryNext()`
15+
* returns `nil`.
16+
*
17+
* After either of `next()` or `tryNext()` throw a non-`DecodingError` error, this change stream will be dead.
18+
* It may still be alive after either returns a `DecodingError`, however.
19+
*/
20+
public func isAlive() async throws -> Bool {
21+
try await self.isAlive().get()
22+
}
23+
24+
/**
25+
* Get the next `T` from this change stream.
26+
*
27+
* This method will continue polling until an event is returned from the server, an error occurs,
28+
* or the current `Task` is cancelled. Each attempt to retrieve results will wait for a maximum of `maxAwaitTimeMS`
29+
* (specified on the `ChangeStreamOptions` passed to the method that created this change stream) before trying
30+
* again.
31+
*
32+
* We recommend to run change streams in their own `Task`s, and to terminate them by cancelling their `Task`s.
33+
*
34+
* - Warning: You *must not* call any change stream methods besides `isAlive()` while awaiting the result of this
35+
* method. Doing so will result in undefined behavior.
36+
*
37+
* - Returns:
38+
* The next `T` in this change stream, or `nil` if the change stream is exhausted or the current `Task` is
39+
* cancelled. This method will not return until one of those conditions is met, potentially after multiple
40+
* requests to the server.
41+
*
42+
* If an error is thrown, it is likely one of the following:
43+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
44+
* - `MongoError.LogicError` if this function is called after the change stream has been exhausted.
45+
* - `MongoError.LogicError` if this function is called and the session associated with this change stream has
46+
* been ended.
47+
* - `DecodingError` if an error occurs decoding the server's response to a `T`.
48+
*/
49+
public func next() async throws -> T? {
50+
while try await self.isAlive() {
51+
if Task.isCancelled {
52+
return nil
53+
}
54+
if let doc = try await self.tryNext() {
55+
return doc
56+
}
57+
await Task.yield()
58+
}
59+
60+
return nil
61+
}
62+
63+
/**
64+
* Attempt to get the next `T` from the change stream, returning `nil` if there are no results.
65+
*
66+
* The change stream will wait server-side for a maximum of `maxAwaitTimeMS` (specified on the
67+
* `ChangeStreamOptions` passed to the method that created this change stream) before returning `nil`.
68+
*
69+
* This method may be called repeatedly while `isAlive()` is true to retrieve new data.
70+
*
71+
* - Warning: You *must not* call any change stream methods besides `isAlive()` while awaiting the result of this
72+
* method. Doing so will result in undefined behavior.
73+
*
74+
* - Returns:
75+
* The next `T` in this change stream, or `nil` if there is no new data.
76+
*
77+
* If an error is thrown, it is likely one of the following:
78+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
79+
* - `MongoError.LogicError` if this function is called after the change stream has been exhausted.
80+
* - `MongoError.LogicError` if this function is called and the session associated with this change stream has
81+
* been ended.
82+
* - `DecodingError` if an error occurs decoding the server's response to a `T`.
83+
*/
84+
public func tryNext() async throws -> T? {
85+
try await self.tryNext().get()
86+
}
87+
88+
/**
89+
* Consolidate the currently available results of the change stream into an array of type `T`.
90+
*
91+
* Since `toArray` will only fetch the currently available results, it may return more data if it is called again
92+
* while the change stream is still alive.
93+
*
94+
* - Warning: You *must not* call any change stream methods besides `isAlive()` while awaiting the result of this
95+
* method. Doing so will result in undefined behavior.
96+
*
97+
* - Returns:
98+
* An `T` containing the results currently available in this change stream.
99+
*
100+
* If an error is thrown, it is likely one of the following:
101+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
102+
* - `MongoError.LogicError` if this function is called after the change stream has been exhausted.
103+
* - `MongoError.LogicError` if this function is called and the session associated with this change stream has
104+
* been ended.
105+
* - `DecodingError` if an error occurs decoding the server's responses to `T`s.
106+
*/
107+
public func toArray() async throws -> [T] {
108+
try await self.toArray().get()
109+
}
110+
}
111+
#endif

Sources/MongoSwift/AsyncAwait/MongoCollection+ChangeStreams+AsyncAwait.swift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ extension MongoCollection {
3232
try await self.watch(
3333
pipeline,
3434
options: options,
35-
session: session,
36-
withEventType: ChangeStreamEvent<CollectionType>.self
37-
)
35+
session: session
36+
).get()
3837
}
3938

4039
/**
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#if compiler(>=5.5) && canImport(_Concurrency) && os(Linux)
2+
/// Extension to `MongoCursor` to support async/await APIs.
3+
extension MongoCursor: AsyncSequence, AsyncIteratorProtocol {
4+
public typealias AsyncIterator = MongoCursor
5+
6+
public typealias Element = T
7+
8+
public func makeAsyncIterator() -> MongoCursor<T> {
9+
self
10+
}
11+
12+
// TODO: SWIFT-1415 Make this a property rather than a method.
13+
/**
14+
* Indicates whether this cursor has the potential to return more data.
15+
*
16+
* This method is mainly useful if this cursor is tailable, since in that case `tryNext()` may return more results
17+
* even after returning `nil`.
18+
*
19+
* If this cursor is non-tailable, it will always be dead after either `tryNext()` returns `nil` or a
20+
* non-`DecodingError` error.
21+
*
22+
* This cursor will be dead after `next()` returns `nil` or throws a non-`DecodingError` error, regardless of the
23+
* cursor type.
24+
*
25+
* This cursor may still be alive after `next()` or `tryNext()` throws a `DecodingError`.
26+
*/
27+
public func isAlive() async throws -> Bool {
28+
try await self.isAlive().get()
29+
}
30+
31+
/**
32+
* Returns the next `T` in this cursor, or `nil` if the cursor is exhausted or the current `Task` is cancelled.
33+
*
34+
* If this cursor is tailable, this method will continue polling until a non-empty batch is returned from the
35+
* server, or until the `Task` it is running in is cancelled. For this reason, we recommend to run tailable
36+
* cursors in their own `Task`s, and to terminate the cursor if/when needed by canceling the `Task`.
37+
*
38+
* - Warning: You *must not* call any cursor methods besides `isAlive()` while awaiting the result of this method.
39+
* Doing so will result in undefined behavior.
40+
*
41+
* - Returns:
42+
* The next `T` in this cursor, or `nil` if the cursor is exhausted or the current `Task` is cancelled.
43+
*
44+
* If an error is thrown, it is likely one of the following:
45+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
46+
* - `MongoError.LogicError` if this function is called after the cursor has been exhausted.
47+
* - `MongoError.LogicError` if this function is called and the session associated with this cursor has been
48+
* ended.
49+
* - `DecodingError` if an error occurs decoding the server's response to a `T`.
50+
*/
51+
public func next() async throws -> T? {
52+
while try await self.isAlive() {
53+
if Task.isCancelled {
54+
return nil
55+
}
56+
if let doc = try await self.tryNext() {
57+
return doc
58+
}
59+
await Task.yield()
60+
}
61+
62+
return nil
63+
}
64+
65+
/**
66+
* Attempt to get the next `T` from the cursor, returning `nil` if there are no results.
67+
*
68+
* If this cursor is tailable, this method may be called repeatedly while `isAlive()` returns true to retrieve new
69+
* data.
70+
*
71+
* If this cursor is a tailable await cursor, it will wait for results server side for a maximum of `maxAwaitTimeMS`
72+
* before evaluating to `nil`. This option can be configured via options passed to the method that created this
73+
* cursor (e.g. the `maxAwaitTimeMS` option on the `FindOptions` passed to `find`).
74+
*
75+
* - Warning: You *must not* call any cursor methods besides `isAlive()` while awaiting the result of this method.
76+
* Doing so will result in undefined behavior.
77+
*
78+
* - Returns:
79+
* The next `T` in this cursor, or `nil` if there is no new data.
80+
*
81+
* If an error is thrown, it is likely one of the following:
82+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
83+
* - `MongoError.LogicError` if this function is called after the cursor has been exhausted.
84+
* - `MongoError.LogicError` if this function is called and the session associated with this cursor has been
85+
* ended.
86+
* - `DecodingError` if an error occurs decoding the server's response to a `T`.
87+
*/
88+
public func tryNext() async throws -> T? {
89+
try await self.tryNext().get()
90+
}
91+
92+
/**
93+
* Consolidate the currently available results of the cursor into an array of type `T`.
94+
*
95+
* If this cursor is not tailable, this method will exhaust it.
96+
*
97+
* If this cursor is tailable, `toArray` will only fetch the currently available results, and it
98+
* may return more data if it is called again while the cursor is still alive.
99+
*
100+
* - Warning: You *must not* call any cursor methods besides `isAlive()` while awaiting the result of this method.
101+
* Doing so will result in undefined behavior.
102+
*
103+
* - Returns:
104+
* An `T` containing the results currently available in this cursor.
105+
*
106+
* If an error is thrown, it is likely one of the following:
107+
* - `MongoError.CommandError` if an error occurs while fetching more results from the server.
108+
* - `MongoError.LogicError` if this function is called after the cursor has been exhausted.
109+
* - `MongoError.LogicError` if this function is called and the session associated with this cursor has been
110+
* ended.
111+
* - `DecodingError` if an error occurs decoding the server's responses to `T`s.
112+
*/
113+
public func toArray() async throws -> [T] {
114+
try await self.toArray().get()
115+
}
116+
}
117+
#endif

Sources/MongoSwift/ChangeStream.swift

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal let changeStreamNamespaceKey = CodingUserInfoKey(rawValue: "namespace")
6161
/// A MongoDB change stream.
6262
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/
6363
public class ChangeStream<T: Codable>: CursorProtocol {
64-
internal typealias Element = T
64+
public typealias Element = T
6565

6666
/// The client this change stream descended from.
6767
private let client: MongoClient
@@ -270,8 +270,10 @@ public class ChangeStream<T: Codable>: CursorProtocol {
270270
* This method MAY be called if the change stream is already dead. It will have no effect.
271271
*
272272
* - Warning:
273-
* If this change stream is alive when it goes out of scope, it will leak resources. To ensure it
274-
* is dead before it leaves scope, invoke this method.
273+
* On Swift versions and platforms where structured concurrency is not available, if a change stream is alive
274+
* when it goes out of scope, it will leak resources. On those Swift versions/platforms, you must invoke this
275+
* method to ensure resources are properly cleaned up. If structured concurrency is available, it is not
276+
* necessary to call this method as resources will be cleaned up automatically during deinitialization.
275277
*
276278
* - Returns:
277279
* An `EventLoopFuture` that evaluates when the change stream has completed closing. This future should not fail.
@@ -281,4 +283,18 @@ public class ChangeStream<T: Codable>: CursorProtocol {
281283
self.wrappedCursor.kill()
282284
}
283285
}
286+
287+
#if compiler(>=5.5) && canImport(_Concurrency) && os(Linux)
288+
/// When concurrency is available, we can ensure change streams are always cleaned up properly.
289+
deinit {
290+
let client = self.client
291+
let el = self.eventLoop
292+
let wrappedCursor = self.wrappedCursor
293+
Task {
294+
try await client.operationExecutor.execute(on: el) {
295+
wrappedCursor.kill()
296+
}
297+
}
298+
}
299+
#endif
284300
}

Sources/MongoSwift/MongoCursor.swift

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,10 @@ public class MongoCursor<T: Codable>: CursorProtocol {
251251
* This method MAY be called if the cursor is already dead. It will have no effect.
252252
*
253253
* - Warning:
254-
* If this cursor is alive when it goes out of scope, it will leak resources. To ensure it
255-
* is dead before it leaves scope, invoke this method.
254+
* On Swift versions and platforms where structured concurrency is not available, if a cursor is alive when it
255+
* goes out of scope, it will leak resources. On those Swift versions/platforms, you must invoke this method
256+
* to ensure resources are properly cleaned up. If structured concurrency is available, it is not necessary to
257+
* call this method as resources will be cleaned up automatically during deinitialization.
256258
*
257259
* - Returns:
258260
* An `EventLoopFuture` that evaluates when the cursor has completed closing. This future should not fail.
@@ -262,4 +264,18 @@ public class MongoCursor<T: Codable>: CursorProtocol {
262264
self.wrappedCursor.kill()
263265
}
264266
}
267+
268+
#if compiler(>=5.5) && canImport(_Concurrency) && os(Linux)
269+
/// When concurrency is available, we can ensure cursors are always cleaned up properly.
270+
deinit {
271+
let client = self.client
272+
let el = self.eventLoop
273+
let wrappedCursor = self.wrappedCursor
274+
Task {
275+
try await client.operationExecutor.execute(on: el) {
276+
wrappedCursor.kill()
277+
}
278+
}
279+
}
280+
#endif
265281
}

Sources/MongoSwift/Operations/Operation.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ internal class OperationExecutor {
100100
self.threadPool.runIfActive(eventLoop: eventLoop ?? self.eventLoopGroup.next(), body)
101101
}
102102

103+
#if compiler(>=5.5) && canImport(_Concurrency) && os(Linux)
104+
internal func execute<T>(on eventLoop: EventLoop?, _ body: @escaping () throws -> T) async throws -> T {
105+
try await self.execute(on: eventLoop, body).get()
106+
}
107+
#endif
108+
103109
internal func makeFailedFuture<T>(_ error: Error, on eventLoop: EventLoop?) -> EventLoopFuture<T> {
104110
let ev = eventLoop ?? self.eventLoopGroup.next()
105111
return ev.makeFailedFuture(error)

0 commit comments

Comments
 (0)