1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.CancellationException 9 import kotlinx.coroutines.flow.* 10 import org.junit.Test 11 import org.reactivestreams.* 12 import java.util.concurrent.* 13 import kotlin.test.* 14 15 class FlowAsPublisherTest : TestBase() { 16 @Test 17 fun testErrorOnCancellationIsReported() { 18 expect(1) 19 flow { 20 try { 21 emit(2) 22 } finally { 23 expect(3) 24 throw TestException() 25 } 26 }.asPublisher().subscribe(object : Subscriber<Int> { 27 private lateinit var subscription: Subscription 28 29 override fun onComplete() { 30 expectUnreached() 31 } 32 33 override fun onSubscribe(s: Subscription?) { 34 subscription = s!! 35 subscription.request(2) 36 } 37 38 override fun onNext(t: Int) { 39 expect(t) 40 subscription.cancel() 41 } 42 43 override fun onError(t: Throwable?) { 44 assertTrue(t is TestException) 45 expect(4) 46 } 47 }) 48 finish(5) 49 } 50 51 @Test 52 fun testCancellationIsNotReported() { 53 expect(1) 54 flow { 55 emit(2) 56 }.asPublisher().subscribe(object : Subscriber<Int> { 57 private lateinit var subscription: Subscription 58 59 override fun onComplete() { 60 expectUnreached() 61 } 62 63 override fun onSubscribe(s: Subscription?) { 64 subscription = s!! 65 subscription.request(2) 66 } 67 68 override fun onNext(t: Int) { 69 expect(t) 70 subscription.cancel() 71 } 72 73 override fun onError(t: Throwable?) { 74 expectUnreached() 75 } 76 }) 77 finish(3) 78 } 79 80 @Test 81 fun testUnconfinedDefaultContext() { 82 expect(1) 83 val thread = Thread.currentThread() 84 fun checkThread() { 85 assertSame(thread, Thread.currentThread()) 86 } 87 flowOf(42).asPublisher().subscribe(object : Subscriber<Int> { 88 private lateinit var subscription: Subscription 89 90 override fun onSubscribe(s: Subscription) { 91 expect(2) 92 subscription = s 93 subscription.request(2) 94 } 95 96 override fun onNext(t: Int) { 97 checkThread() 98 expect(3) 99 assertEquals(42, t) 100 } 101 102 override fun onComplete() { 103 checkThread() 104 expect(4) 105 } 106 107 override fun onError(t: Throwable?) { 108 expectUnreached() 109 } 110 }) 111 finish(5) 112 } 113 114 @Test 115 fun testConfinedContext() { 116 expect(1) 117 val threadName = "FlowAsPublisherTest.testConfinedContext" 118 fun checkThread() { 119 val currentThread = Thread.currentThread() 120 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") 121 } 122 val completed = CountDownLatch(1) 123 newSingleThreadContext(threadName).use { dispatcher -> 124 flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> { 125 private lateinit var subscription: Subscription 126 127 override fun onSubscribe(s: Subscription) { 128 expect(2) 129 subscription = s 130 subscription.request(2) 131 } 132 133 override fun onNext(t: Int) { 134 checkThread() 135 expect(3) 136 assertEquals(42, t) 137 } 138 139 override fun onComplete() { 140 checkThread() 141 expect(4) 142 completed.countDown() 143 } 144 145 override fun onError(t: Throwable?) { 146 expectUnreached() 147 } 148 }) 149 completed.await() 150 } 151 finish(5) 152 } 153 154 @Test 155 fun testFlowWithTimeout() = runTest { 156 val publisher = flow<Int> { 157 expect(2) 158 withTimeout(1) { delay(Long.MAX_VALUE) } 159 }.asPublisher() 160 try { 161 expect(1) 162 publisher.awaitFirstOrNull() 163 } catch (e: CancellationException) { 164 expect(3) 165 } 166 finish(4) 167 } 168 } 169