1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.selects.* 10 import org.junit.Test 11 import java.io.* 12 import kotlin.test.* 13 14 /** 15 * Test emitting multiple values with [rxObservable]. 16 */ 17 class ObservableMultiTest : TestBase() { 18 @Test 19 fun testNumbers() { 20 val n = 100 * stressTestMultiplier 21 val observable = rxObservable { 22 repeat(n) { send(it) } 23 } 24 checkSingleValue(observable.toList()) { list -> 25 assertEquals((0 until n).toList(), list) 26 } 27 } 28 29 30 @Test 31 fun testConcurrentStress() { 32 val n = 10_000 * stressTestMultiplier 33 val observable = rxObservable { 34 newCoroutineContext(coroutineContext) 35 // concurrent emitters (many coroutines) 36 val jobs = List(n) { 37 // launch 38 launch { 39 val i = it 40 send(i) 41 } 42 } 43 jobs.forEach { it.join() } 44 } 45 checkSingleValue(observable.toList()) { list -> 46 assertEquals(n, list.size) 47 assertEquals((0 until n).toList(), list.sorted()) 48 } 49 } 50 51 @Test 52 fun testConcurrentStressOnSend() { 53 val n = 10_000 * stressTestMultiplier 54 val observable = rxObservable<Int> { 55 newCoroutineContext(coroutineContext) 56 // concurrent emitters (many coroutines) 57 val jobs = List(n) { 58 // launch 59 launch(Dispatchers.Default) { 60 val i = it 61 select<Unit> { 62 onSend(i) {} 63 } 64 } 65 } 66 jobs.forEach { it.join() } 67 } 68 checkSingleValue(observable.toList()) { list -> 69 assertEquals(n, list.size) 70 assertEquals((0 until n).toList(), list.sorted()) 71 } 72 } 73 74 @Test 75 fun testIteratorResendUnconfined() { 76 val n = 10_000 * stressTestMultiplier 77 val observable = rxObservable(Dispatchers.Unconfined) { 78 Observable.range(0, n).collect { send(it) } 79 } 80 checkSingleValue(observable.toList()) { list -> 81 assertEquals((0 until n).toList(), list) 82 } 83 } 84 85 @Test 86 fun testIteratorResendPool() { 87 val n = 10_000 * stressTestMultiplier 88 val observable = rxObservable { 89 Observable.range(0, n).collect { send(it) } 90 } 91 checkSingleValue(observable.toList()) { list -> 92 assertEquals((0 until n).toList(), list) 93 } 94 } 95 96 @Test 97 fun testSendAndCrash() { 98 val observable = rxObservable { 99 send("O") 100 throw IOException("K") 101 } 102 val single = rxSingle { 103 var result = "" 104 try { 105 observable.collect { result += it } 106 } catch(e: IOException) { 107 result += e.message 108 } 109 result 110 } 111 checkSingleValue(single) { 112 assertEquals("OK", it) 113 } 114 } 115 } 116