1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.* 10 import org.junit.Test 11 import reactor.core.publisher.* 12 import java.io.* 13 import kotlin.test.* 14 15 class FluxMultiTest : TestBase() { 16 @Test 17 fun testNumbers() { 18 val n = 100 * stressTestMultiplier 19 val flux = flux { 20 repeat(n) { send(it) } 21 } 22 checkMonoValue(flux.collectList()) { list -> 23 assertEquals((0 until n).toList(), list) 24 } 25 } 26 27 @Test 28 fun testConcurrentStress() { 29 val n = 10_000 * stressTestMultiplier 30 val flux = flux { 31 // concurrent emitters (many coroutines) 32 val jobs = List(n) { 33 // launch 34 launch { 35 send(it) 36 } 37 } 38 jobs.forEach { it.join() } 39 } 40 checkMonoValue(flux.collectList()) { list -> 41 assertEquals(n, list.size) 42 assertEquals((0 until n).toList(), list.sorted()) 43 } 44 } 45 46 @Test 47 fun testIteratorResendUnconfined() { 48 val n = 10_000 * stressTestMultiplier 49 val flux = flux(Dispatchers.Unconfined) { 50 Flux.range(0, n).collect { send(it) } 51 } 52 checkMonoValue(flux.collectList()) { list -> 53 assertEquals((0 until n).toList(), list) 54 } 55 } 56 57 @Test 58 fun testIteratorResendPool() { 59 val n = 10_000 * stressTestMultiplier 60 val flux = flux { 61 Flux.range(0, n).collect { send(it) } 62 } 63 checkMonoValue(flux.collectList()) { list -> 64 assertEquals((0 until n).toList(), list) 65 } 66 } 67 68 @Test 69 fun testSendAndCrash() { 70 val flux = flux { 71 send("O") 72 throw IOException("K") 73 } 74 val mono = mono { 75 var result = "" 76 try { 77 flux.collect { result += it } 78 } catch(e: IOException) { 79 result += e.message 80 } 81 result 82 } 83 checkMonoValue(mono) { 84 assertEquals("OK", it) 85 } 86 } 87 }