• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.*
10 import org.junit.Test
11 import reactor.core.publisher.*
12 import java.io.*
13 import kotlin.test.*
14 
15 class FluxMultiTest : TestBase() {
16     @Test
17     fun testNumbers() {
18         val n = 100 * stressTestMultiplier
19         val flux = flux {
20             repeat(n) { send(it) }
21         }
22         checkMonoValue(flux.collectList()) { list ->
23             assertEquals((0 until n).toList(), list)
24         }
25     }
26 
27     @Test
28     fun testConcurrentStress() {
29         val n = 10_000 * stressTestMultiplier
30         val flux = flux {
31             // concurrent emitters (many coroutines)
32             val jobs = List(n) {
33                 // launch
34                 launch {
35                     send(it)
36                 }
37             }
38             jobs.forEach { it.join() }
39         }
40         checkMonoValue(flux.collectList()) { list ->
41             assertEquals(n, list.size)
42             assertEquals((0 until n).toList(), list.sorted())
43         }
44     }
45 
46     @Test
47     fun testIteratorResendUnconfined() {
48         val n = 10_000 * stressTestMultiplier
49         val flux = flux(Dispatchers.Unconfined) {
50             Flux.range(0, n).collect { send(it) }
51         }
52         checkMonoValue(flux.collectList()) { list ->
53             assertEquals((0 until n).toList(), list)
54         }
55     }
56 
57     @Test
58     fun testIteratorResendPool() {
59         val n = 10_000 * stressTestMultiplier
60         val flux = flux {
61             Flux.range(0, n).collect { send(it) }
62         }
63         checkMonoValue(flux.collectList()) { list ->
64             assertEquals((0 until n).toList(), list)
65         }
66     }
67 
68     @Test
69     fun testSendAndCrash() {
70         val flux = flux {
71             send("O")
72             throw IOException("K")
73         }
74         val mono = mono {
75             var result = ""
76             try {
77                 flux.collect { result += it }
78             } catch(e: IOException) {
79                 result += e.message
80             }
81             result
82         }
83         checkMonoValue(mono) {
84             assertEquals("OK", it)
85         }
86     }
87 }