@@ -43,12 +43,11 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
4343 case failed( Error , CheckedContinuation < Void , Never > ? )
4444 }
4545
46- private var _state = State . buffering ( . init( ) , nil )
47- private let lock = NIOLock ( )
46+ private let state = NIOLockedValueBox < State > ( . buffering( [ ] , nil ) )
4847
4948 public var hasDemand : Bool {
50- self . lock . withLock {
51- switch self . _state {
49+ self . state . withLockedValue { state in
50+ switch state {
5251 case . failed, . finished, . buffering:
5352 return false
5453 case . waiting:
@@ -59,67 +58,132 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
5958
6059 /// Wait until a downstream consumer has issued more demand by calling `next`.
6160 public func demand( ) async {
62- self . lock. lock ( )
61+ let shouldBuffer = self . state. withLockedValue { state in
62+ switch state {
63+ case . buffering( _, . none) :
64+ return true
65+ case . waiting:
66+ return false
67+ case . buffering( _, . some) , . failed( _, . some) :
68+ preconditionFailure ( " Already waiting for demand. Invalid state: \( state) " )
69+ case . finished, . failed:
70+ preconditionFailure ( " Invalid state: \( state) " )
71+ }
72+ }
6373
64- switch self . _state {
65- case . buffering( let buffer, . none) :
74+ if shouldBuffer {
6675 await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
67- self . _state = . buffering( buffer, continuation)
68- self . lock. unlock ( )
76+ let shouldResumeContinuation = self . state. withLockedValue { state in
77+ switch state {
78+ case . buffering( let buffer, . none) :
79+ state = . buffering( buffer, continuation)
80+ return false
81+ case . waiting:
82+ return true
83+ case . buffering( _, . some) , . failed( _, . some) :
84+ preconditionFailure ( " Already waiting for demand. Invalid state: \( state) " )
85+ case . finished, . failed:
86+ preconditionFailure ( " Invalid state: \( state) " )
87+ }
88+ }
89+
90+ if shouldResumeContinuation {
91+ continuation. resume ( )
92+ }
6993 }
70-
71- case . waiting:
72- self . lock. unlock ( )
73- return
74-
75- case . buffering( _, . some) , . failed( _, . some) :
76- let state = self . _state
77- self . lock. unlock ( )
78- preconditionFailure ( " Already waiting for demand. Invalid state: \( state) " )
79-
80- case . finished, . failed:
81- let state = self . _state
82- self . lock. unlock ( )
83- preconditionFailure ( " Invalid state: \( state) " )
8494 }
8595 }
8696
97+ private enum NextAction {
98+ /// Resume the continuation if present, and return the result if present.
99+ case resumeAndReturn( CheckedContinuation < Void , Never > ? , Result < Element ? , Error > ? )
100+ /// Suspend the current task and wait for the next value.
101+ case suspend
102+ }
103+
87104 private func next( ) async throws -> Element ? {
88- self . lock. lock ( )
89- switch self . _state {
90- case . buffering( let buffer, let demandContinuation) where buffer. isEmpty:
91- return try await withCheckedThrowingContinuation { continuation in
92- self . _state = . waiting( continuation)
93- self . lock. unlock ( )
94- demandContinuation? . resume ( returning: ( ) )
95- }
105+ let action : NextAction = self . state. withLockedValue { state in
106+ switch state {
107+ case . buffering( var buffer, let demandContinuation) :
108+ if buffer. isEmpty {
109+ return . suspend
110+ } else {
111+ let first = buffer. removeFirst ( )
112+ if first != nil {
113+ state = . buffering( buffer, demandContinuation)
114+ } else {
115+ state = . finished
116+ }
117+ return . resumeAndReturn( nil , . success( first) )
118+ }
119+
120+ case . failed( let error, let demandContinuation) :
121+ state = . finished
122+ return . resumeAndReturn( demandContinuation, . failure( error) )
123+
124+ case . finished:
125+ return . resumeAndReturn( nil , . success( nil ) )
96126
97- case . buffering( var buffer, let demandContinuation) :
98- let first = buffer. removeFirst ( )
99- if first != nil {
100- self . _state = . buffering( buffer, demandContinuation)
101- } else {
102- self . _state = . finished
127+ case . waiting:
128+ preconditionFailure (
129+ " Expected that there is always only one concurrent call to next. Invalid state: \( state) "
130+ )
103131 }
104- self . lock. unlock ( )
105- return first
132+ }
106133
107- case . failed( let error, let demandContinuation) :
108- self . _state = . finished
109- self . lock. unlock ( )
134+ switch action {
135+ case . resumeAndReturn( let demandContinuation, let result) :
110136 demandContinuation? . resume ( )
111- throw error
112-
113- case . finished:
114- self . lock. unlock ( )
115- return nil
116-
117- case . waiting:
118- let state = self . _state
119- self . lock. unlock ( )
120- preconditionFailure (
121- " Expected that there is always only one concurrent call to next. Invalid state: \( state) "
122- )
137+ return try result? . get ( )
138+
139+ case . suspend:
140+ // Holding the lock here *should* be safe but because of a bug in the runtime
141+ // it isn't, so drop the lock, create the continuation and then try again.
142+ //
143+ // See https://github.com/swiftlang/swift/issues/85668
144+ return try await withCheckedThrowingContinuation {
145+ ( continuation: CheckedContinuation < Element ? , any Error > ) in
146+ let action : NextAction = self . state. withLockedValue { state in
147+ switch state {
148+ case . buffering( var buffer, let demandContinuation) :
149+ if buffer. isEmpty {
150+ state = . waiting( continuation)
151+ return . resumeAndReturn( demandContinuation, nil )
152+ } else {
153+ let first = buffer. removeFirst ( )
154+ if first != nil {
155+ state = . buffering( buffer, demandContinuation)
156+ } else {
157+ state = . finished
158+ }
159+ return . resumeAndReturn( nil , . success( first) )
160+ }
161+
162+ case . failed( let error, let demandContinuation) :
163+ state = . finished
164+ return . resumeAndReturn( demandContinuation, . failure( error) )
165+
166+ case . finished:
167+ return . resumeAndReturn( nil , . success( nil ) )
168+
169+ case . waiting:
170+ preconditionFailure (
171+ " Expected that there is always only one concurrent call to next. Invalid state: \( state) "
172+ )
173+ }
174+ }
175+
176+ switch action {
177+ case . resumeAndReturn( let demandContinuation, let result) :
178+ demandContinuation? . resume ( )
179+ // Resume the continuation rather than returning th result.
180+ if let result {
181+ continuation. resume ( with: result)
182+ }
183+ case . suspend:
184+ preconditionFailure ( ) // Not returned from the code above.
185+ }
186+ }
123187 }
124188 }
125189
@@ -137,19 +201,19 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
137201 }
138202
139203 private func writeBufferOrEnd( _ element: Element ? ) {
140- let writeAction = self . lock . withLock { ( ) -> WriteAction in
141- switch self . _state {
204+ let writeAction = self . state . withLockedValue { state -> WriteAction in
205+ switch state {
142206 case . buffering( var buffer, let continuation) :
143207 buffer. append ( element)
144- self . _state = . buffering( buffer, continuation)
208+ state = . buffering( buffer, continuation)
145209 return . none
146210
147211 case . waiting( let continuation) :
148- self . _state = . buffering( . init( ) , nil )
212+ state = . buffering( . init( ) , nil )
149213 return . succeedContinuation( continuation, element)
150214
151215 case . finished, . failed:
152- preconditionFailure ( " Invalid state: \( self . _state ) " )
216+ preconditionFailure ( " Invalid state: \( state ) " )
153217 }
154218 }
155219
@@ -170,17 +234,17 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
170234 /// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next`
171235 /// waiting, will emit the error on the next call to `next`.
172236 public func fail( _ error: Error ) {
173- let errorAction = self . lock . withLock { ( ) -> ErrorAction in
174- switch self . _state {
237+ let errorAction = self . state . withLockedValue { state -> ErrorAction in
238+ switch state {
175239 case . buffering( _, let demandContinuation) :
176- self . _state = . failed( error, demandContinuation)
240+ state = . failed( error, demandContinuation)
177241 return . none
178242
179243 case . failed, . finished:
180244 return . none
181245
182246 case . waiting( let continuation) :
183- self . _state = . finished
247+ state = . finished
184248 return . failContinuation( continuation, error)
185249 }
186250 }
0 commit comments