Skip to content

Commit da86e1b

Browse files
authored
SWIFT-673 Make MongoSwift.ChangeStream async, and implement MongoSwiftSync.ChangeStream (#395)
This also completes SWIFT-688
1 parent 142b2bb commit da86e1b

24 files changed

+977
-611
lines changed
Lines changed: 125 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,31 @@
11
import CLibMongoC
2+
import Foundation
3+
import NIO
24

3-
internal let ClosedChangeStreamError =
4-
LogicError(message: "Cannot advance a completed or failed change stream.")
5+
/// Direct wrapper of a `mongoc_change_stream_t`.
6+
private struct MongocChangeStream: MongocCursorWrapper {
7+
internal let pointer: OpaquePointer
8+
9+
fileprivate init(stealing ptr: OpaquePointer) {
10+
self.pointer = ptr
11+
}
12+
13+
internal func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool {
14+
return mongoc_change_stream_error_document(self.pointer, &bsonError, replyPtr)
15+
}
16+
17+
internal func next(outPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool {
18+
return mongoc_change_stream_next(self.pointer, outPtr)
19+
}
20+
21+
internal func more() -> Bool {
22+
return true
23+
}
24+
25+
internal func destroy() {
26+
mongoc_change_stream_destroy(self.pointer)
27+
}
28+
}
529

630
/// A token used for manually resuming a change stream. Pass this to the `resumeAfter` field of
731
/// `ChangeStreamOptions` to resume or start a change stream where a previous one left off.
@@ -27,158 +51,135 @@ public struct ResumeToken: Codable, Equatable {
2751
// sourcery: skipSyncExport
2852
/// A MongoDB change stream.
2953
/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/
30-
public class ChangeStream<T: Codable>: Sequence, IteratorProtocol {
31-
/// Enum for tracking the state of a change stream.
32-
internal enum State {
33-
/// Indicates that the change stream is still open. Stores a pointer to the `mongoc_change_stream_t`, along
34-
/// with the source connection, client, and possibly session to ensure they are kept alive as long
35-
/// as the change stream is.
36-
case open(
37-
changeStream: OpaquePointer,
38-
connection: Connection,
39-
client: MongoClient,
40-
session: ClientSession?
41-
)
42-
case closed
43-
}
44-
45-
/// The state of this change stream.
46-
internal private(set) var state: State
54+
public class ChangeStream<T: Codable>: CursorProtocol {
55+
internal typealias Element = T
4756

48-
/// A `ResumeToken` associated with the most recent event seen by the change stream.
49-
public internal(set) var resumeToken: ResumeToken?
57+
/// The client this change stream descended from.
58+
private let client: MongoClient
5059

5160
/// Decoder for decoding documents into type `T`.
52-
internal let decoder: BSONDecoder
53-
54-
/// The error that occurred while iterating the change stream, if one exists. This should be used to check
55-
/// for errors after `next()` returns `nil`.
56-
public private(set) var error: Error?
57-
58-
/**
59-
* Retrieves any error that occured in mongoc or on the server while iterating the change stream. Returns nil if
60-
* this change stream is already closed, or if no error occurred.
61-
* - Errors:
62-
* - `DecodingError` if an error occurs while decoding the server's response.
63-
*/
64-
private func getChangeStreamError() -> Error? {
65-
guard case let .open(changeStream, _, _, _) = self.state else {
66-
return nil
67-
}
68-
69-
var replyPtr = UnsafeMutablePointer<BSONPointer?>.allocate(capacity: 1)
70-
defer {
71-
replyPtr.deinitialize(count: 1)
72-
replyPtr.deallocate()
73-
}
74-
75-
var error = bson_error_t()
76-
guard mongoc_change_stream_error_document(changeStream, &error, replyPtr) else {
77-
return nil
78-
}
79-
80-
// If a reply is present, it implies the error occurred on the server. This *should* always be a commandError,
81-
// but we will still parse the mongoc error to cover all cases.
82-
if let docPtr = replyPtr.pointee {
83-
// we have to copy because libmongoc owns the pointer.
84-
let reply = Document(copying: docPtr)
85-
return extractMongoError(error: error, reply: reply)
86-
}
61+
private let decoder: BSONDecoder
8762

88-
// Otherwise, the only feasible error is that the user tried to advance a dead change stream cursor,
89-
// which is a logic error. We will still parse the mongoc error to cover all cases.
90-
return extractMongoError(error: error)
91-
}
63+
/// The cursor this change stream is wrapping.
64+
private let wrappedCursor: Cursor<MongocChangeStream>
9265

93-
/// Returns the next `T` in the change stream or nil if there is no next value. Will block for a maximum of
94-
/// `maxAwaitTimeMS` milliseconds as specified in the `ChangeStreamOptions`, or for the server default timeout
95-
/// if omitted.
96-
public func next() -> T? {
97-
// We already closed the mongoc change stream, either because we reached the end or encountered an error.
98-
guard case let .open(_, connection, client, session) = self.state else {
99-
self.error = ClosedChangeStreamError
100-
return nil
101-
}
102-
do {
103-
let operation = NextOperation(target: .changeStream(self))
104-
guard let out = try client.executeOperation(operation, using: connection, session: session) else {
105-
self.error = self.getChangeStreamError()
106-
if self.error != nil {
107-
self.close()
108-
}
109-
return nil
110-
}
111-
return out
112-
} catch {
113-
self.error = error
114-
self.close()
66+
/// Process an event before returning it to the user.
67+
private func processEvent(_ event: Document?) throws -> T? {
68+
guard let event = event else {
11569
return nil
11670
}
117-
}
118-
119-
/**
120-
* Returns the next `T` in this change stream or `nil`, or throws an error if one occurs -- compared to `next()`,
121-
* which returns `nil` and requires manually checking for an error afterward. Will block for a maximum of
122-
* `maxAwaitTimeMS` milliseconds as specified in the `ChangeStreamOptions`, or for the server default timeout if
123-
* omitted.
124-
* - Returns: the next `T` in this change stream, or `nil` if at the end of the change stream cursor.
125-
* - Throws:
126-
* - `CommandError` if an error occurs on the server while iterating the change stream cursor.
127-
* - `LogicError` if this function is called and the session associated with this change stream is
128-
* inactive.
129-
* - `DecodingError` if an error occurs while decoding the server's response.
130-
*/
131-
public func nextOrError() throws -> T? {
132-
if let next = self.next() {
133-
return next
134-
}
135-
136-
if let error = self.error {
137-
throw error
71+
// Update the resumeToken with the `_id` field from the document.
72+
guard let resumeToken = event["_id"]?.documentValue else {
73+
throw InternalError(message: "_id field is missing from the change stream document.")
13874
}
139-
return nil
75+
self.resumeToken = ResumeToken(resumeToken)
76+
return try self.decoder.decode(T.self, from: event)
14077
}
14178

142-
/**
143-
* Initializes a `ChangeStream`.
144-
* - Throws:
145-
* - `CommandError` if an error occurred on the server when creating the `mongoc_change_stream_t`.
146-
* - `InvalidArgumentError` if the `mongoc_change_stream_t` was created with invalid options.
147-
*/
14879
internal init(
149-
stealing changeStream: OpaquePointer,
80+
stealing changeStreamPtr: OpaquePointer,
15081
connection: Connection,
15182
client: MongoClient,
15283
session: ClientSession?,
15384
decoder: BSONDecoder,
15485
options: ChangeStreamOptions?
15586
) throws {
156-
self.state = .open(changeStream: changeStream, connection: connection, client: client, session: session)
87+
let mongocChangeStream = MongocChangeStream(stealing: changeStreamPtr)
88+
self.wrappedCursor = try Cursor(
89+
mongocCursor: mongocChangeStream,
90+
connection: connection,
91+
session: session,
92+
type: .tailableAwait
93+
)
94+
self.client = client
15795
self.decoder = decoder
15896

15997
// TODO: SWIFT-519 - Starting 4.2, update resumeToken to startAfter (if set).
16098
// startAfter takes precedence over resumeAfter.
16199
if let resumeAfter = options?.resumeAfter {
162100
self.resumeToken = resumeAfter
163101
}
102+
}
103+
104+
deinit {
105+
assert(!self.isAlive, "change stream wasn't closed")
106+
}
107+
108+
/// Indicates whether this change stream has the potential to return more data.
109+
public var isAlive: Bool {
110+
return self.wrappedCursor.isAlive
111+
}
164112

165-
if let err = self.getChangeStreamError() {
166-
self.close()
167-
throw err
113+
/// The `ResumeToken` associated with the most recent event seen by the change stream.
114+
public internal(set) var resumeToken: ResumeToken?
115+
116+
/**
117+
* Get the next `T` from this change stream.
118+
*
119+
* This method will continue polling until an event is returned from the server, an error occurs,
120+
* or the change stream is killed. Each attempt to retrieve results will wait for a maximum of `maxAwaitTimeMS`
121+
* (specified on the `ChangeStreamOptions` passed to the method that created this change stream) before trying
122+
* again.
123+
*
124+
* A thread from the pool will be occupied until the returned future is completed, so performance degradation
125+
* is possible if the number of polling change streams is too close to the total number of threads in the thread
126+
* pool. To configure the total number of threads in the pool, set the `ClientOptions.threadPoolSize` option
127+
* during client creation.
128+
*
129+
* - Returns:
130+
* An `EventLoopFuture<T?>` evaluating to the next `T` in this change stream, `nil` if the change stream is
131+
* exhausted, or an error if one occurred. The returned future will not resolve until one of those conditions is
132+
* met, potentially after multiple requests to the server.
133+
*
134+
* If the future evaluates to an error, it is likely one of the following:
135+
* - `CommandError` if an error occurs while fetching more results from the server.
136+
* - `LogicError` if this function is called after the change stream has died.
137+
* - `LogicError` if this function is called and the session associated with this change stream is inactive.
138+
* - `DecodingError` if an error occurs decoding the server's response.
139+
*/
140+
public func next() -> EventLoopFuture<T?> {
141+
return self.client.operationExecutor.execute {
142+
try self.processEvent(self.wrappedCursor.next())
168143
}
169144
}
170145

171-
/// Cleans up internal state.
172-
private func close() {
173-
guard case let .open(changeStream, _, _, _) = self.state else {
174-
return
146+
/**
147+
* Attempt to get the next `T` from this change stream, returning `nil` if there are no results.
148+
*
149+
* The change stream will wait server-side for a maximum of `maxAwaitTimeMS` (specified on the `ChangeStreamOptions`
150+
* passed to the method that created this change stream) before returning `nil`.
151+
*
152+
* This method may be called repeatedly while `isAlive` is true to retrieve new data.
153+
*
154+
* - Returns:
155+
* An `EventLoopFuture<T?>` containing the next `T` in this change stream, an error if one occurred, or `nil` if
156+
* there was no data.
157+
*
158+
* If the future evaluates to an error, it is likely one of the following:
159+
* - `CommandError` if an error occurs while fetching more results from the server.
160+
* - `LogicError` if this function is called after the change stream has died.
161+
* - `LogicError` if this function is called and the session associated with this change stream is inactive.
162+
* - `DecodingError` if an error occurs decoding the server's response.
163+
*/
164+
public func tryNext() -> EventLoopFuture<T?> {
165+
return self.client.operationExecutor.execute {
166+
try self.processEvent(self.wrappedCursor.tryNext())
175167
}
176-
mongoc_change_stream_destroy(changeStream)
177-
self.state = .closed
178168
}
179169

180-
/// Closes the cursor if it hasn't been closed already.
181-
deinit {
182-
self.close()
170+
/**
171+
* Kill this change stream.
172+
*
173+
* This method MUST be called before this change stream goes out of scope to prevent leaking resources.
174+
* This method may be called even if there are unresolved futures created from other `ChangeStream` methods.
175+
* This method will have no effect if the change stream is already dead.
176+
*
177+
* - Returns:
178+
* An `EventLoopFuture` that evaluates when the change stream has completed closing. This future should not fail.
179+
*/
180+
public func kill() -> EventLoopFuture<Void> {
181+
return self.client.operationExecutor.execute {
182+
self.wrappedCursor.kill()
183+
}
183184
}
184185
}

0 commit comments

Comments
 (0)