<lambda>null1 package kotlinx.coroutines.jdk9
2
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.testing.exceptions.*
7 import org.junit.Test
8 import java.util.concurrent.Flow as JFlow
9 import kotlin.test.*
10
11 class PublishTest : TestBase() {
12 @Test
13 fun testBasicEmpty() = runTest {
14 expect(1)
15 val publisher = flowPublish<Int>(currentDispatcher()) {
16 expect(5)
17 }
18 expect(2)
19 publisher.subscribe(object : JFlow.Subscriber<Int> {
20 override fun onSubscribe(s: JFlow.Subscription?) { expect(3) }
21 override fun onNext(t: Int?) { expectUnreached() }
22 override fun onComplete() { expect(6) }
23 override fun onError(t: Throwable?) { expectUnreached() }
24 })
25 expect(4)
26 yield() // to publish coroutine
27 finish(7)
28 }
29
30 @Test
31 fun testBasicSingle() = runTest {
32 expect(1)
33 val publisher = flowPublish(currentDispatcher()) {
34 expect(5)
35 send(42)
36 expect(7)
37 }
38 expect(2)
39 publisher.subscribe(object : JFlow.Subscriber<Int> {
40 override fun onSubscribe(s: JFlow.Subscription) {
41 expect(3)
42 s.request(1)
43 }
44 override fun onNext(t: Int) {
45 expect(6)
46 assertEquals(42, t)
47 }
48 override fun onComplete() { expect(8) }
49 override fun onError(t: Throwable?) { expectUnreached() }
50 })
51 expect(4)
52 yield() // to publish coroutine
53 finish(9)
54 }
55
56 @Test
57 fun testBasicError() = runTest {
58 expect(1)
59 val publisher = flowPublish<Int>(currentDispatcher()) {
60 expect(5)
61 throw RuntimeException("OK")
62 }
63 expect(2)
64 publisher.subscribe(object : JFlow.Subscriber<Int> {
65 override fun onSubscribe(s: JFlow.Subscription) {
66 expect(3)
67 s.request(1)
68 }
69 override fun onNext(t: Int) { expectUnreached() }
70 override fun onComplete() { expectUnreached() }
71 override fun onError(t: Throwable) {
72 expect(6)
73 assertIs<RuntimeException>(t)
74 assertEquals("OK", t.message)
75 }
76 })
77 expect(4)
78 yield() // to publish coroutine
79 finish(7)
80 }
81
82 @Test
83 fun testHandleFailureAfterCancel() = runTest {
84 expect(1)
85
86 val eh = CoroutineExceptionHandler { _, t ->
87 assertIs<RuntimeException>(t)
88 expect(6)
89 }
90 val publisher = flowPublish<Unit>(Dispatchers.Unconfined + eh) {
91 try {
92 expect(3)
93 delay(10000)
94 } finally {
95 expect(5)
96 throw RuntimeException("FAILED") // crash after cancel
97 }
98 }
99 var sub: JFlow.Subscription? = null
100 publisher.subscribe(object : JFlow.Subscriber<Unit> {
101 override fun onComplete() {
102 expectUnreached()
103 }
104
105 override fun onSubscribe(s: JFlow.Subscription) {
106 expect(2)
107 sub = s
108 }
109
110 override fun onNext(t: Unit?) {
111 expectUnreached()
112 }
113
114 override fun onError(t: Throwable?) {
115 expectUnreached()
116 }
117 })
118 expect(4)
119 sub!!.cancel()
120 finish(7)
121 }
122
123 /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */
124 @Test
125 fun testChannelClosing() = runTest {
126 expect(1)
127 val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
128 expect(3)
129 close()
130 assert(isClosedForSend)
131 expect(4)
132 }
133 try {
134 expect(2)
135 publisher.awaitFirstOrNull()
136 } catch (e: CancellationException) {
137 expect(5)
138 }
139 finish(6)
140 }
141
142 @Test
143 fun testOnNextError() = runTest {
144 val latch = CompletableDeferred<Unit>()
145 expect(1)
146 assertCallsExceptionHandlerWith<TestException> { exceptionHandler ->
147 val publisher = flowPublish(currentDispatcher() + exceptionHandler) {
148 expect(4)
149 try {
150 send("OK")
151 } catch (e: Throwable) {
152 expect(6)
153 assert(e is TestException)
154 assert(isClosedForSend)
155 latch.complete(Unit)
156 }
157 }
158 expect(2)
159 publisher.subscribe(object : JFlow.Subscriber<String> {
160 override fun onComplete() {
161 expectUnreached()
162 }
163
164 override fun onSubscribe(s: JFlow.Subscription) {
165 expect(3)
166 s.request(1)
167 }
168
169 override fun onNext(t: String) {
170 expect(5)
171 assertEquals("OK", t)
172 throw TestException()
173 }
174
175 override fun onError(t: Throwable) {
176 expectUnreached()
177 }
178 })
179 latch.await()
180 }
181 finish(7)
182 }
183
184 /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */
185 @Test
186 fun testOnNextErrorAfterCancellation() = runTest {
187 assertCallsExceptionHandlerWith<TestException> { handler ->
188 var producerScope: ProducerScope<Int>? = null
189 CompletableDeferred<Unit>()
190 expect(1)
191 var job: Job? = null
192 val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) {
193 producerScope = this
194 expect(4)
195 job = launch {
196 delay(Long.MAX_VALUE)
197 }
198 }
199 expect(2)
200 publisher.subscribe(object : JFlow.Subscriber<Int> {
201 override fun onSubscribe(s: JFlow.Subscription) {
202 expect(3)
203 s.request(Long.MAX_VALUE)
204 }
205
206 override fun onNext(t: Int) {
207 expect(6)
208 assertEquals(1, t)
209 job!!.cancel()
210 throw TestException()
211 }
212
213 override fun onError(t: Throwable?) {
214 /* Correct changes to the implementation could lead to us entering or not entering this method, but
215 it only matters that if we do, it is the "correct" exception that was validly used to cancel the
216 coroutine that gets passed here and not `TestException`. */
217 assertIs<CancellationException>(t)
218 }
219
220 override fun onComplete() {
221 expectUnreached()
222 }
223 })
224 expect(5)
225 val result: ChannelResult<Unit> = producerScope!!.trySend(1)
226 val e = result.exceptionOrNull()!!
227 assertIs<CancellationException>(e, "The actual error: $e")
228 assertTrue(producerScope!!.isClosedForSend)
229 assertTrue(result.isFailure)
230 }
231 finish(7)
232 }
233
234 @Test
235 fun testFailingConsumer() = runTest {
236 val pub = flowPublish(currentDispatcher()) {
237 repeat(3) {
238 expect(it + 1) // expect(1), expect(2) *should* be invoked
239 send(it)
240 }
241 }
242 try {
243 pub.collect {
244 throw TestException()
245 }
246 } catch (e: TestException) {
247 finish(3)
248 }
249 }
250
251 @Test
252 fun testIllegalArgumentException() {
253 assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } }
254 }
255
256 /** Tests that `trySend` doesn't throw in `flowPublish`. */
257 @Test
258 fun testTrySendNotThrowing() = runTest {
259 var producerScope: ProducerScope<Int>? = null
260 expect(1)
261 val publisher = flowPublish<Int>(Dispatchers.Unconfined) {
262 producerScope = this
263 expect(3)
264 delay(Long.MAX_VALUE)
265 }
266 val job = launch(start = CoroutineStart.UNDISPATCHED) {
267 expect(2)
268 publisher.awaitFirstOrNull()
269 expectUnreached()
270 }
271 job.cancel()
272 expect(4)
273 val result = producerScope!!.trySend(1)
274 assertTrue(result.isFailure)
275 finish(5)
276 }
277
278 /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */
279 @Test
280 fun testEmittingNull() = runTest {
281 val publisher = flowPublish {
282 assertFailsWith<NullPointerException> { send(null) }
283 assertFailsWith<NullPointerException> { trySend(null) }
284 send("OK")
285 }
286 assertEquals("OK", publisher.awaitFirstOrNull())
287 }
288 }
289