<lambda>null1package kotlinx.coroutines.reactive 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.channels.* 6 import kotlinx.coroutines.selects.* 7 import org.junit.Test 8 import org.junit.runner.* 9 import org.junit.runners.* 10 import kotlin.test.* 11 12 @RunWith(Parameterized::class) 13 class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { 14 companion object { 15 @Parameterized.Parameters(name = "request = {0}") 16 @JvmStatic 17 fun params(): Collection<Array<Any>> = listOf(0, 1, 10).map { arrayOf<Any>(it) } 18 } 19 20 @Test 21 fun testSelect() = runTest { 22 // source with n ints 23 val n = 1000 * stressTestMultiplier 24 val source = publish { repeat(n) { send(it) } } 25 var a = 0 26 var b = 0 27 // open two subs 28 val channelA = source.toChannel(request) 29 val channelB = source.toChannel(request) 30 loop@ while (true) { 31 val done: Int = select { 32 channelA.onReceiveCatching { result -> 33 result.onSuccess { assertEquals(a++, it) } 34 if (result.isSuccess) 1 else 0 35 } 36 channelB.onReceiveCatching { result -> 37 result.onSuccess { assertEquals(b++, it) } 38 if (result.isSuccess) 2 else 0 39 } 40 } 41 when (done) { 42 0 -> break@loop 43 1 -> { 44 val r = channelB.receiveCatching().getOrNull() 45 if (r != null) assertEquals(b++, r) 46 } 47 2 -> { 48 val r = channelA.receiveCatching().getOrNull() 49 if (r != null) assertEquals(a++, r) 50 } 51 } 52 } 53 54 channelA.cancel() 55 channelB.cancel() 56 // should receive one of them fully 57 assertTrue(a == n || b == n) 58 } 59 } 60