• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import org.junit.Test
10 import org.reactivestreams.*
11 import java.util.concurrent.*
12 import kotlin.test.*
13 
14 @Suppress("ReactiveStreamsSubscriberImplementation")
15 class FlowAsFlowableTest : TestBase() {
16     @Test
17     fun testUnconfinedDefaultContext() {
18         expect(1)
19         val thread = Thread.currentThread()
20         fun checkThread() {
21             assertSame(thread, Thread.currentThread())
22         }
23         flowOf(42).asFlowable().subscribe(object : Subscriber<Int> {
24             private lateinit var subscription: Subscription
25 
26             override fun onSubscribe(s: Subscription) {
27                 expect(2)
28                 subscription = s
29                 subscription.request(2)
30             }
31 
32             override fun onNext(t: Int) {
33                 checkThread()
34                 expect(3)
35                 assertEquals(42, t)
36             }
37 
38             override fun onComplete() {
39                 checkThread()
40                 expect(4)
41             }
42 
43             override fun onError(t: Throwable?) {
44                 expectUnreached()
45             }
46         })
47         finish(5)
48     }
49 
50     @Test
51     fun testConfinedContext() {
52         expect(1)
53         val threadName = "FlowAsFlowableTest.testConfinedContext"
54         fun checkThread() {
55             val currentThread = Thread.currentThread()
56             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
57         }
58         val completed = CountDownLatch(1)
59         newSingleThreadContext(threadName).use { dispatcher ->
60             flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> {
61                 private lateinit var subscription: Subscription
62 
63                 override fun onSubscribe(s: Subscription) {
64                     expect(2)
65                     subscription = s
66                     subscription.request(2)
67                 }
68 
69                 override fun onNext(t: Int) {
70                     checkThread()
71                     expect(3)
72                     assertEquals(42, t)
73                 }
74 
75                 override fun onComplete() {
76                     checkThread()
77                     expect(4)
78                     completed.countDown()
79                 }
80 
81                 override fun onError(t: Throwable?) {
82                     expectUnreached()
83                 }
84             })
85             completed.await()
86         }
87         finish(5)
88     }
89 }
90