• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.selects.*
9 import org.junit.Test
10 import kotlin.test.*
11 
12 class PublisherMultiTest : TestBase() {
13     @Test
<lambda>null14     fun testConcurrentStress() = runBlocking {
15         val n = 10_000 * stressTestMultiplier
16         val observable = publish {
17             // concurrent emitters (many coroutines)
18             val jobs = List(n) {
19                 // launch
20                 launch(Dispatchers.Default) {
21                     send(it)
22                 }
23             }
24             jobs.forEach { it.join() }
25         }
26         val resultSet = mutableSetOf<Int>()
27         observable.collect {
28             assertTrue(resultSet.add(it))
29         }
30         assertEquals(n, resultSet.size)
31     }
32 
33     @Test
<lambda>null34     fun testConcurrentStressOnSend() = runBlocking {
35         val n = 10_000 * stressTestMultiplier
36         val observable = publish<Int> {
37             // concurrent emitters (many coroutines)
38             val jobs = List(n) {
39                 // launch
40                 launch(Dispatchers.Default) {
41                     select<Unit> {
42                         onSend(it) {}
43                     }
44                 }
45             }
46             jobs.forEach { it.join() }
47         }
48         val resultSet = mutableSetOf<Int>()
49         observable.collect {
50             assertTrue(resultSet.add(it))
51         }
52         assertEquals(n, resultSet.size)
53     }
54 }
55