<lambda>null1package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.channels.* 6 import kotlinx.coroutines.flow.* 7 import kotlinx.coroutines.reactive.* 8 import org.junit.* 9 import org.junit.Test 10 import kotlin.test.* 11 12 class ConvertTest : TestBase() { 13 @Test 14 fun testJobToMonoSuccess() = runBlocking { 15 expect(1) 16 val job = launch { 17 expect(3) 18 } 19 val mono = job.asMono(coroutineContext.minusKey(Job)) 20 mono.subscribe { 21 expect(4) 22 } 23 expect(2) 24 yield() 25 finish(5) 26 } 27 28 @Test 29 fun testJobToMonoFail() = runBlocking { 30 expect(1) 31 val job = async(NonCancellable) { 32 expect(3) 33 throw RuntimeException("OK") 34 } 35 val mono = job.asMono(coroutineContext.minusKey(Job)) 36 mono.subscribe( 37 { fail("no item should be emitted") }, 38 { expect(4) } 39 ) 40 expect(2) 41 yield() 42 finish(5) 43 } 44 45 @Test 46 fun testDeferredToMono() { 47 val d = GlobalScope.async { 48 delay(50) 49 "OK" 50 } 51 val mono1 = d.asMono(Dispatchers.Unconfined) 52 checkMonoValue(mono1) { 53 assertEquals("OK", it) 54 } 55 val mono2 = d.asMono(Dispatchers.Unconfined) 56 checkMonoValue(mono2) { 57 assertEquals("OK", it) 58 } 59 } 60 61 @Test 62 fun testDeferredToMonoEmpty() { 63 val d = GlobalScope.async { 64 delay(50) 65 null 66 } 67 val mono1 = d.asMono(Dispatchers.Unconfined) 68 checkMonoValue(mono1, Assert::assertNull) 69 val mono2 = d.asMono(Dispatchers.Unconfined) 70 checkMonoValue(mono2, Assert::assertNull) 71 } 72 73 @Test 74 fun testDeferredToMonoFail() { 75 val d = GlobalScope.async { 76 delay(50) 77 throw TestRuntimeException("OK") 78 } 79 val mono1 = d.asMono(Dispatchers.Unconfined) 80 checkErroneous(mono1) { 81 check(it is TestRuntimeException && it.message == "OK") { "$it" } 82 } 83 val mono2 = d.asMono(Dispatchers.Unconfined) 84 checkErroneous(mono2) { 85 check(it is TestRuntimeException && it.message == "OK") { "$it" } 86 } 87 } 88 89 @Test 90 fun testToFlux() { 91 val c = GlobalScope.produce { 92 delay(50) 93 send("O") 94 delay(50) 95 send("K") 96 } 97 val flux = c.consumeAsFlow().asFlux(Dispatchers.Unconfined) 98 checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) { 99 assertEquals("OK", it) 100 } 101 } 102 103 @Test 104 fun testToFluxFail() { 105 val c = GlobalScope.produce { 106 delay(50) 107 send("O") 108 delay(50) 109 throw TestException("K") 110 } 111 val flux = c.consumeAsFlow().asFlux(Dispatchers.Unconfined) 112 val mono = mono(Dispatchers.Unconfined) { 113 var result = "" 114 try { 115 flux.collect { result += it } 116 } catch(e: Throwable) { 117 check(e is TestException) 118 result += e.message 119 } 120 result 121 } 122 checkMonoValue(mono) { 123 assertEquals("OK", it) 124 } 125 } 126 } 127