• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.debug.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.debug.*
10 import kotlinx.coroutines.internal.*
11 import kotlinx.coroutines.internal.ScopeCoroutine
12 import java.io.*
13 import java.lang.StackTraceElement
14 import java.text.*
15 import java.util.concurrent.locks.*
16 import kotlin.collections.ArrayList
17 import kotlin.concurrent.*
18 import kotlin.coroutines.*
19 import kotlin.coroutines.jvm.internal.CoroutineStackFrame
20 import kotlin.synchronized
21 import kotlinx.coroutines.internal.artificialFrame as createArtificialFrame // IDEA bug workaround
22 
23 internal object DebugProbesImpl {
24     private const val ARTIFICIAL_FRAME_MESSAGE = "Coroutine creation stacktrace"
25     private val dateFormat = SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
26 
27     private var weakRefCleanerThread: Thread? = null
28 
29     // Values are boolean, so this map does not need to use a weak reference queue
30     private val capturedCoroutinesMap = ConcurrentWeakMap<CoroutineOwner<*>, Boolean>()
31     private val capturedCoroutines: Set<CoroutineOwner<*>> get() = capturedCoroutinesMap.keys
32 
33     @Volatile
34     private var installations = 0
35 
36     /**
37      * This internal method is used by IDEA debugger under the JVM name of
38      * "isInstalled$kotlinx_coroutines_debug".
39      */
40     internal val isInstalled: Boolean get() = installations > 0
41 
42     // To sort coroutines by creation order, used as unique id
43     private val sequenceNumber = atomic(0L)
44     /*
45      * RW-lock that guards all debug probes state changes.
46      * All individual coroutine state transitions are guarded by read-lock
47      * and do not interfere with each other.
48      * All state reads are guarded by the write lock to guarantee a strongly-consistent
49      * snapshot of the system.
50      */
51     private val coroutineStateLock = ReentrantReadWriteLock()
52 
53     public var sanitizeStackTraces: Boolean = true
54     public var enableCreationStackTraces: Boolean = true
55 
56     /*
57      * Substitute for service loader, DI between core and debug modules.
58      * If the agent was installed via command line -javaagent parameter, do not use byte-byddy to avoud
59      */
60     private val dynamicAttach = getDynamicAttach()
61 
62     @Suppress("UNCHECKED_CAST")
63     private fun getDynamicAttach(): Function1<Boolean, Unit>? = runCatching {
64         val clz = Class.forName("kotlinx.coroutines.debug.internal.ByteBuddyDynamicAttach")
65         val ctor = clz.constructors[0]
66         ctor.newInstance() as Function1<Boolean, Unit>
67     }.getOrNull()
68 
69     /*
70      * This is an optimization in the face of KT-29997:
71      * Consider suspending call stack a()->b()->c() and c() completes its execution and every call is
72      * "almost" in tail position.
73      *
74      * Then at least three RUNNING -> RUNNING transitions will occur consecutively and complexity of each is O(depth).
75      * To avoid that quadratic complexity, we are caching lookup result for such chains in this map and update it incrementally.
76      *
77      * [DebugCoroutineInfoImpl] keeps a lot of auxiliary information about a coroutine, so we use a weak reference queue
78      * to promptly release the corresponding memory when the reference to the coroutine itself was already collected.
79      */
80     private val callerInfoCache = ConcurrentWeakMap<CoroutineStackFrame, DebugCoroutineInfoImpl>(weakRefQueue = true)
81 
82     public fun install(): Unit = coroutineStateLock.write {
83         if (++installations > 1) return
84         startWeakRefCleanerThread()
85         if (AgentPremain.isInstalledStatically) return
86         dynamicAttach?.invoke(true) // attach
87     }
88 
89     public fun uninstall(): Unit = coroutineStateLock.write {
90         check(isInstalled) { "Agent was not installed" }
91         if (--installations != 0) return
92         stopWeakRefCleanerThread()
93         capturedCoroutinesMap.clear()
94         callerInfoCache.clear()
95         if (AgentPremain.isInstalledStatically) return
96         dynamicAttach?.invoke(false) // detach
97     }
98 
99     private fun startWeakRefCleanerThread() {
100         weakRefCleanerThread = thread(isDaemon = true, name = "Coroutines Debugger Cleaner") {
101             callerInfoCache.runWeakRefQueueCleaningLoopUntilInterrupted()
102         }
103     }
104 
105     private fun stopWeakRefCleanerThread() {
106         weakRefCleanerThread?.interrupt()
107         weakRefCleanerThread = null
108     }
109 
110     public fun hierarchyToString(job: Job): String = coroutineStateLock.write {
111         check(isInstalled) { "Debug probes are not installed" }
112         val jobToStack = capturedCoroutines
113             .filter { it.delegate.context[Job] != null }
114             .associateBy({ it.delegate.context.job }, { it.info })
115         return buildString {
116             job.build(jobToStack, this, "")
117         }
118     }
119 
120     private fun Job.build(map: Map<Job, DebugCoroutineInfoImpl>, builder: StringBuilder, indent: String) {
121         val info = map[this]
122         val newIndent: String
123         if (info == null) { // Append coroutine without stacktrace
124             // Do not print scoped coroutines and do not increase indentation level
125             @Suppress("INVISIBLE_REFERENCE")
126             if (this !is ScopeCoroutine<*>) {
127                 builder.append("$indent$debugString\n")
128                 newIndent = indent + "\t"
129             } else {
130                 newIndent = indent
131             }
132         } else {
133             // Append coroutine with its last stacktrace element
134             val element = info.lastObservedStackTrace().firstOrNull()
135             val state = info.state
136             builder.append("$indent$debugString, continuation is $state at line $element\n")
137             newIndent = indent + "\t"
138         }
139         // Append children with new indent
140         for (child in children) {
141             child.build(map, builder, newIndent)
142         }
143     }
144 
145     @Suppress("DEPRECATION_ERROR") // JobSupport
146     private val Job.debugString: String get() = if (this is JobSupport) toDebugString() else toString()
147 
148     /**
149      * Private method that dumps coroutines so that different public-facing method can use
150      * to produce different result types.
151      */
152     private inline fun <R : Any> dumpCoroutinesInfoImpl(create: (CoroutineOwner<*>, CoroutineContext) -> R): List<R> =
153         coroutineStateLock.write {
154             check(isInstalled) { "Debug probes are not installed" }
155             capturedCoroutines
156                 // Stable ordering of coroutines by their sequence number
157                 .sortedBy { it.info.sequenceNumber }
158                 // Leave in the dump only the coroutines that were not collected while we were dumping them
159                 .mapNotNull { owner ->
160                     // Fuse map and filter into one operation to save an inline
161                     if (owner.isFinished()) null
162                     else owner.info.context?.let { context -> create(owner, context) }
163                 }
164         }
165 
166     /*
167      * Internal (JVM-public) method used by IDEA debugger as of 1.4-M3.
168      */
169     public fun dumpCoroutinesInfo(): List<DebugCoroutineInfo> =
170         dumpCoroutinesInfoImpl { owner, context -> DebugCoroutineInfo(owner.info, context) }
171 
172     /*
173      * Internal (JVM-public) method to be used by IDEA debugger in the future (not used as of 1.4-M3).
174      * It is equivalent to [dumpCoroutinesInfo], but returns serializable (and thus less typed) objects.
175      */
176     public fun dumpDebuggerInfo(): List<DebuggerInfo> =
177         dumpCoroutinesInfoImpl { owner, context -> DebuggerInfo(owner.info, context) }
178 
179     public fun dumpCoroutines(out: PrintStream): Unit = synchronized(out) {
180         /*
181          * This method synchronizes both on `out` and `this` for a reason:
182          * 1) Taking a write lock is required to have a consistent snapshot of coroutines.
183          * 2) Synchronization on `out` is not required, but prohibits interleaving with any other
184          *    (asynchronous) attempt to write to this `out` (System.out by default).
185          * Yet this prevents the progress of coroutines until they are fully dumped to the out which we find acceptable compromise.
186          */
187         dumpCoroutinesSynchronized(out)
188     }
189 
190     /*
191      * Filters out coroutines that do not call probeCoroutineCompleted,
192      * are completed, but not yet garbage collected.
193      *
194      * Typically, we intercept completion of the coroutine so it invokes "probeCoroutineCompleted",
195      * but it's not the case for lazy coroutines that get cancelled before start.
196      */
197     private fun CoroutineOwner<*>.isFinished(): Boolean {
198         // Guarded by lock
199         val job = info.context?.get(Job) ?: return false
200         if (!job.isCompleted) return false
201         capturedCoroutinesMap.remove(this) // Clean it up by the way
202         return true
203     }
204 
205     private fun dumpCoroutinesSynchronized(out: PrintStream): Unit = coroutineStateLock.write {
206         check(isInstalled) { "Debug probes are not installed" }
207         out.print("Coroutines dump ${dateFormat.format(System.currentTimeMillis())}")
208         capturedCoroutines
209             .asSequence()
210             .filter { !it.isFinished() }
211             .sortedBy { it.info.sequenceNumber }
212             .forEach { owner ->
213                 val info = owner.info
214                 val observedStackTrace = info.lastObservedStackTrace()
215                 val enhancedStackTrace = enhanceStackTraceWithThreadDumpImpl(info.state, info.lastObservedThread, observedStackTrace)
216                 val state = if (info.state == RUNNING && enhancedStackTrace === observedStackTrace)
217                     "${info.state} (Last suspension stacktrace, not an actual stacktrace)"
218                 else
219                     info.state
220                 out.print("\n\nCoroutine ${owner.delegate}, state: $state")
221                 if (observedStackTrace.isEmpty()) {
222                     out.print("\n\tat ${createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE)}")
223                     printStackTrace(out, info.creationStackTrace)
224                 } else {
225                     printStackTrace(out, enhancedStackTrace)
226                 }
227             }
228     }
229 
230     private fun printStackTrace(out: PrintStream, frames: List<StackTraceElement>) {
231         frames.forEach { frame ->
232             out.print("\n\tat $frame")
233         }
234     }
235 
236     /*
237      * Internal (JVM-public) method used by IDEA debugger as of 1.4-M3.
238      * It is similar to [enhanceStackTraceWithThreadDumpImpl], but uses debugger-facing [DebugCoroutineInfo] type.
239      */
240     @Suppress("unused")
241     public fun enhanceStackTraceWithThreadDump(
242         info: DebugCoroutineInfo,
243         coroutineTrace: List<StackTraceElement>
244     ): List<StackTraceElement> =
245         enhanceStackTraceWithThreadDumpImpl(info.state, info.lastObservedThread, coroutineTrace)
246 
247     /**
248      * Tries to enhance [coroutineTrace] (obtained by call to [DebugCoroutineInfoImpl.lastObservedStackTrace]) with
249      * thread dump of [DebugCoroutineInfoImpl.lastObservedThread].
250      *
251      * Returns [coroutineTrace] if enhancement was unsuccessful or the enhancement result.
252      */
253     private fun enhanceStackTraceWithThreadDumpImpl(
254         state: String,
255         thread: Thread?,
256         coroutineTrace: List<StackTraceElement>
257     ): List<StackTraceElement> {
258         if (state != RUNNING || thread == null) return coroutineTrace
259         // Avoid security manager issues
260         val actualTrace = runCatching { thread.stackTrace }.getOrNull()
261             ?: return coroutineTrace
262 
263         /*
264          * Here goes heuristic that tries to merge two stacktraces: real one
265          * (that has at least one but usually not so many suspend function frames)
266          * and coroutine one that has only suspend function frames.
267          *
268          * Heuristic:
269          * 1) Dump lastObservedThread
270          * 2) Find the next frame after BaseContinuationImpl.resumeWith (continuation machinery).
271          *   Invariant: this method is called under the lock, so such method **should** be present
272          *   in continuation stacktrace.
273          * 3) Find target method in continuation stacktrace (metadata-based)
274          * 4) Prepend dumped stacktrace (trimmed by target frame) to continuation stacktrace
275          *
276          * Heuristic may fail on recursion and overloads, but it will be automatically improved
277          * with KT-29997.
278          */
279         val indexOfResumeWith = actualTrace.indexOfFirst {
280             it.className == "kotlin.coroutines.jvm.internal.BaseContinuationImpl" &&
281                     it.methodName == "resumeWith" &&
282                     it.fileName == "ContinuationImpl.kt"
283         }
284 
285         val (continuationStartFrame, frameSkipped) = findContinuationStartIndex(
286             indexOfResumeWith,
287             actualTrace,
288             coroutineTrace
289         )
290 
291         if (continuationStartFrame == -1) return coroutineTrace
292 
293         val delta = if (frameSkipped) 1 else 0
294         val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta
295         val result = ArrayList<StackTraceElement>(expectedSize)
296         for (index in 0 until indexOfResumeWith - delta) {
297             result += actualTrace[index]
298         }
299 
300         for (index in continuationStartFrame + 1 until coroutineTrace.size) {
301             result += coroutineTrace[index]
302         }
303 
304         return result
305     }
306 
307     /**
308      * Tries to find the lowest meaningful frame above `resumeWith` in the real stacktrace and
309      * its match in a coroutines stacktrace (steps 2-3 in heuristic).
310      *
311      * This method does more than just matching `realTrace.indexOf(resumeWith) - 1`:
312      * If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`),
313      * it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace.
314      *
315      * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped
316      */
317     private fun findContinuationStartIndex(
318         indexOfResumeWith: Int,
319         actualTrace: Array<StackTraceElement>,
320         coroutineTrace: List<StackTraceElement>
321     ): Pair<Int, Boolean> {
322         val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace)
323         if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true
324         return result to false
325     }
326 
327     private fun findIndexOfFrame(
328         frameIndex: Int,
329         actualTrace: Array<StackTraceElement>,
330         coroutineTrace: List<StackTraceElement>
331     ): Int {
332         val continuationFrame = actualTrace.getOrNull(frameIndex)
333             ?: return -1
334 
335         return coroutineTrace.indexOfFirst {
336             it.fileName == continuationFrame.fileName &&
337                     it.className == continuationFrame.className &&
338                     it.methodName == continuationFrame.methodName
339         }
340     }
341 
342     internal fun probeCoroutineResumed(frame: Continuation<*>) = updateState(frame, RUNNING)
343 
344     internal fun probeCoroutineSuspended(frame: Continuation<*>) = updateState(frame, SUSPENDED)
345 
346     private fun updateState(frame: Continuation<*>, state: String) {
347         if (!isInstalled) return
348         // KT-29997 is here only since 1.3.30
349         if (state == RUNNING && KotlinVersion.CURRENT.isAtLeast(1, 3, 30)) {
350             val stackFrame = frame as? CoroutineStackFrame ?: return
351             updateRunningState(stackFrame, state)
352             return
353         }
354 
355         // Find ArtificialStackFrame of the coroutine
356         val owner = frame.owner() ?: return
357         updateState(owner, frame, state)
358     }
359 
360     // See comment to callerInfoCache
361     private fun updateRunningState(frame: CoroutineStackFrame, state: String): Unit = coroutineStateLock.read {
362         if (!isInstalled) return
363         // Lookup coroutine info in cache or by traversing stack frame
364         val info: DebugCoroutineInfoImpl
365         val cached = callerInfoCache.remove(frame)
366         if (cached != null) {
367             info = cached
368         } else {
369             info = frame.owner()?.info ?: return
370             // Guard against improper implementations of CoroutineStackFrame and bugs in the compiler
371             val realCaller = info.lastObservedFrame?.realCaller()
372             if (realCaller != null) callerInfoCache.remove(realCaller)
373         }
374 
375         info.updateState(state, frame as Continuation<*>)
376         // Do not cache it for proxy-classes such as ScopeCoroutines
377         val caller = frame.realCaller() ?: return
378         callerInfoCache[caller] = info
379     }
380 
381     private tailrec fun CoroutineStackFrame.realCaller(): CoroutineStackFrame? {
382         val caller = callerFrame ?: return null
383         return if (caller.getStackTraceElement() != null) caller else caller.realCaller()
384     }
385 
386     private fun updateState(owner: CoroutineOwner<*>, frame: Continuation<*>, state: String) = coroutineStateLock.read {
387         if (!isInstalled) return
388         owner.info.updateState(state, frame)
389     }
390 
391     private fun Continuation<*>.owner(): CoroutineOwner<*>? = (this as? CoroutineStackFrame)?.owner()
392 
393     private tailrec fun CoroutineStackFrame.owner(): CoroutineOwner<*>? =
394         if (this is CoroutineOwner<*>) this else callerFrame?.owner()
395 
396     // Not guarded by the lock at all, does not really affect consistency
397     internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
398         if (!isInstalled) return completion
399         /*
400          * If completion already has an owner, it means that we are in scoped coroutine (coroutineScope, withContext etc.),
401          * then piggyback on its already existing owner and do not replace completion
402          */
403         val owner = completion.owner()
404         if (owner != null) return completion
405         /*
406          * Here we replace completion with a sequence of StackTraceFrame objects
407          * which represents creation stacktrace, thus making stacktrace recovery mechanism
408          * even more verbose (it will attach coroutine creation stacktrace to all exceptions),
409          * and then using CoroutineOwner completion as unique identifier of coroutineSuspended/resumed calls.
410          */
411         val frame = if (enableCreationStackTraces) {
412             sanitizeStackTrace(Exception()).toStackTraceFrame()
413         } else {
414             null
415         }
416         return createOwner(completion, frame)
417     }
418 
419     private fun List<StackTraceElement>.toStackTraceFrame(): StackTraceFrame? =
420         foldRight<StackTraceElement, StackTraceFrame?>(null) { frame, acc ->
421             StackTraceFrame(acc, frame)
422         }
423 
424     private fun <T> createOwner(completion: Continuation<T>, frame: StackTraceFrame?): Continuation<T> {
425         if (!isInstalled) return completion
426         val info = DebugCoroutineInfoImpl(completion.context, frame, sequenceNumber.incrementAndGet())
427         val owner = CoroutineOwner(completion, info, frame)
428         capturedCoroutinesMap[owner] = true
429         if (!isInstalled) capturedCoroutinesMap.clear()
430         return owner
431     }
432 
433     // Not guarded by the lock at all, does not really affect consistency
434     private fun probeCoroutineCompleted(owner: CoroutineOwner<*>) {
435         capturedCoroutinesMap.remove(owner)
436         /*
437          * This removal is a guard against improperly implemented CoroutineStackFrame
438          * and bugs in the compiler.
439          */
440         val caller = owner.info.lastObservedFrame?.realCaller() ?: return
441         callerInfoCache.remove(caller)
442     }
443 
444     /**
445      * This class is injected as completion of all continuations in [probeCoroutineCompleted].
446      * It is owning the coroutine info and responsible for managing all its external info related to debug agent.
447      */
448     private class CoroutineOwner<T>(
449         @JvmField val delegate: Continuation<T>,
450         @JvmField val info: DebugCoroutineInfoImpl,
451         private val frame: CoroutineStackFrame?
452     ) : Continuation<T> by delegate, CoroutineStackFrame {
453 
454         override val callerFrame: CoroutineStackFrame?
455             get() = frame?.callerFrame
456 
457         override fun getStackTraceElement(): StackTraceElement? = frame?.getStackTraceElement()
458 
459         override fun resumeWith(result: Result<T>) {
460             probeCoroutineCompleted(this)
461             delegate.resumeWith(result)
462         }
463 
464         override fun toString(): String = delegate.toString()
465     }
466 
467     private fun <T : Throwable> sanitizeStackTrace(throwable: T): List<StackTraceElement> {
468         val stackTrace = throwable.stackTrace
469         val size = stackTrace.size
470         val probeIndex = stackTrace.indexOfLast { it.className == "kotlin.coroutines.jvm.internal.DebugProbesKt" }
471 
472         if (!sanitizeStackTraces) {
473             return List(size - probeIndex) {
474                 if (it == 0) createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE) else stackTrace[it + probeIndex]
475             }
476         }
477 
478         /*
479          * Trim intervals of internal methods from the stacktrace (bounds are excluded from trimming)
480          * E.g. for sequence [e, i1, i2, i3, e, i4, e, i5, i6, e7]
481          * output will be [e, i1, i3, e, i4, e, i5, i7]
482          */
483         val result = ArrayList<StackTraceElement>(size - probeIndex + 1)
484         result += createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE)
485         var includeInternalFrame = true
486         for (i in (probeIndex + 1) until size - 1) {
487             val element = stackTrace[i]
488             if (!element.isInternalMethod) {
489                 includeInternalFrame = true
490                 result += element
491                 continue
492             }
493 
494             if (includeInternalFrame) {
495                 result += element
496                 includeInternalFrame = false
497             } else if (stackTrace[i + 1].isInternalMethod) {
498                 continue
499             } else {
500                 result += element
501                 includeInternalFrame = true
502             }
503 
504         }
505 
506         result += stackTrace[size - 1]
507         return result
508     }
509 
510     private val StackTraceElement.isInternalMethod: Boolean get() = className.startsWith("kotlinx.coroutines")
511 }
512