1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.selects.* 10 import org.junit.Test 11 import org.junit.runner.* 12 import org.junit.runners.* 13 import kotlin.test.* 14 15 @RunWith(Parameterized::class) 16 class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { 17 companion object { 18 @Parameterized.Parameters(name = "request = {0}") 19 @JvmStatic 20 fun params(): Collection<Array<Any>> = listOf(0, 1, 10).map { arrayOf<Any>(it) } 21 } 22 23 @Test 24 fun testSelect() = runTest { 25 // source with n ints 26 val n = 1000 * stressTestMultiplier 27 val source = publish { repeat(n) { send(it) } } 28 var a = 0 29 var b = 0 30 // open two subs 31 val channelA = source.toChannel(request) 32 val channelB = source.toChannel(request) 33 loop@ while (true) { 34 val done: Int = select { 35 channelA.onReceiveCatching { result -> 36 result.onSuccess { assertEquals(a++, it) } 37 if (result.isSuccess) 1 else 0 38 } 39 channelB.onReceiveCatching { result -> 40 result.onSuccess { assertEquals(b++, it) } 41 if (result.isSuccess) 2 else 0 42 } 43 } 44 when (done) { 45 0 -> break@loop 46 1 -> { 47 val r = channelB.receiveCatching().getOrNull() 48 if (r != null) assertEquals(b++, r) 49 } 50 2 -> { 51 val r = channelA.receiveCatching().getOrNull() 52 if (r != null) assertEquals(a++, r) 53 } 54 } 55 } 56 57 channelA.cancel() 58 channelB.cancel() 59 // should receive one of them fully 60 assertTrue(a == n || b == n) 61 } 62 } 63