<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.exceptions.*
7 import io.reactivex.rxjava3.functions.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.CancellationException
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14
15 class SingleTest : TestBase() {
16 @Before
17 fun setup() {
18 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
19 }
20
21 @Test
22 fun testBasicSuccess() = runBlocking {
23 expect(1)
24 val single = rxSingle(currentDispatcher()) {
25 expect(4)
26 "OK"
27 }
28 expect(2)
29 single.subscribe { value ->
30 expect(5)
31 assertEquals("OK", value)
32 }
33 expect(3)
34 yield() // to started coroutine
35 finish(6)
36 }
37
38 @Test
39 fun testBasicFailure() = runBlocking {
40 expect(1)
41 val single = rxSingle(currentDispatcher()) {
42 expect(4)
43 throw RuntimeException("OK")
44 }
45 expect(2)
46 single.subscribe({
47 expectUnreached()
48 }, { error ->
49 expect(5)
50 assertIs<RuntimeException>(error)
51 assertEquals("OK", error.message)
52 })
53 expect(3)
54 yield() // to started coroutine
55 finish(6)
56 }
57
58
59 @Test
60 fun testBasicUnsubscribe() = runBlocking {
61 expect(1)
62 val single = rxSingle(currentDispatcher()) {
63 expect(4)
64 yield() // back to main, will get cancelled
65 expectUnreached()
66
67 }
68 expect(2)
69 // nothing is called on a disposed rx3 single
70 val sub = single.subscribe({
71 expectUnreached()
72 }, {
73 expectUnreached()
74 })
75 expect(3)
76 yield() // to started coroutine
77 expect(5)
78 sub.dispose() // will cancel coroutine
79 yield()
80 finish(6)
81 }
82
83 @Test
84 fun testSingleNoWait() {
85 val single = rxSingle {
86 "OK"
87 }
88
89 checkSingleValue(single) {
90 assertEquals("OK", it)
91 }
92 }
93
94 @Test
95 fun testSingleAwait() = runBlocking {
96 assertEquals("OK", Single.just("O").await() + "K")
97 }
98
99 /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
100 * [Job] is cancelled. */
101 @Test
102 fun testSingleAwaitCancellation() = runTest {
103 expect(1)
104 val single = SingleSource<Int> { s ->
105 s.onSubscribe(object: Disposable {
106 override fun dispose() { expect(4) }
107 override fun isDisposed(): Boolean { expectUnreached(); return false }
108 })
109 }
110 val job = launch(start = CoroutineStart.UNDISPATCHED) {
111 try {
112 expect(2)
113 single.await()
114 } catch (e: CancellationException) {
115 expect(5)
116 throw e
117 }
118 }
119 expect(3)
120 job.cancelAndJoin()
121 finish(6)
122 }
123
124 @Test
125 fun testSingleEmitAndAwait() {
126 val single = rxSingle {
127 Single.just("O").await() + "K"
128 }
129
130 checkSingleValue(single) {
131 assertEquals("OK", it)
132 }
133 }
134
135 @Test
136 fun testSingleWithDelay() {
137 val single = rxSingle {
138 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
139 }
140
141 checkSingleValue(single) {
142 assertEquals("OK", it)
143 }
144 }
145
146 @Test
147 fun testSingleException() {
148 val single = rxSingle {
149 Observable.just("O", "K").awaitSingle() + "K"
150 }
151
152 checkErroneous(single) {
153 assert(it is IllegalArgumentException)
154 }
155 }
156
157 @Test
158 fun testAwaitFirst() {
159 val single = rxSingle {
160 Observable.just("O", "#").awaitFirst() + "K"
161 }
162
163 checkSingleValue(single) {
164 assertEquals("OK", it)
165 }
166 }
167
168 @Test
169 fun testAwaitLast() {
170 val single = rxSingle {
171 Observable.just("#", "O").awaitLast() + "K"
172 }
173
174 checkSingleValue(single) {
175 assertEquals("OK", it)
176 }
177 }
178
179 @Test
180 fun testExceptionFromObservable() {
181 val single = rxSingle {
182 try {
183 Observable.error<String>(RuntimeException("O")).awaitFirst()
184 } catch (e: RuntimeException) {
185 Observable.just(e.message!!).awaitLast() + "K"
186 }
187 }
188
189 checkSingleValue(single) {
190 assertEquals("OK", it)
191 }
192 }
193
194 @Test
195 fun testExceptionFromCoroutine() {
196 val single = rxSingle<String> {
197 throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
198 }
199
200 checkErroneous(single) {
201 assert(it is IllegalStateException)
202 assertEquals("OK", it.message)
203 }
204 }
205
206 @Test
207 fun testSuppressedException() = runTest {
208 val single = rxSingle(currentDispatcher()) {
209 launch(start = CoroutineStart.ATOMIC) {
210 throw TestException() // child coroutine fails
211 }
212 try {
213 delay(Long.MAX_VALUE)
214 } finally {
215 throw TestException2() // but parent throws another exception while cleaning up
216 }
217 }
218 try {
219 single.await()
220 expectUnreached()
221 } catch (e: TestException) {
222 assertIs<TestException2>(e.suppressed[0])
223 }
224 }
225
226 @Test
227 fun testFatalExceptionInSubscribe() = runTest {
228 val handler = { e: Throwable ->
229 assertTrue(e is UndeliverableException && e.cause is LinkageError)
230 expect(2)
231 }
232 withExceptionHandler(handler) {
233 rxSingle(Dispatchers.Unconfined) {
234 expect(1)
235 42
236 }.subscribe(Consumer {
237 throw LinkageError()
238 })
239 finish(3)
240 }
241 }
242
243 @Test
244 fun testFatalExceptionInSingle() = runTest {
245 rxSingle(Dispatchers.Unconfined) {
246 throw LinkageError()
247 }.subscribe { _, e -> assertIs<LinkageError>(e); expect(1) }
248
249 finish(2)
250 }
251
252 @Test
253 fun testUnhandledException() = runTest {
254 expect(1)
255 var disposable: Disposable? = null
256 val handler = { e: Throwable ->
257 assertTrue(e is UndeliverableException && e.cause is TestException)
258 expect(5)
259 }
260 val single = rxSingle(currentDispatcher()) {
261 expect(4)
262 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
263 try {
264 delay(Long.MAX_VALUE)
265 } finally {
266 throw TestException() // would not be able to handle it since mono is disposed
267 }
268 }
269 withExceptionHandler(handler) {
270 single.subscribe(object : SingleObserver<Unit> {
271 override fun onSubscribe(d: Disposable) {
272 expect(2)
273 disposable = d
274 }
275
276 override fun onSuccess(t: Unit) {
277 expectUnreached()
278 }
279
280 override fun onError(t: Throwable) {
281 expectUnreached()
282 }
283 })
284 expect(3)
285 yield() // run coroutine
286 finish(6)
287 }
288 }
289 }
290