• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <limits>
25 
26 #if __APPLE__
27 // Provides TARGET_OS_IPHONE
28 #include <TargetConditionals.h>
29 #endif
30 
31 #include <grpc/impl/grpc_types.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/cpu.h>
34 #include <grpc/support/time.h>
35 
36 #include "absl/log/check.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/iomgr/closure.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/fork.h"
41 #include "src/core/util/latent_see.h"
42 #include "src/core/util/time.h"
43 #include "src/core/util/time_precise.h"
44 
45 #if !defined(_WIN32) || !defined(_DLL)
46 #define EXEC_CTX exec_ctx_
47 #define CALLBACK_EXEC_CTX callback_exec_ctx_
48 #else
49 #define EXEC_CTX exec_ctx()
50 #define CALLBACK_EXEC_CTX callback_exec_ctx()
51 #endif
52 
53 /// A combiner represents a list of work to be executed later.
54 /// Forward declared here to avoid a circular dependency with combiner.h.
55 typedef struct grpc_combiner grpc_combiner;
56 
57 // This exec_ctx is ready to return: either pre-populated, or cached as soon as
58 // the finish_check returns true
59 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
60 // The exec_ctx's thread is (potentially) owned by a call or channel: care
61 // should be given to not delete said call/channel from this exec_ctx
62 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
63 // This exec ctx was initialized by an internal thread, and should not
64 // be counted by fork handlers
65 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
66 
67 // This application callback exec ctx was initialized by an internal thread, and
68 // should not be counted by fork handlers
69 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
70 
71 namespace grpc_core {
72 class Combiner;
73 /// Execution context.
74 /// A bag of data that collects information along a callstack.
75 /// It is created on the stack at core entry points (public API or iomgr), and
76 /// stored internally as a thread-local variable.
77 ///
78 /// Generally, to create an exec_ctx instance, add the following line at the top
79 /// of the public API entry point or at the start of a thread's work function :
80 ///
81 /// ExecCtx exec_ctx;
82 ///
83 /// Access the created ExecCtx instance using :
84 /// ExecCtx::Get()
85 ///
86 /// Specific responsibilities (this may grow in the future):
87 /// - track a list of core work that needs to be delayed until the base of the
88 ///   call stack (this provides a convenient mechanism to run callbacks
89 ///   without worrying about locking issues)
90 /// - provide a decision maker (via IsReadyToFinish) that provides a
91 ///   signal as to whether a borrowed thread should continue to do work or
92 ///   should actively try to finish up and get this thread back to its owner
93 ///
94 /// CONVENTIONS:
95 /// - Instance of this must ALWAYS be constructed on the stack, never
96 ///   heap allocated.
97 /// - Do not pass exec_ctx as a parameter to a function. Always access it using
98 ///   ExecCtx::Get().
99 /// - NOTE: In the future, the convention is likely to change to allow only one
100 ///         ExecCtx on a thread's stack at the same time. The TODO below
101 ///         discusses this plan in more detail.
102 ///
103 /// TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
104 ///               Stage 1: If a new one is created on the stack, it should just
105 ///               pass-through to the underlying ExecCtx deeper in the thread's
106 ///               stack.
107 ///               Stage 2: Assert if a 2nd one is ever created on the stack
108 ///               since that implies a core re-entry outside of application
109 ///               callbacks.
110 ///
111 class GRPC_DLL ExecCtx : public latent_see::ParentScope {
112  public:
113   /// Default Constructor
114 
ExecCtx()115   ExecCtx()
116       : latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")),
117         flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
118 #if !TARGET_OS_IPHONE
119     if (!IsTimeCachingInPartyEnabled()) {
120       time_cache_.emplace();
121     }
122 #endif
123     Fork::IncExecCtxCount();
124     Set(this);
125   }
126 
127   /// Parameterised Constructor
ExecCtx(uintptr_t fl)128   explicit ExecCtx(uintptr_t fl)
129       : ExecCtx(fl, GRPC_LATENT_SEE_METADATA("ExecCtx")) {}
130 
ExecCtx(uintptr_t fl,latent_see::Metadata * latent_see_metadata)131   explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata)
132       : latent_see::ParentScope(latent_see_metadata), flags_(fl) {
133 #if !TARGET_OS_IPHONE
134     if (!IsTimeCachingInPartyEnabled()) {
135       time_cache_.emplace();
136     }
137 #endif
138     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
139       Fork::IncExecCtxCount();
140     }
141     Set(this);
142   }
143 
144   /// Destructor
~ExecCtx()145   virtual ~ExecCtx() {
146     flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
147     Flush();
148     Set(last_exec_ctx_);
149     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
150       Fork::DecExecCtxCount();
151     }
152   }
153 
154   /// Disallow copy and assignment operators
155   ExecCtx(const ExecCtx&) = delete;
156   ExecCtx& operator=(const ExecCtx&) = delete;
157 
158   struct CombinerData {
159     // currently active combiner: updated only via combiner.c
160     Combiner* active_combiner;
161     // last active combiner in the active combiner list
162     Combiner* last_combiner;
163   };
164 
165   /// Only to be used by grpc-combiner code
combiner_data()166   CombinerData* combiner_data() { return &combiner_data_; }
167 
168   /// Return pointer to grpc_closure_list
closure_list()169   grpc_closure_list* closure_list() { return &closure_list_; }
170 
171   /// Return flags
flags()172   uintptr_t flags() { return flags_; }
173 
174   /// Checks if there is work to be done
HasWork()175   bool HasWork() {
176     return combiner_data_.active_combiner != nullptr ||
177            !grpc_closure_list_empty(closure_list_);
178   }
179 
180   /// Flush any work that has been enqueued onto this grpc_exec_ctx.
181   /// Caller must guarantee that no interfering locks are held.
182   /// Returns true if work was performed, false otherwise.
183   ///
184   bool Flush();
185 
186   /// Returns true if we'd like to leave this execution context as soon as
187   /// possible: useful for deciding whether to do something more or not
188   /// depending on outside context.
189   ///
IsReadyToFinish()190   bool IsReadyToFinish() {
191     if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
192       if (CheckReadyToFinish()) {
193         flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
194         return true;
195       }
196       return false;
197     } else {
198       return true;
199     }
200   }
201 
SetReadyToFinishFlag()202   void SetReadyToFinishFlag() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; }
203 
Now()204   Timestamp Now() { return Timestamp::Now(); }
205 
InvalidateNow()206   void InvalidateNow() {
207     if (time_cache_.has_value()) time_cache_->InvalidateCache();
208   }
209 
SetNowIomgrShutdown()210   void SetNowIomgrShutdown() {
211     // We get to do a test only set now on this path just because iomgr
212     // is getting removed and no point adding more interfaces for it.
213     TestOnlySetNow(Timestamp::InfFuture());
214   }
215 
TestOnlySetNow(Timestamp now)216   void TestOnlySetNow(Timestamp now) {
217     if (!time_cache_.has_value()) time_cache_.emplace();
218     time_cache_->TestOnlySetNow(now);
219   }
220 
221   /// Gets pointer to current exec_ctx.
Get()222   static ExecCtx* Get() { return EXEC_CTX; }
223 
224   static void Run(const DebugLocation& location, grpc_closure* closure,
225                   grpc_error_handle error);
226 
227   static void RunList(const DebugLocation& location, grpc_closure_list* list);
228 
229  protected:
230   /// Check if ready to finish.
CheckReadyToFinish()231   virtual bool CheckReadyToFinish() { return false; }
232 
233   /// Disallow delete on ExecCtx.
delete(void *)234   static void operator delete(void* /* p */) { abort(); }
235 
236  private:
237   /// Set EXEC_CTX to ctx.
Set(ExecCtx * ctx)238   static void Set(ExecCtx* ctx) { EXEC_CTX = ctx; }
239 
240   grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
241   CombinerData combiner_data_ = {nullptr, nullptr};
242   uintptr_t flags_;
243 
244   absl::optional<ScopedTimeCache> time_cache_;
245 
246 #if !defined(_WIN32) || !defined(_DLL)
247   static thread_local ExecCtx* exec_ctx_;
248 #else
249   // cannot be thread_local data member (e.g. exec_ctx_) on windows
250   static ExecCtx*& exec_ctx();
251 #endif
252   ExecCtx* last_exec_ctx_ = Get();
253 };
254 
255 /// Application-callback execution context.
256 /// A bag of data that collects information along a callstack.
257 /// It is created on the stack at core entry points, and stored internally
258 /// as a thread-local variable.
259 ///
260 /// There are three key differences between this structure and ExecCtx:
261 ///   1. ApplicationCallbackExecCtx builds a list of application-level
262 ///      callbacks, but ExecCtx builds a list of internal callbacks to invoke.
263 ///   2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
264 ///      there is no explicit Flush method.
265 ///   3. If more than one ApplicationCallbackExecCtx is created on the thread's
266 ///      stack, only the one closest to the base of the stack is actually
267 ///      active and this is the only one that enqueues application callbacks.
268 ///      (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
269 ///      stack since the executing application callback may itself enter core.
270 ///      However, the new one created will just pass callbacks through to the
271 ///      base one and those will not be executed until the return to the
272 ///      destructor of the base one, preventing unlimited stack growth.)
273 ///
274 /// This structure exists because application callbacks may themselves cause a
275 /// core re-entry (e.g., through a public API call) and if that call in turn
276 /// causes another application-callback, there could be arbitrarily growing
277 /// stacks of core re-entries. Instead, any application callbacks instead should
278 /// not be invoked until other core work is done and other application callbacks
279 /// have completed. To accomplish this, any application callback should be
280 /// enqueued using ApplicationCallbackExecCtx::Enqueue .
281 ///
282 /// CONVENTIONS:
283 /// - Instances of this must ALWAYS be constructed on the stack, never
284 ///   heap allocated.
285 /// - Instances of this are generally constructed before ExecCtx when needed.
286 ///   The only exception is for ExecCtx's that are explicitly flushed and
287 ///   that survive beyond the scope of the function that can cause application
288 ///   callbacks to be invoked (e.g., in the timer thread).
289 ///
290 /// Generally, core entry points that may trigger application-level callbacks
291 /// will have the following declarations:
292 ///
293 /// ApplicationCallbackExecCtx callback_exec_ctx;
294 /// ExecCtx exec_ctx;
295 ///
296 /// This ordering is important to make sure that the ApplicationCallbackExecCtx
297 /// is destroyed after the ExecCtx (to prevent the re-entry problem described
298 /// above, as well as making sure that ExecCtx core callbacks are invoked first)
299 ///
300 ///
301 
302 class GRPC_DLL ApplicationCallbackExecCtx {
303  public:
304   /// Default Constructor
ApplicationCallbackExecCtx()305   ApplicationCallbackExecCtx() { Set(this, flags_); }
306 
307   /// Parameterised Constructor
ApplicationCallbackExecCtx(uintptr_t fl)308   explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
309     Set(this, flags_);
310   }
311 
~ApplicationCallbackExecCtx()312   ~ApplicationCallbackExecCtx() {
313     if (Get() == this) {
314       while (head_ != nullptr) {
315         auto* f = head_;
316         head_ = f->internal_next;
317         if (f->internal_next == nullptr) {
318           tail_ = nullptr;
319         }
320         (*f->functor_run)(f, f->internal_success);
321       }
322       CALLBACK_EXEC_CTX = nullptr;
323       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
324         Fork::DecExecCtxCount();
325       }
326     } else {
327       DCHECK_EQ(head_, nullptr);
328       DCHECK_EQ(tail_, nullptr);
329     }
330   }
331 
Flags()332   uintptr_t Flags() { return flags_; }
333 
Get()334   static ApplicationCallbackExecCtx* Get() { return CALLBACK_EXEC_CTX; }
335 
Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)336   static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
337     if (Get() == nullptr) {
338       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
339         Fork::IncExecCtxCount();
340       }
341       CALLBACK_EXEC_CTX = exec_ctx;
342     }
343   }
344 
Enqueue(grpc_completion_queue_functor * functor,int is_success)345   static void Enqueue(grpc_completion_queue_functor* functor, int is_success) {
346     functor->internal_success = is_success;
347     functor->internal_next = nullptr;
348 
349     ApplicationCallbackExecCtx* ctx = Get();
350 
351     if (ctx->head_ == nullptr) {
352       ctx->head_ = functor;
353     }
354     if (ctx->tail_ != nullptr) {
355       ctx->tail_->internal_next = functor;
356     }
357     ctx->tail_ = functor;
358   }
359 
Available()360   static bool Available() { return Get() != nullptr; }
361 
362  private:
363   uintptr_t flags_{0u};
364   grpc_completion_queue_functor* head_{nullptr};
365   grpc_completion_queue_functor* tail_{nullptr};
366 
367 #if !defined(_WIN32) || !defined(_DLL)
368   static thread_local ApplicationCallbackExecCtx* callback_exec_ctx_;
369 #else
370   // cannot be thread_local data member (e.g. callback_exec_ctx_) on windows
371   static ApplicationCallbackExecCtx*& callback_exec_ctx();
372 #endif
373 };
374 
375 template <typename F>
EnsureRunInExecCtx(F f)376 void EnsureRunInExecCtx(F f) {
377   if (ExecCtx::Get() == nullptr) {
378     ApplicationCallbackExecCtx app_ctx;
379     ExecCtx exec_ctx;
380     f();
381   } else {
382     f();
383   }
384 }
385 
386 #undef EXEC_CTX
387 #undef CALLBACK_EXEC_CTX
388 
389 }  // namespace grpc_core
390 
391 #endif  // GRPC_SRC_CORE_LIB_IOMGR_EXEC_CTX_H
392