1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 @file:Suppress("UNCHECKED_CAST") 6 7 package kotlinx.coroutines.reactive 8 9 import kotlinx.coroutines.flow.* 10 import org.junit.Ignore 11 import org.junit.Test 12 import org.reactivestreams.* 13 import org.reactivestreams.tck.* 14 import java.util.concurrent.* 15 import java.util.concurrent.ForkJoinPool.* 16 import kotlin.test.* 17 18 class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) { 19 generatenull20 private fun generate(num: Long): Array<Long> { 21 return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() } 22 } 23 createPublishernull24 override fun createPublisher(elements: Long): Publisher<Long> { 25 return generate(elements).asIterable().asFlow().asPublisher() 26 } 27 28 @Suppress("SubscriberImplementation") createFailedPublishernull29 override fun createFailedPublisher(): Publisher<Long>? { 30 /* 31 * This is a hack for our adapter structure: 32 * Tests assume that calling "collect" is enough for publisher to fail and it is not 33 * true for our implementation 34 */ 35 val pub = { error(42) }.asFlow().asPublisher() 36 return Publisher { subscriber -> 37 pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> { 38 override fun onSubscribe(s: Subscription) { 39 subscriber.onSubscribe(s) 40 s.request(1) 41 } 42 }) 43 } 44 } 45 46 @Test testStackOverflowTrampolinenull47 fun testStackOverflowTrampoline() { 48 val latch = CountDownLatch(1) 49 val collected = ArrayList<Long>() 50 val toRequest = 1000L 51 val array = generate(toRequest) 52 val publisher = array.asIterable().asFlow().asPublisher() 53 54 publisher.subscribe(object : Subscriber<Long> { 55 private lateinit var s: Subscription 56 57 override fun onSubscribe(s: Subscription) { 58 this.s = s 59 s.request(1) 60 } 61 62 override fun onNext(aLong: Long) { 63 collected.add(aLong) 64 65 s.request(1) 66 } 67 68 override fun onError(t: Throwable) { 69 70 } 71 72 override fun onComplete() { 73 latch.countDown() 74 } 75 }) 76 77 latch.await(5, TimeUnit.SECONDS) 78 assertEquals(collected, array.toList()) 79 } 80 81 @Test testConcurrentRequestnull82 fun testConcurrentRequest() { 83 val latch = CountDownLatch(1) 84 val collected = ArrayList<Long>() 85 val n = 50000L 86 val array = generate(n) 87 val publisher = array.asIterable().asFlow().asPublisher() 88 89 publisher.subscribe(object : Subscriber<Long> { 90 private var s: Subscription? = null 91 92 override fun onSubscribe(s: Subscription) { 93 this.s = s 94 for (i in 0..n) { 95 commonPool().execute { s.request(1) } 96 } 97 } 98 99 override fun onNext(aLong: Long) { 100 collected.add(aLong) 101 } 102 103 override fun onError(t: Throwable) { 104 105 } 106 107 override fun onComplete() { 108 latch.countDown() 109 } 110 }) 111 112 latch.await() 113 assertEquals(array.toList(), collected) 114 } 115 116 @Ignore required_spec309_requestZeroMustSignalIllegalArgumentExceptionnull117 override fun required_spec309_requestZeroMustSignalIllegalArgumentException() { 118 } 119 120 @Ignore required_spec309_requestNegativeNumberMustSignalIllegalArgumentExceptionnull121 override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { 122 } 123 124 @Ignore required_spec312_cancelMustMakeThePublisherToEventuallyStopSignalingnull125 override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() { 126 // This test has a bug in it 127 } 128 } 129