• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import reactor.core.publisher.Flux
8 import reactor.core.publisher.Mono
9 
checkMonoValuenull10 fun <T> checkMonoValue(
11         mono: Mono<T>,
12         checker: (T) -> Unit
13 ) {
14     val monoValue = mono.block()
15     checker(monoValue)
16 }
17 
checkErroneousnull18 fun checkErroneous(
19         mono: Mono<*>,
20         checker: (Throwable) -> Unit
21 ) {
22     try {
23         mono.block()
24         error("Should have failed")
25     } catch (e: Throwable) {
26         checker(e)
27     }
28 }
29 
checkSingleValuenull30 fun <T> checkSingleValue(
31         flux: Flux<T>,
32         checker: (T) -> Unit
33 ) {
34     val singleValue = flux.toIterable().single()
35     checker(singleValue)
36 }
37 
checkErroneousnull38 fun checkErroneous(
39         flux: Flux<*>,
40         checker: (Throwable) -> Unit
41 ) {
42     val singleNotification = flux.materialize().toIterable().single()
43     checker(singleNotification.throwable)
44 }
45