1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.selects.* 10 import org.junit.Test 11 import kotlin.onSuccess 12 import kotlin.test.* 13 14 class ObservableSubscriptionSelectTest : TestBase() { 15 @Test 16 fun testSelect() = runTest { 17 // source with n ints 18 val n = 1000 * stressTestMultiplier 19 val source = rxObservable { repeat(n) { send(it) } } 20 var a = 0 21 var b = 0 22 // open two subs 23 val channelA = source.toChannel() 24 val channelB = source.toChannel() 25 loop@ while (true) { 26 val done: Int = select { 27 channelA.onReceiveCatching { result -> 28 result.onSuccess { assertEquals(a++, it) } 29 if (result.isSuccess) 1 else 0 30 } 31 channelB.onReceiveCatching { result -> 32 result.onSuccess { assertEquals(b++, it) } 33 if (result.isSuccess) 2 else 0 34 } 35 } 36 when (done) { 37 0 -> break@loop 38 1 -> { 39 val r = channelB.receiveCatching().getOrNull() 40 if (r != null) assertEquals(b++, r) 41 } 42 2 -> { 43 val r = channelA.receiveCatching().getOrNull() 44 if (r != null) assertEquals(a++, r) 45 } 46 } 47 } 48 channelA.cancel() 49 channelB.cancel() 50 // should receive one of them fully 51 assertTrue(a == n || b == n) 52 } 53 } 54