Skip to content

Commit 4e47af4

Browse files
committed
Optimize debounce operator allocation pressure by using conflated produce. Previously it was not possible due to not implemented #1235
1 parent 897f02e commit 4e47af4

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,11 @@ import kotlin.jvm.*
1414
@JvmField
1515
@SharedImmutable
1616
internal val NULL = Symbol("NULL")
17+
18+
/*
19+
* Symbol used to indicate that the flow is complete.
20+
* It should never leak to the outside world.
21+
*/
22+
@JvmField
23+
@SharedImmutable
24+
internal val DONE = Symbol("DONE")

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,21 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
4242
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
4343
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
4444
return scopedFlow { downstream ->
45-
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
46-
// Channel is not closed deliberately as there is no close with value
47-
val collector = async {
48-
collect { value -> values.send(value ?: NULL) }
45+
// Actually Any, KT-30796
46+
val values = produce<Any?>(capacity = Channel.CONFLATED) {
47+
collect { value -> send(value ?: NULL) }
4948
}
50-
51-
var isDone = false
5249
var lastValue: Any? = null
53-
while (!isDone) {
50+
while (lastValue !== DONE) {
5451
select<Unit> {
55-
values.onReceive {
56-
lastValue = it
52+
// Should be receiveOrClosed when boxing issues are fixed
53+
values.onReceiveOrNull {
54+
if (it == null) {
55+
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
56+
lastValue = DONE
57+
} else {
58+
lastValue = it
59+
}
5760
}
5861

5962
lastValue?.let { value ->
@@ -63,12 +66,6 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
6366
downstream.emit(NULL.unbox(value))
6467
}
6568
}
66-
67-
// Close with value 'idiom'
68-
collector.onAwait {
69-
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
70-
isDone = true
71-
}
7269
}
7370
}
7471
}
@@ -98,16 +95,14 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
9895
// Actually Any, KT-30796
9996
collect { value -> send(value ?: NULL) }
10097
}
101-
102-
var isDone = false
10398
var lastValue: Any? = null
10499
val ticker = fixedPeriodTicker(periodMillis)
105-
while (!isDone) {
100+
while (lastValue !== DONE) {
106101
select<Unit> {
107102
values.onReceiveOrNull {
108103
if (it == null) {
109104
ticker.cancel(ChildCancelledException())
110-
isDone = true
105+
lastValue = DONE
111106
} else {
112107
lastValue = it
113108
}

0 commit comments

Comments
 (0)