1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.junit.Test 9 import org.reactivestreams.* 10 import kotlin.test.* 11 12 class PublisherCollectTest: TestBase() { 13 14 /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */ 15 @Test 16 fun testCollect() = runTest { 17 val x = 100 18 val xSum = x * (x + 1) / 2 19 val publisher = Publisher<Int> { subscriber -> 20 var requested = 0L 21 var lastOutput = 0 22 subscriber.onSubscribe(object: Subscription { 23 24 override fun request(n: Long) { 25 requested += n 26 if (n <= 0) { 27 subscriber.onError(IllegalArgumentException()) 28 return 29 } 30 while (lastOutput < x && lastOutput < requested) { 31 lastOutput += 1 32 subscriber.onNext(lastOutput) 33 } 34 if (lastOutput == x) 35 subscriber.onComplete() 36 } 37 38 override fun cancel() { 39 /** According to rule 3.5 of the 40 * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5), 41 * this method can be called by the subscriber at any point, so it's not an error if it's called 42 * in this scenario. */ 43 } 44 45 }) 46 } 47 var sum = 0 48 publisher.collect { 49 sum += it 50 } 51 assertEquals(xSum, sum) 52 } 53 54 /** Tests the behavior of [collect] when the publisher raises an error. */ 55 @Test 56 fun testCollectThrowingPublisher() = runTest { 57 val errorString = "Too many elements requested" 58 val x = 100 59 val xSum = x * (x + 1) / 2 60 val publisher = Publisher<Int> { subscriber -> 61 var requested = 0L 62 var lastOutput = 0 63 subscriber.onSubscribe(object: Subscription { 64 65 override fun request(n: Long) { 66 requested += n 67 if (n <= 0) { 68 subscriber.onError(IllegalArgumentException()) 69 return 70 } 71 while (lastOutput < x && lastOutput < requested) { 72 lastOutput += 1 73 subscriber.onNext(lastOutput) 74 } 75 if (lastOutput == x) 76 subscriber.onError(IllegalArgumentException(errorString)) 77 } 78 79 override fun cancel() { 80 /** See the comment for the corresponding part of [testCollect]. */ 81 } 82 83 }) 84 } 85 var sum = 0 86 try { 87 publisher.collect { 88 sum += it 89 } 90 } catch (e: IllegalArgumentException) { 91 assertEquals(errorString, e.message) 92 } 93 assertEquals(xSum, sum) 94 } 95 96 /** Tests the behavior of [collect] when the action throws. */ 97 @Test 98 fun testCollectThrowingAction() = runTest { 99 val errorString = "Too many elements produced" 100 val x = 100 101 val xSum = x * (x + 1) / 2 102 val publisher = Publisher<Int> { subscriber -> 103 var requested = 0L 104 var lastOutput = 0 105 subscriber.onSubscribe(object: Subscription { 106 107 override fun request(n: Long) { 108 requested += n 109 if (n <= 0) { 110 subscriber.onError(IllegalArgumentException()) 111 return 112 } 113 while (lastOutput < x && lastOutput < requested) { 114 lastOutput += 1 115 subscriber.onNext(lastOutput) 116 } 117 } 118 119 override fun cancel() { 120 assertEquals(x, lastOutput) 121 expect(x + 2) 122 } 123 124 }) 125 } 126 var sum = 0 127 try { 128 expect(1) 129 var i = 1 130 publisher.collect { 131 sum += it 132 i += 1 133 expect(i) 134 if (sum >= xSum) { 135 throw IllegalArgumentException(errorString) 136 } 137 } 138 } catch (e: IllegalArgumentException) { 139 expect(x + 3) 140 assertEquals(errorString, e.message) 141 } 142 finish(x + 4) 143 } 144 }