Skip to content

Commit ce1bf5d

Browse files
committed
re-add variadic merge
1 parent c9e1a62 commit ce1bf5d

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ AsyncStream)
4444
* [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintain an replays a buffered amount of values
4545

4646
### Combiners
47+
* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements
4748
* [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence`
4849
* [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences
4950

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// AsyncMergeSequence.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 31/03/2022.
6+
//
7+
8+
/// Creates an asynchronous sequence of elements from many underlying asynchronous sequences
9+
public func merge<Base: AsyncSequence>(
10+
_ bases: Base...
11+
) -> AsyncMergeSequence<Base> {
12+
AsyncMergeSequence(bases)
13+
}
14+
15+
/// An asynchronous sequence of elements from many underlying asynchronous sequences
16+
///
17+
/// In a `AsyncMergeSequence` instance, the *i*th element is the *i*th element
18+
/// resolved in sequential order out of the two underlying asynchronous sequences.
19+
/// Use the `merge(...)` function to create an `AsyncMergeSequence`.
20+
public struct AsyncMergeSequence<Base: AsyncSequence>: AsyncSequence {
21+
public typealias Element = Base.Element
22+
public typealias AsyncIterator = Iterator
23+
24+
let bases: [Base]
25+
26+
public init(_ bases: [Base]) {
27+
self.bases = bases
28+
}
29+
30+
public func makeAsyncIterator() -> Iterator {
31+
Iterator(
32+
bases: self.bases
33+
)
34+
}
35+
36+
public struct Iterator: AsyncIteratorProtocol {
37+
let mergeStateMachine: MergeStateMachine<Element>
38+
39+
init(bases: [Base]) {
40+
self.mergeStateMachine = MergeStateMachine(
41+
bases
42+
)
43+
}
44+
45+
public mutating func next() async rethrows -> Element? {
46+
let mergedElement = await self.mergeStateMachine.next()
47+
switch mergedElement {
48+
case .element(let result):
49+
return try result._rethrowGet()
50+
case .termination:
51+
return nil
52+
}
53+
}
54+
}
55+
}
56+
57+
extension AsyncMergeSequence: Sendable where Base: Sendable {}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
//
2+
// MergeStateMachine.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 08/09/2022.
6+
//
7+
8+
import DequeModule
9+
10+
struct MergeStateMachine<Element>: Sendable {
11+
enum BufferState {
12+
case idle
13+
case queued(Deque<RegulatedElement<Element>>)
14+
case awaiting(UnsafeContinuation<RegulatedElement<Element>, Never>)
15+
case closed
16+
}
17+
18+
struct State {
19+
var buffer: BufferState
20+
var basesToTerminate: Int
21+
}
22+
23+
struct OnNextDecision {
24+
let continuation: UnsafeContinuation<RegulatedElement<Element>, Never>
25+
let regulatedElement: RegulatedElement<Element>
26+
}
27+
28+
let requestNextRegulatedElements: @Sendable () -> Void
29+
let state: ManagedCriticalState<State>
30+
let task: Task<Void, Never>
31+
32+
init<Base1: AsyncSequence, Base2: AsyncSequence>(
33+
_ base1: Base1,
34+
_ base2: Base2
35+
) where Base1.Element == Element, Base2.Element == Element {
36+
self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 2))
37+
38+
let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
39+
let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
40+
41+
self.requestNextRegulatedElements = {
42+
regulator1.requestNextRegulatedElement()
43+
regulator2.requestNextRegulatedElement()
44+
}
45+
46+
self.task = Task {
47+
await withTaskGroup(of: Void.self) { group in
48+
group.addTask {
49+
await regulator1.iterate()
50+
}
51+
52+
group.addTask {
53+
await regulator2.iterate()
54+
}
55+
}
56+
}
57+
}
58+
59+
init<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>(
60+
_ base1: Base1,
61+
_ base2: Base2,
62+
_ base3: Base3
63+
) where Base1.Element == Element, Base2.Element == Element, Base3.Element == Base1.Element {
64+
self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 3))
65+
66+
let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
67+
let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
68+
let regulator3 = Regulator(base3, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
69+
70+
self.requestNextRegulatedElements = {
71+
regulator1.requestNextRegulatedElement()
72+
regulator2.requestNextRegulatedElement()
73+
regulator3.requestNextRegulatedElement()
74+
}
75+
76+
self.task = Task {
77+
await withTaskGroup(of: Void.self) { group in
78+
group.addTask {
79+
await regulator1.iterate()
80+
}
81+
82+
group.addTask {
83+
await regulator2.iterate()
84+
}
85+
86+
group.addTask {
87+
await regulator3.iterate()
88+
}
89+
}
90+
}
91+
}
92+
93+
init<Base: AsyncSequence>(
94+
_ bases: [Base]
95+
) where Base.Element == Element {
96+
self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: bases.count))
97+
98+
var regulators = [Regulator<Base>]()
99+
100+
for base in bases {
101+
let regulator = Regulator<Base>(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
102+
regulators.append(regulator)
103+
}
104+
105+
let immutableRegulators = regulators
106+
self.requestNextRegulatedElements = {
107+
for regulator in immutableRegulators {
108+
regulator.requestNextRegulatedElement()
109+
}
110+
}
111+
112+
self.task = Task {
113+
await withTaskGroup(of: Void.self) { group in
114+
for regulators in immutableRegulators {
115+
group.addTask {
116+
await regulators.iterate()
117+
}
118+
}
119+
}
120+
}
121+
}
122+
123+
@Sendable
124+
static func onNextRegulatedElement(_ element: RegulatedElement<Element>, state: ManagedCriticalState<State>) {
125+
let decision = state.withCriticalRegion { state -> OnNextDecision? in
126+
switch (state.buffer, element) {
127+
case (.idle, .element):
128+
state.buffer = .queued([element])
129+
return nil
130+
case (.queued(var elements), .element):
131+
elements.append(element)
132+
state.buffer = .queued(elements)
133+
return nil
134+
case (.awaiting(let continuation), .element(.success)):
135+
state.buffer = .idle
136+
return OnNextDecision(continuation: continuation, regulatedElement: element)
137+
case (.awaiting(let continuation), .element(.failure)):
138+
state.buffer = .closed
139+
return OnNextDecision(continuation: continuation, regulatedElement: element)
140+
141+
case (.idle, .termination):
142+
state.basesToTerminate -= 1
143+
if state.basesToTerminate == 0 {
144+
state.buffer = .closed
145+
} else {
146+
state.buffer = .idle
147+
}
148+
return nil
149+
150+
case (.queued(var elements), .termination):
151+
state.basesToTerminate -= 1
152+
if state.basesToTerminate == 0 {
153+
elements.append(.termination)
154+
state.buffer = .queued(elements)
155+
}
156+
return nil
157+
158+
case (.awaiting(let continuation), .termination):
159+
state.basesToTerminate -= 1
160+
if state.basesToTerminate == 0 {
161+
state.buffer = .closed
162+
return OnNextDecision(continuation: continuation, regulatedElement: .termination)
163+
} else {
164+
state.buffer = .awaiting(continuation)
165+
return nil
166+
}
167+
168+
case (.closed, _):
169+
return nil
170+
}
171+
}
172+
173+
if let decision = decision {
174+
decision.continuation.resume(returning: decision.regulatedElement)
175+
}
176+
}
177+
178+
@Sendable
179+
func unsuspendAndClearOnCancel() {
180+
let continuation = self.state.withCriticalRegion { state -> UnsafeContinuation<RegulatedElement<Element>, Never>? in
181+
switch state.buffer {
182+
case .awaiting(let continuation):
183+
state.basesToTerminate = 0
184+
state.buffer = .closed
185+
return continuation
186+
default:
187+
state.basesToTerminate = 0
188+
state.buffer = .closed
189+
return nil
190+
}
191+
}
192+
193+
continuation?.resume(returning: .termination)
194+
self.task.cancel()
195+
}
196+
197+
func next() async -> RegulatedElement<Element> {
198+
await withTaskCancellationHandler {
199+
self.unsuspendAndClearOnCancel()
200+
} operation: {
201+
self.requestNextRegulatedElements()
202+
203+
let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation<RegulatedElement<Element>, Never>) in
204+
let decision = self.state.withCriticalRegion { state -> OnNextDecision? in
205+
switch state.buffer {
206+
case .queued(var elements):
207+
guard let regulatedElement = elements.popFirst() else {
208+
assertionFailure("The buffer cannot by empty, it should be idle in this case")
209+
return OnNextDecision(continuation: continuation, regulatedElement: .termination)
210+
}
211+
switch regulatedElement {
212+
case .termination:
213+
state.buffer = .closed
214+
return OnNextDecision(continuation: continuation, regulatedElement: .termination)
215+
case .element(.success):
216+
if elements.isEmpty {
217+
state.buffer = .idle
218+
} else {
219+
state.buffer = .queued(elements)
220+
}
221+
return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement)
222+
case .element(.failure):
223+
state.buffer = .closed
224+
return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement)
225+
}
226+
case .idle:
227+
state.buffer = .awaiting(continuation)
228+
return nil
229+
case .awaiting:
230+
assertionFailure("The next function cannot be called concurrently")
231+
return OnNextDecision(continuation: continuation, regulatedElement: .termination)
232+
case .closed:
233+
return OnNextDecision(continuation: continuation, regulatedElement: .termination)
234+
}
235+
}
236+
237+
if let decision = decision {
238+
decision.continuation.resume(returning: decision.regulatedElement)
239+
}
240+
}
241+
242+
if case .termination = regulatedElement, case .element(.failure) = regulatedElement {
243+
self.task.cancel()
244+
}
245+
246+
return regulatedElement
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)