1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.jdk9
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.exceptions.*
9 import org.junit.Test
10 import kotlinx.coroutines.flow.flowOn
11 import org.junit.runner.*
12 import org.junit.runners.*
13 import kotlin.contracts.*
14 import java.util.concurrent.Flow as JFlow
15 import kotlin.coroutines.*
16 import kotlin.test.*
17
18 @RunWith(Parameterized::class)
19 class IntegrationTest(
20 private val ctx: Ctx,
21 private val delay: Boolean
22 ) : TestBase() {
23
24 enum class Ctx {
25 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
26 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
27 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
28
29 abstract operator fun invoke(context: CoroutineContext): CoroutineContext
30 }
31
32 companion object {
33 @Parameterized.Parameters(name = "ctx={0}, delay={1}")
34 @JvmStatic
35 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
36 listOf(false, true).map { delay ->
37 arrayOf(ctx, delay)
38 }
39 }
40 }
41
42 @Test
43 fun testEmpty(): Unit = runBlocking {
44 val pub = flowPublish<String>(ctx(coroutineContext)) {
45 if (delay) delay(1)
46 // does not send anything
47 }
48 assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
49 assertEquals("OK", pub.awaitFirstOrDefault("OK"))
50 assertNull(pub.awaitFirstOrNull())
51 assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
52 assertFailsWith<NoSuchElementException> { pub.awaitLast() }
53 assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
54 var cnt = 0
55 pub.collect { cnt++ }
56 assertEquals(0, cnt)
57 }
58
59 @Test
60 fun testSingle() = runBlocking {
61 val pub = flowPublish(ctx(coroutineContext)) {
62 if (delay) delay(1)
63 send("OK")
64 }
65 assertEquals("OK", pub.awaitFirst())
66 assertEquals("OK", pub.awaitFirstOrDefault("!"))
67 assertEquals("OK", pub.awaitFirstOrNull())
68 assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
69 assertEquals("OK", pub.awaitLast())
70 assertEquals("OK", pub.awaitSingle())
71 var cnt = 0
72 pub.collect {
73 assertEquals("OK", it)
74 cnt++
75 }
76 assertEquals(1, cnt)
77 }
78
79 @Test
80 fun testNumbers() = runBlocking {
81 val n = 100 * stressTestMultiplier
82 val pub = flowPublish(ctx(coroutineContext)) {
83 for (i in 1..n) {
84 send(i)
85 if (delay) delay(1)
86 }
87 }
88 assertEquals(1, pub.awaitFirst())
89 assertEquals(1, pub.awaitFirstOrDefault(0))
90 assertEquals(n, pub.awaitLast())
91 assertEquals(1, pub.awaitFirstOrNull())
92 assertEquals(1, pub.awaitFirstOrElse { 0 })
93 assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
94 checkNumbers(n, pub)
95 val flow = pub.asFlow()
96 checkNumbers(n, flow.flowOn(ctx(coroutineContext)).asPublisher())
97 }
98
99 @Test
100 fun testCancelWithoutValue() = runTest {
101 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
102 flowPublish<String> {
103 hang {}
104 }.awaitFirst()
105 }
106
107 job.cancel()
108 job.join()
109 }
110
111 @Test
112 fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException}) {
113 expect(1)
114 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
115 flowPublish<String> {
116 yield()
117 expect(2)
118 // Nothing to emit
119 }.awaitFirst()
120 }
121
122 job.join()
123 finish(3)
124 }
125
126 private suspend fun checkNumbers(n: Int, pub: JFlow.Publisher<Int>) {
127 var last = 0
128 pub.collect {
129 assertEquals(++last, it)
130 }
131 assertEquals(n, last)
132 }
133
134 }
135
136 @OptIn(ExperimentalContracts::class)
assertCallsExceptionHandlerWithnull137 internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
138 crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
139 contract {
140 callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
141 }
142 val handler = CapturingHandler()
143 return withContext(handler) {
144 operation(handler)
145 handler.getException().let {
146 assertTrue(it is E, it.toString())
147 it
148 }
149 }
150 }
151