1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import org.junit.* 8 import org.reactivestreams.example.unicast.AsyncIterablePublisher 9 import org.reactivestreams.Publisher 10 import org.reactivestreams.example.unicast.InfiniteIncrementNumberPublisher 11 import org.reactivestreams.tck.TestEnvironment 12 import java.util.concurrent.Executors 13 import java.util.concurrent.ExecutorService 14 import org.reactivestreams.tck.PublisherVerification 15 import org.testng.annotations.AfterClass 16 import org.testng.annotations.BeforeClass 17 import org.testng.annotations.Test 18 19 @Test 20 class UnboundedIntegerIncrementPublisherTest : PublisherVerification<Int>(TestEnvironment()) { 21 22 private var e: ExecutorService? = null 23 24 @BeforeClass beforenull25 internal fun before() { 26 e = Executors.newFixedThreadPool(4) 27 } 28 29 @AfterClass afternull30 internal fun after() { 31 if (e != null) e!!.shutdown() 32 } 33 createPublishernull34 override fun createPublisher(elements: Long): Publisher<Int> { 35 return InfiniteIncrementNumberPublisher(e!!).asFlow().asPublisher() 36 } 37 createFailedPublishernull38 override fun createFailedPublisher(): Publisher<Int> { 39 return AsyncIterablePublisher(object : Iterable<Int> { 40 override fun iterator(): Iterator<Int> { 41 throw RuntimeException("Error state signal!") 42 } 43 }, e!!) 44 } 45 maxElementsFromPublishernull46 override fun maxElementsFromPublisher(): Long { 47 return super.publisherUnableToSignalOnComplete() 48 } 49 50 @Ignore required_spec309_requestZeroMustSignalIllegalArgumentExceptionnull51 override fun required_spec309_requestZeroMustSignalIllegalArgumentException() { 52 } 53 54 @Ignore required_spec309_requestNegativeNumberMustSignalIllegalArgumentExceptionnull55 override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() { 56 } 57 } 58