• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.coroutines.*
10 import org.junit.Test
11 import kotlin.test.*
12 
13 class ObservableCollectTest: TestBase() {
14 
15     /** Tests the behavior of [collect] when the publisher raises an error. */
16     @Test
17     fun testObservableCollectThrowingObservable() = runTest {
18         expect(1)
19         var sum = 0
20         try {
21             rxObservable {
22                 for (i in 0..100) {
23                     send(i)
24                 }
25                 throw TestException()
26             }.collect {
27                 sum += it
28             }
29         } catch (e: TestException) {
30             assertTrue(sum > 0)
31             finish(2)
32         }
33     }
34 
35     /** Tests the behavior of [collect] when the action throws. */
36     @Test
37     fun testObservableCollectThrowingAction() = runTest {
38         expect(1)
39         var sum = 0
40         val expectedSum = 5
41         try {
42             var disposed = false
43             ObservableSource<Int> { observer ->
44                 launch(Dispatchers.Default) {
45                     observer.onSubscribe(object : Disposable {
46                         override fun dispose() {
47                             disposed = true
48                             expect(expectedSum + 2)
49                         }
50 
51                         override fun isDisposed(): Boolean = disposed
52                     })
53                     while (!disposed) {
54                         observer.onNext(1)
55                     }
56                 }
57             }.collect {
58                 expect(sum + 2)
59                 sum += it
60                 if (sum == expectedSum) {
61                     throw TestException()
62                 }
63             }
64         } catch (e: TestException) {
65             assertEquals(expectedSum, sum)
66             finish(expectedSum + 3)
67         }
68     }
69 }