1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import org.junit.Test 10 import org.reactivestreams.* 11 import java.util.concurrent.* 12 import kotlin.test.* 13 14 class FlowAsPublisherTest : TestBase() { 15 @Test 16 fun testErrorOnCancellationIsReported() { 17 expect(1) 18 flow<Int> { 19 try { 20 emit(2) 21 } finally { 22 expect(3) 23 throw TestException() 24 } 25 }.asPublisher().subscribe(object : Subscriber<Int> { 26 private lateinit var subscription: Subscription 27 28 override fun onComplete() { 29 expectUnreached() 30 } 31 32 override fun onSubscribe(s: Subscription?) { 33 subscription = s!! 34 subscription.request(2) 35 } 36 37 override fun onNext(t: Int) { 38 expect(t) 39 subscription.cancel() 40 } 41 42 override fun onError(t: Throwable?) { 43 assertTrue(t is TestException) 44 expect(4) 45 } 46 }) 47 finish(5) 48 } 49 50 @Test 51 fun testCancellationIsNotReported() { 52 expect(1) 53 flow<Int> { 54 emit(2) 55 }.asPublisher().subscribe(object : Subscriber<Int> { 56 private lateinit var subscription: Subscription 57 58 override fun onComplete() { 59 expect(3) 60 } 61 62 override fun onSubscribe(s: Subscription?) { 63 subscription = s!! 64 subscription.request(2) 65 } 66 67 override fun onNext(t: Int) { 68 expect(t) 69 subscription.cancel() 70 } 71 72 override fun onError(t: Throwable?) { 73 expectUnreached() 74 } 75 }) 76 finish(4) 77 } 78 79 @Test 80 fun testUnconfinedDefaultContext() { 81 expect(1) 82 val thread = Thread.currentThread() 83 fun checkThread() { 84 assertSame(thread, Thread.currentThread()) 85 } 86 flowOf(42).asPublisher().subscribe(object : Subscriber<Int> { 87 private lateinit var subscription: Subscription 88 89 override fun onSubscribe(s: Subscription) { 90 expect(2) 91 subscription = s 92 subscription.request(2) 93 } 94 95 override fun onNext(t: Int) { 96 checkThread() 97 expect(3) 98 assertEquals(42, t) 99 } 100 101 override fun onComplete() { 102 checkThread() 103 expect(4) 104 } 105 106 override fun onError(t: Throwable?) { 107 expectUnreached() 108 } 109 }) 110 finish(5) 111 } 112 113 @Test 114 fun testConfinedContext() { 115 expect(1) 116 val threadName = "FlowAsPublisherTest.testConfinedContext" 117 fun checkThread() { 118 val currentThread = Thread.currentThread() 119 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") 120 } 121 val completed = CountDownLatch(1) 122 newSingleThreadContext(threadName).use { dispatcher -> 123 flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> { 124 private lateinit var subscription: Subscription 125 126 override fun onSubscribe(s: Subscription) { 127 expect(2) 128 subscription = s 129 subscription.request(2) 130 } 131 132 override fun onNext(t: Int) { 133 checkThread() 134 expect(3) 135 assertEquals(42, t) 136 } 137 138 override fun onComplete() { 139 checkThread() 140 expect(4) 141 completed.countDown() 142 } 143 144 override fun onError(t: Throwable?) { 145 expectUnreached() 146 } 147 }) 148 completed.await() 149 } 150 finish(5) 151 } 152 } 153