• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.selects.*
10 import org.junit.Test
11 import org.junit.runner.*
12 import org.junit.runners.*
13 import kotlin.test.*
14 
15 @RunWith(Parameterized::class)
16 class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() {
17     companion object {
18         @Parameterized.Parameters(name = "request = {0}")
19         @JvmStatic
20         fun params(): Collection<Array<Any>> = listOf(0, 1, 10).map { arrayOf<Any>(it) }
21     }
22 
23     @Test
24     fun testSelect() = runTest {
25         // source with n ints
26         val n = 1000 * stressTestMultiplier
27         val source = publish { repeat(n) { send(it) } }
28         var a = 0
29         var b = 0
30         // open two subs
31         val channelA = source.toChannel(request)
32         val channelB = source.toChannel(request)
33         loop@ while (true) {
34             val done: Int = select {
35                 channelA.onReceiveCatching { result ->
36                     result.onSuccess { assertEquals(a++, it) }
37                     if (result.isSuccess) 1 else 0
38                 }
39                 channelB.onReceiveCatching { result ->
40                     result.onSuccess { assertEquals(b++, it) }
41                     if (result.isSuccess) 2 else 0
42                 }
43             }
44             when (done) {
45                 0 -> break@loop
46                 1 -> {
47                     val r = channelB.receiveCatching().getOrNull()
48                     if (r != null) assertEquals(b++, r)
49                 }
50                 2 -> {
51                     val r = channelA.receiveCatching().getOrNull()
52                     if (r != null) assertEquals(a++, r)
53                 }
54             }
55         }
56 
57         channelA.cancel()
58         channelB.cancel()
59         // should receive one of them fully
60         assertTrue(a == n || b == n)
61     }
62 }
63