1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.junit.* 9 import org.reactivestreams.* 10 11 class PublisherBackpressureTest : TestBase() { 12 @Test <lambda>null13 fun testCancelWhileBPSuspended() = runBlocking { 14 expect(1) 15 val observable = publish(currentDispatcher()) { 16 expect(5) 17 send("A") // will not suspend, because an item was requested 18 expect(7) 19 send("B") // second requested item 20 expect(9) 21 try { 22 send("C") // will suspend (no more requested) 23 } finally { 24 expect(12) 25 } 26 expectUnreached() 27 } 28 expect(2) 29 var sub: Subscription? = null 30 observable.subscribe(object : Subscriber<String> { 31 override fun onSubscribe(s: Subscription) { 32 sub = s 33 expect(3) 34 s.request(2) // request two items 35 } 36 37 override fun onNext(t: String) { 38 when (t) { 39 "A" -> expect(6) 40 "B" -> expect(8) 41 else -> error("Should not happen") 42 } 43 } 44 45 override fun onComplete() { 46 expectUnreached() 47 } 48 49 override fun onError(e: Throwable) { 50 expectUnreached() 51 } 52 }) 53 expect(4) 54 yield() // yield to observable coroutine 55 expect(10) 56 sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal) 57 expect(11) 58 yield() // shall perform finally in coroutine 59 finish(13) 60 } 61 }