1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import org.junit.Test 10 import java.util.concurrent.Flow as JFlow 11 import kotlin.test.* 12 13 class PublishTest : TestBase() { 14 @Test 15 fun testBasicEmpty() = runTest { 16 expect(1) 17 val publisher = flowPublish<Int>(currentDispatcher()) { 18 expect(5) 19 } 20 expect(2) 21 publisher.subscribe(object : JFlow.Subscriber<Int> { 22 override fun onSubscribe(s: JFlow.Subscription?) { expect(3) } 23 override fun onNext(t: Int?) { expectUnreached() } 24 override fun onComplete() { expect(6) } 25 override fun onError(t: Throwable?) { expectUnreached() } 26 }) 27 expect(4) 28 yield() // to publish coroutine 29 finish(7) 30 } 31 32 @Test 33 fun testBasicSingle() = runTest { 34 expect(1) 35 val publisher = flowPublish(currentDispatcher()) { 36 expect(5) 37 send(42) 38 expect(7) 39 } 40 expect(2) 41 publisher.subscribe(object : JFlow.Subscriber<Int> { 42 override fun onSubscribe(s: JFlow.Subscription) { 43 expect(3) 44 s.request(1) 45 } 46 override fun onNext(t: Int) { 47 expect(6) 48 assertEquals(42, t) 49 } 50 override fun onComplete() { expect(8) } 51 override fun onError(t: Throwable?) { expectUnreached() } 52 }) 53 expect(4) 54 yield() // to publish coroutine 55 finish(9) 56 } 57 58 @Test 59 fun testBasicError() = runTest { 60 expect(1) 61 val publisher = flowPublish<Int>(currentDispatcher()) { 62 expect(5) 63 throw RuntimeException("OK") 64 } 65 expect(2) 66 publisher.subscribe(object : JFlow.Subscriber<Int> { 67 override fun onSubscribe(s: JFlow.Subscription) { 68 expect(3) 69 s.request(1) 70 } 71 override fun onNext(t: Int) { expectUnreached() } 72 override fun onComplete() { expectUnreached() } 73 override fun onError(t: Throwable) { 74 expect(6) 75 assertTrue(t is RuntimeException) 76 assertEquals("OK", t.message) 77 } 78 }) 79 expect(4) 80 yield() // to publish coroutine 81 finish(7) 82 } 83 84 @Test 85 fun testHandleFailureAfterCancel() = runTest { 86 expect(1) 87 88 val eh = CoroutineExceptionHandler { _, t -> 89 assertTrue(t is RuntimeException) 90 expect(6) 91 } 92 val publisher = flowPublish<Unit>(Dispatchers.Unconfined + eh) { 93 try { 94 expect(3) 95 delay(10000) 96 } finally { 97 expect(5) 98 throw RuntimeException("FAILED") // crash after cancel 99 } 100 } 101 var sub: JFlow.Subscription? = null 102 publisher.subscribe(object : JFlow.Subscriber<Unit> { 103 override fun onComplete() { 104 expectUnreached() 105 } 106 107 override fun onSubscribe(s: JFlow.Subscription) { 108 expect(2) 109 sub = s 110 } 111 112 override fun onNext(t: Unit?) { 113 expectUnreached() 114 } 115 116 override fun onError(t: Throwable?) { 117 expectUnreached() 118 } 119 }) 120 expect(4) 121 sub!!.cancel() 122 finish(7) 123 } 124 125 /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ 126 @Test 127 fun testChannelClosing() = runTest { 128 expect(1) 129 val publisher = flowPublish<Int>(Dispatchers.Unconfined) { 130 expect(3) 131 close() 132 assert(isClosedForSend) 133 expect(4) 134 } 135 try { 136 expect(2) 137 publisher.awaitFirstOrNull() 138 } catch (e: CancellationException) { 139 expect(5) 140 } 141 finish(6) 142 } 143 144 @Test 145 fun testOnNextError() = runTest { 146 val latch = CompletableDeferred<Unit>() 147 expect(1) 148 assertCallsExceptionHandlerWith<TestException> { exceptionHandler -> 149 val publisher = flowPublish(currentDispatcher() + exceptionHandler) { 150 expect(4) 151 try { 152 send("OK") 153 } catch (e: Throwable) { 154 expect(6) 155 assert(e is TestException) 156 assert(isClosedForSend) 157 latch.complete(Unit) 158 } 159 } 160 expect(2) 161 publisher.subscribe(object : JFlow.Subscriber<String> { 162 override fun onComplete() { 163 expectUnreached() 164 } 165 166 override fun onSubscribe(s: JFlow.Subscription) { 167 expect(3) 168 s.request(1) 169 } 170 171 override fun onNext(t: String) { 172 expect(5) 173 assertEquals("OK", t) 174 throw TestException() 175 } 176 177 override fun onError(t: Throwable) { 178 expectUnreached() 179 } 180 }) 181 latch.await() 182 } 183 finish(7) 184 } 185 186 /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ 187 @Test 188 fun testOnNextErrorAfterCancellation() = runTest { 189 assertCallsExceptionHandlerWith<TestException> { handler -> 190 var producerScope: ProducerScope<Int>? = null 191 CompletableDeferred<Unit>() 192 expect(1) 193 var job: Job? = null 194 val publisher = flowPublish<Int>(handler + Dispatchers.Unconfined) { 195 producerScope = this 196 expect(4) 197 job = launch { 198 delay(Long.MAX_VALUE) 199 } 200 } 201 expect(2) 202 publisher.subscribe(object: JFlow.Subscriber<Int> { 203 override fun onSubscribe(s: JFlow.Subscription) { 204 expect(3) 205 s.request(Long.MAX_VALUE) 206 } 207 override fun onNext(t: Int) { 208 expect(6) 209 assertEquals(1, t) 210 job!!.cancel() 211 throw TestException() 212 } 213 override fun onError(t: Throwable?) { 214 /* Correct changes to the implementation could lead to us entering or not entering this method, but 215 it only matters that if we do, it is the "correct" exception that was validly used to cancel the 216 coroutine that gets passed here and not `TestException`. */ 217 assertTrue(t is CancellationException) 218 } 219 override fun onComplete() { expectUnreached() } 220 }) 221 expect(5) 222 val result: ChannelResult<Unit> = producerScope!!.trySend(1) 223 val e = result.exceptionOrNull()!! 224 assertTrue(e is CancellationException, "The actual error: $e") 225 assertTrue(producerScope!!.isClosedForSend) 226 assertTrue(result.isFailure) 227 } 228 finish(7) 229 } 230 231 @Test 232 fun testFailingConsumer() = runTest { 233 val pub = flowPublish(currentDispatcher()) { 234 repeat(3) { 235 expect(it + 1) // expect(1), expect(2) *should* be invoked 236 send(it) 237 } 238 } 239 try { 240 pub.collect { 241 throw TestException() 242 } 243 } catch (e: TestException) { 244 finish(3) 245 } 246 } 247 248 @Test 249 fun testIllegalArgumentException() { 250 assertFailsWith<IllegalArgumentException> { flowPublish<Int>(Job()) { } } 251 } 252 253 /** Tests that `trySend` doesn't throw in `flowPublish`. */ 254 @Test 255 fun testTrySendNotThrowing() = runTest { 256 var producerScope: ProducerScope<Int>? = null 257 expect(1) 258 val publisher = flowPublish<Int>(Dispatchers.Unconfined) { 259 producerScope = this 260 expect(3) 261 delay(Long.MAX_VALUE) 262 } 263 val job = launch(start = CoroutineStart.UNDISPATCHED) { 264 expect(2) 265 publisher.awaitFirstOrNull() 266 expectUnreached() 267 } 268 job.cancel() 269 expect(4) 270 val result = producerScope!!.trySend(1) 271 assertTrue(result.isFailure) 272 finish(5) 273 } 274 275 /** Tests that all methods on `flowPublish` fail without closing the channel when attempting to emit `null`. */ 276 @Test 277 fun testEmittingNull() = runTest { 278 val publisher = flowPublish { 279 assertFailsWith<NullPointerException> { send(null) } 280 assertFailsWith<NullPointerException> { trySend(null) } 281 send("OK") 282 } 283 assertEquals("OK", publisher.awaitFirstOrNull()) 284 } 285 } 286