1 /* <lambda>null2 * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") 6 7 package kotlinx.coroutines.debug 8 9 import kotlinx.coroutines.scheduling.* 10 import reactor.blockhound.* 11 import reactor.blockhound.integration.* 12 13 /** 14 * @suppress 15 */ 16 public class CoroutinesBlockHoundIntegration : BlockHoundIntegration { 17 18 override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) { 19 allowBlockingCallsInPrimitiveImplementations() 20 allowBlockingWhenEnqueuingTasks() 21 allowServiceLoaderInvocationsOnInit() 22 allowBlockingCallsInReflectionImpl() 23 allowBlockingCallsInDebugProbes() 24 allowBlockingCallsInWorkQueue() 25 // Stacktrace recovery cache is guarded by lock 26 allowBlockingCallsInside("kotlinx.coroutines.internal.ExceptionsConstructorKt", "tryCopyException") 27 /* The predicates that define that BlockHound should only report blocking calls from threads that are part of 28 the coroutine thread pool and currently execute a CPU-bound coroutine computation. */ 29 addDynamicThreadPredicate { isSchedulerWorker(it) } 30 nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } } 31 } 32 33 /** 34 * Allows blocking calls in various coroutine structures, such as flows and channels. 35 * 36 * They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so 37 * locking in such places doesn't affect the program liveness. 38 */ 39 private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() { 40 allowBlockingCallsInJobSupport() 41 allowBlockingCallsInThreadSafeHeap() 42 allowBlockingCallsInFlow() 43 allowBlockingCallsInChannels() 44 } 45 46 /** 47 * Allows blocking inside [kotlinx.coroutines.JobSupport]. 48 */ 49 private fun BlockHound.Builder.allowBlockingCallsInJobSupport() { 50 for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling", 51 "tryMakeCompleting")) 52 { 53 allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method) 54 } 55 } 56 57 /** 58 * Allow blocking calls inside [kotlinx.coroutines.debug.internal.DebugProbesImpl]. 59 */ 60 private fun BlockHound.Builder.allowBlockingCallsInDebugProbes() { 61 for (method in listOf("install", "uninstall", "hierarchyToString", "dumpCoroutinesInfo", "dumpDebuggerInfo", 62 "dumpCoroutinesSynchronized", "updateRunningState", "updateState")) 63 { 64 allowBlockingCallsInside("kotlinx.coroutines.debug.internal.DebugProbesImpl", method) 65 } 66 } 67 68 /** 69 * Allow blocking calls inside [kotlinx.coroutines.scheduling.WorkQueue] 70 */ 71 private fun BlockHound.Builder.allowBlockingCallsInWorkQueue() { 72 /** uses [Thread.yield] in a benign way. */ 73 allowBlockingCallsInside("kotlinx.coroutines.scheduling.WorkQueue", "addLast") 74 } 75 76 /** 77 * Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap]. 78 */ 79 private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() { 80 for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) { 81 allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method) 82 } 83 } 84 85 private fun BlockHound.Builder.allowBlockingCallsInFlow() { 86 allowBlockingCallsInsideStateFlow() 87 allowBlockingCallsInsideSharedFlow() 88 } 89 90 /** 91 * Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow]. 92 */ 93 private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() { 94 allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState") 95 } 96 97 /** 98 * Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow]. 99 */ 100 private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() { 101 for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter", 102 "tryTakeValue", "resetReplayCache")) 103 { 104 allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method) 105 } 106 for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) { 107 allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method) 108 } 109 } 110 111 private fun BlockHound.Builder.allowBlockingCallsInChannels() { 112 allowBlockingCallsInBroadcastChannels() 113 allowBlockingCallsInConflatedChannels() 114 } 115 116 /** 117 * Allows blocking inside [kotlinx.coroutines.channels.BroadcastChannel]. 118 */ 119 private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannels() { 120 for (method in listOf("openSubscription", "removeSubscriber", "send", "trySend", "registerSelectForSend", 121 "close", "cancelImpl", "isClosedForSend", "value", "valueOrNull")) 122 { 123 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl", method) 124 } 125 for (method in listOf("cancelImpl")) { 126 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberConflated", method) 127 } 128 for (method in listOf("cancelImpl")) { 129 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberBuffered", method) 130 } 131 } 132 133 /** 134 * Allows blocking inside [kotlinx.coroutines.channels.ConflatedBufferedChannel]. 135 */ 136 private fun BlockHound.Builder.allowBlockingCallsInConflatedChannels() { 137 for (method in listOf("receive", "receiveCatching", "tryReceive", "registerSelectForReceive", 138 "send", "trySend", "sendBroadcast", "registerSelectForSend", 139 "close", "cancelImpl", "isClosedForSend", "isClosedForReceive", "isEmpty")) 140 { 141 allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel", method) 142 } 143 for (method in listOf("hasNext")) { 144 allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel\$ConflatedChannelIterator", method) 145 } 146 } 147 148 /** 149 * Allows blocking when enqueuing tasks into a thread pool. 150 * 151 * Without this, the following code breaks: 152 * ``` 153 * withContext(Dispatchers.Default) { 154 * withContext(newSingleThreadContext("singleThreadedContext")) { 155 * } 156 * } 157 * ``` 158 */ 159 private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() { 160 /* This method may block as part of its implementation, but is probably safe. */ 161 allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute") 162 } 163 164 /** 165 * Allows instances of [java.util.ServiceLoader] being called. 166 * 167 * Each instance is listed separately; another approach could be to generally allow the operations performed by 168 * service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a 169 * large API surface, with some methods being hidden as implementation details (in particular, the implementation of 170 * its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation 171 * would be brittle, so here we only provide clearance rules for some specific instances. 172 */ 173 private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() { 174 allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>") 175 allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>") 176 // not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking 177 allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>") 178 } 179 180 /** 181 * Allows some blocking calls from the reflection API. 182 * 183 * The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some 184 * simple examples work without problems. 185 */ 186 private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() { 187 allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage") 188 } 189 190 } 191