<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.flow.* 6 import org.junit.Test 7 import org.reactivestreams.* 8 import java.util.concurrent.* 9 import kotlin.test.* 10 11 @Suppress("ReactiveStreamsSubscriberImplementation") 12 class FlowAsFlowableTest : TestBase() { 13 @Test 14 fun testUnconfinedDefaultContext() { 15 expect(1) 16 val thread = Thread.currentThread() 17 fun checkThread() { 18 assertSame(thread, Thread.currentThread()) 19 } 20 flowOf(42).asFlowable().subscribe(object : Subscriber<Int> { 21 private lateinit var subscription: Subscription 22 23 override fun onSubscribe(s: Subscription) { 24 expect(2) 25 subscription = s 26 subscription.request(2) 27 } 28 29 override fun onNext(t: Int) { 30 checkThread() 31 expect(3) 32 assertEquals(42, t) 33 } 34 35 override fun onComplete() { 36 checkThread() 37 expect(4) 38 } 39 40 override fun onError(t: Throwable?) { 41 expectUnreached() 42 } 43 }) 44 finish(5) 45 } 46 47 @Test 48 fun testConfinedContext() { 49 expect(1) 50 val threadName = "FlowAsFlowableTest.testConfinedContext" 51 fun checkThread() { 52 val currentThread = Thread.currentThread() 53 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") 54 } 55 val completed = CountDownLatch(1) 56 newSingleThreadContext(threadName).use { dispatcher -> 57 flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> { 58 private lateinit var subscription: Subscription 59 60 override fun onSubscribe(s: Subscription) { 61 expect(2) 62 subscription = s 63 subscription.request(2) 64 } 65 66 override fun onNext(t: Int) { 67 checkThread() 68 expect(3) 69 assertEquals(42, t) 70 } 71 72 override fun onComplete() { 73 checkThread() 74 expect(4) 75 completed.countDown() 76 } 77 78 override fun onError(t: Throwable?) { 79 expectUnreached() 80 } 81 }) 82 completed.await() 83 } 84 finish(5) 85 } 86 } 87