• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.exceptions.*
9 import org.junit.Test
10 import org.junit.runner.*
11 import org.junit.runners.*
12 import org.reactivestreams.*
13 import java.lang.IllegalStateException
14 import java.lang.RuntimeException
15 import kotlin.contracts.*
16 import kotlin.coroutines.*
17 import kotlin.test.*
18 
19 @RunWith(Parameterized::class)
20 class IntegrationTest(
21     private val ctx: Ctx,
22     private val delay: Boolean
23 ) : TestBase() {
24 
25     enum class Ctx {
26         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
27         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
28         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
29 
30         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
31     }
32 
33     companion object {
34         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
35         @JvmStatic
36         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
37             listOf(false, true).map { delay ->
38                 arrayOf(ctx, delay)
39             }
40         }
41     }
42 
43     @Test
44     fun testEmpty(): Unit = runBlocking {
45         val pub = publish<String>(ctx(coroutineContext)) {
46             if (delay) delay(1)
47             // does not send anything
48         }
49         assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
50         assertEquals("OK", pub.awaitFirstOrDefault("OK"))
51         assertNull(pub.awaitFirstOrNull())
52         assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
53         assertFailsWith<NoSuchElementException> { pub.awaitLast() }
54         assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
55         var cnt = 0
56         pub.collect { cnt++ }
57         assertEquals(0, cnt)
58     }
59 
60     @Test
61     fun testSingle() = runBlocking {
62         val pub = publish(ctx(coroutineContext)) {
63             if (delay) delay(1)
64             send("OK")
65         }
66         assertEquals("OK", pub.awaitFirst())
67         assertEquals("OK", pub.awaitFirstOrDefault("!"))
68         assertEquals("OK", pub.awaitFirstOrNull())
69         assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
70         assertEquals("OK", pub.awaitLast())
71         assertEquals("OK", pub.awaitSingle())
72         var cnt = 0
73         pub.collect {
74             assertEquals("OK", it)
75             cnt++
76         }
77         assertEquals(1, cnt)
78     }
79 
80     @Test
81     fun testCancelWithoutValue() = runTest {
82         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
83             publish<String> {
84                 hang {}
85             }.awaitFirst()
86         }
87 
88         job.cancel()
89         job.join()
90     }
91 
92     @Test
93     fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
94         expect(1)
95         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
96             publish<String> {
97                 yield()
98                 expect(2)
99                 // Nothing to emit
100             }.awaitFirst()
101         }
102 
103         job.join()
104         finish(3)
105     }
106 
107     /**
108      * Test that the continuation is not being resumed after it has already failed due to there having been too many
109      * values passed.
110      */
111     @Test
112     fun testNotCompletingFailedAwait() = runTest {
113         try {
114             expect(1)
115             Publisher<Int> { sub ->
116                 sub.onSubscribe(object: Subscription {
117                     override fun request(n: Long) {
118                         expect(2)
119                         sub.onNext(1)
120                         sub.onNext(2)
121                         expect(4)
122                         sub.onComplete()
123                     }
124 
125                     override fun cancel() {
126                         expect(3)
127                     }
128                 })
129             }.awaitSingle()
130         } catch (e: java.lang.IllegalArgumentException) {
131             expect(5)
132         }
133         finish(6)
134     }
135 
136     /**
137      * Test the behavior of [awaitOne] on unconforming publishers.
138      */
139     @Test
140     fun testAwaitOnNonconformingPublishers() = runTest {
141         fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
142             Publisher<T> { subscriber ->
143                 subscriber.onSubscribe(object: Subscription {
144                     override fun request(n: Long) {
145                         subscriber.block(n)
146                     }
147 
148                     override fun cancel() {
149                     }
150                 })
151             }
152         val dummyMessage = "dummy"
153         val dummyThrowable = RuntimeException(dummyMessage)
154         suspend fun <T> assertDetectsBadPublisher(
155             operation: suspend Publisher<T>.() -> T,
156             message: String,
157             block: Subscriber<in T>.(n: Long) -> Unit,
158         ) {
159             assertCallsExceptionHandlerWith<IllegalStateException> {
160                 try {
161                     publisher(block).operation()
162                 } catch (e: Throwable) {
163                     if (e.message != dummyMessage)
164                         throw e
165                 }
166             }.let {
167                 assertTrue("Expected the message to contain '$message', got '${it.message}'") {
168                     it.message?.contains(message) ?: false
169                 }
170             }
171         }
172 
173         // Rule 1.1 broken: the publisher produces more values than requested.
174         assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
175             onNext(1)
176             onNext(2)
177             onComplete()
178         }
179 
180         // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
181         assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
182             onNext(1)
183             onError(dummyThrowable)
184             onComplete()
185         }
186         assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
187             onNext(0)
188             onComplete()
189             onComplete()
190         }
191         assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
192             onComplete()
193             onNext(3)
194         }
195         assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
196             onError(dummyThrowable)
197             onNext(3)
198         }
199 
200         // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
201         assertCallsExceptionHandlerWith<IllegalStateException> {
202             try {
203                 Publisher<Int> { subscriber ->
204                     subscriber.onNext(3)
205                     subscriber.onComplete()
206                 }.awaitFirst()
207             } catch (e: NoSuchElementException) {
208                 // intentionally blank
209             }
210         }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
211     }
212 
213     @Test
214     fun testPublishWithTimeout() = runTest {
215         val publisher = publish<Int> {
216             expect(2)
217             withTimeout(1) { delay(100) }
218         }
219         try {
220             expect(1)
221             publisher.awaitFirstOrNull()
222         } catch (e: CancellationException) {
223             expect(3)
224         }
225         finish(4)
226     }
227 
228 }
229 
230 @OptIn(ExperimentalContracts::class)
assertCallsExceptionHandlerWithnull231 internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
232     crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
233     contract {
234         callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
235     }
236     val handler = CapturingHandler()
237     return withContext(handler) {
238         operation(handler)
239         handler.getException().let {
240             assertTrue(it is E, it.toString())
241             it
242         }
243     }
244 }
245