1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.reactive.* 11 import org.junit.Test 12 import reactor.core.publisher.* 13 import kotlin.test.* 14 15 class BackpressureTest : TestBase() { 16 @Test <lambda>null17 fun testBackpressureDropDirect() = runTest { 18 expect(1) 19 Flux.fromArray(arrayOf(1)) 20 .onBackpressureDrop() 21 .collect { 22 assertEquals(1, it) 23 expect(2) 24 } 25 finish(3) 26 } 27 28 @Test <lambda>null29 fun testBackpressureDropFlow() = runTest { 30 expect(1) 31 Flux.fromArray(arrayOf(1)) 32 .onBackpressureDrop() 33 .asFlow() 34 .collect { 35 assertEquals(1, it) 36 expect(2) 37 } 38 finish(3) 39 } 40 41 @Test <lambda>null42 fun testCooperativeCancellation() = runTest { 43 val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() 44 flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join() 45 } 46 47 @Test <lambda>null48 fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) { 49 val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow() <lambda>null50 val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default) <lambda>null51 channel.consumeEach { /* Do nothing, just consume elements */ } 52 } 53 }