@@ -41,8 +41,7 @@ private class SubscriptionChannel<T>(
4141 require(request >= 0 ) { " Invalid request size: $request " }
4242 }
4343
44- @Volatile
45- private var subscription: Subscription ? = null
44+ private val _subscription = atomic<Subscription ?>(null )
4645
4746 // requested from subscription minus number of received minus number of enqueued receivers,
4847 // can be negative if we have receivers, but no subscription yet
@@ -52,7 +51,7 @@ private class SubscriptionChannel<T>(
5251 @Suppress(" CANNOT_OVERRIDE_INVISIBLE_MEMBER" )
5352 override fun onReceiveEnqueued () {
5453 _requested .loop { wasRequested ->
55- val subscription = this .subscription
54+ val subscription = _subscription .value
5655 val needRequested = wasRequested - 1
5756 if (subscription != null && needRequested < 0 ) { // need to request more from subscription
5857 // try to fixup by making request
@@ -73,13 +72,12 @@ private class SubscriptionChannel<T>(
7372
7473 @Suppress(" CANNOT_OVERRIDE_INVISIBLE_MEMBER" )
7574 override fun onClosedIdempotent (closed : LockFreeLinkedListNode ) {
76- subscription?.cancel()
77- subscription = null // optimization -- no need to cancel it again
75+ _subscription .getAndSet(null )?.cancel() // cancel exactly once
7876 }
7977
8078 // Subscriber overrides
8179 override fun onSubscribe (s : Subscription ) {
82- subscription = s
80+ _subscription .value = s
8381 while (true ) { // lock-free loop on _requested
8482 if (isClosedForSend) {
8583 s.cancel()
0 commit comments