• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:Suppress("UNCHECKED_CAST")
2 
3 package kotlinx.coroutines.reactive
4 
5 import kotlinx.coroutines.flow.*
6 import org.junit.Ignore
7 import org.junit.Test
8 import org.reactivestreams.*
9 import org.reactivestreams.tck.*
10 import java.util.concurrent.*
11 import java.util.concurrent.ForkJoinPool.*
12 import kotlin.test.*
13 
14 class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
15 
generatenull16     private fun generate(num: Long): Array<Long> {
17         return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
18     }
19 
createPublishernull20     override fun createPublisher(elements: Long): Publisher<Long> {
21         return generate(elements).asIterable().asFlow().asPublisher()
22     }
23 
24     @Suppress("SubscriberImplementation")
createFailedPublishernull25     override fun createFailedPublisher(): Publisher<Long>? {
26         /*
27          * This is a hack for our adapter structure:
28          * Tests assume that calling "collect" is enough for publisher to fail and it is not
29          * true for our implementation
30          */
31         val pub = { error(42) }.asFlow().asPublisher()
32         return Publisher { subscriber ->
33             pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> {
34                 override fun onSubscribe(s: Subscription) {
35                     subscriber.onSubscribe(s)
36                     s.request(1)
37                 }
38             })
39         }
40     }
41 
42     @Test
testStackOverflowTrampolinenull43     fun testStackOverflowTrampoline() {
44         val latch = CountDownLatch(1)
45         val collected = ArrayList<Long>()
46         val toRequest = 1000L
47         val array = generate(toRequest)
48         val publisher = array.asIterable().asFlow().asPublisher()
49 
50         publisher.subscribe(object : Subscriber<Long> {
51             private lateinit var s: Subscription
52 
53             override fun onSubscribe(s: Subscription) {
54                 this.s = s
55                 s.request(1)
56             }
57 
58             override fun onNext(aLong: Long) {
59                 collected.add(aLong)
60 
61                 s.request(1)
62             }
63 
64             override fun onError(t: Throwable) {
65 
66             }
67 
68             override fun onComplete() {
69                 latch.countDown()
70             }
71         })
72 
73         latch.await(5, TimeUnit.SECONDS)
74         assertEquals(collected, array.toList())
75     }
76 
77     @Test
testConcurrentRequestnull78     fun testConcurrentRequest() {
79         val latch = CountDownLatch(1)
80         val collected = ArrayList<Long>()
81         val n = 50000L
82         val array = generate(n)
83         val publisher = array.asIterable().asFlow().asPublisher()
84 
85         publisher.subscribe(object : Subscriber<Long> {
86             private var s: Subscription? = null
87 
88             override fun onSubscribe(s: Subscription) {
89                 this.s = s
90                 for (i in 0..n) {
91                     commonPool().execute { s.request(1) }
92                 }
93             }
94 
95             override fun onNext(aLong: Long) {
96                 collected.add(aLong)
97             }
98 
99             override fun onError(t: Throwable) {
100 
101             }
102 
103             override fun onComplete() {
104                 latch.countDown()
105             }
106         })
107 
108         latch.await()
109         assertEquals(array.toList(), collected)
110     }
111 
112     @Ignore
required_spec309_requestZeroMustSignalIllegalArgumentExceptionnull113     override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
114     }
115 
116     @Ignore
required_spec309_requestNegativeNumberMustSignalIllegalArgumentExceptionnull117     override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
118     }
119 
120     @Ignore
required_spec312_cancelMustMakeThePublisherToEventuallyStopSignalingnull121     override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
122         // This test has a bug in it
123     }
124 }
125