1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.plugins.* 9 checkSingleValuenull10fun <T : Any> checkSingleValue( 11 observable: Observable<T>, 12 checker: (T) -> Unit 13 ) { 14 val singleValue = observable.blockingSingle() 15 checker(singleValue) 16 } 17 checkErroneousnull18fun checkErroneous( 19 observable: Observable<*>, 20 checker: (Throwable) -> Unit 21 ) { 22 val singleNotification = observable.materialize().blockingSingle() 23 val error = singleNotification.error ?: error("Excepted error") 24 checker(error) 25 } 26 checkSingleValuenull27fun <T : Any> checkSingleValue( 28 single: Single<T>, 29 checker: (T) -> Unit 30 ) { 31 val singleValue = single.blockingGet() 32 checker(singleValue) 33 } 34 checkErroneousnull35fun checkErroneous( 36 single: Single<*>, 37 checker: (Throwable) -> Unit 38 ) { 39 try { 40 single.blockingGet() 41 error("Should have failed") 42 } catch (e: Throwable) { 43 checker(e) 44 } 45 } 46 checkMaybeValuenull47fun <T> checkMaybeValue( 48 maybe: Maybe<T>, 49 checker: (T?) -> Unit 50 ) { 51 val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull() 52 checker(maybeValue) 53 } 54 55 @Suppress("UNCHECKED_CAST") checkErroneousnull56fun checkErroneous( 57 maybe: Maybe<*>, 58 checker: (Throwable) -> Unit 59 ) { 60 try { 61 (maybe as Maybe<Any>).blockingGet() 62 error("Should have failed") 63 } catch (e: Throwable) { 64 checker(e) 65 } 66 } 67 withExceptionHandlernull68inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) { 69 val original = RxJavaPlugins.getErrorHandler() 70 RxJavaPlugins.setErrorHandler { handler(it) } 71 try { 72 block() 73 } finally { 74 RxJavaPlugins.setErrorHandler(original) 75 } 76 } 77