• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.reactive.*
8 import org.junit.Test
9 import reactor.core.publisher.*
10 import kotlin.test.*
11 
12 class BackpressureTest : TestBase() {
13     @Test
<lambda>null14     fun testBackpressureDropDirect() = runTest {
15         expect(1)
16         Flux.fromArray(arrayOf(1))
17             .onBackpressureDrop()
18             .collect {
19                 assertEquals(1, it)
20                 expect(2)
21             }
22         finish(3)
23     }
24 
25     @Test
<lambda>null26     fun testBackpressureDropFlow() = runTest {
27         expect(1)
28         Flux.fromArray(arrayOf(1))
29             .onBackpressureDrop()
30             .asFlow()
31             .collect {
32                 assertEquals(1, it)
33                 expect(2)
34             }
35         finish(3)
36     }
37 
38     @Test
<lambda>null39     fun testCooperativeCancellation() = runTest {
40         val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
41         flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join()
42     }
43 
44     @Test
<lambda>null45     fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) {
46         val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
<lambda>null47         val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default)
<lambda>null48         channel.consumeEach { /* Do nothing, just consume elements */ }
49     }
50 }