Skip to content

Commit d079f05

Browse files
committed
Merge remote-tracking branch 'lachenmayer/fix/switch-to-latest-leak-2' into linux-leak
2 parents fdad5fd + b093edc commit d079f05

11 files changed

+200
-123
lines changed

Sources/AsyncChannels/AsyncBufferedChannel.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
3535

3636
struct Awaiting: Hashable {
3737
let id: Int
38-
let continuation: UnsafeContinuation<Element?, Never>?
38+
let continuation: CheckedContinuation<Element?, Never>?
3939

4040
static func placeHolder(id: Int) -> Awaiting {
4141
Awaiting(id: id, continuation: nil)
@@ -178,7 +178,7 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
178178

179179
awaiting?.continuation?.resume(returning: nil)
180180
} operation: {
181-
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
181+
await withCheckedContinuation { [state] (continuation: CheckedContinuation<Element?, Never>) in
182182
let decision = state.withCriticalRegion { state -> AwaitingDecision in
183183
let isCancelled = cancellation.withCriticalRegion { $0 }
184184
guard !isCancelled else { return .resume(nil) }

Sources/AsyncSubjects/AsyncCurrentValueSubject.swift

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,28 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
6767
/// Sends a value to all consumers
6868
/// - Parameter element: the value to send
6969
public func send(_ element: Element) {
70-
self.state.withCriticalRegion { state in
70+
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
7171
state.current = element
72-
for channel in state.channels.values {
73-
channel.send(element)
74-
}
72+
return Array(state.channels.values)
73+
}
74+
75+
for channel in channels {
76+
channel.send(element)
7577
}
7678
}
7779

7880
/// Finishes the async sequences with a normal ending.
7981
/// - Parameter termination: The termination to finish the subject.
8082
public func send(_ termination: Termination<Failure>) {
81-
self.state.withCriticalRegion { state in
83+
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
8284
state.terminalState = termination
8385
let channels = Array(state.channels.values)
8486
state.channels.removeAll()
85-
for channel in channels {
86-
channel.finish()
87-
}
87+
return channels
88+
}
89+
90+
for channel in channels {
91+
channel.finish()
8892
}
8993
}
9094

@@ -133,10 +137,10 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
133137
}
134138

135139
public mutating func next() async -> Element? {
136-
await withTaskCancellationHandler {
137-
await self.iterator.next()
138-
} onCancel: { [unregister] in
140+
await withTaskCancellationHandler { [unregister] in
139141
unregister()
142+
} operation: {
143+
await self.iterator.next()
140144
}
141145
}
142146
}

Sources/AsyncSubjects/AsyncPassthroughSubject.swift

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,27 @@ public final class AsyncPassthroughSubject<Element>: AsyncSubject {
5252
/// Sends a value to all consumers
5353
/// - Parameter element: the value to send
5454
public func send(_ element: Element) {
55-
self.state.withCriticalRegion { state in
56-
for channel in state.channels.values {
57-
channel.send(element)
58-
}
55+
let channels = self.state.withCriticalRegion { state in
56+
state.channels.values
57+
}
58+
59+
for channel in channels {
60+
channel.send(element)
5961
}
6062
}
6163

6264
/// Finishes the subject with a normal ending.
6365
/// - Parameter termination: The termination to finish the subject
6466
public func send(_ termination: Termination<Failure>) {
65-
self.state.withCriticalRegion { state in
67+
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
6668
state.terminalState = termination
6769
let channels = Array(state.channels.values)
6870
state.channels.removeAll()
69-
for channel in channels {
70-
channel.finish()
71-
}
71+
return channels
72+
}
73+
74+
for channel in channels {
75+
channel.finish()
7276
}
7377
}
7478

@@ -116,10 +120,10 @@ public final class AsyncPassthroughSubject<Element>: AsyncSubject {
116120
}
117121

118122
public mutating func next() async -> Element? {
119-
await withTaskCancellationHandler {
120-
await self.iterator.next()
121-
} onCancel: { [unregister] in
123+
await withTaskCancellationHandler { [unregister] in
122124
unregister()
125+
} operation: {
126+
await self.iterator.next()
123127
}
124128
}
125129
}

Sources/AsyncSubjects/AsyncReplaySubject.swift

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,33 @@ public final class AsyncReplaySubject<Element>: AsyncSubject where Element: Send
4646
/// Sends a value to all consumers
4747
/// - Parameter element: the value to send
4848
public func send(_ element: Element) {
49-
self.state.withCriticalRegion { state in
49+
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
5050
if state.buffer.count >= state.bufferSize && !state.buffer.isEmpty {
5151
state.buffer.removeFirst()
5252
}
5353
state.buffer.append(element)
54-
for channel in state.channels.values {
55-
channel.send(element)
56-
}
54+
return Array(state.channels.values)
55+
}
56+
57+
for channel in channels {
58+
channel.send(element)
5759
}
5860
}
5961

6062
/// Finishes the subject with a normal ending.
6163
/// - Parameter termination: The termination to finish the subject.
6264
public func send(_ termination: Termination<Failure>) {
63-
self.state.withCriticalRegion { state in
65+
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
6466
state.terminalState = termination
6567
let channels = Array(state.channels.values)
6668
state.channels.removeAll()
6769
state.buffer.removeAll()
6870
state.bufferSize = 0
69-
for channel in channels {
70-
channel.finish()
71-
}
71+
return channels
72+
}
73+
74+
for channel in channels {
75+
channel.finish()
7276
}
7377
}
7478

@@ -120,10 +124,10 @@ public final class AsyncReplaySubject<Element>: AsyncSubject where Element: Send
120124
}
121125

122126
public mutating func next() async -> Element? {
123-
await withTaskCancellationHandler {
124-
await self.iterator.next()
125-
} onCancel: { [unregister] in
127+
await withTaskCancellationHandler { [unregister] in
126128
unregister()
129+
} operation: {
130+
await self.iterator.next()
127131
}
128132
}
129133
}

Sources/AsyncSubjects/AsyncThrowingCurrentValueSubject.swift

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,32 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
6767
/// Sends a value to all consumers
6868
/// - Parameter element: the value to send
6969
public func send(_ element: Element) {
70-
self.state.withCriticalRegion { state in
70+
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
7171
state.current = element
72-
for channel in state.channels.values {
73-
channel.send(element)
74-
}
72+
return Array(state.channels.values)
73+
}
74+
75+
for channel in channels {
76+
channel.send(element)
7577
}
7678
}
7779

7880
/// Finishes the subject with either a normal ending or an error.
7981
/// - Parameter termination: The termination to finish the subject.
8082
public func send(_ termination: Termination<Failure>) {
81-
self.state.withCriticalRegion { state in
83+
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
8284
state.terminalState = termination
8385
let channels = Array(state.channels.values)
8486
state.channels.removeAll()
85-
for channel in channels {
86-
switch termination {
87-
case .finished:
88-
channel.finish()
89-
case .failure(let error):
90-
channel.fail(error)
91-
}
87+
return channels
88+
}
89+
90+
for channel in channels {
91+
switch termination {
92+
case .finished:
93+
channel.finish()
94+
case .failure(let error):
95+
channel.fail(error)
9296
}
9397
}
9498
}
@@ -144,10 +148,10 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
144148
}
145149

146150
public mutating func next() async throws -> Element? {
147-
try await withTaskCancellationHandler {
148-
try await self.iterator.next()
149-
} onCancel: { [unregister] in
151+
try await withTaskCancellationHandler { [unregister] in
150152
unregister()
153+
} operation: {
154+
try await self.iterator.next()
151155
}
152156
}
153157
}

Sources/AsyncSubjects/AsyncThrowingPassthroughSubject.swift

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,28 +53,31 @@ public final class AsyncThrowingPassthroughSubject<Element, Failure: Error>: Asy
5353
/// Sends a value to all consumers
5454
/// - Parameter element: the value to send
5555
public func send(_ element: Element) {
56-
self.state.withCriticalRegion { state in
57-
for channel in state.channels.values {
58-
channel.send(element)
59-
}
56+
let channels = self.state.withCriticalRegion { state in
57+
state.channels.values
58+
}
59+
60+
for channel in channels {
61+
channel.send(element)
6062
}
6163
}
6264

6365
/// Finishes the subject with either a normal ending or an error.
6466
/// - Parameter termination: The termination to finish the subject
6567
public func send(_ termination: Termination<Failure>) {
66-
self.state.withCriticalRegion { state in
68+
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
6769
state.terminalState = termination
6870
let channels = Array(state.channels.values)
6971
state.channels.removeAll()
72+
return channels
73+
}
7074

71-
for channel in channels {
72-
switch termination {
73-
case .finished:
74-
channel.finish()
75-
case .failure(let error):
76-
channel.fail(error)
77-
}
75+
for channel in channels {
76+
switch termination {
77+
case .finished:
78+
channel.finish()
79+
case .failure(let error):
80+
channel.fail(error)
7881
}
7982
}
8083
}
@@ -129,10 +132,10 @@ public final class AsyncThrowingPassthroughSubject<Element, Failure: Error>: Asy
129132
}
130133

131134
public mutating func next() async throws -> Element? {
132-
try await withTaskCancellationHandler {
133-
try await self.iterator.next()
134-
} onCancel: { [unregister] in
135+
try await withTaskCancellationHandler { [unregister] in
135136
unregister()
137+
} operation: {
138+
try await self.iterator.next()
136139
}
137140
}
138141
}

Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,33 +45,37 @@ public final class AsyncThrowingReplaySubject<Element, Failure: Error>: AsyncSub
4545
/// Sends a value to all consumers
4646
/// - Parameter element: the value to send
4747
public func send(_ element: Element) {
48-
self.state.withCriticalRegion { state in
48+
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
4949
if state.buffer.count >= state.bufferSize && !state.buffer.isEmpty {
5050
state.buffer.removeFirst()
5151
}
5252
state.buffer.append(element)
53-
for channel in state.channels.values {
54-
channel.send(element)
55-
}
53+
return Array(state.channels.values)
54+
}
55+
56+
for channel in channels {
57+
channel.send(element)
5658
}
5759
}
5860

5961
/// Finishes the subject with either a normal ending or an error.
6062
/// - Parameter termination: The termination to finish the subject
6163
public func send(_ termination: Termination<Failure>) {
62-
self.state.withCriticalRegion { state in
64+
let channels = self.state.withCriticalRegion { state -> [AsyncThrowingBufferedChannel<Element, Error>] in
6365
state.terminalState = termination
6466
let channels = Array(state.channels.values)
6567
state.channels.removeAll()
6668
state.buffer.removeAll()
6769
state.bufferSize = 0
68-
for channel in channels {
69-
switch termination {
70-
case .finished:
71-
channel.finish()
72-
case .failure(let error):
73-
channel.fail(error)
74-
}
70+
return channels
71+
}
72+
73+
for channel in channels {
74+
switch termination {
75+
case .finished:
76+
channel.finish()
77+
case .failure(let error):
78+
channel.fail(error)
7579
}
7680
}
7781
}
@@ -130,10 +134,10 @@ public final class AsyncThrowingReplaySubject<Element, Failure: Error>: AsyncSub
130134
}
131135

132136
public mutating func next() async throws -> Element? {
133-
try await withTaskCancellationHandler {
134-
try await self.iterator.next()
135-
} onCancel: { [unregister] in
137+
try await withTaskCancellationHandler { [unregister] in
136138
unregister()
139+
} operation: {
140+
try await self.iterator.next()
137141
}
138142
}
139143
}

0 commit comments

Comments
 (0)