Skip to content

Commit 6d016f0

Browse files
authored
Merge pull request #23 from sideeffect-io/refactor
flatMapLatest: uncomment tests
2 parents ac37743 + 0394947 commit 6d016f0

File tree

3 files changed

+117
-68
lines changed

3 files changed

+117
-68
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* [AsyncThrowingBufferedChannel](./Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift): Throwing buffered communication channel between tasks
1717

1818
### Subjects
19-
* [AsyncPassthroughSubject](./Sources/AsyncSubjects/AsyncPassthroughSubject.swift): Subject with a shared output
19+
* [AsyncPassthroughSubject](./Sources/AsyncSubjects/AsyncPassthroughSubject.swift): Subject with a shared output. The elements are not shared and will be spread across consumers (same as AsyncStream)
2020
* [AsyncThrowingPassthroughSubject](./Sources/AsyncSubjects/AsyncThrowingPassthroughSubject.swift): Throwing subject with a shared output
2121
* [AsyncCurrentValueSubject](./Sources/AsyncSubjects/AsyncCurrentValueSubject.swift): Subject with a shared output. Maintain an replays its current value
2222
* [AsyncThrowingCurrentValueSubject](./Sources/AsyncSubjects/AsyncThrowingCurrentValueSubject.swift): Throwing subject with a shared output. Maintain an replays its current value
@@ -45,13 +45,13 @@
4545
### Operators
4646
* [`handleEvents()`](./Sources/Operators/AsyncHandleEventsSequence.swift): Executes closures during the lifecycle of the self
4747
* [`mapToResult()`](./Sources/Operators/AsyncMapToResultSequence.swift): Maps elements and failure from self to a `Result` type
48-
* [`multicast(_:)`](./Sources/Operators/AsyncMulticastSequence.swift): Shares values from self to several consumers thanks to a provided Suject
4948
* [`prepend(_:)`](./Sources/Operators/AsyncPrependSequence.swift): Prepends an element to self
5049
* [`scan(_:_:)`](./Sources/Operators/AsyncScanSequence.swift): Transforms elements from self by providing the current element to a closure along with the last value returned by the closure
5150
* [`assign(_:)`](./Sources/Operators/AsyncSequence+Assign.swift): Assigns elements from self to a property
5251
* [`collect(_:)`](./Sources/Operators/AsyncSequence+Collect.swift): Iterate over elements from self and execute a closure
5352
* [`eraseToAnyAsyncSequence()`](./Sources/Operators/AsyncSequence+EraseToAnyAsyncSequence.swift): Erases to AnyAsyncSequence
5453
* [`flatMapLatest(_:)`](./Sources/Operators/AsyncSequence+FlatMapLatest.swift): Transforms elements from self into a `AsyncSequence` and republishes elements sent by the most recently received `AsyncSequence` when self is an `AsyncSequence` of `AsyncSequence`
54+
* [`multicast(_:)`](./Sources/Operators/AsyncMulticastSequence.swift): Shares values from self to several consumers thanks to a provided Suject
5555
* [`share()`](./Sources/Operators/AsyncSequence+Share.swift): Shares values from self to several consumers
5656
* [`switchToLatest()`](./Sources/Operators/AsyncSwitchToLatestSequence.swift): Republishes elements sent by the most recently received `AsyncSequence` when self is an `AsyncSequence` of `AsyncSequence`
5757

Tests/Operators/AsyncMulticastSequenceTests.swift

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ import XCTest
1111
private struct SpyAsyncSequenceForOnNextCall<Element>: AsyncSequence {
1212
typealias Element = Element
1313
typealias AsyncIterator = Iterator
14-
14+
1515
let onNext: () -> Void
16-
16+
1717
func makeAsyncIterator() -> AsyncIterator {
1818
Iterator(onNext: self.onNext)
1919
}
20-
20+
2121
struct Iterator: AsyncIteratorProtocol {
2222
let onNext: () -> Void
23-
23+
2424
func next() async throws -> Element? {
2525
self.onNext()
2626
try await Task.sleep(nanoseconds: 100_000_000_000)
@@ -32,26 +32,26 @@ private struct SpyAsyncSequenceForOnNextCall<Element>: AsyncSequence {
3232
private class SpyAsyncSequenceForNumberOfIterators<Element>: AsyncSequence {
3333
typealias Element = Element
3434
typealias AsyncIterator = Iterator
35-
35+
3636
let element: Element
3737
let numberOfTimes: Int
38-
38+
3939
var numberOfIterators = 0
40-
40+
4141
init(element: Element, numberOfTimes: Int) {
4242
self.element = element
4343
self.numberOfTimes = numberOfTimes
4444
}
45-
45+
4646
func makeAsyncIterator() -> AsyncIterator {
4747
self.numberOfIterators += 1
4848
return Iterator(element: self.element, numberOfTimes: self.numberOfTimes)
4949
}
50-
50+
5151
struct Iterator: AsyncIteratorProtocol {
5252
let element: Element
5353
var numberOfTimes: Int
54-
54+
5555
mutating func next() async throws -> Element? {
5656
guard self.numberOfTimes > 0 else { return nil }
5757
self.numberOfTimes -= 1
@@ -64,14 +64,14 @@ final class AsyncMulticastSequenceTests: XCTestCase {
6464
func test_multiple_loops_receive_elements_from_single_baseIterator() {
6565
let taskHaveIterators = expectation(description: "All tasks have their iterator")
6666
taskHaveIterators.expectedFulfillmentCount = 2
67-
67+
6868
let tasksHaveFinishedExpectation = expectation(description: "Tasks have finished")
6969
tasksHaveFinishedExpectation.expectedFulfillmentCount = 2
70-
70+
7171
let spyUpstreamSequence = SpyAsyncSequenceForNumberOfIterators(element: 1, numberOfTimes: 3)
7272
let stream = AsyncThrowingPassthroughSubject<Int, Error>()
7373
let sut = spyUpstreamSequence.multicast(stream)
74-
74+
7575
Task {
7676
var receivedElement = [Int]()
7777
var iterator = sut.makeAsyncIterator()
@@ -82,7 +82,7 @@ final class AsyncMulticastSequenceTests: XCTestCase {
8282
XCTAssertEqual(receivedElement, [1, 1, 1])
8383
tasksHaveFinishedExpectation.fulfill()
8484
}
85-
85+
8686
Task {
8787
var receivedElement = [Int]()
8888
var iterator = sut.makeAsyncIterator()
@@ -93,27 +93,27 @@ final class AsyncMulticastSequenceTests: XCTestCase {
9393
XCTAssertEqual(receivedElement, [1, 1, 1])
9494
tasksHaveFinishedExpectation.fulfill()
9595
}
96-
96+
9797
wait(for: [taskHaveIterators], timeout: 1)
98-
98+
9999
sut.connect()
100-
100+
101101
wait(for: [tasksHaveFinishedExpectation], timeout: 1)
102-
102+
103103
XCTAssertEqual(spyUpstreamSequence.numberOfIterators, 1)
104104
}
105-
105+
106106
func test_multiple_loops_uses_provided_stream() {
107107
let taskHaveIterators = expectation(description: "All tasks have their iterator")
108108
taskHaveIterators.expectedFulfillmentCount = 3
109-
109+
110110
let tasksHaveFinishedExpectation = expectation(description: "Tasks have finished")
111111
tasksHaveFinishedExpectation.expectedFulfillmentCount = 3
112-
112+
113113
let stream = AsyncThrowingPassthroughSubject<Int, Error>()
114114
let spyUpstreamSequence = SpyAsyncSequenceForNumberOfIterators(element: 1, numberOfTimes: 3)
115115
let sut = spyUpstreamSequence.multicast(stream)
116-
116+
117117
Task {
118118
var receivedElement = [Int]()
119119
var iterator = sut.makeAsyncIterator()
@@ -124,7 +124,7 @@ final class AsyncMulticastSequenceTests: XCTestCase {
124124
XCTAssertEqual(receivedElement, [1, 1, 1])
125125
tasksHaveFinishedExpectation.fulfill()
126126
}
127-
127+
128128
Task {
129129
var receivedElement = [Int]()
130130
var iterator = sut.makeAsyncIterator()
@@ -135,7 +135,7 @@ final class AsyncMulticastSequenceTests: XCTestCase {
135135
XCTAssertEqual(receivedElement, [1, 1, 1])
136136
tasksHaveFinishedExpectation.fulfill()
137137
}
138-
138+
139139
Task {
140140
var receivedElement = [Int]()
141141
var iterator = sut.makeAsyncIterator()
@@ -146,26 +146,26 @@ final class AsyncMulticastSequenceTests: XCTestCase {
146146
XCTAssertEqual(receivedElement, [1, 1, 1])
147147
tasksHaveFinishedExpectation.fulfill()
148148
}
149-
149+
150150
wait(for: [taskHaveIterators], timeout: 1)
151-
151+
152152
sut.connect()
153-
153+
154154
wait(for: [tasksHaveFinishedExpectation], timeout: 1)
155-
155+
156156
XCTAssertEqual(spyUpstreamSequence.numberOfIterators, 1)
157157
}
158-
158+
159159
func test_multicast_propagates_error_when_autoconnect() async {
160160
let expectedError = MockError(code: Int.random(in: 0...100))
161-
161+
162162
let stream = AsyncThrowingPassthroughSubject<Int, Error>()
163-
163+
164164
let sut = AsyncFailSequence<Int>(expectedError)
165165
.prepend(1)
166166
.multicast(stream)
167167
.autoconnect()
168-
168+
169169
var receivedElement = [Int]()
170170
do {
171171
for try await element in sut {
@@ -177,45 +177,45 @@ final class AsyncMulticastSequenceTests: XCTestCase {
177177
XCTAssertEqual(error as? MockError, expectedError)
178178
}
179179
}
180-
180+
181181
func test_multicast_finishes_when_task_is_cancelled() {
182182
let taskHasFinishedExpectation = expectation(description: "Task has finished")
183-
183+
184184
let stream = AsyncThrowingPassthroughSubject<Int, Error>()
185185
let sut = AsyncLazySequence<[Int]>([1, 2, 3, 4, 5])
186186
.multicast(stream)
187187
.autoconnect()
188-
188+
189189
Task {
190190
for try await _ in sut {}
191191
taskHasFinishedExpectation.fulfill()
192192
}.cancel()
193-
193+
194194
wait(for: [taskHasFinishedExpectation], timeout: 1)
195195
}
196-
196+
197197
func test_multicast_finishes_when_task_is_cancelled_while_waiting_for_next() {
198198
let canCancelExpectation = expectation(description: "the task can be cancelled")
199199
let taskHasFinishedExpectation = expectation(description: "Task has finished")
200-
200+
201201
let spyAsyncSequence = SpyAsyncSequenceForOnNextCall<Int> {
202202
canCancelExpectation.fulfill()
203203
}
204-
204+
205205
let stream = AsyncThrowingPassthroughSubject<Int, Error>()
206206
let sut = spyAsyncSequence
207207
.multicast(stream)
208208
.autoconnect()
209-
209+
210210
let task = Task {
211211
for try await _ in sut {}
212212
taskHasFinishedExpectation.fulfill()
213213
}
214-
214+
215215
wait(for: [canCancelExpectation], timeout: 1)
216-
216+
217217
task.cancel()
218-
218+
219219
wait(for: [taskHasFinishedExpectation], timeout: 1)
220220
}
221221
}

Tests/Operators/AsyncSequence+ShareTests.swift

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,79 @@
88
import AsyncExtensions
99
import XCTest
1010

11-
final class AsyncSequence_ShareTests: XCTestCase {
12-
func test_share_multicasts_values_to_clientLoops() {
13-
let tasksHaveFinishedExpectation = expectation(description: "the tasks have finished")
14-
tasksHaveFinishedExpectation.expectedFulfillmentCount = 2
15-
16-
let sut = AsyncLazySequence(["first", "second", "third"])
17-
.share()
18-
19-
Task {
20-
var received = [String]()
21-
await sut
22-
.collect { received.append($0) }
23-
XCTAssertEqual(received, ["first", "second", "third"])
24-
tasksHaveFinishedExpectation.fulfill()
25-
}
11+
private extension DispatchTimeInterval {
12+
var nanoseconds: UInt64 {
13+
switch self {
14+
case .nanoseconds(let value) where value >= 0: return UInt64(value)
15+
case .microseconds(let value) where value >= 0: return UInt64(value) * 1000
16+
case .milliseconds(let value) where value >= 0: return UInt64(value) * 1_000_000
17+
case .seconds(let value) where value >= 0: return UInt64(value) * 1_000_000_000
18+
case .never: return .zero
19+
default: return .zero
20+
}
21+
}
22+
}
2623

27-
Task {
28-
var received = [String]()
29-
await sut
30-
.collect { received.append($0) }
31-
XCTAssertEqual(received, ["first", "second", "third"])
32-
tasksHaveFinishedExpectation.fulfill()
33-
}
24+
private struct LongAsyncSequence<Element>: AsyncSequence, AsyncIteratorProtocol {
25+
typealias Element = Element
26+
typealias AsyncIterator = LongAsyncSequence
27+
28+
var elements: IndexingIterator<[Element]>
29+
let interval: DispatchTimeInterval
30+
var currentIndex = -1
31+
let failAt: Int?
32+
var hasEmitted = false
33+
let onCancel: () -> Void
34+
35+
init(elements: [Element], interval: DispatchTimeInterval = .seconds(0), failAt: Int? = nil, onCancel: @escaping () -> Void = {}) {
36+
self.onCancel = onCancel
37+
self.elements = elements.makeIterator()
38+
self.failAt = failAt
39+
self.interval = interval
40+
}
41+
42+
mutating func next() async throws -> Element? {
43+
return try await withTaskCancellationHandler { [onCancel] in
44+
onCancel()
45+
} operation: {
46+
try await Task.sleep(nanoseconds: self.interval.nanoseconds)
47+
self.currentIndex += 1
48+
if self.currentIndex == self.failAt {
49+
throw MockError(code: 0)
50+
}
51+
return self.elements.next()
52+
}
53+
}
54+
55+
func makeAsyncIterator() -> AsyncIterator {
56+
self
57+
}
58+
}
3459

35-
waitForExpectations(timeout: 5)
60+
final class AsyncSequence_ShareTests: XCTestCase {
61+
func test_share_multicasts_values_to_clientLoops() {
62+
let tasksHaveFinishedExpectation = expectation(description: "the tasks have finished")
63+
tasksHaveFinishedExpectation.expectedFulfillmentCount = 2
64+
65+
let sut = LongAsyncSequence(
66+
elements: ["1", "2", "3"],
67+
interval: .milliseconds(200)
68+
).share()
69+
70+
Task(priority: .high) {
71+
var received = [String]()
72+
try await sut.collect { received.append($0) }
73+
XCTAssertEqual(received, ["1", "2", "3"])
74+
tasksHaveFinishedExpectation.fulfill()
75+
}
76+
77+
Task(priority: .high) {
78+
var received = [String]()
79+
try await sut.collect { received.append($0) }
80+
XCTAssertEqual(received, ["1", "2", "3"])
81+
tasksHaveFinishedExpectation.fulfill()
3682
}
83+
84+
waitForExpectations(timeout: 5)
85+
}
3786
}

0 commit comments

Comments
 (0)