1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.junit.* 9 import org.reactivestreams.* 10 11 class AwaitTest: TestBase() { 12 13 /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and 14 * unsubscribe from the publisher when their [Job] is cancelled. */ 15 @Test 16 fun testAwaitCancellation() = runTest { 17 expect(1) 18 val publisher = Publisher<Int> { s -> 19 s.onSubscribe(object: Subscription { 20 override fun request(n: Long) { 21 expect(3) 22 } 23 24 override fun cancel() { 25 expect(5) 26 } 27 }) 28 } 29 val job = launch(start = CoroutineStart.UNDISPATCHED) { 30 try { 31 expect(2) 32 publisher.awaitFirst() 33 } catch (e: CancellationException) { 34 expect(6) 35 throw e 36 } 37 } 38 expect(4) 39 job.cancelAndJoin() 40 finish(7) 41 } 42 43 }