<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.exceptions.*
7 import io.reactivex.rxjava3.internal.functions.Functions.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.CancellationException
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14
15 class MaybeTest : TestBase() {
16 @Before
17 fun setup() {
18 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
19 }
20
21 @Test
22 fun testBasicSuccess() = runBlocking {
23 expect(1)
24 val maybe = rxMaybe(currentDispatcher()) {
25 expect(4)
26 "OK"
27 }
28 expect(2)
29 maybe.subscribe { value ->
30 expect(5)
31 assertEquals("OK", value)
32 }
33 expect(3)
34 yield() // to started coroutine
35 finish(6)
36 }
37
38 @Test
39 fun testBasicEmpty() = runBlocking {
40 expect(1)
41 val maybe = rxMaybe(currentDispatcher()) {
42 expect(4)
43 null
44 }
45 expect(2)
46 maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
47 expect(5)
48 })
49 expect(3)
50 yield() // to started coroutine
51 finish(6)
52 }
53
54 @Test
55 fun testBasicFailure() = runBlocking {
56 expect(1)
57 val maybe = rxMaybe(currentDispatcher()) {
58 expect(4)
59 throw RuntimeException("OK")
60 }
61 expect(2)
62 maybe.subscribe({
63 expectUnreached()
64 }, { error ->
65 expect(5)
66 assertIs<RuntimeException>(error)
67 assertEquals("OK", error.message)
68 })
69 expect(3)
70 yield() // to started coroutine
71 finish(6)
72 }
73
74
75 @Test
76 fun testBasicUnsubscribe() = runBlocking {
77 expect(1)
78 val maybe = rxMaybe(currentDispatcher()) {
79 expect(4)
80 yield() // back to main, will get cancelled
81 expectUnreached()
82 }
83 expect(2)
84 // nothing is called on a disposed rx2 maybe
85 val sub = maybe.subscribe({
86 expectUnreached()
87 }, {
88 expectUnreached()
89 })
90 expect(3)
91 yield() // to started coroutine
92 expect(5)
93 sub.dispose() // will cancel coroutine
94 yield()
95 finish(6)
96 }
97
98 @Test
99 fun testMaybeNoWait() {
100 val maybe = rxMaybe {
101 "OK"
102 }
103
104 checkMaybeValue(maybe) {
105 assertEquals("OK", it)
106 }
107 }
108
109 @Test
110 fun testMaybeAwait() = runBlocking {
111 assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
112 assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
113 }
114
115 @Test
116 fun testMaybeAwaitForNull(): Unit = runBlocking {
117 assertNull(Maybe.empty<String>().awaitSingleOrNull())
118 assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
119 }
120
121 /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
122 * [Job] is cancelled. */
123 @Test
124 fun testMaybeAwaitCancellation() = runTest {
125 expect(1)
126 val maybe = MaybeSource<Int> { s ->
127 s.onSubscribe(object: Disposable {
128 override fun dispose() { expect(4) }
129 override fun isDisposed(): Boolean { expectUnreached(); return false }
130 })
131 }
132 val job = launch(start = CoroutineStart.UNDISPATCHED) {
133 try {
134 expect(2)
135 maybe.awaitSingleOrNull()
136 } catch (e: CancellationException) {
137 expect(5)
138 throw e
139 }
140 }
141 expect(3)
142 job.cancelAndJoin()
143 finish(6)
144 }
145
146 @Test
147 fun testMaybeEmitAndAwait() {
148 val maybe = rxMaybe {
149 Maybe.just("O").awaitSingleOrNull() + "K"
150 }
151
152 checkMaybeValue(maybe) {
153 assertEquals("OK", it)
154 }
155 }
156
157 @Test
158 fun testMaybeWithDelay() {
159 val maybe = rxMaybe {
160 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
161 }
162
163 checkMaybeValue(maybe) {
164 assertEquals("OK", it)
165 }
166 }
167
168 @Test
169 fun testMaybeException() {
170 val maybe = rxMaybe {
171 Observable.just("O", "K").awaitSingle() + "K"
172 }
173
174 checkErroneous(maybe) {
175 assert(it is IllegalArgumentException)
176 }
177 }
178
179 @Test
180 fun testAwaitFirst() {
181 val maybe = rxMaybe {
182 Observable.just("O", "#").awaitFirst() + "K"
183 }
184
185 checkMaybeValue(maybe) {
186 assertEquals("OK", it)
187 }
188 }
189
190 @Test
191 fun testAwaitLast() {
192 val maybe = rxMaybe {
193 Observable.just("#", "O").awaitLast() + "K"
194 }
195
196 checkMaybeValue(maybe) {
197 assertEquals("OK", it)
198 }
199 }
200
201 @Test
202 fun testExceptionFromObservable() {
203 val maybe = rxMaybe {
204 try {
205 Observable.error<String>(RuntimeException("O")).awaitFirst()
206 } catch (e: RuntimeException) {
207 Observable.just(e.message!!).awaitLast() + "K"
208 }
209 }
210
211 checkMaybeValue(maybe) {
212 assertEquals("OK", it)
213 }
214 }
215
216 @Test
217 fun testExceptionFromCoroutine() {
218 val maybe = rxMaybe<String> {
219 throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
220 }
221
222 checkErroneous(maybe) {
223 assert(it is IllegalStateException)
224 assertEquals("OK", it.message)
225 }
226 }
227
228 @Test
229 fun testCancelledConsumer() = runTest {
230 expect(1)
231 val maybe = rxMaybe(currentDispatcher()) {
232 expect(4)
233 try {
234 delay(Long.MAX_VALUE)
235 } catch (e: CancellationException) {
236 expect(6)
237 }
238 42
239 }
240 expect(2)
241 val timeout = withTimeoutOrNull(100) {
242 expect(3)
243 maybe.collect {
244 expectUnreached()
245 }
246 expectUnreached()
247 }
248 assertNull(timeout)
249 expect(5)
250 yield() // must cancel code inside maybe!!!
251 finish(7)
252 }
253
254 /** Tests the simple scenario where the Maybe doesn't output a value. */
255 @Test
256 fun testMaybeCollectEmpty() = runTest {
257 expect(1)
258 Maybe.empty<Int>().collect {
259 expectUnreached()
260 }
261 finish(2)
262 }
263
264 /** Tests the simple scenario where the Maybe doesn't output a value. */
265 @Test
266 fun testMaybeCollectSingle() = runTest {
267 expect(1)
268 Maybe.just("OK").collect {
269 assertEquals("OK", it)
270 expect(2)
271 }
272 finish(3)
273 }
274
275 /** Tests the behavior of [collect] when the Maybe raises an error. */
276 @Test
277 fun testMaybeCollectThrowingMaybe() = runTest {
278 expect(1)
279 try {
280 Maybe.error<Int>(TestException()).collect {
281 expectUnreached()
282 }
283 } catch (e: TestException) {
284 expect(2)
285 }
286 finish(3)
287 }
288
289 /** Tests the behavior of [collect] when the action throws. */
290 @Test
291 fun testMaybeCollectThrowingAction() = runTest {
292 expect(1)
293 try {
294 Maybe.just("OK").collect {
295 expect(2)
296 throw TestException()
297 }
298 } catch (e: TestException) {
299 expect(3)
300 }
301 finish(4)
302 }
303
304 @Test
305 fun testSuppressedException() = runTest {
306 val maybe = rxMaybe(currentDispatcher()) {
307 launch(start = CoroutineStart.ATOMIC) {
308 throw TestException() // child coroutine fails
309 }
310 try {
311 delay(Long.MAX_VALUE)
312 } finally {
313 throw TestException2() // but parent throws another exception while cleaning up
314 }
315 }
316 try {
317 maybe.awaitSingleOrNull()
318 expectUnreached()
319 } catch (e: TestException) {
320 assertIs<TestException2>(e.suppressed[0])
321 }
322 }
323
324 @Test
325 fun testUnhandledException() = runTest {
326 expect(1)
327 var disposable: Disposable? = null
328 val handler = { e: Throwable ->
329 assertTrue(e is UndeliverableException && e.cause is TestException)
330 expect(5)
331 }
332 val maybe = rxMaybe(currentDispatcher()) {
333 expect(4)
334 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
335 try {
336 delay(Long.MAX_VALUE)
337 } finally {
338 throw TestException() // would not be able to handle it since mono is disposed
339 }
340 }
341 withExceptionHandler(handler) {
342 maybe.subscribe(object : MaybeObserver<Unit> {
343 override fun onSubscribe(d: Disposable) {
344 expect(2)
345 disposable = d
346 }
347
348 override fun onComplete() {
349 expectUnreached()
350 }
351
352 override fun onSuccess(t: Unit) {
353 expectUnreached()
354 }
355
356 override fun onError(t: Throwable) {
357 expectUnreached()
358 }
359 })
360 expect(3)
361 yield() // run coroutine
362 finish(6)
363 }
364 }
365
366 @Test
367 fun testFatalExceptionInSubscribe() = runTest {
368 val handler = { e: Throwable ->
369 assertTrue(e is UndeliverableException && e.cause is LinkageError)
370 expect(2)
371 }
372
373 withExceptionHandler(handler) {
374 rxMaybe(Dispatchers.Unconfined) {
375 expect(1)
376 42
377 }.subscribe { throw LinkageError() }
378 finish(3)
379 }
380 }
381
382 @Test
383 fun testFatalExceptionInSingle() = runTest {
384 rxMaybe(Dispatchers.Unconfined) {
385 throw LinkageError()
386 }.subscribe({ expectUnreached() }, { expect(1); assertIs<LinkageError>(it) })
387 finish(2)
388 }
389 }
390