• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.flow.*
6 import org.junit.Test
7 import org.reactivestreams.*
8 import java.util.concurrent.*
9 import kotlin.test.*
10 
11 @Suppress("ReactiveStreamsSubscriberImplementation")
12 class FlowAsFlowableTest : TestBase() {
13     @Test
14     fun testUnconfinedDefaultContext() {
15         expect(1)
16         val thread = Thread.currentThread()
17         fun checkThread() {
18             assertSame(thread, Thread.currentThread())
19         }
20         flowOf(42).asFlowable().subscribe(object : Subscriber<Int> {
21             private lateinit var subscription: Subscription
22 
23             override fun onSubscribe(s: Subscription) {
24                 expect(2)
25                 subscription = s
26                 subscription.request(2)
27             }
28 
29             override fun onNext(t: Int) {
30                 checkThread()
31                 expect(3)
32                 assertEquals(42, t)
33             }
34 
35             override fun onComplete() {
36                 checkThread()
37                 expect(4)
38             }
39 
40             override fun onError(t: Throwable?) {
41                 expectUnreached()
42             }
43         })
44         finish(5)
45     }
46 
47     @Test
48     fun testConfinedContext() {
49         expect(1)
50         val threadName = "FlowAsFlowableTest.testConfinedContext"
51         fun checkThread() {
52             val currentThread = Thread.currentThread()
53             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
54         }
55         val completed = CountDownLatch(1)
56         newSingleThreadContext(threadName).use { dispatcher ->
57             flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> {
58                 private lateinit var subscription: Subscription
59 
60                 override fun onSubscribe(s: Subscription) {
61                     expect(2)
62                     subscription = s
63                     subscription.request(2)
64                 }
65 
66                 override fun onNext(t: Int) {
67                     checkThread()
68                     expect(3)
69                     assertEquals(42, t)
70                 }
71 
72                 override fun onComplete() {
73                     checkThread()
74                     expect(4)
75                     completed.countDown()
76                 }
77 
78                 override fun onError(t: Throwable?) {
79                     expectUnreached()
80                 }
81             })
82             completed.await()
83         }
84         finish(5)
85     }
86 }
87