• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6 
7 package kotlinx.coroutines.debug
8 
9 import kotlinx.coroutines.scheduling.*
10 import reactor.blockhound.*
11 import reactor.blockhound.integration.*
12 
13 /**
14  * @suppress
15  */
16 public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {
17 
18     override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
19         allowBlockingCallsInPrimitiveImplementations()
20         allowBlockingWhenEnqueuingTasks()
21         allowServiceLoaderInvocationsOnInit()
22         allowBlockingCallsInReflectionImpl()
23         allowBlockingCallsInDebugProbes()
24         allowBlockingCallsInWorkQueue()
25         // Stacktrace recovery cache is guarded by lock
26         allowBlockingCallsInside("kotlinx.coroutines.internal.ExceptionsConstructorKt", "tryCopyException")
27         /* The predicates that define that BlockHound should only report blocking calls from threads that are part of
28         the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
29         addDynamicThreadPredicate { isSchedulerWorker(it) }
30         nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
31     }
32 
33     /**
34      * Allows blocking calls in various coroutine structures, such as flows and channels.
35      *
36      * They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
37      * locking in such places doesn't affect the program liveness.
38      */
39     private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
40         allowBlockingCallsInJobSupport()
41         allowBlockingCallsInThreadSafeHeap()
42         allowBlockingCallsInFlow()
43         allowBlockingCallsInChannels()
44     }
45 
46     /**
47      * Allows blocking inside [kotlinx.coroutines.JobSupport].
48      */
49     private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
50         for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
51             "tryMakeCompleting"))
52         {
53             allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
54         }
55     }
56 
57     /**
58      * Allow blocking calls inside [kotlinx.coroutines.debug.internal.DebugProbesImpl].
59      */
60     private fun BlockHound.Builder.allowBlockingCallsInDebugProbes() {
61         for (method in listOf("install", "uninstall", "hierarchyToString", "dumpCoroutinesInfo", "dumpDebuggerInfo",
62             "dumpCoroutinesSynchronized", "updateRunningState", "updateState"))
63         {
64             allowBlockingCallsInside("kotlinx.coroutines.debug.internal.DebugProbesImpl", method)
65         }
66     }
67 
68     /**
69      * Allow blocking calls inside [kotlinx.coroutines.scheduling.WorkQueue]
70      */
71     private fun BlockHound.Builder.allowBlockingCallsInWorkQueue() {
72         /** uses [Thread.yield] in a benign way. */
73         allowBlockingCallsInside("kotlinx.coroutines.scheduling.WorkQueue", "addLast")
74     }
75 
76     /**
77      * Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
78      */
79     private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
80         for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
81             allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
82         }
83     }
84 
85     private fun BlockHound.Builder.allowBlockingCallsInFlow() {
86         allowBlockingCallsInsideStateFlow()
87         allowBlockingCallsInsideSharedFlow()
88     }
89 
90     /**
91      * Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
92      */
93     private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
94         allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
95     }
96 
97     /**
98      * Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
99      */
100     private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
101         for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
102             "tryTakeValue", "resetReplayCache"))
103         {
104             allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
105         }
106         for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
107             allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
108         }
109     }
110 
111     private fun BlockHound.Builder.allowBlockingCallsInChannels() {
112         allowBlockingCallsInBroadcastChannels()
113         allowBlockingCallsInConflatedChannels()
114     }
115 
116     /**
117      * Allows blocking inside [kotlinx.coroutines.channels.BroadcastChannel].
118      */
119     private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannels() {
120         for (method in listOf("openSubscription", "removeSubscriber", "send", "trySend", "registerSelectForSend",
121                               "close", "cancelImpl", "isClosedForSend", "value", "valueOrNull"))
122         {
123             allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl", method)
124         }
125         for (method in listOf("cancelImpl")) {
126             allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberConflated", method)
127         }
128         for (method in listOf("cancelImpl")) {
129             allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberBuffered", method)
130         }
131     }
132 
133     /**
134      * Allows blocking inside [kotlinx.coroutines.channels.ConflatedBufferedChannel].
135      */
136     private fun BlockHound.Builder.allowBlockingCallsInConflatedChannels() {
137         for (method in listOf("receive", "receiveCatching", "tryReceive", "registerSelectForReceive",
138                               "send", "trySend", "sendBroadcast", "registerSelectForSend",
139                               "close", "cancelImpl", "isClosedForSend", "isClosedForReceive", "isEmpty"))
140         {
141             allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel", method)
142         }
143         for (method in listOf("hasNext")) {
144             allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel\$ConflatedChannelIterator", method)
145         }
146     }
147 
148     /**
149      * Allows blocking when enqueuing tasks into a thread pool.
150      *
151      * Without this, the following code breaks:
152      * ```
153      * withContext(Dispatchers.Default) {
154      *     withContext(newSingleThreadContext("singleThreadedContext")) {
155      *     }
156      * }
157      * ```
158      */
159     private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
160         /* This method may block as part of its implementation, but is probably safe. */
161         allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
162     }
163 
164     /**
165      * Allows instances of [java.util.ServiceLoader] being called.
166      *
167      * Each instance is listed separately; another approach could be to generally allow the operations performed by
168      * service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
169      * large API surface, with some methods being hidden as implementation details (in particular, the implementation of
170      * its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
171      * would be brittle, so here we only provide clearance rules for some specific instances.
172      */
173     private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
174         allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
175         allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
176         // not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
177         allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
178     }
179 
180     /**
181      * Allows some blocking calls from the reflection API.
182      *
183      * The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
184      * simple examples work without problems.
185      */
186     private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
187         allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
188     }
189 
190 }
191