1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.reactive.* 10 import org.junit.* 11 import org.junit.Assert.* 12 13 class ConvertTest : TestBase() { 14 @Test 15 fun testJobToMonoSuccess() = runBlocking { 16 expect(1) 17 val job = launch { 18 expect(3) 19 } 20 val mono = job.asMono(coroutineContext.minusKey(Job)) 21 mono.subscribe { 22 expect(4) 23 } 24 expect(2) 25 yield() 26 finish(5) 27 } 28 29 @Test 30 fun testJobToMonoFail() = runBlocking { 31 expect(1) 32 val job = async(NonCancellable) { 33 expect(3) 34 throw RuntimeException("OK") 35 } 36 val mono = job.asMono(coroutineContext.minusKey(Job)) 37 mono.subscribe( 38 { fail("no item should be emitted") }, 39 { expect(4) } 40 ) 41 expect(2) 42 yield() 43 finish(5) 44 } 45 46 @Test 47 fun testDeferredToMono() { 48 val d = GlobalScope.async { 49 delay(50) 50 "OK" 51 } 52 val mono1 = d.asMono(Dispatchers.Unconfined) 53 checkMonoValue(mono1) { 54 assertEquals("OK", it) 55 } 56 val mono2 = d.asMono(Dispatchers.Unconfined) 57 checkMonoValue(mono2) { 58 assertEquals("OK", it) 59 } 60 } 61 62 @Test 63 fun testDeferredToMonoEmpty() { 64 val d = GlobalScope.async { 65 delay(50) 66 null 67 } 68 val mono1 = d.asMono(Dispatchers.Unconfined) 69 checkMonoValue(mono1, ::assertNull) 70 val mono2 = d.asMono(Dispatchers.Unconfined) 71 checkMonoValue(mono2, ::assertNull) 72 } 73 74 @Test 75 fun testDeferredToMonoFail() { 76 val d = GlobalScope.async { 77 delay(50) 78 throw TestRuntimeException("OK") 79 } 80 val mono1 = d.asMono(Dispatchers.Unconfined) 81 checkErroneous(mono1) { 82 check(it is TestRuntimeException && it.message == "OK") { "$it" } 83 } 84 val mono2 = d.asMono(Dispatchers.Unconfined) 85 checkErroneous(mono2) { 86 check(it is TestRuntimeException && it.message == "OK") { "$it" } 87 } 88 } 89 90 @Test 91 fun testToFlux() { 92 val c = GlobalScope.produce { 93 delay(50) 94 send("O") 95 delay(50) 96 send("K") 97 } 98 val flux = c.asFlux(Dispatchers.Unconfined) 99 checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) { 100 assertEquals("OK", it) 101 } 102 } 103 104 @Test 105 fun testToFluxFail() { 106 val c = GlobalScope.produce { 107 delay(50) 108 send("O") 109 delay(50) 110 throw TestException("K") 111 } 112 val flux = c.asFlux(Dispatchers.Unconfined) 113 val mono = mono(Dispatchers.Unconfined) { 114 var result = "" 115 try { 116 flux.collect { result += it } 117 } catch(e: Throwable) { 118 check(e is TestException) 119 result += e.message 120 } 121 result 122 } 123 checkMonoValue(mono) { 124 assertEquals("OK", it) 125 } 126 } 127 }