1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.flow.* 8 import org.junit.* 9 10 class ReusableContinuationStressTest : TestBase() { 11 12 private val iterations = 1000 * stressTestMultiplierSqrt 13 14 @Test // Originally reported by @denis-bezrukov in #2736 15 fun testDebounceWithStateFlow() = runBlocking<Unit> { 16 withContext(Dispatchers.Default) { 17 repeat(iterations) { 18 launch { // <- load the dispatcher and OS scheduler 19 runStressTestOnce(1, 1) 20 } 21 } 22 } 23 } 24 25 private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope { 26 val stateFlow = MutableStateFlow(0) 27 val emitter = launch { 28 repeat(1000) { i -> 29 stateFlow.emit(i) 30 delay(delay.toLong()) 31 } 32 } 33 var last = 0 34 stateFlow.debounce(debounce.toLong()).take(100).collect { i -> 35 if (i - last > 100) { 36 last = i 37 } 38 } 39 emitter.cancel() 40 } 41 } 42