1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.selects.* 9 import org.junit.Test 10 import kotlin.test.* 11 12 class PublisherMultiTest : TestBase() { 13 @Test <lambda>null14 fun testConcurrentStress() = runBlocking { 15 val n = 10_000 * stressTestMultiplier 16 val observable = publish { 17 // concurrent emitters (many coroutines) 18 val jobs = List(n) { 19 // launch 20 launch(Dispatchers.Default) { 21 send(it) 22 } 23 } 24 jobs.forEach { it.join() } 25 } 26 val resultSet = mutableSetOf<Int>() 27 observable.collect { 28 assertTrue(resultSet.add(it)) 29 } 30 assertEquals(n, resultSet.size) 31 } 32 33 @Test <lambda>null34 fun testConcurrentStressOnSend() = runBlocking { 35 val n = 10_000 * stressTestMultiplier 36 val observable = publish<Int> { 37 // concurrent emitters (many coroutines) 38 val jobs = List(n) { 39 // launch 40 launch(Dispatchers.Default) { 41 select<Unit> { 42 onSend(it) {} 43 } 44 } 45 } 46 jobs.forEach { it.join() } 47 } 48 val resultSet = mutableSetOf<Int>() 49 observable.collect { 50 assertTrue(resultSet.add(it)) 51 } 52 assertEquals(n, resultSet.size) 53 } 54 } 55