File tree Expand file tree Collapse file tree 8 files changed +228
-30
lines changed
kotlinx-coroutines-reactive
kotlinx-coroutines-reactor/test Expand file tree Collapse file tree 8 files changed +228
-30
lines changed Original file line number Diff line number Diff line change @@ -30,9 +30,8 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
3030/* *
3131 * Subscribes to this [Publisher] and performs the specified action for each received element.
3232 */
33- public suspend inline fun <T > Publisher<T>.consumeEach (action : (T ) -> Unit ) {
33+ public suspend inline fun <T > Publisher<T>.consumeEach (action : (T ) -> Unit ) =
3434 openSubscription().consumeEach(action)
35- }
3635
3736@Suppress(" INVISIBLE_REFERENCE" , " INVISIBLE_MEMBER" )
3837private class SubscriptionChannel <T >(
@@ -75,6 +74,7 @@ private class SubscriptionChannel<T>(
7574 @Suppress(" CANNOT_OVERRIDE_INVISIBLE_MEMBER" )
7675 override fun onClosedIdempotent (closed : LockFreeLinkedListNode ) {
7776 subscription?.cancel()
77+ subscription = null // optimization -- no need to cancel it again
7878 }
7979
8080 // Subscriber overrides
Original file line number Diff line number Diff line change @@ -74,24 +74,6 @@ class IntegrationTest(
7474 assertThat(cnt, IsEqual (1 ))
7575 }
7676
77- @Test
78- fun testFailingConsumer () = runTest {
79- val pub = publish {
80- repeat(3 ) {
81- expect(it + 1 ) // expect(1), expect(2) *should* be invoked
82- send(it)
83- }
84- }
85-
86- try {
87- pub.consumeEach {
88- throw TestException ()
89- }
90- } catch (e: TestException ) {
91- finish(3 )
92- }
93- }
94-
9577 @Test
9678 fun testNumbers () = runBlocking<Unit > {
9779 val n = 100 * stressTestMultiplier
Original file line number Diff line number Diff line change @@ -252,4 +252,21 @@ class PublishTest : TestBase() {
252252 latch.await()
253253 finish(8 )
254254 }
255+
256+ @Test
257+ fun testFailingConsumer () = runTest {
258+ val pub = publish {
259+ repeat(3 ) {
260+ expect(it + 1 ) // expect(1), expect(2) *should* be invoked
261+ send(it)
262+ }
263+ }
264+ try {
265+ pub.consumeEach {
266+ throw TestException ()
267+ }
268+ } catch (e: TestException ) {
269+ finish(3 )
270+ }
271+ }
255272}
Original file line number Diff line number Diff line change 55package kotlinx.coroutines.reactor
66
77import kotlinx.coroutines.*
8+ import kotlinx.coroutines.reactive.*
89import org.hamcrest.core.*
910import org.junit.*
11+ import org.junit.Test
12+ import kotlin.test.*
1013
1114class FluxTest : TestBase () {
1215 @Test
@@ -80,4 +83,59 @@ class FluxTest : TestBase() {
8083 { assert (it is RuntimeException ) }
8184 )
8285 }
86+
87+ @Test
88+ fun testNotifyOnceOnCancellation () = runTest {
89+ expect(1 )
90+ val observable =
91+ flux {
92+ expect(5 )
93+ send(" OK" )
94+ try {
95+ delay(Long .MAX_VALUE )
96+ } catch (e: CancellationException ) {
97+ expect(11 )
98+ }
99+ }
100+ .doOnNext {
101+ expect(6 )
102+ assertEquals(" OK" , it)
103+ }
104+ .doOnCancel {
105+ expect(10 ) // notified once!
106+ }
107+ expect(2 )
108+ val job = launch(start = CoroutineStart .UNDISPATCHED ) {
109+ expect(3 )
110+ observable.consumeEach {
111+ expect(8 )
112+ assertEquals(" OK" , it)
113+ }
114+ }
115+ expect(4 )
116+ yield () // to observable code
117+ expect(7 )
118+ yield () // to consuming coroutines
119+ expect(9 )
120+ job.cancel()
121+ job.join()
122+ finish(12 )
123+ }
124+
125+ @Test
126+ fun testFailingConsumer () = runTest {
127+ val pub = flux {
128+ repeat(3 ) {
129+ expect(it + 1 ) // expect(1), expect(2) *should* be invoked
130+ send(it)
131+ }
132+ }
133+ try {
134+ pub.consumeEach {
135+ throw TestException ()
136+ }
137+ } catch (e: TestException ) {
138+ finish(3 )
139+ }
140+ }
83141}
Original file line number Diff line number Diff line change @@ -43,20 +43,14 @@ public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
4343/* *
4444 * Subscribes to this [MaybeSource] and performs the specified action for each received element.
4545 */
46- public suspend inline fun <T > MaybeSource<T>.consumeEach (action : (T ) -> Unit ) {
47- val channel = openSubscription()
48- for (x in channel) action(x)
49- channel.cancel()
50- }
46+ public suspend inline fun <T > MaybeSource<T>.consumeEach (action : (T ) -> Unit ) =
47+ openSubscription().consumeEach(action)
5148
5249/* *
5350 * Subscribes to this [ObservableSource] and performs the specified action for each received element.
5451 */
55- public suspend inline fun <T > ObservableSource<T>.consumeEach (action : (T ) -> Unit ) {
56- val channel = openSubscription()
57- for (x in channel) action(x)
58- channel.cancel()
59- }
52+ public suspend inline fun <T > ObservableSource<T>.consumeEach (action : (T ) -> Unit ) =
53+ openSubscription().consumeEach(action)
6054
6155@Suppress(" INVISIBLE_REFERENCE" , " INVISIBLE_MEMBER" )
6256private class SubscriptionChannel <T > :
@@ -68,6 +62,7 @@ private class SubscriptionChannel<T> :
6862 @Suppress(" CANNOT_OVERRIDE_INVISIBLE_MEMBER" )
6963 override fun onClosedIdempotent (closed : LockFreeLinkedListNode ) {
7064 subscription?.dispose()
65+ subscription = null // optimization -- no need to dispose it again
7166 }
7267
7368 // Observer overrider
Original file line number Diff line number Diff line change @@ -8,6 +8,8 @@ import kotlinx.coroutines.*
88import kotlinx.coroutines.reactive.*
99import org.hamcrest.core.*
1010import org.junit.*
11+ import org.junit.Test
12+ import kotlin.test.*
1113
1214class FlowableTest : TestBase () {
1315 @Test
@@ -81,4 +83,59 @@ class FlowableTest : TestBase() {
8183 { assert (it is RuntimeException ) }
8284 )
8385 }
86+
87+ @Test
88+ fun testNotifyOnceOnCancellation () = runTest {
89+ expect(1 )
90+ val observable =
91+ rxFlowable {
92+ expect(5 )
93+ send(" OK" )
94+ try {
95+ delay(Long .MAX_VALUE )
96+ } catch (e: CancellationException ) {
97+ expect(11 )
98+ }
99+ }
100+ .doOnNext {
101+ expect(6 )
102+ assertEquals(" OK" , it)
103+ }
104+ .doOnCancel {
105+ expect(10 ) // notified once!
106+ }
107+ expect(2 )
108+ val job = launch(start = CoroutineStart .UNDISPATCHED ) {
109+ expect(3 )
110+ observable.consumeEach{
111+ expect(8 )
112+ assertEquals(" OK" , it)
113+ }
114+ }
115+ expect(4 )
116+ yield () // to observable code
117+ expect(7 )
118+ yield () // to consuming coroutines
119+ expect(9 )
120+ job.cancel()
121+ job.join()
122+ finish(12 )
123+ }
124+
125+ @Test
126+ fun testFailingConsumer () = runTest {
127+ val pub = rxFlowable {
128+ repeat(3 ) {
129+ expect(it + 1 ) // expect(1), expect(2) *should* be invoked
130+ send(it)
131+ }
132+ }
133+ try {
134+ pub.consumeEach {
135+ throw TestException ()
136+ }
137+ } catch (e: TestException ) {
138+ finish(3 )
139+ }
140+ }
84141}
Original file line number Diff line number Diff line change @@ -12,6 +12,7 @@ import org.hamcrest.core.*
1212import org.junit.*
1313import org.junit.Assert.*
1414import java.util.concurrent.*
15+ import java.util.concurrent.CancellationException
1516
1617class MaybeTest : TestBase () {
1718 @Before
@@ -211,4 +212,30 @@ class MaybeTest : TestBase() {
211212 { assert (it is RuntimeException ) }
212213 )
213214 }
215+
216+ @Test
217+ fun testCancelledConsumer () = runTest {
218+ expect(1 )
219+ val maybe = rxMaybe<Int > {
220+ expect(4 )
221+ try {
222+ delay(Long .MAX_VALUE )
223+ } catch (e: CancellationException ) {
224+ expect(6 )
225+ }
226+ 42
227+ }
228+ expect(2 )
229+ val timeout = withTimeoutOrNull(100 ) {
230+ expect(3 )
231+ maybe.consumeEach {
232+ expectUnreached()
233+ }
234+ expectUnreached()
235+ }
236+ assertNull(timeout)
237+ expect(5 )
238+ yield () // must cancel code inside maybe!!!
239+ finish(7 )
240+ }
214241}
Original file line number Diff line number Diff line change @@ -7,6 +7,8 @@ package kotlinx.coroutines.rx2
77import kotlinx.coroutines.*
88import org.hamcrest.core.*
99import org.junit.*
10+ import org.junit.Test
11+ import kotlin.test.*
1012
1113class ObservableTest : TestBase () {
1214 @Test
@@ -80,4 +82,64 @@ class ObservableTest : TestBase() {
8082 { assert (it is RuntimeException ) }
8183 )
8284 }
85+
86+ @Test
87+ fun testNotifyOnceOnCancellation () = runTest {
88+ expect(1 )
89+ val observable =
90+ rxObservable {
91+ expect(5 )
92+ send(" OK" )
93+ try {
94+ delay(Long .MAX_VALUE )
95+ } catch (e: CancellationException ) {
96+ expect(11 )
97+ }
98+ }
99+ .doOnNext {
100+ expect(6 )
101+ assertEquals(" OK" , it)
102+ }
103+ .doOnDispose {
104+ expect(10 ) // notified once!
105+ }
106+ expect(2 )
107+ val job = launch(start = CoroutineStart .UNDISPATCHED ) {
108+ expect(3 )
109+ observable.consumeEach{
110+ expect(8 )
111+ assertEquals(" OK" , it)
112+ }
113+ }
114+ expect(4 )
115+ yield () // to observable code
116+ expect(7 )
117+ yield () // to consuming coroutines
118+ expect(9 )
119+ job.cancel()
120+ job.join()
121+ finish(12 )
122+ }
123+
124+ @Test
125+ fun testFailingConsumer () = runTest {
126+ expect(1 )
127+ val pub = rxObservable {
128+ expect(2 )
129+ send(" OK" )
130+ try {
131+ delay(Long .MAX_VALUE )
132+ } catch (e: CancellationException ) {
133+ finish(5 )
134+ }
135+ }
136+ try {
137+ pub.consumeEach {
138+ expect(3 )
139+ throw TestException ()
140+ }
141+ } catch (e: TestException ) {
142+ expect(4 )
143+ }
144+ }
83145}
You can’t perform that action at this time.
0 commit comments