-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Description
In IJ we have a blocking API which reacts on its own cancellation handles. I'm attaching invokeOnCompletion(onCancelling = true) handler to cancel IJ own cancellation handle, but the invokeOnCompletion handler is never invoked despite both emitter and collector being executed in Dispatchers.Default, and collector launches a standalone coroutine for each collected element.
Provide a Reproducer
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.first
fun main() {
runBlocking(Dispatchers.Default) {
val flow = MutableSharedFlow<Int>()
val collector = launch {
flow.collectLatest { value ->
@OptIn(InternalCoroutinesApi::class)
currentCoroutineContext().job.invokeOnCompletion(onCancelling = true, handler = {
if (it == null) {
println("$value completed")
}
else {
println("$value canceled; t: $it")
}
})
Thread.sleep(100)
println("$value processed")
}
}
flow.subscriptionCount.first { it == 1 }
repeat(3) {
flow.emit(it)
}
collector.cancel()
}
}
Expected:
0 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@xxxxxx
0 processed
1 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@xxxxxx
1 processed
2 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@5c79719e
2 processed
Actual:
0 processed
0 completed
1 processed
1 completed
2 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@5c79719e
2 processed
art-shen