Skip to content

Commit c5235f7

Browse files
committed
feat: Add initial naive implementation of flatMapLatest
This introduces a simplified version of the flatMapLatest operator for AsyncSequence, along with a basic unit test. Note that this implementation uses unstructured concurrency and may have race conditions regarding task cancellation.
1 parent 2773d41 commit c5235f7

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
import Foundation
13+
14+
@available(AsyncAlgorithms 1.0, *)
15+
extension AsyncSequence where Self: Sendable {
16+
17+
/// Transforms elements into new asynchronous sequences, emitting elements
18+
/// from the most recent inner sequence.
19+
///
20+
/// When a new element is emitted by this sequence, the `transform`
21+
/// is called to produce a new inner sequence. Iteration on the
22+
/// previous inner sequence is cancelled, and iteration begins
23+
/// on the new one.
24+
public func flatMapLatest<T: AsyncSequence & Sendable>(
25+
_ transform: @escaping @Sendable (Element) -> T
26+
) -> AsyncThrowingStream<T.Element, Error>
27+
where T.Element: Sendable {
28+
29+
AsyncThrowingStream { continuation in
30+
let outerIterationTask = Task {
31+
var innerIterationTask: Task<Void, Never>? = nil
32+
33+
do {
34+
for try await element in self {
35+
innerIterationTask?.cancel()
36+
37+
let innerSequence = transform(element)
38+
39+
innerIterationTask = Task {
40+
do {
41+
for try await innerElement in innerSequence {
42+
try Task.checkCancellation()
43+
continuation.yield(innerElement)
44+
}
45+
} catch is CancellationError {
46+
// Inner task was cancelled, this is normal
47+
} catch {
48+
// Inner sequence threw an error
49+
continuation.finish(throwing: error)
50+
}
51+
}
52+
}
53+
} catch {
54+
// Outer sequence threw an error
55+
continuation.finish(throwing: error)
56+
}
57+
58+
// Outer sequence finished
59+
await innerIterationTask?.value
60+
continuation.finish()
61+
}
62+
63+
continuation.onTermination = { @Sendable _ in
64+
outerIterationTask.cancel()
65+
}
66+
}
67+
}
68+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
import XCTest
13+
import AsyncAlgorithms
14+
15+
@available(macOS 15.0, *)
16+
final class TestFlatMapLatest: XCTestCase {
17+
18+
func test_simple_sequence() async throws {
19+
let source = [1, 2, 3].async
20+
let transformed = source.flatMapLatest { intValue in
21+
return [intValue, intValue * 10].async
22+
}
23+
24+
var expected = [3, 30]
25+
do {
26+
for try await element in transformed {
27+
let (e, ex) = (element, expected.removeFirst())
28+
print("\(e) == \(ex)")
29+
30+
XCTAssertEqual(e, ex)
31+
}
32+
} catch {
33+
XCTFail("Unexpected error: \(error)")
34+
}
35+
XCTAssertTrue(expected.isEmpty)
36+
}
37+
38+
}

0 commit comments

Comments
 (0)