• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import org.junit.Test
10 import org.reactivestreams.*
11 import java.util.concurrent.*
12 import kotlin.test.*
13 
14 class FlowAsPublisherTest : TestBase() {
15     @Test
16     fun testErrorOnCancellationIsReported() {
17         expect(1)
18         flow<Int> {
19             try {
20                 emit(2)
21             } finally {
22                 expect(3)
23                 throw TestException()
24             }
25         }.asPublisher().subscribe(object : Subscriber<Int> {
26             private lateinit var subscription: Subscription
27 
28             override fun onComplete() {
29                 expectUnreached()
30             }
31 
32             override fun onSubscribe(s: Subscription?) {
33                 subscription = s!!
34                 subscription.request(2)
35             }
36 
37             override fun onNext(t: Int) {
38                 expect(t)
39                 subscription.cancel()
40             }
41 
42             override fun onError(t: Throwable?) {
43                 assertTrue(t is TestException)
44                 expect(4)
45             }
46         })
47         finish(5)
48     }
49 
50     @Test
51     fun testCancellationIsNotReported() {
52         expect(1)
53         flow<Int>    {
54             emit(2)
55         }.asPublisher().subscribe(object : Subscriber<Int> {
56             private lateinit var subscription: Subscription
57 
58             override fun onComplete() {
59                 expect(3)
60             }
61 
62             override fun onSubscribe(s: Subscription?) {
63                 subscription = s!!
64                 subscription.request(2)
65             }
66 
67             override fun onNext(t: Int) {
68                 expect(t)
69                 subscription.cancel()
70             }
71 
72             override fun onError(t: Throwable?) {
73                 expectUnreached()
74             }
75         })
76         finish(4)
77     }
78 
79     @Test
80     fun testUnconfinedDefaultContext() {
81         expect(1)
82         val thread = Thread.currentThread()
83         fun checkThread() {
84             assertSame(thread, Thread.currentThread())
85         }
86         flowOf(42).asPublisher().subscribe(object : Subscriber<Int> {
87             private lateinit var subscription: Subscription
88 
89             override fun onSubscribe(s: Subscription) {
90                 expect(2)
91                 subscription = s
92                 subscription.request(2)
93             }
94 
95             override fun onNext(t: Int) {
96                 checkThread()
97                 expect(3)
98                 assertEquals(42, t)
99             }
100 
101             override fun onComplete() {
102                 checkThread()
103                 expect(4)
104             }
105 
106             override fun onError(t: Throwable?) {
107                 expectUnreached()
108             }
109         })
110         finish(5)
111     }
112 
113     @Test
114     fun testConfinedContext() {
115         expect(1)
116         val threadName = "FlowAsPublisherTest.testConfinedContext"
117         fun checkThread() {
118             val currentThread = Thread.currentThread()
119             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
120         }
121         val completed = CountDownLatch(1)
122         newSingleThreadContext(threadName).use { dispatcher ->
123             flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> {
124                 private lateinit var subscription: Subscription
125 
126                 override fun onSubscribe(s: Subscription) {
127                     expect(2)
128                     subscription = s
129                     subscription.request(2)
130                 }
131 
132                 override fun onNext(t: Int) {
133                     checkThread()
134                     expect(3)
135                     assertEquals(42, t)
136                 }
137 
138                 override fun onComplete() {
139                     checkThread()
140                     expect(4)
141                     completed.countDown()
142                 }
143 
144                 override fun onError(t: Throwable?) {
145                     expectUnreached()
146                 }
147             })
148             completed.await()
149         }
150         finish(5)
151     }
152 }
153