<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.channels.* 6 import kotlinx.coroutines.selects.* 7 import org.junit.Test 8 import kotlin.onSuccess 9 import kotlin.test.* 10 11 class ObservableSubscriptionSelectTest : TestBase() { 12 @Test 13 fun testSelect() = runTest { 14 // source with n ints 15 val n = 1000 * stressTestMultiplier 16 val source = rxObservable { repeat(n) { send(it) } } 17 var a = 0 18 var b = 0 19 // open two subs 20 val channelA = source.openSubscription() 21 val channelB = source.openSubscription() 22 loop@ while (true) { 23 val done: Int = select { 24 channelA.onReceiveCatching { result -> 25 result.onSuccess { assertEquals(a++, it) } 26 if (result.isSuccess) 1 else 0 27 } 28 channelB.onReceiveCatching { result -> 29 result.onSuccess { assertEquals(b++, it) } 30 if (result.isSuccess) 2 else 0 31 } 32 } 33 when (done) { 34 0 -> break@loop 35 1 -> { 36 val r = channelB.receiveCatching().getOrNull() 37 if (r != null) assertEquals(b++, r) 38 } 39 2 -> { 40 val r = channelA.receiveCatching().getOrNull() 41 if (r != null) assertEquals(a++, r) 42 } 43 } 44 } 45 channelA.cancel() 46 channelB.cancel() 47 // should receive one of them fully 48 assertTrue(a == n || b == n) 49 } 50 } 51