• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.ObservableSource
8 import io.reactivex.rxjava3.disposables.*
9 import kotlinx.coroutines.*
10 import org.junit.Test
11 import kotlin.test.*
12 
13 class ObservableCollectTest: TestBase() {
14 
15     /** Tests the behavior of [collect] when the publisher raises an error. */
16     @Test
17     fun testObservableCollectThrowingObservable() = runTest {
18         expect(1)
19         var sum = 0
20         try {
21             rxObservable {
22                 for (i in 0..100) {
23                     send(i)
24                 }
25                 throw TestException()
26             }.collect {
27                 sum += it
28             }
29         } catch (e: TestException) {
30             assertTrue(sum > 0)
31             finish(2)
32         }
33     }
34 
35     @Test
36     fun testObservableCollectThrowingAction() = runTest {
37         expect(1)
38         var sum = 0
39         val expectedSum = 5
40         try {
41             var disposed = false
42             ObservableSource<Int> { observer ->
43                 launch(Dispatchers.Default) {
44                     observer.onSubscribe(object : Disposable {
45                         override fun dispose() {
46                             disposed = true
47                             expect(expectedSum + 2)
48                         }
49 
50                         override fun isDisposed(): Boolean = disposed
51                     })
52                     while (!disposed) {
53                         observer.onNext(1)
54                     }
55                 }
56             }.collect {
57                 expect(sum + 2)
58                 sum += it
59                 if (sum == expectedSum) {
60                     throw TestException()
61                 }
62             }
63         } catch (e: TestException) {
64             assertEquals(expectedSum, sum)
65             finish(expectedSum + 3)
66         }
67     }
68 }