• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.*
6 import org.reactivestreams.*
7 
8 class PublisherBackpressureTest : TestBase() {
9     @Test
<lambda>null10     fun testCancelWhileBPSuspended() = runBlocking {
11         expect(1)
12         val observable = publish(currentDispatcher()) {
13             expect(5)
14             send("A") // will not suspend, because an item was requested
15             expect(7)
16             send("B") // second requested item
17             expect(9)
18             try {
19                 send("C") // will suspend (no more requested)
20             } finally {
21                 expect(12)
22             }
23             expectUnreached()
24         }
25         expect(2)
26         var sub: Subscription? = null
27         observable.subscribe(object : Subscriber<String> {
28             override fun onSubscribe(s: Subscription) {
29                 sub = s
30                 expect(3)
31                 s.request(2) // request two items
32             }
33 
34             override fun onNext(t: String) {
35                 when (t) {
36                     "A" -> expect(6)
37                     "B" -> expect(8)
38                     else -> error("Should not happen")
39                 }
40             }
41 
42             override fun onComplete() {
43                 expectUnreached()
44             }
45 
46             override fun onError(e: Throwable) {
47                 expectUnreached()
48             }
49         })
50         expect(4)
51         yield() // yield to observable coroutine
52         expect(10)
53         sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal)
54         expect(11)
55         yield() // shall perform finally in coroutine
56         finish(13)
57     }
58 }