<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import kotlin.test.*
11
12 class FlowAsObservableTest : TestBase() {
13 @Test
14 fun testBasicSuccess() = runTest {
15 expect(1)
16 val observable = flow {
17 expect(3)
18 emit("OK")
19 }.asObservable()
20
21 expect(2)
22 observable.subscribe { value ->
23 expect(4)
24 assertEquals("OK", value)
25 }
26
27 finish(5)
28 }
29
30 @Test
31 fun testBasicFailure() = runTest {
32 expect(1)
33 val observable = flow<Int> {
34 expect(3)
35 throw RuntimeException("OK")
36 }.asObservable()
37
38 expect(2)
39 observable.subscribe({ expectUnreached() }, { error ->
40 expect(4)
41 assertIs<RuntimeException>(error)
42 assertEquals("OK", error.message)
43 })
44 finish(5)
45 }
46
47 @Test
48 fun testBasicUnsubscribe() = runTest {
49 expect(1)
50 val observable = flow<Int> {
51 expect(3)
52 hang {
53 expect(4)
54 }
55 }.asObservable()
56
57 expect(2)
58 val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
59 sub.dispose() // will cancel coroutine
60 finish(5)
61 }
62
63 @Test
64 fun testNotifyOnceOnCancellation() = runTest {
65 val observable =
66 flow {
67 expect(3)
68 emit("OK")
69 hang {
70 expect(7)
71 }
72 }.asObservable()
73 .doOnNext {
74 expect(4)
75 assertEquals("OK", it)
76 }
77 .doOnDispose {
78 expect(6) // notified once!
79 }
80
81 expect(1)
82 val job = launch(start = CoroutineStart.UNDISPATCHED) {
83 expect(2)
84 observable.collect {
85 expect(5)
86 assertEquals("OK", it)
87 }
88 }
89
90 yield()
91 job.cancelAndJoin()
92 finish(8)
93 }
94
95 @Test
96 fun testFailingConsumer() = runTest {
97 expect(1)
98 val observable = flow {
99 expect(2)
100 emit("OK")
101 hang {
102 expect(4)
103 }
104
105 }.asObservable()
106
107 try {
108 observable.collect {
109 expect(3)
110 throw TestException()
111 }
112 } catch (e: TestException) {
113 finish(5)
114 }
115 }
116
117 @Test
118 fun testNonAtomicStart() = runTest {
119 withContext(Dispatchers.Unconfined) {
120 val observable = flow<Int> {
121 expect(1)
122 }.asObservable()
123
124 val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
125 disposable.dispose()
126 }
127 finish(2)
128 }
129
130 @Test
131 fun testFlowCancelledFromWithin() = runTest {
132 val observable = flow {
133 expect(1)
134 emit(1)
135 kotlin.coroutines.coroutineContext.cancel()
136 kotlin.coroutines.coroutineContext.ensureActive()
137 expectUnreached()
138 }.asObservable()
139
140 observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
141 }
142
143 @Test
144 fun testUnconfinedDefaultContext() {
145 expect(1)
146 val thread = Thread.currentThread()
147 fun checkThread() {
148 assertSame(thread, Thread.currentThread())
149 }
150 flowOf(42).asObservable().subscribe(object : Observer<Int> {
151 override fun onSubscribe(d: Disposable) {
152 expect(2)
153 }
154
155 override fun onNext(t: Int) {
156 checkThread()
157 expect(3)
158 assertEquals(42, t)
159 }
160
161 override fun onComplete() {
162 checkThread()
163 expect(4)
164 }
165
166 override fun onError(t: Throwable) {
167 expectUnreached()
168 }
169 })
170 finish(5)
171 }
172
173 @Test
174 fun testConfinedContext() {
175 expect(1)
176 val threadName = "FlowAsObservableTest.testConfinedContext"
177 fun checkThread() {
178 val currentThread = Thread.currentThread()
179 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
180 }
181 val completed = CountDownLatch(1)
182 newSingleThreadContext(threadName).use { dispatcher ->
183 flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> {
184 override fun onSubscribe(d: Disposable) {
185 expect(2)
186 }
187
188 override fun onNext(t: Int) {
189 checkThread()
190 expect(3)
191 assertEquals(42, t)
192 }
193
194 override fun onComplete() {
195 checkThread()
196 expect(4)
197 completed.countDown()
198 }
199
200 override fun onError(e: Throwable) {
201 expectUnreached()
202 }
203 })
204 completed.await()
205 }
206 finish(5)
207 }
208 }
209