• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import kotlin.test.*
11 
12 class FlowAsObservableTest : TestBase() {
13     @Test
14     fun testBasicSuccess() = runTest {
15         expect(1)
16         val observable = flow {
17             expect(3)
18             emit("OK")
19         }.asObservable()
20 
21         expect(2)
22         observable.subscribe { value ->
23             expect(4)
24             assertEquals("OK", value)
25         }
26 
27         finish(5)
28     }
29 
30     @Test
31     fun testBasicFailure() = runTest {
32         expect(1)
33         val observable = flow<Int> {
34             expect(3)
35             throw RuntimeException("OK")
36         }.asObservable()
37 
38         expect(2)
39         observable.subscribe({ expectUnreached() }, { error ->
40             expect(4)
41             assertIs<RuntimeException>(error)
42             assertEquals("OK", error.message)
43         })
44         finish(5)
45     }
46 
47     @Test
48     fun testBasicUnsubscribe() = runTest {
49         expect(1)
50         val observable = flow<Int> {
51             expect(3)
52             hang {
53                 expect(4)
54             }
55         }.asObservable()
56 
57         expect(2)
58         val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
59         sub.dispose() // will cancel coroutine
60         finish(5)
61     }
62 
63     @Test
64     fun testNotifyOnceOnCancellation() = runTest {
65         val observable =
66             flow {
67                 expect(3)
68                 emit("OK")
69                 hang {
70                     expect(7)
71                 }
72             }.asObservable()
73                 .doOnNext {
74                     expect(4)
75                     assertEquals("OK", it)
76                 }
77                 .doOnDispose {
78                     expect(6) // notified once!
79                 }
80 
81         expect(1)
82         val job = launch(start = CoroutineStart.UNDISPATCHED) {
83             expect(2)
84             observable.collect {
85                 expect(5)
86                 assertEquals("OK", it)
87             }
88         }
89 
90         yield()
91         job.cancelAndJoin()
92         finish(8)
93     }
94 
95     @Test
96     fun testFailingConsumer() = runTest {
97         expect(1)
98         val observable = flow {
99             expect(2)
100             emit("OK")
101             hang {
102                 expect(4)
103             }
104 
105         }.asObservable()
106 
107         try {
108             observable.collect {
109                 expect(3)
110                 throw TestException()
111             }
112         } catch (e: TestException) {
113             finish(5)
114         }
115     }
116 
117     @Test
118     fun testNonAtomicStart() = runTest {
119         withContext(Dispatchers.Unconfined) {
120             val observable = flow<Int> {
121                 expect(1)
122             }.asObservable()
123 
124             val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
125             disposable.dispose()
126         }
127         finish(2)
128     }
129 
130     @Test
131     fun testFlowCancelledFromWithin() = runTest {
132         val observable = flow {
133             expect(1)
134             emit(1)
135             kotlin.coroutines.coroutineContext.cancel()
136             kotlin.coroutines.coroutineContext.ensureActive()
137             expectUnreached()
138         }.asObservable()
139 
140         observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
141     }
142 
143     @Test
144     fun testUnconfinedDefaultContext() {
145         expect(1)
146         val thread = Thread.currentThread()
147         fun checkThread() {
148             assertSame(thread, Thread.currentThread())
149         }
150         flowOf(42).asObservable().subscribe(object : Observer<Int> {
151             override fun onSubscribe(d: Disposable) {
152                 expect(2)
153             }
154 
155             override fun onNext(t: Int) {
156                 checkThread()
157                 expect(3)
158                 assertEquals(42, t)
159             }
160 
161             override fun onComplete() {
162                 checkThread()
163                 expect(4)
164             }
165 
166             override fun onError(t: Throwable) {
167                 expectUnreached()
168             }
169         })
170         finish(5)
171     }
172 
173     @Test
174     fun testConfinedContext() {
175         expect(1)
176         val threadName = "FlowAsObservableTest.testConfinedContext"
177         fun checkThread() {
178             val currentThread = Thread.currentThread()
179             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
180         }
181         val completed = CountDownLatch(1)
182         newSingleThreadContext(threadName).use { dispatcher ->
183             flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> {
184                 override fun onSubscribe(d: Disposable) {
185                     expect(2)
186                 }
187 
188                 override fun onNext(t: Int) {
189                     checkThread()
190                     expect(3)
191                     assertEquals(42, t)
192                 }
193 
194                 override fun onComplete() {
195                     checkThread()
196                     expect(4)
197                     completed.countDown()
198                 }
199 
200                 override fun onError(e: Throwable) {
201                     expectUnreached()
202                 }
203             })
204             completed.await()
205         }
206         finish(5)
207     }
208 }
209