1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H 20 #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <limits> 25 26 #include <grpc/impl/codegen/grpc_types.h> 27 #include <grpc/support/atm.h> 28 #include <grpc/support/cpu.h> 29 #include <grpc/support/log.h> 30 31 #include "src/core/lib/gpr/time_precise.h" 32 #include "src/core/lib/gpr/tls.h" 33 #include "src/core/lib/gprpp/debug_location.h" 34 #include "src/core/lib/gprpp/fork.h" 35 #include "src/core/lib/iomgr/closure.h" 36 37 typedef int64_t grpc_millis; 38 39 #define GRPC_MILLIS_INF_FUTURE INT64_MAX 40 #define GRPC_MILLIS_INF_PAST INT64_MIN 41 42 /** A combiner represents a list of work to be executed later. 43 Forward declared here to avoid a circular dependency with combiner.h. */ 44 typedef struct grpc_combiner grpc_combiner; 45 46 /* This exec_ctx is ready to return: either pre-populated, or cached as soon as 47 the finish_check returns true */ 48 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1 49 /* The exec_ctx's thread is (potentially) owned by a call or channel: care 50 should be given to not delete said call/channel from this exec_ctx */ 51 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2 52 /* This exec ctx was initialized by an internal thread, and should not 53 be counted by fork handlers */ 54 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4 55 56 /* This application callback exec ctx was initialized by an internal thread, and 57 should not be counted by fork handlers */ 58 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1 59 60 gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); 61 grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts); 62 grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts); 63 grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles); 64 grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles); 65 66 namespace grpc_core { 67 class Combiner; 68 /** Execution context. 69 * A bag of data that collects information along a callstack. 70 * It is created on the stack at core entry points (public API or iomgr), and 71 * stored internally as a thread-local variable. 72 * 73 * Generally, to create an exec_ctx instance, add the following line at the top 74 * of the public API entry point or at the start of a thread's work function : 75 * 76 * grpc_core::ExecCtx exec_ctx; 77 * 78 * Access the created ExecCtx instance using : 79 * grpc_core::ExecCtx::Get() 80 * 81 * Specific responsibilities (this may grow in the future): 82 * - track a list of core work that needs to be delayed until the base of the 83 * call stack (this provides a convenient mechanism to run callbacks 84 * without worrying about locking issues) 85 * - provide a decision maker (via IsReadyToFinish) that provides a 86 * signal as to whether a borrowed thread should continue to do work or 87 * should actively try to finish up and get this thread back to its owner 88 * 89 * CONVENTIONS: 90 * - Instance of this must ALWAYS be constructed on the stack, never 91 * heap allocated. 92 * - Do not pass exec_ctx as a parameter to a function. Always access it using 93 * grpc_core::ExecCtx::Get(). 94 * - NOTE: In the future, the convention is likely to change to allow only one 95 * ExecCtx on a thread's stack at the same time. The TODO below 96 * discusses this plan in more detail. 97 * 98 * TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time. 99 * Stage 1: If a new one is created on the stack, it should just 100 * pass-through to the underlying ExecCtx deeper in the thread's 101 * stack. 102 * Stage 2: Assert if a 2nd one is ever created on the stack 103 * since that implies a core re-entry outside of application 104 * callbacks. 105 */ 106 class ExecCtx { 107 public: 108 /** Default Constructor */ 109 ExecCtx()110 ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { 111 grpc_core::Fork::IncExecCtxCount(); 112 Set(this); 113 } 114 115 /** Parameterised Constructor */ ExecCtx(uintptr_t fl)116 explicit ExecCtx(uintptr_t fl) : flags_(fl) { 117 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 118 grpc_core::Fork::IncExecCtxCount(); 119 } 120 Set(this); 121 } 122 123 /** Destructor */ ~ExecCtx()124 virtual ~ExecCtx() { 125 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; 126 Flush(); 127 Set(last_exec_ctx_); 128 if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 129 grpc_core::Fork::DecExecCtxCount(); 130 } 131 } 132 133 /** Disallow copy and assignment operators */ 134 ExecCtx(const ExecCtx&) = delete; 135 ExecCtx& operator=(const ExecCtx&) = delete; 136 starting_cpu()137 unsigned starting_cpu() { 138 if (starting_cpu_ == std::numeric_limits<unsigned>::max()) { 139 starting_cpu_ = gpr_cpu_current_cpu(); 140 } 141 return starting_cpu_; 142 } 143 144 struct CombinerData { 145 /* currently active combiner: updated only via combiner.c */ 146 Combiner* active_combiner; 147 /* last active combiner in the active combiner list */ 148 Combiner* last_combiner; 149 }; 150 151 /** Only to be used by grpc-combiner code */ combiner_data()152 CombinerData* combiner_data() { return &combiner_data_; } 153 154 /** Return pointer to grpc_closure_list */ closure_list()155 grpc_closure_list* closure_list() { return &closure_list_; } 156 157 /** Return flags */ flags()158 uintptr_t flags() { return flags_; } 159 160 /** Checks if there is work to be done */ HasWork()161 bool HasWork() { 162 return combiner_data_.active_combiner != nullptr || 163 !grpc_closure_list_empty(closure_list_); 164 } 165 166 /** Flush any work that has been enqueued onto this grpc_exec_ctx. 167 * Caller must guarantee that no interfering locks are held. 168 * Returns true if work was performed, false otherwise. 169 */ 170 bool Flush(); 171 172 /** Returns true if we'd like to leave this execution context as soon as 173 * possible: useful for deciding whether to do something more or not 174 * depending on outside context. 175 */ IsReadyToFinish()176 bool IsReadyToFinish() { 177 if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { 178 if (CheckReadyToFinish()) { 179 flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; 180 return true; 181 } 182 return false; 183 } else { 184 return true; 185 } 186 } 187 188 /** Returns the stored current time relative to start if valid, 189 * otherwise refreshes the stored time, sets it valid and returns the new 190 * value. 191 */ 192 grpc_millis Now(); 193 194 /** Invalidates the stored time value. A new time value will be set on calling 195 * Now(). 196 */ InvalidateNow()197 void InvalidateNow() { now_is_valid_ = false; } 198 199 /** To be used only by shutdown code in iomgr */ SetNowIomgrShutdown()200 void SetNowIomgrShutdown() { 201 now_ = GRPC_MILLIS_INF_FUTURE; 202 now_is_valid_ = true; 203 } 204 205 /** To be used only for testing. 206 * Sets the now value. 207 */ TestOnlySetNow(grpc_millis new_val)208 void TestOnlySetNow(grpc_millis new_val) { 209 now_ = new_val; 210 now_is_valid_ = true; 211 } 212 213 static void TestOnlyGlobalInit(gpr_timespec new_val); 214 215 /** Global initialization for ExecCtx. Called by iomgr. */ 216 static void GlobalInit(void); 217 218 /** Global shutdown for ExecCtx. Called by iomgr. */ GlobalShutdown(void)219 static void GlobalShutdown(void) { gpr_tls_destroy(&exec_ctx_); } 220 221 /** Gets pointer to current exec_ctx. */ Get()222 static ExecCtx* Get() { 223 return reinterpret_cast<ExecCtx*>(gpr_tls_get(&exec_ctx_)); 224 } 225 Set(ExecCtx * exec_ctx)226 static void Set(ExecCtx* exec_ctx) { 227 gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx)); 228 } 229 230 static void Run(const DebugLocation& location, grpc_closure* closure, 231 grpc_error* error); 232 233 static void RunList(const DebugLocation& location, grpc_closure_list* list); 234 235 protected: 236 /** Check if ready to finish. */ CheckReadyToFinish()237 virtual bool CheckReadyToFinish() { return false; } 238 239 /** Disallow delete on ExecCtx. */ delete(void *)240 static void operator delete(void* /* p */) { abort(); } 241 242 private: 243 /** Set exec_ctx_ to exec_ctx. */ 244 245 grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; 246 CombinerData combiner_data_ = {nullptr, nullptr}; 247 uintptr_t flags_; 248 249 unsigned starting_cpu_ = std::numeric_limits<unsigned>::max(); 250 251 bool now_is_valid_ = false; 252 grpc_millis now_ = 0; 253 254 GPR_TLS_CLASS_DECL(exec_ctx_); 255 ExecCtx* last_exec_ctx_ = Get(); 256 }; 257 258 /** Application-callback execution context. 259 * A bag of data that collects information along a callstack. 260 * It is created on the stack at core entry points, and stored internally 261 * as a thread-local variable. 262 * 263 * There are three key differences between this structure and ExecCtx: 264 * 1. ApplicationCallbackExecCtx builds a list of application-level 265 * callbacks, but ExecCtx builds a list of internal callbacks to invoke. 266 * 2. ApplicationCallbackExecCtx invokes its callbacks only at destruction; 267 * there is no explicit Flush method. 268 * 3. If more than one ApplicationCallbackExecCtx is created on the thread's 269 * stack, only the one closest to the base of the stack is actually 270 * active and this is the only one that enqueues application callbacks. 271 * (Unlike ExecCtx, it is not feasible to prevent multiple of these on the 272 * stack since the executing application callback may itself enter core. 273 * However, the new one created will just pass callbacks through to the 274 * base one and those will not be executed until the return to the 275 * destructor of the base one, preventing unlimited stack growth.) 276 * 277 * This structure exists because application callbacks may themselves cause a 278 * core re-entry (e.g., through a public API call) and if that call in turn 279 * causes another application-callback, there could be arbitrarily growing 280 * stacks of core re-entries. Instead, any application callbacks instead should 281 * not be invoked until other core work is done and other application callbacks 282 * have completed. To accomplish this, any application callback should be 283 * enqueued using grpc_core::ApplicationCallbackExecCtx::Enqueue . 284 * 285 * CONVENTIONS: 286 * - Instances of this must ALWAYS be constructed on the stack, never 287 * heap allocated. 288 * - Instances of this are generally constructed before ExecCtx when needed. 289 * The only exception is for ExecCtx's that are explicitly flushed and 290 * that survive beyond the scope of the function that can cause application 291 * callbacks to be invoked (e.g., in the timer thread). 292 * 293 * Generally, core entry points that may trigger application-level callbacks 294 * will have the following declarations: 295 * 296 * grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; 297 * grpc_core::ExecCtx exec_ctx; 298 * 299 * This ordering is important to make sure that the ApplicationCallbackExecCtx 300 * is destroyed after the ExecCtx (to prevent the re-entry problem described 301 * above, as well as making sure that ExecCtx core callbacks are invoked first) 302 * 303 */ 304 305 class ApplicationCallbackExecCtx { 306 public: 307 /** Default Constructor */ ApplicationCallbackExecCtx()308 ApplicationCallbackExecCtx() { Set(this, flags_); } 309 310 /** Parameterised Constructor */ ApplicationCallbackExecCtx(uintptr_t fl)311 explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) { 312 Set(this, flags_); 313 } 314 ~ApplicationCallbackExecCtx()315 ~ApplicationCallbackExecCtx() { 316 if (reinterpret_cast<ApplicationCallbackExecCtx*>( 317 gpr_tls_get(&callback_exec_ctx_)) == this) { 318 while (head_ != nullptr) { 319 auto* f = head_; 320 head_ = f->internal_next; 321 if (f->internal_next == nullptr) { 322 tail_ = nullptr; 323 } 324 (*f->functor_run)(f, f->internal_success); 325 } 326 gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr)); 327 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { 328 grpc_core::Fork::DecExecCtxCount(); 329 } 330 } else { 331 GPR_DEBUG_ASSERT(head_ == nullptr); 332 GPR_DEBUG_ASSERT(tail_ == nullptr); 333 } 334 } 335 Flags()336 uintptr_t Flags() { return flags_; } 337 Get()338 static ApplicationCallbackExecCtx* Get() { 339 return reinterpret_cast<ApplicationCallbackExecCtx*>( 340 gpr_tls_get(&callback_exec_ctx_)); 341 } 342 Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)343 static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) { 344 if (Get() == nullptr) { 345 if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) { 346 grpc_core::Fork::IncExecCtxCount(); 347 } 348 gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx)); 349 } 350 } 351 Enqueue(grpc_experimental_completion_queue_functor * functor,int is_success)352 static void Enqueue(grpc_experimental_completion_queue_functor* functor, 353 int is_success) { 354 functor->internal_success = is_success; 355 functor->internal_next = nullptr; 356 357 ApplicationCallbackExecCtx* ctx = Get(); 358 359 if (ctx->head_ == nullptr) { 360 ctx->head_ = functor; 361 } 362 if (ctx->tail_ != nullptr) { 363 ctx->tail_->internal_next = functor; 364 } 365 ctx->tail_ = functor; 366 } 367 368 /** Global initialization for ApplicationCallbackExecCtx. Called by init. */ GlobalInit(void)369 static void GlobalInit(void) { gpr_tls_init(&callback_exec_ctx_); } 370 371 /** Global shutdown for ApplicationCallbackExecCtx. Called by init. */ GlobalShutdown(void)372 static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); } 373 Available()374 static bool Available() { return Get() != nullptr; } 375 376 private: 377 uintptr_t flags_{0u}; 378 grpc_experimental_completion_queue_functor* head_{nullptr}; 379 grpc_experimental_completion_queue_functor* tail_{nullptr}; 380 GPR_TLS_CLASS_DECL(callback_exec_ctx_); 381 }; 382 } // namespace grpc_core 383 384 #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ 385