1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.hamcrest.core.* 9 import org.junit.* 10 import org.junit.Assert.* 11 12 class PublisherMultiTest : TestBase() { 13 @Test <lambda>null14 fun testConcurrentStress() = runBlocking { 15 val n = 10_000 * stressTestMultiplier 16 val observable = publish { 17 // concurrent emitters (many coroutines) 18 val jobs = List(n) { 19 // launch 20 launch { 21 send(it) 22 } 23 } 24 jobs.forEach { it.join() } 25 } 26 val resultSet = mutableSetOf<Int>() 27 observable.collect { 28 assertTrue(resultSet.add(it)) 29 } 30 assertThat(resultSet.size, IsEqual(n)) 31 } 32 } 33