• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.exceptions.*
9 import org.junit.Test
10 import kotlinx.coroutines.flow.flowOn
11 import org.junit.runner.*
12 import org.junit.runners.*
13 import kotlin.contracts.*
14 import java.util.concurrent.Flow as JFlow
15 import kotlin.coroutines.*
16 import kotlin.test.*
17 
18 @RunWith(Parameterized::class)
19 class IntegrationTest(
20     private val ctx: Ctx,
21     private val delay: Boolean
22 ) : TestBase() {
23 
24     enum class Ctx {
25         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
26         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
27         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
28 
29         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
30     }
31 
32     companion object {
33         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
34         @JvmStatic
35         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
36             listOf(false, true).map { delay ->
37                 arrayOf(ctx, delay)
38             }
39         }
40     }
41 
42     @Test
43     fun testEmpty(): Unit = runBlocking {
44         val pub = flowPublish<String>(ctx(coroutineContext)) {
45             if (delay) delay(1)
46             // does not send anything
47         }
48         assertFailsWith<NoSuchElementException> { pub.awaitFirst() }
49         assertEquals("OK", pub.awaitFirstOrDefault("OK"))
50         assertNull(pub.awaitFirstOrNull())
51         assertEquals("ELSE", pub.awaitFirstOrElse { "ELSE" })
52         assertFailsWith<NoSuchElementException> { pub.awaitLast() }
53         assertFailsWith<NoSuchElementException> { pub.awaitSingle() }
54         var cnt = 0
55         pub.collect { cnt++ }
56         assertEquals(0, cnt)
57     }
58 
59     @Test
60     fun testSingle() = runBlocking {
61         val pub = flowPublish(ctx(coroutineContext)) {
62             if (delay) delay(1)
63             send("OK")
64         }
65         assertEquals("OK", pub.awaitFirst())
66         assertEquals("OK", pub.awaitFirstOrDefault("!"))
67         assertEquals("OK", pub.awaitFirstOrNull())
68         assertEquals("OK", pub.awaitFirstOrElse { "ELSE" })
69         assertEquals("OK", pub.awaitLast())
70         assertEquals("OK", pub.awaitSingle())
71         var cnt = 0
72         pub.collect {
73             assertEquals("OK", it)
74             cnt++
75         }
76         assertEquals(1, cnt)
77     }
78 
79     @Test
80     fun testNumbers() = runBlocking {
81         val n = 100 * stressTestMultiplier
82         val pub = flowPublish(ctx(coroutineContext)) {
83             for (i in 1..n) {
84                 send(i)
85                 if (delay) delay(1)
86             }
87         }
88         assertEquals(1, pub.awaitFirst())
89         assertEquals(1, pub.awaitFirstOrDefault(0))
90         assertEquals(n, pub.awaitLast())
91         assertEquals(1, pub.awaitFirstOrNull())
92         assertEquals(1, pub.awaitFirstOrElse { 0 })
93         assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
94         checkNumbers(n, pub)
95         val flow = pub.asFlow()
96         checkNumbers(n, flow.flowOn(ctx(coroutineContext)).asPublisher())
97     }
98 
99     @Test
100     fun testCancelWithoutValue() = runTest {
101         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
102             flowPublish<String> {
103                 hang {}
104             }.awaitFirst()
105         }
106 
107         job.cancel()
108         job.join()
109     }
110 
111     @Test
112     fun testEmptySingle() = runTest(unhandled = listOf { e -> e is NoSuchElementException}) {
113         expect(1)
114         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
115             flowPublish<String> {
116                 yield()
117                 expect(2)
118                 // Nothing to emit
119             }.awaitFirst()
120         }
121 
122         job.join()
123         finish(3)
124     }
125 
126     private suspend fun checkNumbers(n: Int, pub: JFlow.Publisher<Int>) {
127         var last = 0
128         pub.collect {
129             assertEquals(++last, it)
130         }
131         assertEquals(n, last)
132     }
133 
134 }
135 
136 @OptIn(ExperimentalContracts::class)
assertCallsExceptionHandlerWithnull137 internal suspend inline fun <reified E: Throwable> assertCallsExceptionHandlerWith(
138     crossinline operation: suspend (CoroutineExceptionHandler) -> Unit): E {
139     contract {
140         callsInPlace(operation, InvocationKind.EXACTLY_ONCE)
141     }
142     val handler = CapturingHandler()
143     return withContext(handler) {
144         operation(handler)
145         handler.getException().let {
146             assertTrue(it is E, it.toString())
147             it
148         }
149     }
150 }
151