• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
20 #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <limits>
25 
26 #include <grpc/impl/codegen/grpc_types.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/cpu.h>
29 #include <grpc/support/log.h>
30 
31 #include "src/core/lib/gpr/time_precise.h"
32 #include "src/core/lib/gpr/tls.h"
33 #include "src/core/lib/gprpp/debug_location.h"
34 #include "src/core/lib/gprpp/fork.h"
35 #include "src/core/lib/iomgr/closure.h"
36 
37 typedef int64_t grpc_millis;
38 
39 #define GRPC_MILLIS_INF_FUTURE INT64_MAX
40 #define GRPC_MILLIS_INF_PAST INT64_MIN
41 
42 /** A combiner represents a list of work to be executed later.
43     Forward declared here to avoid a circular dependency with combiner.h. */
44 typedef struct grpc_combiner grpc_combiner;
45 
46 /* This exec_ctx is ready to return: either pre-populated, or cached as soon as
47    the finish_check returns true */
48 #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
49 /* The exec_ctx's thread is (potentially) owned by a call or channel: care
50    should be given to not delete said call/channel from this exec_ctx */
51 #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
52 /* This exec ctx was initialized by an internal thread, and should not
53    be counted by fork handlers */
54 #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
55 
56 /* This application callback exec ctx was initialized by an internal thread, and
57    should not be counted by fork handlers */
58 #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
59 
60 gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
61 grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts);
62 grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts);
63 grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles);
64 grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles);
65 
66 namespace grpc_core {
67 class Combiner;
68 /** Execution context.
69  *  A bag of data that collects information along a callstack.
70  *  It is created on the stack at core entry points (public API or iomgr), and
71  *  stored internally as a thread-local variable.
72  *
73  *  Generally, to create an exec_ctx instance, add the following line at the top
74  *  of the public API entry point or at the start of a thread's work function :
75  *
76  *  grpc_core::ExecCtx exec_ctx;
77  *
78  *  Access the created ExecCtx instance using :
79  *  grpc_core::ExecCtx::Get()
80  *
81  *  Specific responsibilities (this may grow in the future):
82  *  - track a list of core work that needs to be delayed until the base of the
83  *    call stack (this provides a convenient mechanism to run callbacks
84  *    without worrying about locking issues)
85  *  - provide a decision maker (via IsReadyToFinish) that provides a
86  *    signal as to whether a borrowed thread should continue to do work or
87  *    should actively try to finish up and get this thread back to its owner
88  *
89  *  CONVENTIONS:
90  *  - Instance of this must ALWAYS be constructed on the stack, never
91  *    heap allocated.
92  *  - Do not pass exec_ctx as a parameter to a function. Always access it using
93  *    grpc_core::ExecCtx::Get().
94  *  - NOTE: In the future, the convention is likely to change to allow only one
95  *          ExecCtx on a thread's stack at the same time. The TODO below
96  *          discusses this plan in more detail.
97  *
98  * TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
99  *                Stage 1: If a new one is created on the stack, it should just
100  *                pass-through to the underlying ExecCtx deeper in the thread's
101  *                stack.
102  *                Stage 2: Assert if a 2nd one is ever created on the stack
103  *                since that implies a core re-entry outside of application
104  *                callbacks.
105  */
106 class ExecCtx {
107  public:
108   /** Default Constructor */
109 
ExecCtx()110   ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
111     grpc_core::Fork::IncExecCtxCount();
112     Set(this);
113   }
114 
115   /** Parameterised Constructor */
ExecCtx(uintptr_t fl)116   explicit ExecCtx(uintptr_t fl) : flags_(fl) {
117     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
118       grpc_core::Fork::IncExecCtxCount();
119     }
120     Set(this);
121   }
122 
123   /** Destructor */
~ExecCtx()124   virtual ~ExecCtx() {
125     flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
126     Flush();
127     Set(last_exec_ctx_);
128     if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
129       grpc_core::Fork::DecExecCtxCount();
130     }
131   }
132 
133   /** Disallow copy and assignment operators */
134   ExecCtx(const ExecCtx&) = delete;
135   ExecCtx& operator=(const ExecCtx&) = delete;
136 
starting_cpu()137   unsigned starting_cpu() {
138     if (starting_cpu_ == std::numeric_limits<unsigned>::max()) {
139       starting_cpu_ = gpr_cpu_current_cpu();
140     }
141     return starting_cpu_;
142   }
143 
144   struct CombinerData {
145     /* currently active combiner: updated only via combiner.c */
146     Combiner* active_combiner;
147     /* last active combiner in the active combiner list */
148     Combiner* last_combiner;
149   };
150 
151   /** Only to be used by grpc-combiner code */
combiner_data()152   CombinerData* combiner_data() { return &combiner_data_; }
153 
154   /** Return pointer to grpc_closure_list */
closure_list()155   grpc_closure_list* closure_list() { return &closure_list_; }
156 
157   /** Return flags */
flags()158   uintptr_t flags() { return flags_; }
159 
160   /** Checks if there is work to be done */
HasWork()161   bool HasWork() {
162     return combiner_data_.active_combiner != nullptr ||
163            !grpc_closure_list_empty(closure_list_);
164   }
165 
166   /** Flush any work that has been enqueued onto this grpc_exec_ctx.
167    *  Caller must guarantee that no interfering locks are held.
168    *  Returns true if work was performed, false otherwise.
169    */
170   bool Flush();
171 
172   /** Returns true if we'd like to leave this execution context as soon as
173    *  possible: useful for deciding whether to do something more or not
174    *  depending on outside context.
175    */
IsReadyToFinish()176   bool IsReadyToFinish() {
177     if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
178       if (CheckReadyToFinish()) {
179         flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
180         return true;
181       }
182       return false;
183     } else {
184       return true;
185     }
186   }
187 
188   /** Returns the stored current time relative to start if valid,
189    *  otherwise refreshes the stored time, sets it valid and returns the new
190    *  value.
191    */
192   grpc_millis Now();
193 
194   /** Invalidates the stored time value. A new time value will be set on calling
195    *  Now().
196    */
InvalidateNow()197   void InvalidateNow() { now_is_valid_ = false; }
198 
199   /** To be used only by shutdown code in iomgr */
SetNowIomgrShutdown()200   void SetNowIomgrShutdown() {
201     now_ = GRPC_MILLIS_INF_FUTURE;
202     now_is_valid_ = true;
203   }
204 
205   /** To be used only for testing.
206    *  Sets the now value.
207    */
TestOnlySetNow(grpc_millis new_val)208   void TestOnlySetNow(grpc_millis new_val) {
209     now_ = new_val;
210     now_is_valid_ = true;
211   }
212 
213   static void TestOnlyGlobalInit(gpr_timespec new_val);
214 
215   /** Global initialization for ExecCtx. Called by iomgr. */
216   static void GlobalInit(void);
217 
218   /** Global shutdown for ExecCtx. Called by iomgr. */
GlobalShutdown(void)219   static void GlobalShutdown(void) { gpr_tls_destroy(&exec_ctx_); }
220 
221   /** Gets pointer to current exec_ctx. */
Get()222   static ExecCtx* Get() {
223     return reinterpret_cast<ExecCtx*>(gpr_tls_get(&exec_ctx_));
224   }
225 
Set(ExecCtx * exec_ctx)226   static void Set(ExecCtx* exec_ctx) {
227     gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
228   }
229 
230   static void Run(const DebugLocation& location, grpc_closure* closure,
231                   grpc_error* error);
232 
233   static void RunList(const DebugLocation& location, grpc_closure_list* list);
234 
235  protected:
236   /** Check if ready to finish. */
CheckReadyToFinish()237   virtual bool CheckReadyToFinish() { return false; }
238 
239   /** Disallow delete on ExecCtx. */
delete(void *)240   static void operator delete(void* /* p */) { abort(); }
241 
242  private:
243   /** Set exec_ctx_ to exec_ctx. */
244 
245   grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
246   CombinerData combiner_data_ = {nullptr, nullptr};
247   uintptr_t flags_;
248 
249   unsigned starting_cpu_ = std::numeric_limits<unsigned>::max();
250 
251   bool now_is_valid_ = false;
252   grpc_millis now_ = 0;
253 
254   GPR_TLS_CLASS_DECL(exec_ctx_);
255   ExecCtx* last_exec_ctx_ = Get();
256 };
257 
258 /** Application-callback execution context.
259  *  A bag of data that collects information along a callstack.
260  *  It is created on the stack at core entry points, and stored internally
261  *  as a thread-local variable.
262  *
263  *  There are three key differences between this structure and ExecCtx:
264  *    1. ApplicationCallbackExecCtx builds a list of application-level
265  *       callbacks, but ExecCtx builds a list of internal callbacks to invoke.
266  *    2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
267  *       there is no explicit Flush method.
268  *    3. If more than one ApplicationCallbackExecCtx is created on the thread's
269  *       stack, only the one closest to the base of the stack is actually
270  *       active and this is the only one that enqueues application callbacks.
271  *       (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
272  *       stack since the executing application callback may itself enter core.
273  *       However, the new one created will just pass callbacks through to the
274  *       base one and those will not be executed until the return to the
275  *       destructor of the base one, preventing unlimited stack growth.)
276  *
277  *  This structure exists because application callbacks may themselves cause a
278  *  core re-entry (e.g., through a public API call) and if that call in turn
279  *  causes another application-callback, there could be arbitrarily growing
280  *  stacks of core re-entries. Instead, any application callbacks instead should
281  *  not be invoked until other core work is done and other application callbacks
282  *  have completed. To accomplish this, any application callback should be
283  *  enqueued using grpc_core::ApplicationCallbackExecCtx::Enqueue .
284  *
285  *  CONVENTIONS:
286  *  - Instances of this must ALWAYS be constructed on the stack, never
287  *    heap allocated.
288  *  - Instances of this are generally constructed before ExecCtx when needed.
289  *    The only exception is for ExecCtx's that are explicitly flushed and
290  *    that survive beyond the scope of the function that can cause application
291  *    callbacks to be invoked (e.g., in the timer thread).
292  *
293  *  Generally, core entry points that may trigger application-level callbacks
294  *  will have the following declarations:
295  *
296  *  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
297  *  grpc_core::ExecCtx exec_ctx;
298  *
299  *  This ordering is important to make sure that the ApplicationCallbackExecCtx
300  *  is destroyed after the ExecCtx (to prevent the re-entry problem described
301  *  above, as well as making sure that ExecCtx core callbacks are invoked first)
302  *
303  */
304 
305 class ApplicationCallbackExecCtx {
306  public:
307   /** Default Constructor */
ApplicationCallbackExecCtx()308   ApplicationCallbackExecCtx() { Set(this, flags_); }
309 
310   /** Parameterised Constructor */
ApplicationCallbackExecCtx(uintptr_t fl)311   explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
312     Set(this, flags_);
313   }
314 
~ApplicationCallbackExecCtx()315   ~ApplicationCallbackExecCtx() {
316     if (reinterpret_cast<ApplicationCallbackExecCtx*>(
317             gpr_tls_get(&callback_exec_ctx_)) == this) {
318       while (head_ != nullptr) {
319         auto* f = head_;
320         head_ = f->internal_next;
321         if (f->internal_next == nullptr) {
322           tail_ = nullptr;
323         }
324         (*f->functor_run)(f, f->internal_success);
325       }
326       gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
327       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
328         grpc_core::Fork::DecExecCtxCount();
329       }
330     } else {
331       GPR_DEBUG_ASSERT(head_ == nullptr);
332       GPR_DEBUG_ASSERT(tail_ == nullptr);
333     }
334   }
335 
Flags()336   uintptr_t Flags() { return flags_; }
337 
Get()338   static ApplicationCallbackExecCtx* Get() {
339     return reinterpret_cast<ApplicationCallbackExecCtx*>(
340         gpr_tls_get(&callback_exec_ctx_));
341   }
342 
Set(ApplicationCallbackExecCtx * exec_ctx,uintptr_t flags)343   static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
344     if (Get() == nullptr) {
345       if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
346         grpc_core::Fork::IncExecCtxCount();
347       }
348       gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
349     }
350   }
351 
Enqueue(grpc_experimental_completion_queue_functor * functor,int is_success)352   static void Enqueue(grpc_experimental_completion_queue_functor* functor,
353                       int is_success) {
354     functor->internal_success = is_success;
355     functor->internal_next = nullptr;
356 
357     ApplicationCallbackExecCtx* ctx = Get();
358 
359     if (ctx->head_ == nullptr) {
360       ctx->head_ = functor;
361     }
362     if (ctx->tail_ != nullptr) {
363       ctx->tail_->internal_next = functor;
364     }
365     ctx->tail_ = functor;
366   }
367 
368   /** Global initialization for ApplicationCallbackExecCtx. Called by init. */
GlobalInit(void)369   static void GlobalInit(void) { gpr_tls_init(&callback_exec_ctx_); }
370 
371   /** Global shutdown for ApplicationCallbackExecCtx. Called by init. */
GlobalShutdown(void)372   static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); }
373 
Available()374   static bool Available() { return Get() != nullptr; }
375 
376  private:
377   uintptr_t flags_{0u};
378   grpc_experimental_completion_queue_functor* head_{nullptr};
379   grpc_experimental_completion_queue_functor* tail_{nullptr};
380   GPR_TLS_CLASS_DECL(callback_exec_ctx_);
381 };
382 }  // namespace grpc_core
383 
384 #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */
385