|
12 | 12 | import DequeModule |
13 | 13 |
|
14 | 14 | @available(AsyncAlgorithms 1.0, *) |
15 | | -extension AsyncSequence where Self: Sendable { |
16 | | - /// Transforms elements into new asynchronous sequences, emitting elements |
17 | | - /// from the most recent inner sequence. |
18 | | - /// |
19 | | - /// When a new element is emitted by this sequence, the `transform` |
20 | | - /// is called to produce a new inner sequence. Iteration on the |
21 | | - /// previous inner sequence is cancelled, and iteration begins |
22 | | - /// on the new one. |
23 | | - public func flatMapLatest<T: AsyncSequence & Sendable>( |
24 | | - _ transform: @escaping @Sendable (Element) -> T |
25 | | - ) -> AsyncFlatMapLatestSequence<Self, T> { |
26 | | - return AsyncFlatMapLatestSequence(self, transform: transform) |
27 | | - } |
28 | | -} |
29 | | - |
30 | | -@available(AsyncAlgorithms 1.0, *) |
31 | | -public struct AsyncFlatMapLatestSequence<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable>: AsyncSequence, Sendable where Base.Element: Sendable, Inner.Element: Sendable { |
32 | | - public typealias Element = Inner.Element |
33 | | - |
34 | | - let base: Base |
35 | | - let transform: @Sendable (Base.Element) -> Inner |
36 | | - |
37 | | - init(_ base: Base, transform: @escaping @Sendable (Base.Element) -> Inner) { |
38 | | - self.base = base |
39 | | - self.transform = transform |
40 | | - } |
41 | | - |
42 | | - public func makeAsyncIterator() -> Iterator { |
43 | | - return Iterator(base: base, transform: transform) |
44 | | - } |
45 | | - |
46 | | - public struct Iterator: AsyncIteratorProtocol, Sendable { |
47 | | - fileprivate let storage: FlatMapLatestStorage<Base, Inner> |
48 | | - |
49 | | - init(base: Base, transform: @escaping @Sendable (Base.Element) -> Inner) { |
50 | | - self.storage = FlatMapLatestStorage(base: base, transform: transform) |
51 | | - } |
52 | | - |
53 | | - public func next() async throws -> Element? { |
54 | | - return try await storage.next() |
55 | | - } |
56 | | - } |
57 | | -} |
58 | | - |
59 | | -@available(AsyncAlgorithms 1.0, *) |
60 | | -private final class FlatMapLatestStorage<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable>: @unchecked Sendable where Base.Element: Sendable, Inner.Element: Sendable { |
61 | | - typealias Element = Inner.Element |
62 | | - |
63 | | - private let lock = Lock.allocate() |
64 | | - private var stateMachine: FlatMapLatestStateMachine<Base, Inner> |
65 | | - |
66 | | - init(base: Base, transform: @escaping @Sendable (Base.Element) -> Inner) { |
67 | | - self.stateMachine = FlatMapLatestStateMachine(base: base, transform: transform) |
68 | | - } |
69 | | - |
70 | | - deinit { |
71 | | - lock.deinitialize() |
72 | | - } |
73 | | - |
74 | | - func next() async throws -> Element? { |
75 | | - return try await withTaskCancellationHandler { |
76 | | - lock.lock() |
77 | | - let action = stateMachine.next() |
78 | | - |
79 | | - switch action { |
80 | | - case .returnElement(let element): |
81 | | - lock.unlock() |
82 | | - return element |
83 | | - |
84 | | - case .returnNil: |
85 | | - lock.unlock() |
86 | | - return nil |
87 | | - |
88 | | - case .throwError(let error): |
89 | | - lock.unlock() |
90 | | - throw error |
91 | | - |
92 | | - case .startOuterTask(let base): |
93 | | - // We need to start the outer task and then suspend |
94 | | - startOuterTask(base) |
95 | | - |
96 | | - return try await suspend() |
97 | | - |
98 | | - case .suspend: |
99 | | - lock.unlock() |
100 | | - return try await suspend() |
101 | | - } |
102 | | - } onCancel: { |
103 | | - let action = lock.withLock { stateMachine.cancelled() } |
104 | | - handleAction(action) |
105 | | - } |
106 | | - } |
107 | | - |
108 | | - private func suspend() async throws -> Element? { |
109 | | - return try await withUnsafeThrowingContinuation { continuation in |
110 | | - let action = lock.withLock { stateMachine.next(for: continuation) } |
111 | | - |
112 | | - switch action { |
113 | | - case .resumeOuterContinuation(let continuation): |
114 | | - continuation.resume() |
115 | | - case .resumeInnerContinuation(let continuation): |
116 | | - continuation.resume() |
117 | | - case .none: |
118 | | - break |
119 | | - } |
120 | | - } |
121 | | - } |
122 | | - |
123 | | - private func startOuterTask(_ base: Base) { |
124 | | - let task = Task { |
125 | | - var iterator = base.makeAsyncIterator() |
126 | | - |
127 | | - loop: while true { |
128 | | - // Create a continuation to wait for demand |
129 | | - do { |
130 | | - try await withUnsafeThrowingContinuation { continuation in |
131 | | - let action = lock.withLock { stateMachine.outerTaskSuspended(continuation) } |
132 | | - |
133 | | - switch action { |
134 | | - case .resumeOuterContinuation(let continuation): |
135 | | - continuation.resume() |
136 | | - case .resumeInnerContinuation(let continuation): |
137 | | - continuation.resume() |
138 | | - case .none: |
139 | | - break |
140 | | - } |
141 | | - } |
142 | | - } catch { |
143 | | - // Cancellation or other error during suspension |
144 | | - let action = lock.withLock { stateMachine.outerThrew(error) } |
145 | | - handleAction(action) |
146 | | - break loop |
147 | | - } |
148 | | - |
149 | | - do { |
150 | | - if let element = try await iterator.next() { |
151 | | - let action = lock.withLock { stateMachine.outerElementProduced(element) } |
152 | | - handleAction(action) |
153 | | - } else { |
154 | | - let action = lock.withLock { stateMachine.outerFinished() } |
155 | | - handleAction(action) |
156 | | - break loop |
157 | | - } |
158 | | - } catch { |
159 | | - let action = lock.withLock { stateMachine.outerThrew(error) } |
160 | | - handleAction(action) |
161 | | - break loop |
162 | | - } |
163 | | - } |
164 | | - } |
165 | | - stateMachine.outerTaskStarted(task) |
166 | | - lock.unlock() |
167 | | - } |
168 | | - |
169 | | - private func startInnerTask(_ inner: Inner, generation: Int) { |
170 | | - let task = Task { |
171 | | - var iterator = inner.makeAsyncIterator() |
172 | | - |
173 | | - loop: while true { |
174 | | - // Wait for demand |
175 | | - do { |
176 | | - try await withUnsafeThrowingContinuation { continuation in |
177 | | - let action = lock.withLock { stateMachine.innerTaskSuspended(continuation, generation: generation) } |
178 | | - |
179 | | - switch action { |
180 | | - case .resumeInnerContinuation(let continuation): |
181 | | - continuation.resume() |
182 | | - case .resumeOuterContinuation(let continuation): |
183 | | - continuation.resume() |
184 | | - case .none: |
185 | | - break |
186 | | - } |
187 | | - } |
188 | | - } catch { |
189 | | - // Cancellation |
190 | | - let action = lock.withLock { stateMachine.innerThrew(error, generation: generation) } |
191 | | - handleAction(action) |
192 | | - break loop |
193 | | - } |
194 | | - |
195 | | - do { |
196 | | - if let element = try await iterator.next() { |
197 | | - let action = lock.withLock { stateMachine.innerElementProduced(element, generation: generation) } |
198 | | - handleAction(action) |
199 | | - } else { |
200 | | - let action = lock.withLock { stateMachine.innerFinished(generation: generation) } |
201 | | - handleAction(action) |
202 | | - break loop |
203 | | - } |
204 | | - } catch { |
205 | | - let action = lock.withLock { stateMachine.innerThrew(error, generation: generation) } |
206 | | - handleAction(action) |
207 | | - break loop |
208 | | - } |
209 | | - } |
210 | | - } |
211 | | - stateMachine.innerTaskStarted(task, generation: generation) |
212 | | - } |
213 | | - |
214 | | - private func handleAction(_ action: FlatMapLatestStateMachine<Base, Inner>.Action) { |
215 | | - switch action { |
216 | | - case .startInnerTask(let inner, let generation, let previousTask, let previousCont): |
217 | | - if let previousTask = previousTask { |
218 | | - previousTask.cancel() |
219 | | - previousCont?.resume(throwing: CancellationError()) |
220 | | - } |
221 | | - startInnerTask(inner, generation: generation) |
222 | | - |
223 | | - case .cancelInnerTask(let task, let continuation): |
224 | | - task.cancel() |
225 | | - continuation?.resume(throwing: CancellationError()) |
226 | | - |
227 | | - case .resumeDownstream(let continuation, let result): |
228 | | - continuation.resume(with: result) |
229 | | - |
230 | | - case .resumeOuterContinuation(let continuation): |
231 | | - continuation.resume() |
232 | | - |
233 | | - case .cancelTasks(let outer, let inner, let outerCont, let innerCont): |
234 | | - outer?.cancel() |
235 | | - inner?.cancel() |
236 | | - outerCont?.resume(throwing: CancellationError()) |
237 | | - innerCont?.resume(throwing: CancellationError()) |
238 | | - |
239 | | - case .none: |
240 | | - break |
241 | | - } |
242 | | - } |
243 | | -} |
244 | | - |
245 | | -@available(AsyncAlgorithms 1.0, *) |
246 | | -private struct FlatMapLatestStateMachine<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable> where Base.Element: Sendable, Inner.Element: Sendable { |
| 15 | +struct FlatMapLatestStateMachine<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable> where Base.Element: Sendable, Inner.Element: Sendable { |
247 | 16 | typealias Element = Inner.Element |
248 | 17 |
|
249 | 18 | private enum State { |
|
0 commit comments