<lambda>null1 package kotlinx.coroutines.reactive
2
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.testing.exceptions.*
6 import org.junit.Test
7 import org.junit.runner.*
8 import org.junit.runners.*
9 import org.reactivestreams.*
10 import java.lang.IllegalStateException
11 import java.lang.RuntimeException
12 import kotlin.coroutines.*
13 import kotlin.test.*
14
15 @RunWith(Parameterized::class)
16 class IntegrationTest(
17 private val ctx: Ctx,
18 private val delay: Boolean
19 ) : TestBase() {
20
21 enum class Ctx {
22 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
23 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
24 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
25
26 abstract operator fun invoke(context: CoroutineContext): CoroutineContext
27 }
28
29 companion object {
30 @Parameterized.Parameters(name = "ctx={0}, delay={1}")
31 @JvmStatic
32 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
33 listOf(false, true).map { delay ->
34 arrayOf(ctx, delay)
35 }
36 }
37 }
38
39 @Test
40 fun testEmpty(): Unit = runBlocking {
41 val pub = publish<String>(ctx(coroutineContext)) {
42 if (delay) delay(1)
43 // does not send anything
44 }
45 assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
46 assertEquals("OK", pub.awaitFirstOrDefault("OK"))
47 assertNull(pub.awaitFirstOrNull())
48 assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
49 assertFailsWith<NoSuchElementException> { pub.awaitLast() }
50 assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
51 var cnt = 0
52 pub.collect { cnt++ }
53 assertEquals(0, cnt)
54 }
55
56 @Test
57 fun testSingle() = runBlocking {
58 val pub = publish(ctx(coroutineContext)) {
59 if (delay) delay(1)
60 send("OK")
61 }
62 assertEquals("OK", pub.awaitFirst())
63 assertEquals("OK", pub.awaitFirstOrDefault("!"))
64 assertEquals("OK", pub.awaitFirstOrNull())
65 assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
66 assertEquals("OK", pub.awaitLast())
67 assertEquals("OK", pub.awaitSingle())
68 var cnt = 0
69 pub.collect {
70 assertEquals("OK", it)
71 cnt++
72 }
73 assertEquals(1, cnt)
74 }
75
76 @Test
77 fun testCancelWithoutValue() = runTest {
78 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
79 publish<String> {
80 hang {}
81 }.awaitFirst()
82 }
83
84 job.cancel()
85 job.join()
86 }
87
88 @Test
89 fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
90 expect(1)
91 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
92 publish<String> {
93 yield()
94 expect(2)
95 // Nothing to emit
96 }.awaitFirst()
97 }
98
99 job.join()
100 finish(3)
101 }
102
103 /**
104 * Test that the continuation is not being resumed after it has already failed due to there having been too many
105 * values passed.
106 */
107 @Test
108 fun testNotCompletingFailedAwait() = runTest {
109 try {
110 expect(1)
111 Publisher<Int> { sub ->
112 sub.onSubscribe(object: Subscription {
113 override fun request(n: Long) {
114 expect(2)
115 sub.onNext(1)
116 sub.onNext(2)
117 expect(4)
118 sub.onComplete()
119 }
120
121 override fun cancel() {
122 expect(3)
123 }
124 })
125 }.awaitSingle()
126 } catch (e: java.lang.IllegalArgumentException) {
127 expect(5)
128 }
129 finish(6)
130 }
131
132 /**
133 * Test the behavior of [awaitOne] on unconforming publishers.
134 */
135 @Test
136 fun testAwaitOnNonconformingPublishers() = runTest {
137 fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
138 Publisher<T> { subscriber ->
139 subscriber.onSubscribe(object: Subscription {
140 override fun request(n: Long) {
141 subscriber.block(n)
142 }
143
144 override fun cancel() {
145 }
146 })
147 }
148 val dummyMessage = "dummy"
149 val dummyThrowable = RuntimeException(dummyMessage)
150 suspend fun <T> assertDetectsBadPublisher(
151 operation: suspend Publisher<T>.() -> T,
152 message: String,
153 block: Subscriber<in T>.(n: Long) -> Unit,
154 ) {
155 assertCallsExceptionHandlerWith<IllegalStateException> {
156 try {
157 publisher(block).operation()
158 } catch (e: Throwable) {
159 if (e.message != dummyMessage)
160 throw e
161 }
162 }.let {
163 assertTrue("Expected the message to contain '$message', got '${it.message}'") {
164 it.message?.contains(message) ?: false
165 }
166 }
167 }
168
169 // Rule 1.1 broken: the publisher produces more values than requested.
170 assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
171 onNext(1)
172 onNext(2)
173 onComplete()
174 }
175
176 // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
177 assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
178 onNext(1)
179 onError(dummyThrowable)
180 onComplete()
181 }
182 assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
183 onNext(0)
184 onComplete()
185 onComplete()
186 }
187 assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
188 onComplete()
189 onNext(3)
190 }
191 assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
192 onError(dummyThrowable)
193 onNext(3)
194 }
195
196 // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
197 assertCallsExceptionHandlerWith<IllegalStateException> {
198 try {
199 Publisher<Int> { subscriber ->
200 subscriber.onNext(3)
201 subscriber.onComplete()
202 }.awaitFirst()
203 } catch (e: NoSuchElementException) {
204 // intentionally blank
205 }
206 }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
207 }
208
209 @Test
210 fun testPublishWithTimeout() = runTest {
211 val publisher = publish<Int> {
212 expect(2)
213 withTimeout(1) { delay(100) }
214 }
215 try {
216 expect(1)
217 publisher.awaitFirstOrNull()
218 } catch (e: CancellationException) {
219 expect(3)
220 }
221 finish(4)
222 }
223
224 }
225
226