• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.selects.*
6 import org.junit.Test
7 import kotlin.coroutines.*
8 import kotlin.test.*
9 
10 class ReusableCancellableContinuationTest : TestBase() {
11     @Test
<lambda>null12     fun testReusable() = runTest {
13         testContinuationsCount(10, 1, ::suspendCancellableCoroutineReusable)
14     }
15 
16     @Test
<lambda>null17     fun testRegular() = runTest {
18         testContinuationsCount(10, 10, ::suspendCancellableCoroutine)
19     }
20 
testContinuationsCountnull21     private suspend inline fun CoroutineScope.testContinuationsCount(
22         iterations: Int,
23         expectedInstances: Int,
24         suspender: suspend ((CancellableContinuation<Unit>) -> Unit) -> Unit
25     ) {
26         val result = mutableSetOf<CancellableContinuation<*>>()
27         val job = coroutineContext[Job]!!
28         val channel = Channel<Continuation<Unit>>(1)
29         launch {
30             channel.consumeEach {
31                 val f = FieldWalker.walk(job)
32                 result.addAll(f.filterIsInstance<CancellableContinuation<*>>())
33                 it.resumeWith(Result.success(Unit))
34             }
35         }
36 
37         repeat(iterations) {
38             suspender {
39                 assertTrue(channel.trySend(it).isSuccess)
40             }
41         }
42         channel.close()
43         assertEquals(expectedInstances, result.size - 1)
44     }
45 
46     @Test
<lambda>null47     fun testCancelledOnClaimedCancel() = runTest {
48         expect(1)
49         try {
50             suspendCancellableCoroutineReusable<Unit> {
51                 it.cancel()
52             }
53             expectUnreached()
54         } catch (e: CancellationException) {
55             finish(2)
56         }
57     }
58 
59     @Test
<lambda>null60     fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) {
61         expect(1)
62         // Bind child at first
63         var continuation: Continuation<*>? = null
<lambda>null64         suspendCancellableCoroutineReusable<Unit> {
65             expect(2)
66             continuation = it
67             launch {  // Attach to the parent, avoid fast path
68                 expect(3)
69                 it.resume(Unit)
70             }
71         }
72         expect(4)
73         ensureActive()
74         // Verify child was bound
<lambda>null75         FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation }
76         try {
<lambda>null77             suspendCancellableCoroutineReusable<Unit> {
78                 expect(5)
79                 coroutineContext[Job]!!.cancel()
80                 it.resume(Unit) // will not dispatch, will get CancellationException
81             }
82         } catch (e: CancellationException) {
83             assertFalse(isActive)
84             finish(6)
85         }
86     }
87 
88     @Test
<lambda>null89     fun testResumeReusablePreservesReference() = runTest {
90         expect(1)
91         var cont: Continuation<Unit>? = null
92         launch {
93             cont!!.resumeWith(Result.success(Unit))
94         }
95         suspendCancellableCoroutineReusable<Unit> {
96             cont = it
97         }
98         ensureActive()
99         assertTrue { FieldWalker.walk(coroutineContext[Job]).contains(cont!!) }
100         finish(2)
101     }
102 
103     @Test
<lambda>null104     fun testResumeRegularDoesntPreservesReference() = runTest {
105         expect(1)
106         var cont: Continuation<Unit>? = null
107         launch { // Attach to the parent, avoid fast path
108             cont!!.resumeWith(Result.success(Unit))
109         }
110         suspendCancellableCoroutine<Unit> {
111             cont = it
112         }
113         ensureActive()
114         FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
115         finish(2)
116     }
117 
118     @Test
testDetachedOnCancelnull119     fun testDetachedOnCancel() = runTest {
120         expect(1)
121         var cont: Continuation<*>? = null
122         try {
123             suspendCancellableCoroutineReusable<Unit> {
124                 cont = it
125                 it.cancel()
126             }
127             expectUnreached()
128         } catch (e: CancellationException) {
129             FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
130             finish(2)
131         }
132     }
133 
134     @Test
<lambda>null135     fun testPropagatedCancel() = runTest({it is CancellationException}) {
136         val currentJob = coroutineContext[Job]!!
137         expect(1)
138         // Bind child at first
<lambda>null139         suspendCancellableCoroutineReusable<Unit> {
140             expect(2)
141             // Attach to the parent, avoid fast path
142             launch {
143                 expect(3)
144                 it.resume(Unit)
145             }
146         }
147         expect(4)
148         ensureActive()
149         // Verify child was bound
<lambda>null150         FieldWalker.assertReachableCount(1, currentJob) { it is CancellableContinuation<*> }
151         currentJob.cancel()
152         assertFalse(isActive)
153         // Child detached
<lambda>null154         FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
155         expect(5)
156         try {
157             // Resume is non-atomic, so it throws cancellation exception
<lambda>null158             suspendCancellableCoroutineReusable<Unit> {
159                 expect(6) // but the code inside the block is executed
160                 it.resume(Unit)
161             }
162         } catch (e: CancellationException) {
<lambda>null163             FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
164             expect(7)
165         }
166         try {
167             // No resume -- still cancellation exception
<lambda>null168             suspendCancellableCoroutineReusable<Unit> {}
169         } catch (e: CancellationException) {
<lambda>null170             FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
171             finish(8)
172         }
173     }
174 
175     @Test
<lambda>null176     fun testChannelMemoryLeak() = runTest {
177         val iterations = 100
178         val channel = Channel<Unit>()
179         launch {
180             repeat(iterations) {
181                 select {
182                     channel.onSend(Unit) {}
183                 }
184             }
185         }
186 
187         val receiver = launch {
188             repeat(iterations) {
189                 channel.receive()
190             }
191             expect(2)
192             val job = coroutineContext[Job]!!
193             // 1 for reusable CC, another one for outer joiner
194             FieldWalker.assertReachableCount(2, job) { it is CancellableContinuation<*> }
195         }
196         expect(1)
197         receiver.join()
198         // Reference should be claimed at this point
199         FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> }
200         finish(3)
201     }
202 
203     @Test
<lambda>null204     fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest {
205         val channel =  produce {
206             repeat(10) {
207                 send(Unit)
208             }
209         }
210         for (value in channel) {
211             delay(1)
212         }
213         FieldWalker.assertReachableCount(1, coroutineContext[Job]) {
214             // could be `it is ChildContinuation` if `ChildContinuation` wasn't private
215             it::class.simpleName == "ChildContinuation"
216         }
217     }
218 }
219