• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.functions.Consumer
9 import io.reactivex.plugins.*
10 
checkSingleValuenull11 fun <T> checkSingleValue(
12     observable: Observable<T>,
13     checker: (T) -> Unit
14 ) {
15     val singleValue = observable.blockingSingle()
16     checker(singleValue)
17 }
18 
checkErroneousnull19 fun checkErroneous(
20         observable: Observable<*>,
21         checker: (Throwable) -> Unit
22 ) {
23     val singleNotification = observable.materialize().blockingSingle()
24     val error = singleNotification.error ?: error("Excepted error")
25     checker(error)
26 }
27 
checkSingleValuenull28 fun <T> checkSingleValue(
29     single: Single<T>,
30     checker: (T) -> Unit
31 ) {
32     val singleValue = single.blockingGet()
33     checker(singleValue)
34 }
35 
checkErroneousnull36 fun checkErroneous(
37     single: Single<*>,
38     checker: (Throwable) -> Unit
39 ) {
40     try {
41         single.blockingGet()
42         error("Should have failed")
43     } catch (e: Throwable) {
44         checker(e)
45     }
46 }
47 
checkMaybeValuenull48 fun <T> checkMaybeValue(
49         maybe: Maybe<T>,
50         checker: (T?) -> Unit
51 ) {
52     val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
53     checker(maybeValue)
54 }
55 
56 @Suppress("UNCHECKED_CAST")
checkErroneousnull57 fun checkErroneous(
58     maybe: Maybe<*>,
59     checker: (Throwable) -> Unit
60 ) {
61     try {
62         (maybe as Maybe<Any>).blockingGet()
63         error("Should have failed")
64     } catch (e: Throwable) {
65         checker(e)
66     }
67 }
68 
withExceptionHandlernull69 inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) {
70     val original = RxJavaPlugins.getErrorHandler()
71     RxJavaPlugins.setErrorHandler { handler(it) }
72     try {
73         block()
74     } finally {
75         RxJavaPlugins.setErrorHandler(original)
76     }
77 }