• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import kotlinx.coroutines.flow.*
10 import org.junit.Test
11 import org.reactivestreams.*
12 import java.util.concurrent.*
13 import kotlin.test.*
14 
15 class FlowAsPublisherTest : TestBase() {
16     @Test
17     fun testErrorOnCancellationIsReported() {
18         expect(1)
19         flow {
20             try {
21                 emit(2)
22             } finally {
23                 expect(3)
24                 throw TestException()
25             }
26         }.asPublisher().subscribe(object : Subscriber<Int> {
27             private lateinit var subscription: Subscription
28 
29             override fun onComplete() {
30                 expectUnreached()
31             }
32 
33             override fun onSubscribe(s: Subscription?) {
34                 subscription = s!!
35                 subscription.request(2)
36             }
37 
38             override fun onNext(t: Int) {
39                 expect(t)
40                 subscription.cancel()
41             }
42 
43             override fun onError(t: Throwable?) {
44                 assertTrue(t is TestException)
45                 expect(4)
46             }
47         })
48         finish(5)
49     }
50 
51     @Test
52     fun testCancellationIsNotReported() {
53         expect(1)
54         flow {
55             emit(2)
56         }.asPublisher().subscribe(object : Subscriber<Int> {
57             private lateinit var subscription: Subscription
58 
59             override fun onComplete() {
60                 expectUnreached()
61             }
62 
63             override fun onSubscribe(s: Subscription?) {
64                 subscription = s!!
65                 subscription.request(2)
66             }
67 
68             override fun onNext(t: Int) {
69                 expect(t)
70                 subscription.cancel()
71             }
72 
73             override fun onError(t: Throwable?) {
74                 expectUnreached()
75             }
76         })
77         finish(3)
78     }
79 
80     @Test
81     fun testUnconfinedDefaultContext() {
82         expect(1)
83         val thread = Thread.currentThread()
84         fun checkThread() {
85             assertSame(thread, Thread.currentThread())
86         }
87         flowOf(42).asPublisher().subscribe(object : Subscriber<Int> {
88             private lateinit var subscription: Subscription
89 
90             override fun onSubscribe(s: Subscription) {
91                 expect(2)
92                 subscription = s
93                 subscription.request(2)
94             }
95 
96             override fun onNext(t: Int) {
97                 checkThread()
98                 expect(3)
99                 assertEquals(42, t)
100             }
101 
102             override fun onComplete() {
103                 checkThread()
104                 expect(4)
105             }
106 
107             override fun onError(t: Throwable?) {
108                 expectUnreached()
109             }
110         })
111         finish(5)
112     }
113 
114     @Test
115     fun testConfinedContext() {
116         expect(1)
117         val threadName = "FlowAsPublisherTest.testConfinedContext"
118         fun checkThread() {
119             val currentThread = Thread.currentThread()
120             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
121         }
122         val completed = CountDownLatch(1)
123         newSingleThreadContext(threadName).use { dispatcher ->
124             flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> {
125                 private lateinit var subscription: Subscription
126 
127                 override fun onSubscribe(s: Subscription) {
128                     expect(2)
129                     subscription = s
130                     subscription.request(2)
131                 }
132 
133                 override fun onNext(t: Int) {
134                     checkThread()
135                     expect(3)
136                     assertEquals(42, t)
137                 }
138 
139                 override fun onComplete() {
140                     checkThread()
141                     expect(4)
142                     completed.countDown()
143                 }
144 
145                 override fun onError(t: Throwable?) {
146                     expectUnreached()
147                 }
148             })
149             completed.await()
150         }
151         finish(5)
152     }
153 
154     @Test
155     fun testFlowWithTimeout() = runTest {
156         val publisher = flow<Int> {
157             expect(2)
158             withTimeout(1) { delay(Long.MAX_VALUE) }
159         }.asPublisher()
160         try {
161             expect(1)
162             publisher.awaitFirstOrNull()
163         } catch (e: CancellationException) {
164             expect(3)
165         }
166         finish(4)
167     }
168 }
169