1 package kotlinx.coroutines.rx3 2 3 import io.reactivex.rxjava3.core.* 4 import io.reactivex.rxjava3.plugins.* 5 checkSingleValuenull6fun <T : Any> checkSingleValue( 7 observable: Observable<T>, 8 checker: (T) -> Unit 9 ) { 10 val singleValue = observable.blockingSingle() 11 checker(singleValue) 12 } 13 checkErroneousnull14fun checkErroneous( 15 observable: Observable<*>, 16 checker: (Throwable) -> Unit 17 ) { 18 val singleNotification = observable.materialize().blockingSingle() 19 val error = singleNotification.error ?: error("Excepted error") 20 checker(error) 21 } 22 checkSingleValuenull23fun <T : Any> checkSingleValue( 24 single: Single<T>, 25 checker: (T) -> Unit 26 ) { 27 val singleValue = single.blockingGet() 28 checker(singleValue) 29 } 30 checkErroneousnull31fun checkErroneous( 32 single: Single<*>, 33 checker: (Throwable) -> Unit 34 ) { 35 try { 36 single.blockingGet() 37 error("Should have failed") 38 } catch (e: Throwable) { 39 checker(e) 40 } 41 } 42 checkMaybeValuenull43fun <T> checkMaybeValue( 44 maybe: Maybe<T>, 45 checker: (T?) -> Unit 46 ) { 47 val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull() 48 checker(maybeValue) 49 } 50 51 @Suppress("UNCHECKED_CAST") checkErroneousnull52fun checkErroneous( 53 maybe: Maybe<*>, 54 checker: (Throwable) -> Unit 55 ) { 56 try { 57 (maybe as Maybe<Any>).blockingGet() 58 error("Should have failed") 59 } catch (e: Throwable) { 60 checker(e) 61 } 62 } 63 withExceptionHandlernull64inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) { 65 val original = RxJavaPlugins.getErrorHandler() 66 RxJavaPlugins.setErrorHandler { handler(it) } 67 try { 68 block() 69 } finally { 70 RxJavaPlugins.setErrorHandler(original) 71 } 72 } 73