• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlin.test.*
11 
12 class PublisherAsFlowTest : TestBase() {
13     @Test
14     fun testCancellation() = runTest {
15         var onNext = 0
16         var onCancelled = 0
17         var onError = 0
18 
19         val publisher = flowPublish(currentDispatcher()) {
20             coroutineContext[Job]?.invokeOnCompletion {
21                 if (it is CancellationException) ++onCancelled
22             }
23 
24             repeat(100) {
25                 send(it)
26             }
27         }
28 
29         publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
30             onEach {
31                 ++onNext
32                 throw RuntimeException()
33             }
34             catch<Throwable> {
35                 ++onError
36             }
37         }.join()
38 
39 
40         assertEquals(1, onNext)
41         assertEquals(1, onError)
42         assertEquals(1, onCancelled)
43     }
44 
45     @Test
46     fun testBufferSize1() = runTest {
47         val publisher = flowPublish(currentDispatcher()) {
48             expect(1)
49             send(3)
50 
51             expect(2)
52             send(5)
53 
54             expect(4)
55             send(7)
56             expect(6)
57         }
58 
59         publisher.asFlow().buffer(1).collect {
60             expect(it)
61         }
62 
63         finish(8)
64     }
65 
66     @Test
67     fun testBufferSizeDefault() = runTest {
68         val publisher = flowPublish(currentDispatcher()) {
69             repeat(64) {
70                 send(it + 1)
71                 expect(it + 1)
72             }
73             assertFalse { trySend(-1).isSuccess }
74         }
75 
76         publisher.asFlow().collect {
77             expect(64 + it)
78         }
79 
80         finish(129)
81     }
82 
83     @Test
84     fun testDefaultCapacityIsProperlyOverwritten() = runTest {
85         val publisher = flowPublish(currentDispatcher()) {
86             expect(1)
87             send(3)
88             expect(2)
89             send(5)
90             expect(4)
91             send(7)
92             expect(6)
93         }
94 
95         publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect {
96             expect(it)
97         }
98 
99         finish(8)
100     }
101 
102     @Test
103     fun testBufferSize10() = runTest {
104         val publisher = flowPublish(currentDispatcher()) {
105             expect(1)
106             send(5)
107 
108             expect(2)
109             send(6)
110 
111             expect(3)
112             send(7)
113             expect(4)
114         }
115 
116         publisher.asFlow().buffer(10).collect {
117             expect(it)
118         }
119 
120         finish(8)
121     }
122 
123     @Test
124     fun testConflated() = runTest {
125         val publisher = flowPublish(currentDispatcher()) {
126             for (i in 1..5) send(i)
127         }
128         val list = publisher.asFlow().conflate().toList()
129         assertEquals(listOf(1, 5), list)
130     }
131 
132     @Test
133     fun testProduce() = runTest {
134         val flow = flowPublish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow()
135         check((0..9).toList(), flow.produceIn(this))
136         check((0..9).toList(), flow.buffer(2).produceIn(this))
137         check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
138         check(listOf(0, 9), flow.conflate().produceIn(this))
139     }
140 
141     private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
142         val result = ArrayList<Int>(10)
143         channel.consumeEach { result.add(it) }
144         assertEquals(expected, result)
145     }
146 
147     @Test
148     fun testProduceCancellation() = runTest {
149         expect(1)
150         // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
151         val flow = flowPublish(currentDispatcher()) {
152             expect(3)
153             repeat(10) { value ->
154                 when (value) {
155                     in 0..6 -> send(value)
156                     7 -> try {
157                         send(value)
158                     } catch (e: CancellationException) {
159                         expect(5)
160                         throw e
161                     }
162                     else -> expectUnreached()
163                 }
164             }
165         }.asFlow().buffer(1)
166         assertFailsWith<TestException> {
167             coroutineScope {
168                 expect(2)
169                 val channel = flow.produceIn(this)
170                 channel.consumeEach { value ->
171                     when (value) {
172                         in 0..4 -> {}
173                         5 -> {
174                             expect(4)
175                             throw TestException()
176                         }
177                         else -> expectUnreached()
178                     }
179                 }
180             }
181         }
182         finish(6)
183     }
184 }
185