1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.channels.* 8 import org.junit.Test 9 import kotlin.test.* 10 11 class ReusableCancellableContinuationLeakStressTest : TestBase() { 12 13 @Suppress("UnnecessaryVariable") receiveBatchnull14 private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { 15 val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in 16 return r 17 } 18 19 private val iterations = 100_000 * stressTestMultiplier 20 21 class Leak(val i: Int) 22 23 @Test // Simplified version of #2564 <lambda>null24 fun testReusableContinuationLeak() = runTest { 25 val channel = produce(capacity = 1) { // from the main thread 26 (0 until iterations).forEach { 27 send(Leak(it)) 28 } 29 } 30 31 launch(Dispatchers.Default) { 32 repeat (iterations) { 33 val value = channel.receiveBatch() 34 assertEquals(it, value.i) 35 } 36 (channel as Job).join() 37 38 FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } 39 } 40 } 41 } 42