• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.reactive.*
11 import org.junit.Test
12 import reactor.core.publisher.*
13 import kotlin.test.*
14 
15 class BackpressureTest : TestBase() {
16     @Test
<lambda>null17     fun testBackpressureDropDirect() = runTest {
18         expect(1)
19         Flux.fromArray(arrayOf(1))
20             .onBackpressureDrop()
21             .collect {
22                 assertEquals(1, it)
23                 expect(2)
24             }
25         finish(3)
26     }
27 
28     @Test
<lambda>null29     fun testBackpressureDropFlow() = runTest {
30         expect(1)
31         Flux.fromArray(arrayOf(1))
32             .onBackpressureDrop()
33             .asFlow()
34             .collect {
35                 assertEquals(1, it)
36                 expect(2)
37             }
38         finish(3)
39     }
40 
41     @Test
<lambda>null42     fun testCooperativeCancellation() = runTest {
43         val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
44         flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.launchIn(this + Dispatchers.Default).join()
45     }
46 
47     @Test
<lambda>null48     fun testCooperativeCancellationForBuffered() = runTest(expected = { it is CancellationException }) {
49         val flow = Flux.fromIterable((0L..Long.MAX_VALUE)).asFlow()
<lambda>null50         val channel = flow.onEach { if (it > 10) currentCoroutineContext().cancel() }.produceIn(this + Dispatchers.Default)
<lambda>null51         channel.consumeEach { /* Do nothing, just consume elements */ }
52     }
53 }