1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.Test 10 import org.reactivestreams.* 11 import kotlin.test.* 12 import java.util.concurrent.Flow as JFlow 13 14 class PublisherCollectTest: TestBase() { 15 16 /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */ 17 @Test 18 fun testCollect() = runTest { 19 val x = 100 20 val xSum = x * (x + 1) / 2 21 val publisher = JFlow.Publisher<Int> { subscriber -> 22 var requested = 0L 23 var lastOutput = 0 24 subscriber.onSubscribe(object: JFlow.Subscription { 25 26 override fun request(n: Long) { 27 requested += n 28 if (n <= 0) { 29 subscriber.onError(IllegalArgumentException()) 30 return 31 } 32 while (lastOutput < x && lastOutput < requested) { 33 lastOutput += 1 34 subscriber.onNext(lastOutput) 35 } 36 if (lastOutput == x) 37 subscriber.onComplete() 38 } 39 40 override fun cancel() { 41 /** According to rule 3.5 of the 42 * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5), 43 * this method can be called by the subscriber at any point, so it's not an error if it's called 44 * in this scenario. */ 45 } 46 47 }) 48 } 49 var sum = 0 50 publisher.collect { 51 sum += it 52 } 53 assertEquals(xSum, sum) 54 } 55 56 /** Tests the behavior of [collect] when the publisher raises an error. */ 57 @Test 58 fun testCollectThrowingPublisher() = runTest { 59 val errorString = "Too many elements requested" 60 val x = 100 61 val xSum = x * (x + 1) / 2 62 val publisher = Publisher<Int> { subscriber -> 63 var requested = 0L 64 var lastOutput = 0 65 subscriber.onSubscribe(object: Subscription { 66 67 override fun request(n: Long) { 68 requested += n 69 if (n <= 0) { 70 subscriber.onError(IllegalArgumentException()) 71 return 72 } 73 while (lastOutput < x && lastOutput < requested) { 74 lastOutput += 1 75 subscriber.onNext(lastOutput) 76 } 77 if (lastOutput == x) 78 subscriber.onError(IllegalArgumentException(errorString)) 79 } 80 81 override fun cancel() { 82 /** See the comment for the corresponding part of [testCollect]. */ 83 } 84 85 }) 86 } 87 var sum = 0 88 try { 89 publisher.collect { 90 sum += it 91 } 92 } catch (e: IllegalArgumentException) { 93 assertEquals(errorString, e.message) 94 } 95 assertEquals(xSum, sum) 96 } 97 98 /** Tests the behavior of [collect] when the action throws. */ 99 @Test 100 fun testCollectThrowingAction() = runTest { 101 val errorString = "Too many elements produced" 102 val x = 100 103 val xSum = x * (x + 1) / 2 104 val publisher = Publisher<Int> { subscriber -> 105 var requested = 0L 106 var lastOutput = 0 107 subscriber.onSubscribe(object: Subscription { 108 109 override fun request(n: Long) { 110 requested += n 111 if (n <= 0) { 112 subscriber.onError(IllegalArgumentException()) 113 return 114 } 115 while (lastOutput < x && lastOutput < requested) { 116 lastOutput += 1 117 subscriber.onNext(lastOutput) 118 } 119 } 120 121 override fun cancel() { 122 assertEquals(x, lastOutput) 123 expect(x + 2) 124 } 125 126 }) 127 } 128 var sum = 0 129 try { 130 expect(1) 131 var i = 1 132 publisher.collect { 133 sum += it 134 i += 1 135 expect(i) 136 if (sum >= xSum) { 137 throw IllegalArgumentException(errorString) 138 } 139 } 140 } catch (e: IllegalArgumentException) { 141 expect(x + 3) 142 assertEquals(errorString, e.message) 143 } 144 finish(x + 4) 145 } 146 }