1 package kotlinx.coroutines.reactive 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import org.junit.* 6 import org.reactivestreams.* 7 8 class PublisherBackpressureTest : TestBase() { 9 @Test <lambda>null10 fun testCancelWhileBPSuspended() = runBlocking { 11 expect(1) 12 val observable = publish(currentDispatcher()) { 13 expect(5) 14 send("A") // will not suspend, because an item was requested 15 expect(7) 16 send("B") // second requested item 17 expect(9) 18 try { 19 send("C") // will suspend (no more requested) 20 } finally { 21 expect(12) 22 } 23 expectUnreached() 24 } 25 expect(2) 26 var sub: Subscription? = null 27 observable.subscribe(object : Subscriber<String> { 28 override fun onSubscribe(s: Subscription) { 29 sub = s 30 expect(3) 31 s.request(2) // request two items 32 } 33 34 override fun onNext(t: String) { 35 when (t) { 36 "A" -> expect(6) 37 "B" -> expect(8) 38 else -> error("Should not happen") 39 } 40 } 41 42 override fun onComplete() { 43 expectUnreached() 44 } 45 46 override fun onError(e: Throwable) { 47 expectUnreached() 48 } 49 }) 50 expect(4) 51 yield() // yield to observable coroutine 52 expect(10) 53 sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal) 54 expect(11) 55 yield() // shall perform finally in coroutine 56 finish(13) 57 } 58 }