• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.Test
10 import org.reactivestreams.*
11 import kotlin.test.*
12 import java.util.concurrent.Flow as JFlow
13 
14 class PublisherCollectTest: TestBase() {
15 
16     /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
17     @Test
18     fun testCollect() = runTest {
19         val x = 100
20         val xSum = x * (x + 1) / 2
21         val publisher = JFlow.Publisher<Int> { subscriber ->
22             var requested = 0L
23             var lastOutput = 0
24             subscriber.onSubscribe(object: JFlow.Subscription {
25 
26                 override fun request(n: Long) {
27                     requested += n
28                     if (n <= 0) {
29                         subscriber.onError(IllegalArgumentException())
30                         return
31                     }
32                     while (lastOutput < x && lastOutput < requested) {
33                         lastOutput += 1
34                         subscriber.onNext(lastOutput)
35                     }
36                     if (lastOutput == x)
37                         subscriber.onComplete()
38                 }
39 
40                 override fun cancel() {
41                     /** According to rule 3.5 of the
42                      * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
43                      * this method can be called by the subscriber at any point, so it's not an error if it's called
44                      * in this scenario. */
45                 }
46 
47             })
48         }
49         var sum = 0
50         publisher.collect {
51             sum += it
52         }
53         assertEquals(xSum, sum)
54     }
55 
56     /** Tests the behavior of [collect] when the publisher raises an error. */
57     @Test
58     fun testCollectThrowingPublisher() = runTest {
59         val errorString = "Too many elements requested"
60         val x = 100
61         val xSum = x * (x + 1) / 2
62         val publisher = Publisher<Int> { subscriber ->
63             var requested = 0L
64             var lastOutput = 0
65             subscriber.onSubscribe(object: Subscription {
66 
67                 override fun request(n: Long) {
68                     requested += n
69                     if (n <= 0) {
70                         subscriber.onError(IllegalArgumentException())
71                         return
72                     }
73                     while (lastOutput < x && lastOutput < requested) {
74                         lastOutput += 1
75                         subscriber.onNext(lastOutput)
76                     }
77                     if (lastOutput == x)
78                         subscriber.onError(IllegalArgumentException(errorString))
79                 }
80 
81                 override fun cancel() {
82                     /** See the comment for the corresponding part of [testCollect]. */
83                 }
84 
85             })
86         }
87         var sum = 0
88         try {
89             publisher.collect {
90                 sum += it
91             }
92         } catch (e: IllegalArgumentException) {
93             assertEquals(errorString, e.message)
94         }
95         assertEquals(xSum, sum)
96     }
97 
98     /** Tests the behavior of [collect] when the action throws. */
99     @Test
100     fun testCollectThrowingAction() = runTest {
101         val errorString = "Too many elements produced"
102         val x = 100
103         val xSum = x * (x + 1) / 2
104         val publisher = Publisher<Int> { subscriber ->
105             var requested = 0L
106             var lastOutput = 0
107             subscriber.onSubscribe(object: Subscription {
108 
109                 override fun request(n: Long) {
110                     requested += n
111                     if (n <= 0) {
112                         subscriber.onError(IllegalArgumentException())
113                         return
114                     }
115                     while (lastOutput < x && lastOutput < requested) {
116                         lastOutput += 1
117                         subscriber.onNext(lastOutput)
118                     }
119                 }
120 
121                 override fun cancel() {
122                     assertEquals(x, lastOutput)
123                     expect(x + 2)
124                 }
125 
126             })
127         }
128         var sum = 0
129         try {
130             expect(1)
131             var i = 1
132             publisher.collect {
133                 sum += it
134                 i += 1
135                 expect(i)
136                 if (sum >= xSum) {
137                     throw IllegalArgumentException(errorString)
138                 }
139             }
140         } catch (e: IllegalArgumentException) {
141             expect(x + 3)
142             assertEquals(errorString, e.message)
143         }
144         finish(x + 4)
145     }
146 }