1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.CancellationException 9 import kotlinx.coroutines.channels.* 10 import kotlinx.coroutines.flow.* 11 import kotlinx.coroutines.sync.* 12 import org.junit.Test 13 import org.reactivestreams.* 14 import java.util.concurrent.* 15 import kotlin.test.* 16 17 class PublishTest : TestBase() { 18 @Test 19 fun testBasicEmpty() = runTest { 20 expect(1) 21 val publisher = publish<Int>(currentDispatcher()) { 22 expect(5) 23 } 24 expect(2) 25 publisher.subscribe(object : Subscriber<Int> { 26 override fun onSubscribe(s: Subscription?) { expect(3) } 27 override fun onNext(t: Int?) { expectUnreached() } 28 override fun onComplete() { expect(6) } 29 override fun onError(t: Throwable?) { expectUnreached() } 30 }) 31 expect(4) 32 yield() // to publish coroutine 33 finish(7) 34 } 35 36 @Test 37 fun testBasicSingle() = runTest { 38 expect(1) 39 val publisher = publish(currentDispatcher()) { 40 expect(5) 41 send(42) 42 expect(7) 43 } 44 expect(2) 45 publisher.subscribe(object : Subscriber<Int> { 46 override fun onSubscribe(s: Subscription) { 47 expect(3) 48 s.request(1) 49 } 50 override fun onNext(t: Int) { 51 expect(6) 52 assertEquals(42, t) 53 } 54 override fun onComplete() { expect(8) } 55 override fun onError(t: Throwable?) { expectUnreached() } 56 }) 57 expect(4) 58 yield() // to publish coroutine 59 finish(9) 60 } 61 62 @Test 63 fun testBasicError() = runTest { 64 expect(1) 65 val publisher = publish<Int>(currentDispatcher()) { 66 expect(5) 67 throw RuntimeException("OK") 68 } 69 expect(2) 70 publisher.subscribe(object : Subscriber<Int> { 71 override fun onSubscribe(s: Subscription) { 72 expect(3) 73 s.request(1) 74 } 75 override fun onNext(t: Int) { expectUnreached() } 76 override fun onComplete() { expectUnreached() } 77 override fun onError(t: Throwable) { 78 expect(6) 79 assertTrue(t is RuntimeException) 80 assertEquals("OK", t.message) 81 } 82 }) 83 expect(4) 84 yield() // to publish coroutine 85 finish(7) 86 } 87 88 @Test 89 fun testHandleFailureAfterCancel() = runTest { 90 expect(1) 91 92 val eh = CoroutineExceptionHandler { _, t -> 93 assertTrue(t is RuntimeException) 94 expect(6) 95 } 96 val publisher = publish<Unit>(Dispatchers.Unconfined + eh) { 97 try { 98 expect(3) 99 delay(10000) 100 } finally { 101 expect(5) 102 throw RuntimeException("FAILED") // crash after cancel 103 } 104 } 105 var sub: Subscription? = null 106 publisher.subscribe(object : Subscriber<Unit> { 107 override fun onComplete() { 108 expectUnreached() 109 } 110 111 override fun onSubscribe(s: Subscription) { 112 expect(2) 113 sub = s 114 } 115 116 override fun onNext(t: Unit?) { 117 expectUnreached() 118 } 119 120 override fun onError(t: Throwable?) { 121 expectUnreached() 122 } 123 }) 124 expect(4) 125 sub!!.cancel() 126 finish(7) 127 } 128 129 /** Tests that, as soon as `ProducerScope.close` is called, `isClosedForSend` starts returning `true`. */ 130 @Test 131 fun testChannelClosing() = runTest { 132 expect(1) 133 val publisher = publish<Int>(Dispatchers.Unconfined) { 134 expect(3) 135 close() 136 assert(isClosedForSend) 137 expect(4) 138 } 139 try { 140 expect(2) 141 publisher.awaitFirstOrNull() 142 } catch (e: CancellationException) { 143 expect(5) 144 } 145 finish(6) 146 } 147 148 @Test 149 fun testOnNextError() = runTest { 150 val latch = CompletableDeferred<Unit>() 151 expect(1) 152 assertCallsExceptionHandlerWith<TestException> { exceptionHandler -> 153 val publisher = publish(currentDispatcher() + exceptionHandler) { 154 expect(4) 155 try { 156 send("OK") 157 } catch (e: Throwable) { 158 expect(6) 159 assert(e is TestException) 160 assert(isClosedForSend) 161 latch.complete(Unit) 162 } 163 } 164 expect(2) 165 publisher.subscribe(object : Subscriber<String> { 166 override fun onComplete() { 167 expectUnreached() 168 } 169 170 override fun onSubscribe(s: Subscription) { 171 expect(3) 172 s.request(1) 173 } 174 175 override fun onNext(t: String) { 176 expect(5) 177 assertEquals("OK", t) 178 throw TestException() 179 } 180 181 override fun onError(t: Throwable) { 182 expectUnreached() 183 } 184 }) 185 latch.await() 186 } 187 finish(7) 188 } 189 190 /** Tests the behavior when a call to `onNext` fails after the channel is already closed. */ 191 @Test 192 fun testOnNextErrorAfterCancellation() = runTest { 193 assertCallsExceptionHandlerWith<TestException> { handler -> 194 var producerScope: ProducerScope<Int>? = null 195 CompletableDeferred<Unit>() 196 expect(1) 197 var job: Job? = null 198 val publisher = publish<Int>(handler + Dispatchers.Unconfined) { 199 producerScope = this 200 expect(4) 201 job = launch { 202 delay(Long.MAX_VALUE) 203 } 204 } 205 expect(2) 206 publisher.subscribe(object: Subscriber<Int> { 207 override fun onSubscribe(s: Subscription) { 208 expect(3) 209 s.request(Long.MAX_VALUE) 210 } 211 override fun onNext(t: Int) { 212 expect(6) 213 assertEquals(1, t) 214 job!!.cancel() 215 throw TestException() 216 } 217 override fun onError(t: Throwable?) { 218 /* Correct changes to the implementation could lead to us entering or not entering this method, but 219 it only matters that if we do, it is the "correct" exception that was validly used to cancel the 220 coroutine that gets passed here and not `TestException`. */ 221 assertTrue(t is CancellationException) 222 } 223 override fun onComplete() { expectUnreached() } 224 }) 225 expect(5) 226 val result: ChannelResult<Unit> = producerScope!!.trySend(1) 227 val e = result.exceptionOrNull()!! 228 assertTrue(e is CancellationException, "The actual error: $e") 229 assertTrue(producerScope!!.isClosedForSend) 230 assertTrue(result.isFailure) 231 } 232 finish(7) 233 } 234 235 @Test 236 fun testFailingConsumer() = runTest { 237 val pub = publish(currentDispatcher()) { 238 repeat(3) { 239 expect(it + 1) // expect(1), expect(2) *should* be invoked 240 send(it) 241 } 242 } 243 try { 244 pub.collect { 245 throw TestException() 246 } 247 } catch (e: TestException) { 248 finish(3) 249 } 250 } 251 252 @Test 253 fun testIllegalArgumentException() { 254 assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } } 255 } 256 257 /** Tests that `trySend` doesn't throw in `publish`. */ 258 @Test 259 fun testTrySendNotThrowing() = runTest { 260 var producerScope: ProducerScope<Int>? = null 261 expect(1) 262 val publisher = publish<Int>(Dispatchers.Unconfined) { 263 producerScope = this 264 expect(3) 265 delay(Long.MAX_VALUE) 266 } 267 val job = launch(start = CoroutineStart.UNDISPATCHED) { 268 expect(2) 269 publisher.awaitFirstOrNull() 270 expectUnreached() 271 } 272 job.cancel() 273 expect(4) 274 val result = producerScope!!.trySend(1) 275 assertTrue(result.isFailure) 276 finish(5) 277 } 278 279 /** Tests that all methods on `publish` fail without closing the channel when attempting to emit `null`. */ 280 @Test 281 fun testEmittingNull() = runTest { 282 val publisher = publish { 283 assertFailsWith<NullPointerException> { send(null) } 284 assertFailsWith<NullPointerException> { trySend(null) } 285 send("OK") 286 } 287 assertEquals("OK", publisher.awaitFirstOrNull()) 288 } 289 290 @Test 291 fun testOnSendCancelled() = runTest { 292 val latch = CountDownLatch(1) 293 val published = publish(Dispatchers.Default) { 294 expect(2) 295 // Collector is ready 296 send(1) 297 try { 298 send(2) 299 expectUnreached() 300 } catch (e: CancellationException) { 301 // publisher cancellation is async 302 latch.countDown() 303 throw e 304 } 305 } 306 307 expect(1) 308 val collectorLatch = Mutex(true) 309 val job = launch { 310 published.asFlow().buffer(0).collect { 311 collectorLatch.unlock() 312 hang { expect(4) } 313 } 314 } 315 collectorLatch.lock() 316 expect(3) 317 job.cancelAndJoin() 318 latch.await() 319 finish(5) 320 } 321 } 322