1 package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.channels.* 6 import kotlinx.coroutines.flow.* 7 import kotlinx.coroutines.reactive.* 8 import org.junit.Test 9 import reactor.core.publisher.* 10 import kotlin.test.* 11 12 class BackpressureTest : TestBase() { 13 @Test <lambda>null14 fun testBackpressureDropDirect() = runTest { 15 expect(1) 16 Flux.fromArray(arrayOf(1)) 17 .onBackpressureDrop() 18 .collect { 19 assertEquals(1, it) 20 expect(2) 21 } 22 finish(3) 23 } 24 25 @Test <lambda>null26 fun testBackpressureDropFlow() = runTest { 27 expect(1) 28 Flux.fromArray(arrayOf(1)) 29 .onBackpressureDrop() 30 .asFlow() 31 .collect { 32 assertEquals(1, it) 33 expect(2) 34 } 35 finish(3) 36 } 37 38 @Test <lambda>null39 fun testCooperativeCancellation() = runTest { 40 val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() 41 flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join() 42 } 43 44 @Test <lambda>null45 fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) { 46 val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() <lambda>null47 val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default) <lambda>null48 channel.consumeEach { /* Do nothing, just consume elements */ } 49 } 50 }