1 /*
<lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.exceptions.*
9 import org.junit.Test
10 import org.junit.runner.*
11 import org.junit.runners.*
12 import org.reactivestreams.*
13 import java.lang.IllegalStateException
14 import java.lang.RuntimeException
15 import kotlin.contracts.*
16 import kotlin.coroutines.*
17 import kotlin.test.*
18
19 @RunWith(Parameterized::class)
20 class IntegrationTest(
21 private val ctx: Ctx,
22 private val delay: Boolean
23 ) : TestBase() {
24
25 enum class Ctx {
26 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
27 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
28 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
29
30 abstract operator fun invoke(context: CoroutineContext): CoroutineContext
31 }
32
33 companion object {
34 @Parameterized.Parameters(name = "ctx={0}, delay={1}")
35 @JvmStatic
36 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
37 listOf(false, true).map { delay ->
38 arrayOf(ctx, delay)
39 }
40 }
41 }
42
43 @Test
44 fun testEmpty(): Unit = runBlocking {
45 val pub = publish<String>(ctx(coroutineContext)) {
46 if (delay) delay(1)
47 // does not send anything
48 }
49 assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
50 assertEquals("OK", pub.awaitFirstOrDefault("OK"))
51 assertNull(pub.awaitFirstOrNull())
52 assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
53 assertFailsWith<NoSuchElementException> { pub.awaitLast() }
54 assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
55 var cnt = 0
56 pub.collect { cnt++ }
57 assertEquals(0, cnt)
58 }
59
60 @Test
61 fun testSingle() = runBlocking {
62 val pub = publish(ctx(coroutineContext)) {
63 if (delay) delay(1)
64 send("OK")
65 }
66 assertEquals("OK", pub.awaitFirst())
67 assertEquals("OK", pub.awaitFirstOrDefault("!"))
68 assertEquals("OK", pub.awaitFirstOrNull())
69 assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
70 assertEquals("OK", pub.awaitLast())
71 assertEquals("OK", pub.awaitSingle())
72 var cnt = 0
73 pub.collect {
74 assertEquals("OK", it)
75 cnt++
76 }
77 assertEquals(1, cnt)
78 }
79
80 @Test
81 fun testCancelWithoutValue() = runTest {
82 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
83 publish<String> {
84 hang {}
85 }.awaitFirst()
86 }
87
88 job.cancel()
89 job.join()
90 }
91
92 @Test
93 fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
94 expect(1)
95 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
96 publish<String> {
97 yield()
98 expect(2)
99 // Nothing to emit
100 }.awaitFirst()
101 }
102
103 job.join()
104 finish(3)
105 }
106
107 /**
108 * Test that the continuation is not being resumed after it has already failed due to there having been too many
109 * values passed.
110 */
111 @Test
112 fun testNotCompletingFailedAwait() = runTest {
113 try {
114 expect(1)
115 Publisher<Int> { sub ->
116 sub.onSubscribe(object: Subscription {
117 override fun request(n: Long) {
118 expect(2)
119 sub.onNext(1)
120 sub.onNext(2)
121 expect(4)
122 sub.onComplete()
123 }
124
125 override fun cancel() {
126 expect(3)
127 }
128 })
129 }.awaitSingle()
130 } catch (e: java.lang.IllegalArgumentException) {
131 expect(5)
132 }
133 finish(6)
134 }
135
136 /**
137 * Test the behavior of [awaitOne] on unconforming publishers.
138 */
139 @Test
140 fun testAwaitOnNonconformingPublishers() = runTest {
141 fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
142 Publisher<T> { subscriber ->
143 subscriber.onSubscribe(object: Subscription {
144 override fun request(n: Long) {
145 subscriber.block(n)
146 }
147
148 override fun cancel() {
149 }
150 })
151 }
152 val dummyMessage = "dummy"
153 val dummyThrowable = RuntimeException(dummyMessage)
154 suspend fun <T> assertDetectsBadPublisher(
155 operation: suspend Publisher<T>.() -> T,
156 message: String,
157 block: Subscriber<in T>.(n: Long) -> Unit,
158 ) {
159 assertCallsExceptionHandlerWith<IllegalStateException> {
160 try {
161 publisher(block).operation()
162 } catch (e: Throwable) {
163 if (e.message != dummyMessage)
164 throw e
165 }
166 }.let {
167 assertTrue("Expected the message to contain '$message', got '${it.message}'") {
168 it.message?.contains(message) ?: false
169 }
170 }
171 }
172
173 // Rule 1.1 broken: the publisher produces more values than requested.
174 assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
175 onNext(1)
176 onNext(2)
177 onComplete()
178 }
179
180 // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
181 assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
182 onNext(1)
183 onError(dummyThrowable)
184 onComplete()
185 }
186 assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
187 onNext(0)
188 onComplete()
189 onComplete()
190 }
191 assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
192 onComplete()
193 onNext(3)
194 }
195 assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
196 onError(dummyThrowable)
197 onNext(3)
198 }
199
200 // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
201 assertCallsExceptionHandlerWith<IllegalStateException> {
202 try {
203 Publisher<Int> { subscriber ->
204 subscriber.onNext(3)
205 subscriber.onComplete()
206 }.awaitFirst()
207 } catch (e: NoSuchElementException) {
208 // intentionally blank
209 }
210 }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
211 }
212
213 @Test
214 fun testPublishWithTimeout() = runTest {
215 val publisher = publish<Int> {
216 expect(2)
217 withTimeout(1) { delay(100) }
218 }
219 try {
220 expect(1)
221 publisher.awaitFirstOrNull()
222 } catch (e: CancellationException) {
223 expect(3)
224 }
225 finish(4)
226 }
227
228 }
229
230 @OptIn(ExperimentalContracts::class)
assertCallsExceptionHandlerWithnull231 internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
232 crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
233 contract {
234 callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
235 }
236 val handler = CapturingHandler()
237 return withContext(handler) {
238 operation(handler)
239 handler.getException().let {
240 assertTrue(it is E, it.toString())
241 it
242 }
243 }
244 }
245