1 @file:Suppress("UNCHECKED_CAST") 2 3 package kotlinx.coroutines.reactive 4 5 import kotlinx.coroutines.flow.* 6 import org.junit.Ignore 7 import org.junit.Test 8 import org.reactivestreams.* 9 import org.reactivestreams.tck.* 10 import java.util.concurrent.* 11 import java.util.concurrent.ForkJoinPool.* 12 import kotlin.test.* 13 14 class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) { 15 generatenull16 private fun generate(num: Long): Array<Long> { 17 return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() } 18 } 19 createPublishernull20 override fun createPublisher(elements: Long): Publisher<Long> { 21 return generate(elements).asIterable().asFlow().asPublisher() 22 } 23 24 @Suppress("SubscriberImplementation") createFailedPublishernull25 override fun createFailedPublisher(): Publisher<Long>? { 26 /* 27 * This is a hack for our adapter structure: 28 * Tests assume that calling "collect" is enough for publisher to fail and it is not 29 * true for our implementation 30 */ 31 val pub = { error(42) }.asFlow().asPublisher() 32 return Publisher { subscriber -> 33 pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> { 34 override fun onSubscribe(s: Subscription) { 35 subscriber.onSubscribe(s) 36 s.request(1) 37 } 38 }) 39 } 40 } 41 42 @Test testStackOverflowTrampolinenull43 fun testStackOverflowTrampoline() { 44 val latch = CountDownLatch(1) 45 val collected = ArrayList<Long>() 46 val toRequest = 1000L 47 val array = generate(toRequest) 48 val publisher = array.asIterable().asFlow().asPublisher() 49 50 publisher.subscribe(object : Subscriber<Long> { 51 private lateinit var s: Subscription 52 53 override fun onSubscribe(s: Subscription) { 54 this.s = s 55 s.request(1) 56 } 57 58 override fun onNext(aLong: Long) { 59 collected.add(aLong) 60 61 s.request(1) 62 } 63 64 override fun onError(t: Throwable) { 65 66 } 67 68 override fun onComplete() { 69 latch.countDown() 70 } 71 }) 72 73 latch.await(5, TimeUnit.SECONDS) 74 assertEquals(collected, array.toList()) 75 } 76 77 @Test testConcurrentRequestnull78 fun testConcurrentRequest() { 79 val latch = CountDownLatch(1) 80 val collected = ArrayList<Long>() 81 val n = 50000L 82 val array = generate(n) 83 val publisher = array.asIterable().asFlow().asPublisher() 84 85 publisher.subscribe(object : Subscriber<Long> { 86 private var s: Subscription? = null 87 88 override fun onSubscribe(s: Subscription) { 89 this.s = s 90 for (i in 0..n) { 91 commonPool().execute { s.request(1) } 92 } 93 } 94 95 override fun onNext(aLong: Long) { 96 collected.add(aLong) 97 } 98 99 override fun onError(t: Throwable) { 100 101 } 102 103 override fun onComplete() { 104 latch.countDown() 105 } 106 }) 107 108 latch.await() 109 assertEquals(array.toList(), collected) 110 } 111 112 @Ignore required_spec309_requestZeroMustSignalIllegalArgumentExceptionnull113 override fun required_spec309_requestZeroMustSignalIllegalArgumentException() { 114 } 115 116 @Ignore required_spec309_requestNegativeNumberMustSignalIllegalArgumentExceptionnull117 override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { 118 } 119 120 @Ignore required_spec312_cancelMustMakeThePublisherToEventuallyStopSignalingnull121 override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() { 122 // This test has a bug in it 123 } 124 } 125