• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("UNCHECKED_CAST")
6 
7 package kotlinx.coroutines.reactive
8 
9 import kotlinx.coroutines.flow.*
10 import org.junit.Ignore
11 import org.junit.Test
12 import org.reactivestreams.*
13 import org.reactivestreams.tck.*
14 import java.util.concurrent.*
15 import java.util.concurrent.ForkJoinPool.*
16 import kotlin.test.*
17 
18 class IterableFlowTckTest : PublisherVerification<Long>(TestEnvironment()) {
19 
generatenull20     private fun generate(num: Long): Array<Long> {
21         return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
22     }
23 
createPublishernull24     override fun createPublisher(elements: Long): Publisher<Long> {
25         return generate(elements).asIterable().asFlow().asPublisher()
26     }
27 
28     @Suppress("SubscriberImplementation")
createFailedPublishernull29     override fun createFailedPublisher(): Publisher<Long>? {
30         /*
31          * This is a hack for our adapter structure:
32          * Tests assume that calling "collect" is enough for publisher to fail and it is not
33          * true for our implementation
34          */
35         val pub = { error(42) }.asFlow().asPublisher()
36         return Publisher { subscriber ->
37             pub.subscribe(object : Subscriber<Long> by subscriber as Subscriber<Long> {
38                 override fun onSubscribe(s: Subscription) {
39                     subscriber.onSubscribe(s)
40                     s.request(1)
41                 }
42             })
43         }
44     }
45 
46     @Test
testStackOverflowTrampolinenull47     fun testStackOverflowTrampoline() {
48         val latch = CountDownLatch(1)
49         val collected = ArrayList<Long>()
50         val toRequest = 1000L
51         val array = generate(toRequest)
52         val publisher = array.asIterable().asFlow().asPublisher()
53 
54         publisher.subscribe(object : Subscriber<Long> {
55             private lateinit var s: Subscription
56 
57             override fun onSubscribe(s: Subscription) {
58                 this.s = s
59                 s.request(1)
60             }
61 
62             override fun onNext(aLong: Long) {
63                 collected.add(aLong)
64 
65                 s.request(1)
66             }
67 
68             override fun onError(t: Throwable) {
69 
70             }
71 
72             override fun onComplete() {
73                 latch.countDown()
74             }
75         })
76 
77         latch.await(5, TimeUnit.SECONDS)
78         assertEquals(collected, array.toList())
79     }
80 
81     @Test
testConcurrentRequestnull82     fun testConcurrentRequest() {
83         val latch = CountDownLatch(1)
84         val collected = ArrayList<Long>()
85         val n = 50000L
86         val array = generate(n)
87         val publisher = array.asIterable().asFlow().asPublisher()
88 
89         publisher.subscribe(object : Subscriber<Long> {
90             private var s: Subscription? = null
91 
92             override fun onSubscribe(s: Subscription) {
93                 this.s = s
94                 for (i in 0..n) {
95                     commonPool().execute { s.request(1) }
96                 }
97             }
98 
99             override fun onNext(aLong: Long) {
100                 collected.add(aLong)
101             }
102 
103             override fun onError(t: Throwable) {
104 
105             }
106 
107             override fun onComplete() {
108                 latch.countDown()
109             }
110         })
111 
112         latch.await()
113         assertEquals(array.toList(), collected)
114     }
115 
116     @Ignore
required_spec309_requestZeroMustSignalIllegalArgumentExceptionnull117     override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
118     }
119 
120     @Ignore
required_spec309_requestNegativeNumberMustSignalIllegalArgumentExceptionnull121     override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
122     }
123 
124     @Ignore
required_spec312_cancelMustMakeThePublisherToEventuallyStopSignalingnull125     override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
126         // This test has a bug in it
127     }
128 }
129