1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import reactor.core.publisher.Flux 8 import reactor.core.publisher.Mono 9 checkMonoValuenull10fun <T> checkMonoValue( 11 mono: Mono<T>, 12 checker: (T) -> Unit 13 ) { 14 val monoValue = mono.block() 15 checker(monoValue) 16 } 17 checkErroneousnull18fun checkErroneous( 19 mono: Mono<*>, 20 checker: (Throwable) -> Unit 21 ) { 22 try { 23 mono.block() 24 error("Should have failed") 25 } catch (e: Throwable) { 26 checker(e) 27 } 28 } 29 checkSingleValuenull30fun <T> checkSingleValue( 31 flux: Flux<T>, 32 checker: (T) -> Unit 33 ) { 34 val singleValue = flux.toIterable().single() 35 checker(singleValue) 36 } 37 checkErroneousnull38fun checkErroneous( 39 flux: Flux<*>, 40 checker: (Throwable) -> Unit 41 ) { 42 val singleNotification = flux.materialize().toIterable().single() 43 checker(singleNotification.throwable) 44 } 45