1 package kotlinx.coroutines 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.channels.* 5 import kotlinx.coroutines.selects.* 6 import org.junit.Test 7 import kotlin.coroutines.* 8 import kotlin.test.* 9 10 class ReusableCancellableContinuationTest : TestBase() { 11 @Test <lambda>null12 fun testReusable() = runTest { 13 testContinuationsCount(10, 1, ::suspendCancellableCoroutineReusable) 14 } 15 16 @Test <lambda>null17 fun testRegular() = runTest { 18 testContinuationsCount(10, 10, ::suspendCancellableCoroutine) 19 } 20 testContinuationsCountnull21 private suspend inline fun CoroutineScope.testContinuationsCount( 22 iterations: Int, 23 expectedInstances: Int, 24 suspender: suspend ((CancellableContinuation<Unit>) -> Unit) -> Unit 25 ) { 26 val result = mutableSetOf<CancellableContinuation<*>>() 27 val job = coroutineContext[Job]!! 28 val channel = Channel<Continuation<Unit>>(1) 29 launch { 30 channel.consumeEach { 31 val f = FieldWalker.walk(job) 32 result.addAll(f.filterIsInstance<CancellableContinuation<*>>()) 33 it.resumeWith(Result.success(Unit)) 34 } 35 } 36 37 repeat(iterations) { 38 suspender { 39 assertTrue(channel.trySend(it).isSuccess) 40 } 41 } 42 channel.close() 43 assertEquals(expectedInstances, result.size - 1) 44 } 45 46 @Test <lambda>null47 fun testCancelledOnClaimedCancel() = runTest { 48 expect(1) 49 try { 50 suspendCancellableCoroutineReusable<Unit> { 51 it.cancel() 52 } 53 expectUnreached() 54 } catch (e: CancellationException) { 55 finish(2) 56 } 57 } 58 59 @Test <lambda>null60 fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) { 61 expect(1) 62 // Bind child at first 63 var continuation: Continuation<*>? = null <lambda>null64 suspendCancellableCoroutineReusable<Unit> { 65 expect(2) 66 continuation = it 67 launch { // Attach to the parent, avoid fast path 68 expect(3) 69 it.resume(Unit) 70 } 71 } 72 expect(4) 73 ensureActive() 74 // Verify child was bound <lambda>null75 FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation } 76 try { <lambda>null77 suspendCancellableCoroutineReusable<Unit> { 78 expect(5) 79 coroutineContext[Job]!!.cancel() 80 it.resume(Unit) // will not dispatch, will get CancellationException 81 } 82 } catch (e: CancellationException) { 83 assertFalse(isActive) 84 finish(6) 85 } 86 } 87 88 @Test <lambda>null89 fun testResumeReusablePreservesReference() = runTest { 90 expect(1) 91 var cont: Continuation<Unit>? = null 92 launch { 93 cont!!.resumeWith(Result.success(Unit)) 94 } 95 suspendCancellableCoroutineReusable<Unit> { 96 cont = it 97 } 98 ensureActive() 99 assertTrue { FieldWalker.walk(coroutineContext[Job]).contains(cont!!) } 100 finish(2) 101 } 102 103 @Test <lambda>null104 fun testResumeRegularDoesntPreservesReference() = runTest { 105 expect(1) 106 var cont: Continuation<Unit>? = null 107 launch { // Attach to the parent, avoid fast path 108 cont!!.resumeWith(Result.success(Unit)) 109 } 110 suspendCancellableCoroutine<Unit> { 111 cont = it 112 } 113 ensureActive() 114 FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont } 115 finish(2) 116 } 117 118 @Test testDetachedOnCancelnull119 fun testDetachedOnCancel() = runTest { 120 expect(1) 121 var cont: Continuation<*>? = null 122 try { 123 suspendCancellableCoroutineReusable<Unit> { 124 cont = it 125 it.cancel() 126 } 127 expectUnreached() 128 } catch (e: CancellationException) { 129 FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont } 130 finish(2) 131 } 132 } 133 134 @Test <lambda>null135 fun testPropagatedCancel() = runTest({it is CancellationException}) { 136 val currentJob = coroutineContext[Job]!! 137 expect(1) 138 // Bind child at first <lambda>null139 suspendCancellableCoroutineReusable<Unit> { 140 expect(2) 141 // Attach to the parent, avoid fast path 142 launch { 143 expect(3) 144 it.resume(Unit) 145 } 146 } 147 expect(4) 148 ensureActive() 149 // Verify child was bound <lambda>null150 FieldWalker.assertReachableCount(1, currentJob) { it is CancellableContinuation<*> } 151 currentJob.cancel() 152 assertFalse(isActive) 153 // Child detached <lambda>null154 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 155 expect(5) 156 try { 157 // Resume is non-atomic, so it throws cancellation exception <lambda>null158 suspendCancellableCoroutineReusable<Unit> { 159 expect(6) // but the code inside the block is executed 160 it.resume(Unit) 161 } 162 } catch (e: CancellationException) { <lambda>null163 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 164 expect(7) 165 } 166 try { 167 // No resume -- still cancellation exception <lambda>null168 suspendCancellableCoroutineReusable<Unit> {} 169 } catch (e: CancellationException) { <lambda>null170 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 171 finish(8) 172 } 173 } 174 175 @Test <lambda>null176 fun testChannelMemoryLeak() = runTest { 177 val iterations = 100 178 val channel = Channel<Unit>() 179 launch { 180 repeat(iterations) { 181 select { 182 channel.onSend(Unit) {} 183 } 184 } 185 } 186 187 val receiver = launch { 188 repeat(iterations) { 189 channel.receive() 190 } 191 expect(2) 192 val job = coroutineContext[Job]!! 193 // 1 for reusable CC, another one for outer joiner 194 FieldWalker.assertReachableCount(2, job) { it is CancellableContinuation<*> } 195 } 196 expect(1) 197 receiver.join() 198 // Reference should be claimed at this point 199 FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> } 200 finish(3) 201 } 202 203 @Test <lambda>null204 fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest { 205 val channel = produce { 206 repeat(10) { 207 send(Unit) 208 } 209 } 210 for (value in channel) { 211 delay(1) 212 } 213 FieldWalker.assertReachableCount(1, coroutineContext[Job]) { 214 // could be `it is ChildContinuation` if `ChildContinuation` wasn't private 215 it::class.simpleName == "ChildContinuation" 216 } 217 } 218 } 219