1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import org.junit.Test 10 import org.reactivestreams.* 11 import java.util.concurrent.* 12 import kotlin.test.* 13 14 @Suppress("ReactiveStreamsSubscriberImplementation") 15 class FlowAsFlowableTest : TestBase() { 16 @Test 17 fun testUnconfinedDefaultContext() { 18 expect(1) 19 val thread = Thread.currentThread() 20 fun checkThread() { 21 assertSame(thread, Thread.currentThread()) 22 } 23 flowOf(42).asFlowable().subscribe(object : Subscriber<Int> { 24 private lateinit var subscription: Subscription 25 26 override fun onSubscribe(s: Subscription) { 27 expect(2) 28 subscription = s 29 subscription.request(2) 30 } 31 32 override fun onNext(t: Int) { 33 checkThread() 34 expect(3) 35 assertEquals(42, t) 36 } 37 38 override fun onComplete() { 39 checkThread() 40 expect(4) 41 } 42 43 override fun onError(t: Throwable?) { 44 expectUnreached() 45 } 46 }) 47 finish(5) 48 } 49 50 @Test 51 fun testConfinedContext() { 52 expect(1) 53 val threadName = "FlowAsFlowableTest.testConfinedContext" 54 fun checkThread() { 55 val currentThread = Thread.currentThread() 56 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") 57 } 58 val completed = CountDownLatch(1) 59 newSingleThreadContext(threadName).use { dispatcher -> 60 flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> { 61 private lateinit var subscription: Subscription 62 63 override fun onSubscribe(s: Subscription) { 64 expect(2) 65 subscription = s 66 subscription.request(2) 67 } 68 69 override fun onNext(t: Int) { 70 checkThread() 71 expect(3) 72 assertEquals(42, t) 73 } 74 75 override fun onComplete() { 76 checkThread() 77 expect(4) 78 completed.countDown() 79 } 80 81 override fun onError(t: Throwable?) { 82 expectUnreached() 83 } 84 }) 85 completed.await() 86 } 87 finish(5) 88 } 89 } 90