1 package kotlinx.coroutines.reactor 2 3 import reactor.core.publisher.Flux 4 import reactor.core.publisher.Mono 5 checkMonoValuenull6fun <T> checkMonoValue( 7 mono: Mono<T>, 8 checker: (T) -> Unit 9 ) { 10 val monoValue = mono.block() 11 checker(monoValue) 12 } 13 checkErroneousnull14fun checkErroneous( 15 mono: Mono<*>, 16 checker: (Throwable) -> Unit 17 ) { 18 try { 19 mono.block() 20 error("Should have failed") 21 } catch (e: Throwable) { 22 checker(e) 23 } 24 } 25 checkSingleValuenull26fun <T> checkSingleValue( 27 flux: Flux<T>, 28 checker: (T) -> Unit 29 ) { 30 val singleValue = flux.toIterable().single() 31 checker(singleValue) 32 } 33 checkErroneousnull34fun checkErroneous( 35 flux: Flux<*>, 36 checker: (Throwable) -> Unit 37 ) { 38 val singleNotification = flux.materialize().toIterable().single() 39 checker(singleNotification.throwable) 40 } 41