• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import org.junit.Test
10 import java.util.concurrent.Flow as JFlow
11 import kotlin.test.*
12 
13 class FlowAsPublisherTest : TestBase() {
14 
15     @Test
testErrorOnCancellationIsReportednull16     fun testErrorOnCancellationIsReported() {
17         expect(1)
18         flow {
19             try {
20                 emit(2)
21             } finally {
22                 expect(3)
23                 throw TestException()
24             }
25         }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
26             private lateinit var subscription: JFlow.Subscription
27 
28             override fun onComplete() {
29                 expectUnreached()
30             }
31 
32             override fun onSubscribe(s: JFlow.Subscription?) {
33                 subscription = s!!
34                 subscription.request(2)
35             }
36 
37             override fun onNext(t: Int) {
38                 expect(t)
39                 subscription.cancel()
40             }
41 
42             override fun onError(t: Throwable?) {
43                 assertTrue(t is TestException)
44                 expect(4)
45             }
46         })
47         finish(5)
48     }
49 
50     @Test
testCancellationIsNotReportednull51     fun testCancellationIsNotReported() {
52         expect(1)
53         flow {
54             emit(2)
55         }.asPublisher().subscribe(object : JFlow.Subscriber<Int> {
56             private lateinit var subscription: JFlow.Subscription
57 
58             override fun onComplete() {
59                 expectUnreached()
60             }
61 
62             override fun onSubscribe(s: JFlow.Subscription?) {
63                 subscription = s!!
64                 subscription.request(2)
65             }
66 
67             override fun onNext(t: Int) {
68                 expect(t)
69                 subscription.cancel()
70             }
71 
72             override fun onError(t: Throwable?) {
73                 expectUnreached()
74             }
75         })
76         finish(3)
77     }
78 
79     @Test
<lambda>null80     fun testFlowWithTimeout() = runTest {
81         val publisher = flow<Int> {
82             expect(2)
83             withTimeout(1) { delay(Long.MAX_VALUE) }
84         }.asPublisher()
85         try {
86             expect(1)
87             publisher.awaitFirstOrNull()
88         } catch (e: CancellationException) {
89             expect(3)
90         }
91         finish(4)
92     }
93 }
94