• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx2
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.Observable
5 import io.reactivex.ObservableSource
6 import io.reactivex.Observer
7 import io.reactivex.disposables.Disposables
8 import io.reactivex.subjects.PublishSubject
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.testing.flow.*
13 import kotlin.test.*
14 
15 class ObservableAsFlowTest : TestBase() {
16     @Test
testCancellationnull17     fun testCancellation() = runTest {
18         var onNext = 0
19         var onCancelled = 0
20         var onError = 0
21 
22         val source = rxObservable(currentDispatcher()) {
23             coroutineContext[Job]?.invokeOnCompletion {
24                 if (it is CancellationException) ++onCancelled
25             }
26 
27             repeat(100) {
28                 send(it)
29             }
30         }
31 
32         source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
33             onEach {
34                 ++onNext
35                 throw RuntimeException()
36             }
37             catch<Throwable> {
38                 ++onError
39             }
40         }.join()
41 
42 
43         assertEquals(1, onNext)
44         assertEquals(1, onError)
45         assertEquals(1, onCancelled)
46     }
47 
48     @Test
testImmediateCollectionnull49     fun testImmediateCollection() {
50         val source = PublishSubject.create<Int>()
51         val flow = source.asFlow()
52         GlobalScope.launch(Dispatchers.Unconfined) {
53             expect(1)
54             flow.collect { expect(it) }
55             expect(6)
56         }
57         expect(2)
58         source.onNext(3)
59         expect(4)
60         source.onNext(5)
61         source.onComplete()
62         finish(7)
63     }
64 
65     @Test
testOnErrorCancellationnull66     fun testOnErrorCancellation() {
67         val source = PublishSubject.create<Int>()
68         val flow = source.asFlow()
69         val exception = RuntimeException()
70         GlobalScope.launch(Dispatchers.Unconfined) {
71             try {
72                 expect(1)
73                 flow.collect { expect(it) }
74                 expectUnreached()
75             }
76             catch (e: Exception) {
77                 assertSame(exception, e.cause)
78                 expect(5)
79             }
80             expect(6)
81         }
82         expect(2)
83         source.onNext(3)
84         expect(4)
85         source.onError(exception)
86         finish(7)
87     }
88 
89     @Test
testUnsubscribeOnCollectionExceptionnull90     fun testUnsubscribeOnCollectionException() {
91         val source = PublishSubject.create<Int>()
92         val flow = source.asFlow()
93         val exception = RuntimeException()
94         GlobalScope.launch(Dispatchers.Unconfined) {
95             try {
96                 expect(1)
97                 flow.collect {
98                     expect(it)
99                     if (it == 3) throw exception
100                 }
101                 expectUnreached()
102             }
103             catch (e: Exception) {
104                 assertSame(exception, e.cause)
105                 expect(4)
106             }
107             expect(5)
108         }
109         expect(2)
110         assertTrue(source.hasObservers())
111         source.onNext(3)
112         assertFalse(source.hasObservers())
113         finish(6)
114     }
115 
116     @Test
testLateOnSubscribenull117     fun testLateOnSubscribe() {
118         var observer: Observer<in Int>? = null
119         val source = ObservableSource<Int> { observer = it }
120         val flow = source.asFlow()
121         assertNull(observer)
122         val job = GlobalScope.launch(Dispatchers.Unconfined) {
123             expect(1)
124             flow.collect { expectUnreached() }
125             expectUnreached()
126         }
127         expect(2)
128         assertNotNull(observer)
129         job.cancel()
130         val disposable = Disposables.empty()
131         observer!!.onSubscribe(disposable)
132         assertTrue(disposable.isDisposed)
133         finish(3)
134     }
135 
136     @Test
<lambda>null137     fun testBufferUnlimited() = runTest {
138         val source = rxObservable(currentDispatcher()) {
139             expect(1); send(10)
140             expect(2); send(11)
141             expect(3); send(12)
142             expect(4); send(13)
143             expect(5); send(14)
144             expect(6); send(15)
145             expect(7); send(16)
146             expect(8); send(17)
147             expect(9)
148         }
149         source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
150         finish(18)
151     }
152 
153     @Test
<lambda>null154     fun testConflated() = runTest {
155         val source = Observable.range(1, 5)
156         val list = source.asFlow().conflate().toList()
157         assertEquals(listOf(1, 5), list)
158     }
159 
160     @Test
<lambda>null161     fun testLongRange() = runTest {
162         val source = Observable.range(1, 10_000)
163         val count = source.asFlow().count()
164         assertEquals(10_000, count)
165     }
166 
167     @Test
<lambda>null168     fun testProduce() = runTest {
169         val source = Observable.range(0, 10)
170         val flow = source.asFlow()
171         check((0..9).toList(), flow.produceIn(this))
172         check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
173         check((0..9).toList(), flow.buffer(2).produceIn(this))
174         check((0..9).toList(), flow.buffer(0).produceIn(this))
175         check(listOf(0, 9), flow.conflate().produceIn(this))
176     }
177 
checknull178     private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
179         val result = ArrayList<Int>(10)
180         channel.consumeEach { result.add(it) }
181         assertEquals(expected, result)
182     }
183 }
184