• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.junit.Test
9 import org.reactivestreams.*
10 import kotlin.test.*
11 
12 class PublisherCollectTest: TestBase() {
13 
14     /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
15     @Test
16     fun testCollect() = runTest {
17         val x = 100
18         val xSum = x * (x + 1) / 2
19         val publisher = Publisher<Int> { subscriber ->
20             var requested = 0L
21             var lastOutput = 0
22             subscriber.onSubscribe(object: Subscription {
23 
24                 override fun request(n: Long) {
25                     requested += n
26                     if (n <= 0) {
27                         subscriber.onError(IllegalArgumentException())
28                         return
29                     }
30                     while (lastOutput < x && lastOutput < requested) {
31                         lastOutput += 1
32                         subscriber.onNext(lastOutput)
33                     }
34                     if (lastOutput == x)
35                         subscriber.onComplete()
36                 }
37 
38                 override fun cancel() {
39                     /** According to rule 3.5 of the
40                      * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
41                      * this method can be called by the subscriber at any point, so it's not an error if it's called
42                      * in this scenario. */
43                 }
44 
45             })
46         }
47         var sum = 0
48         publisher.collect {
49             sum += it
50         }
51         assertEquals(xSum, sum)
52     }
53 
54     /** Tests the behavior of [collect] when the publisher raises an error. */
55     @Test
56     fun testCollectThrowingPublisher() = runTest {
57         val errorString = "Too many elements requested"
58         val x = 100
59         val xSum = x * (x + 1) / 2
60         val publisher = Publisher<Int> { subscriber ->
61             var requested = 0L
62             var lastOutput = 0
63             subscriber.onSubscribe(object: Subscription {
64 
65                 override fun request(n: Long) {
66                     requested += n
67                     if (n <= 0) {
68                         subscriber.onError(IllegalArgumentException())
69                         return
70                     }
71                     while (lastOutput < x && lastOutput < requested) {
72                         lastOutput += 1
73                         subscriber.onNext(lastOutput)
74                     }
75                     if (lastOutput == x)
76                         subscriber.onError(IllegalArgumentException(errorString))
77                 }
78 
79                 override fun cancel() {
80                     /** See the comment for the corresponding part of [testCollect]. */
81                 }
82 
83             })
84         }
85         var sum = 0
86         try {
87             publisher.collect {
88                 sum += it
89             }
90         } catch (e: IllegalArgumentException) {
91             assertEquals(errorString, e.message)
92         }
93         assertEquals(xSum, sum)
94     }
95 
96     /** Tests the behavior of [collect] when the action throws. */
97     @Test
98     fun testCollectThrowingAction() = runTest {
99         val errorString = "Too many elements produced"
100         val x = 100
101         val xSum = x * (x + 1) / 2
102         val publisher = Publisher<Int> { subscriber ->
103             var requested = 0L
104             var lastOutput = 0
105             subscriber.onSubscribe(object: Subscription {
106 
107                 override fun request(n: Long) {
108                     requested += n
109                     if (n <= 0) {
110                         subscriber.onError(IllegalArgumentException())
111                         return
112                     }
113                     while (lastOutput < x && lastOutput < requested) {
114                         lastOutput += 1
115                         subscriber.onNext(lastOutput)
116                     }
117                 }
118 
119                 override fun cancel() {
120                     assertEquals(x, lastOutput)
121                     expect(x + 2)
122                 }
123 
124             })
125         }
126         var sum = 0
127         try {
128             expect(1)
129             var i = 1
130             publisher.collect {
131                 sum += it
132                 i += 1
133                 expect(i)
134                 if (sum >= xSum) {
135                     throw IllegalArgumentException(errorString)
136                 }
137             }
138         } catch (e: IllegalArgumentException) {
139             expect(x + 3)
140             assertEquals(errorString, e.message)
141         }
142         finish(x + 4)
143     }
144 }