1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.flow.Flow 10 import org.junit.* 11 import org.reactivestreams.* 12 import java.util.concurrent.* 13 import java.util.concurrent.atomic.* 14 import kotlin.coroutines.* 15 import kotlin.random.* 16 17 /** 18 * This stress-test is self-contained reproducer for the race in [Flow.asPublisher] extension 19 * that was originally reported in the issue 20 * [#2109](https://github.com/Kotlin/kotlinx.coroutines/issues/2109). 21 * The original reproducer used a flow that loads a file using AsynchronousFileChannel 22 * (that issues completion callbacks from multiple threads) 23 * and uploads it to S3 via Amazon SDK, which internally uses netty for I/O 24 * (which uses a single thread for connection-related callbacks). 25 * 26 * This stress-test essentially mimics the logic in multiple interacting threads: several emitter threads that form 27 * the flow and a single requesting thread works on the subscriber's side to periodically request more 28 * values when the number of items requested drops below the threshold. 29 */ 30 @Suppress("ReactiveStreamsSubscriberImplementation") 31 class PublisherRequestStressTest : TestBase() { 32 33 private val testDurationSec = 3 * stressTestMultiplier 34 35 // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. 36 // These constants were chosen so that problem reproduces asap with particular this code. 37 private val minDemand = 8L 38 private val maxDemand = 16L 39 40 private val nEmitThreads = 4 41 42 private val emitThreadNo = AtomicInteger() 43 44 private val emitPool = Executors.newFixedThreadPool(nEmitThreads) { r -> 45 Thread(r, "PublisherRequestStressTest-emit-${emitThreadNo.incrementAndGet()}") 46 } 47 48 private val reqPool = Executors.newSingleThreadExecutor { r -> 49 Thread(r, "PublisherRequestStressTest-req") 50 } 51 52 private val nextValue = AtomicLong(0) 53 54 @After 55 fun tearDown() { 56 emitPool.shutdown() 57 reqPool.shutdown() 58 emitPool.awaitTermination(10, TimeUnit.SECONDS) 59 reqPool.awaitTermination(10, TimeUnit.SECONDS) 60 } 61 62 private lateinit var subscription: Subscription 63 64 @Test 65 fun testRequestStress() { 66 val expectedValue = AtomicLong(0) 67 val requestedTill = AtomicLong(0) 68 val callingOnNext = AtomicInteger() 69 70 val publisher = mtFlow().asPublisher() 71 var error = false 72 73 publisher.subscribe(object : Subscriber<Long> { 74 private var demand = 0L // only updated from reqPool 75 76 override fun onComplete() { 77 // Typically unreached, but, rarely, `emitPool` may shut down before the cancellation is performed. 78 } 79 80 override fun onSubscribe(sub: Subscription) { 81 subscription = sub 82 maybeRequestMore() 83 } 84 85 private fun maybeRequestMore() { 86 if (demand >= minDemand) return 87 val nextDemand = Random.nextLong(minDemand + 1..maxDemand) 88 val more = nextDemand - demand 89 demand = nextDemand 90 requestedTill.addAndGet(more) 91 subscription.request(more) 92 } 93 94 override fun onNext(value: Long) { 95 check(callingOnNext.getAndIncrement() == 0) // make sure it is not concurrent 96 // check for expected value 97 check(value == expectedValue.get()) 98 // check that it does not exceed requested values 99 check(value < requestedTill.get()) 100 val nextExpected = value + 1 101 expectedValue.set(nextExpected) 102 // send more requests from request thread 103 reqPool.execute { 104 demand-- // processed an item 105 maybeRequestMore() 106 } 107 callingOnNext.decrementAndGet() 108 } 109 110 override fun onError(ex: Throwable?) { 111 error = true 112 error("Failed", ex) 113 } 114 }) 115 var prevExpected = -1L 116 for (second in 1..testDurationSec) { 117 if (error) break 118 Thread.sleep(1000) 119 val expected = expectedValue.get() 120 println("$second: expectedValue = $expected") 121 check(expected > prevExpected) // should have progress 122 prevExpected = expected 123 } 124 if (!error) { 125 subscription.cancel() 126 runBlocking { 127 (subscription as AbstractCoroutine<*>).join() 128 } 129 } 130 } 131 132 private fun mtFlow(): Flow<Long> = flow { 133 while (currentCoroutineContext().isActive) { 134 emit(aWait()) 135 } 136 } 137 138 private suspend fun aWait(): Long = suspendCancellableCoroutine { cont -> 139 emitPool.execute(Runnable { 140 cont.resume(nextValue.getAndIncrement()) 141 }) 142 } 143 }