• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.testing.exceptions.*
6 import org.junit.Test
7 import org.junit.runner.*
8 import org.junit.runners.*
9 import org.reactivestreams.*
10 import java.lang.IllegalStateException
11 import java.lang.RuntimeException
12 import kotlin.coroutines.*
13 import kotlin.test.*
14 
15 @RunWith(Parameterized::class)
16 class IntegrationTest(
17     private val ctx: Ctx,
18     private val delay: Boolean
19 ) : TestBase() {
20 
21     enum class Ctx {
22         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
23         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
24         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
25 
26         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
27     }
28 
29     companion object {
30         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
31         @JvmStatic
32         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
33             listOf(false, true).map { delay ->
34                 arrayOf(ctx, delay)
35             }
36         }
37     }
38 
39     @Test
40     fun testEmpty(): Unit = runBlocking {
41         val pub = publish<String>(ctx(coroutineContext)) {
42             if (delay) delay(1)
43             // does not send anything
44         }
45         assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
46         assertEquals("OK", pub.awaitFirstOrDefault("OK"))
47         assertNull(pub.awaitFirstOrNull())
48         assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
49         assertFailsWith<NoSuchElementException> { pub.awaitLast() }
50         assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
51         var cnt = 0
52         pub.collect { cnt++ }
53         assertEquals(0, cnt)
54     }
55 
56     @Test
57     fun testSingle() = runBlocking {
58         val pub = publish(ctx(coroutineContext)) {
59             if (delay) delay(1)
60             send("OK")
61         }
62         assertEquals("OK", pub.awaitFirst())
63         assertEquals("OK", pub.awaitFirstOrDefault("!"))
64         assertEquals("OK", pub.awaitFirstOrNull())
65         assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
66         assertEquals("OK", pub.awaitLast())
67         assertEquals("OK", pub.awaitSingle())
68         var cnt = 0
69         pub.collect {
70             assertEquals("OK", it)
71             cnt++
72         }
73         assertEquals(1, cnt)
74     }
75 
76     @Test
77     fun testCancelWithoutValue() = runTest {
78         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
79             publish<String> {
80                 hang {}
81             }.awaitFirst()
82         }
83 
84         job.cancel()
85         job.join()
86     }
87 
88     @Test
89     fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException }) {
90         expect(1)
91         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
92             publish<String> {
93                 yield()
94                 expect(2)
95                 // Nothing to emit
96             }.awaitFirst()
97         }
98 
99         job.join()
100         finish(3)
101     }
102 
103     /**
104      * Test that the continuation is not being resumed after it has already failed due to there having been too many
105      * values passed.
106      */
107     @Test
108     fun testNotCompletingFailedAwait() = runTest {
109         try {
110             expect(1)
111             Publisher<Int> { sub ->
112                 sub.onSubscribe(object: Subscription {
113                     override fun request(n: Long) {
114                         expect(2)
115                         sub.onNext(1)
116                         sub.onNext(2)
117                         expect(4)
118                         sub.onComplete()
119                     }
120 
121                     override fun cancel() {
122                         expect(3)
123                     }
124                 })
125             }.awaitSingle()
126         } catch (e: java.lang.IllegalArgumentException) {
127             expect(5)
128         }
129         finish(6)
130     }
131 
132     /**
133      * Test the behavior of [awaitOne] on unconforming publishers.
134      */
135     @Test
136     fun testAwaitOnNonconformingPublishers() = runTest {
137         fun <T> publisher(block: Subscriber<in T>.(n: Long) -> Unit) =
138             Publisher<T> { subscriber ->
139                 subscriber.onSubscribe(object: Subscription {
140                     override fun request(n: Long) {
141                         subscriber.block(n)
142                     }
143 
144                     override fun cancel() {
145                     }
146                 })
147             }
148         val dummyMessage = "dummy"
149         val dummyThrowable = RuntimeException(dummyMessage)
150         suspend fun <T> assertDetectsBadPublisher(
151             operation: suspend Publisher<T>.() -> T,
152             message: String,
153             block: Subscriber<in T>.(n: Long) -> Unit,
154         ) {
155             assertCallsExceptionHandlerWith<IllegalStateException> {
156                 try {
157                     publisher(block).operation()
158                 } catch (e: Throwable) {
159                     if (e.message != dummyMessage)
160                         throw e
161                 }
162             }.let {
163                 assertTrue("Expected the message to contain '$message', got '${it.message}'") {
164                     it.message?.contains(message) ?: false
165                 }
166             }
167         }
168 
169         // Rule 1.1 broken: the publisher produces more values than requested.
170         assertDetectsBadPublisher<Int>({ awaitFirst() }, "provided more") {
171             onNext(1)
172             onNext(2)
173             onComplete()
174         }
175 
176         // Rule 1.7 broken: the publisher calls a method on a subscriber after reaching the terminal state.
177         assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
178             onNext(1)
179             onError(dummyThrowable)
180             onComplete()
181         }
182         assertDetectsBadPublisher<Int>({ awaitFirst() }, "terminal state") {
183             onNext(0)
184             onComplete()
185             onComplete()
186         }
187         assertDetectsBadPublisher<Int>({ awaitFirstOrDefault(1) }, "terminal state") {
188             onComplete()
189             onNext(3)
190         }
191         assertDetectsBadPublisher<Int>({ awaitSingle() }, "terminal state") {
192             onError(dummyThrowable)
193             onNext(3)
194         }
195 
196         // Rule 1.9 broken (the first signal to the subscriber was not 'onSubscribe')
197         assertCallsExceptionHandlerWith<IllegalStateException> {
198             try {
199                 Publisher<Int> { subscriber ->
200                     subscriber.onNext(3)
201                     subscriber.onComplete()
202                 }.awaitFirst()
203             } catch (e: NoSuchElementException) {
204                 // intentionally blank
205             }
206         }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) }
207     }
208 
209     @Test
210     fun testPublishWithTimeout() = runTest {
211         val publisher = publish<Int> {
212             expect(2)
213             withTimeout(1) { delay(100) }
214         }
215         try {
216             expect(1)
217             publisher.awaitFirstOrNull()
218         } catch (e: CancellationException) {
219             expect(3)
220         }
221         finish(4)
222     }
223 
224 }
225 
226