• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.channels.*
8 import org.junit.Test
9 import kotlin.test.*
10 
11 class ReusableCancellableContinuationLeakStressTest : TestBase() {
12 
13     @Suppress("UnnecessaryVariable")
receiveBatchnull14     private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
15         val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
16         return r
17     }
18 
19     private val iterations = 100_000 * stressTestMultiplier
20 
21     class Leak(val i: Int)
22 
23     @Test // Simplified version of #2564
<lambda>null24     fun testReusableContinuationLeak() = runTest {
25         val channel = produce(capacity = 1) { // from the main thread
26             (0 until iterations).forEach {
27                 send(Leak(it))
28             }
29         }
30 
31         launch(Dispatchers.Default) {
32             repeat (iterations) {
33                 val value = channel.receiveBatch()
34                 assertEquals(it, value.i)
35             }
36             (channel as Job).join()
37 
38             FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
39         }
40     }
41 }
42