1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.functions.Consumer 9 import io.reactivex.plugins.* 10 checkSingleValuenull11fun <T> checkSingleValue( 12 observable: Observable<T>, 13 checker: (T) -> Unit 14 ) { 15 val singleValue = observable.blockingSingle() 16 checker(singleValue) 17 } 18 checkErroneousnull19fun checkErroneous( 20 observable: Observable<*>, 21 checker: (Throwable) -> Unit 22 ) { 23 val singleNotification = observable.materialize().blockingSingle() 24 val error = singleNotification.error ?: error("Excepted error") 25 checker(error) 26 } 27 checkSingleValuenull28fun <T> checkSingleValue( 29 single: Single<T>, 30 checker: (T) -> Unit 31 ) { 32 val singleValue = single.blockingGet() 33 checker(singleValue) 34 } 35 checkErroneousnull36fun checkErroneous( 37 single: Single<*>, 38 checker: (Throwable) -> Unit 39 ) { 40 try { 41 single.blockingGet() 42 error("Should have failed") 43 } catch (e: Throwable) { 44 checker(e) 45 } 46 } 47 checkMaybeValuenull48fun <T> checkMaybeValue( 49 maybe: Maybe<T>, 50 checker: (T?) -> Unit 51 ) { 52 val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull() 53 checker(maybeValue) 54 } 55 56 @Suppress("UNCHECKED_CAST") checkErroneousnull57fun checkErroneous( 58 maybe: Maybe<*>, 59 checker: (Throwable) -> Unit 60 ) { 61 try { 62 (maybe as Maybe<Any>).blockingGet() 63 error("Should have failed") 64 } catch (e: Throwable) { 65 checker(e) 66 } 67 } 68 withExceptionHandlernull69inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) { 70 val original = RxJavaPlugins.getErrorHandler() 71 RxJavaPlugins.setErrorHandler { handler(it) } 72 try { 73 block() 74 } finally { 75 RxJavaPlugins.setErrorHandler(original) 76 } 77 }