• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.junit.*
9 import org.reactivestreams.*
10 
11 class AwaitTest: TestBase() {
12 
13     /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
14      * unsubscribe from the publisher when their [Job] is cancelled. */
15     @Test
16     fun testAwaitCancellation() = runTest {
17         expect(1)
18         val publisher = Publisher<Int> { s ->
19             s.onSubscribe(object: Subscription {
20                 override fun request(n: Long) {
21                     expect(3)
22                 }
23 
24                 override fun cancel() {
25                     expect(5)
26                 }
27             })
28         }
29         val job = launch(start = CoroutineStart.UNDISPATCHED) {
30             try {
31                 expect(2)
32                 publisher.awaitFirst()
33             } catch (e: CancellationException) {
34                 expect(6)
35                 throw e
36             }
37         }
38         expect(4)
39         job.cancelAndJoin()
40         finish(7)
41     }
42 
43 }