• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <assert.h>
25 #include <stdbool.h>
26 
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/manual_constructor.h"
33 #include "src/core/lib/gprpp/mpscq.h"
34 #include "src/core/lib/iomgr/error.h"
35 
36 struct grpc_closure;
37 typedef struct grpc_closure grpc_closure;
38 
39 extern grpc_core::DebugOnlyTraceFlag grpc_trace_closure;
40 
41 typedef struct grpc_closure_list {
42   grpc_closure* head;
43   grpc_closure* tail;
44 } grpc_closure_list;
45 
46 /// gRPC Callback definition.
47 ///
48 ///\param arg Arbitrary input.
49 ///\param error absl::OkStatus() if no error occurred, otherwise some grpc_error
50 ///             describing what went wrong.
51 ///             Error contract: it is not the cb's job to unref this error;
52 ///             the closure scheduler will do that after the cb returns
53 typedef void (*grpc_iomgr_cb_func)(void* arg, grpc_error_handle error);
54 
55 /// A closure over a grpc_iomgr_cb_func.
56 struct grpc_closure {
57   /// Once queued, next indicates the next queued closure; before then, scratch
58   /// space
59   union {
60     grpc_closure* next;
61     grpc_core::ManualConstructor<
62         grpc_core::MultiProducerSingleConsumerQueue::Node>
63         mpscq_node;
64     uintptr_t scratch;
65   } next_data;
66 
67   /// Bound callback.
68   grpc_iomgr_cb_func cb;
69 
70   /// Arguments to be passed to "cb".
71   void* cb_arg;
72 
73   /// Once queued, the result of the closure. Before then: scratch space
74   union {
75     uintptr_t error;
76     uintptr_t scratch;
77   } error_data;
78 
79 // extra tracing and debugging for grpc_closure. This incurs a decent amount of
80 // overhead per closure, so it must be enabled at compile time.
81 #ifndef NDEBUG
82   bool scheduled;
83   bool run;  // true = run, false = scheduled
84   const char* file_created;
85   int line_created;
86   const char* file_initiated;
87   int line_initiated;
88 #endif
89 
90   std::string DebugString() const;
91 };
92 
93 #ifndef NDEBUG
grpc_closure_init(const char * file,int line,grpc_closure * closure,grpc_iomgr_cb_func cb,void * cb_arg)94 inline grpc_closure* grpc_closure_init(const char* file, int line,
95                                        grpc_closure* closure,
96                                        grpc_iomgr_cb_func cb, void* cb_arg) {
97 #else
98 inline grpc_closure* grpc_closure_init(grpc_closure* closure,
99                                        grpc_iomgr_cb_func cb, void* cb_arg) {
100 #endif
101   closure->cb = cb;
102   closure->cb_arg = cb_arg;
103   closure->error_data.error = 0;
104 #ifndef NDEBUG
105   closure->scheduled = false;
106   closure->file_initiated = nullptr;
107   closure->line_initiated = 0;
108   closure->run = false;
109   closure->file_created = file;
110   closure->line_created = line;
111 #endif
112   return closure;
113 }
114 
115 /// Initializes \a closure with \a cb and \a cb_arg. Returns \a closure.
116 #ifndef NDEBUG
117 #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
118   grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg)
119 #else
120 #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
121   grpc_closure_init(closure, cb, cb_arg)
122 #endif
123 
124 namespace grpc_core {
125 template <typename T, void (T::*cb)(grpc_error_handle)>
126 grpc_closure MakeMemberClosure(T* p, DebugLocation location = DebugLocation()) {
127   grpc_closure out;
128   GRPC_CLOSURE_INIT(
129       &out, [](void* p, grpc_error_handle e) { (static_cast<T*>(p)->*cb)(e); },
130       p, nullptr);
131 #ifndef NDEBUG
132   out.file_created = location.file();
133   out.line_created = location.line();
134 #else
135   (void)location;
136 #endif
137   return out;
138 }
139 
140 template <typename T, void (T::*cb)()>
141 grpc_closure MakeMemberClosure(T* p, DebugLocation location = DebugLocation()) {
142   grpc_closure out;
143   GRPC_CLOSURE_INIT(
144       &out, [](void* p, grpc_error_handle) { (static_cast<T*>(p)->*cb)(); }, p,
145       nullptr);
146 #ifndef NDEBUG
147   out.file_created = location.file();
148   out.line_created = location.line();
149 #else
150   (void)location;
151 #endif
152   return out;
153 }
154 
155 template <typename F>
156 grpc_closure* NewClosure(F f) {
157   struct Closure : public grpc_closure {
158     explicit Closure(F f) : f(std::move(f)) {}
159     F f;
160     static void Run(void* arg, grpc_error_handle error) {
161       auto self = static_cast<Closure*>(arg);
162       self->f(error);
163       delete self;
164     }
165   };
166   Closure* c = new Closure(std::move(f));
167   GRPC_CLOSURE_INIT(c, Closure::Run, c, nullptr);
168   return c;
169 }
170 }  // namespace grpc_core
171 
172 namespace closure_impl {
173 
174 struct wrapped_closure {
175   grpc_iomgr_cb_func cb;
176   void* cb_arg;
177   grpc_closure wrapper;
178 };
179 inline void closure_wrapper(void* arg, grpc_error_handle error) {
180   wrapped_closure* wc = static_cast<wrapped_closure*>(arg);
181   grpc_iomgr_cb_func cb = wc->cb;
182   void* cb_arg = wc->cb_arg;
183   gpr_free(wc);
184   cb(cb_arg, error);
185 }
186 
187 }  // namespace closure_impl
188 
189 #ifndef NDEBUG
190 inline grpc_closure* grpc_closure_create(const char* file, int line,
191                                          grpc_iomgr_cb_func cb, void* cb_arg) {
192 #else
193 inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg) {
194 #endif
195   closure_impl::wrapped_closure* wc =
196       static_cast<closure_impl::wrapped_closure*>(gpr_malloc(sizeof(*wc)));
197   wc->cb = cb;
198   wc->cb_arg = cb_arg;
199 #ifndef NDEBUG
200   grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper,
201                     wc);
202 #else
203   grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc);
204 #endif
205   return &wc->wrapper;
206 }
207 
208 // Create a heap allocated closure: try to avoid except for very rare events
209 #ifndef NDEBUG
210 #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
211   grpc_closure_create(__FILE__, __LINE__, cb, cb_arg)
212 #else
213 #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
214   grpc_closure_create(cb, cb_arg)
215 #endif
216 
217 #define GRPC_CLOSURE_LIST_INIT \
218   { nullptr, nullptr }
219 
220 inline void grpc_closure_list_init(grpc_closure_list* closure_list) {
221   closure_list->head = closure_list->tail = nullptr;
222 }
223 
224 /// add \a closure to the end of \a list
225 /// Returns true if \a list becomes non-empty
226 inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
227                                      grpc_closure* closure) {
228   if (closure == nullptr) {
229     return false;
230   }
231   closure->next_data.next = nullptr;
232   bool was_empty = (closure_list->head == nullptr);
233   if (was_empty) {
234     closure_list->head = closure;
235   } else {
236     closure_list->tail->next_data.next = closure;
237   }
238   closure_list->tail = closure;
239   return was_empty;
240 }
241 
242 /// add \a closure to the end of \a list
243 /// and set \a closure's result to \a error
244 /// Returns true if \a list becomes non-empty
245 inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
246                                      grpc_closure* closure,
247                                      grpc_error_handle error) {
248   if (closure == nullptr) {
249     return false;
250   }
251   closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
252   return grpc_closure_list_append(closure_list, closure);
253 }
254 
255 /// force all success bits in \a list to false
256 inline void grpc_closure_list_fail_all(grpc_closure_list* list,
257                                        grpc_error_handle forced_failure) {
258   for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
259     if (c->error_data.error == 0) {
260       c->error_data.error =
261           grpc_core::internal::StatusAllocHeapPtr(forced_failure);
262     }
263   }
264 }
265 
266 /// append all closures from \a src to \a dst and empty \a src.
267 inline void grpc_closure_list_move(grpc_closure_list* src,
268                                    grpc_closure_list* dst) {
269   if (src->head == nullptr) {
270     return;
271   }
272   if (dst->head == nullptr) {
273     *dst = *src;
274   } else {
275     dst->tail->next_data.next = src->head;
276     dst->tail = src->tail;
277   }
278   src->head = src->tail = nullptr;
279 }
280 
281 /// return whether \a list is empty.
282 inline bool grpc_closure_list_empty(grpc_closure_list closure_list) {
283   return closure_list.head == nullptr;
284 }
285 
286 namespace grpc_core {
287 class Closure {
288  public:
289   static void Run(const DebugLocation& location, grpc_closure* closure,
290                   grpc_error_handle error) {
291     (void)location;
292     if (closure == nullptr) {
293       return;
294     }
295 #ifndef NDEBUG
296     if (grpc_trace_closure.enabled()) {
297       gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: run [%s:%d]",
298               closure, closure->file_created, closure->line_created,
299               location.file(), location.line());
300     }
301     GPR_ASSERT(closure->cb != nullptr);
302 #endif
303     closure->cb(closure->cb_arg, error);
304 #ifndef NDEBUG
305     if (grpc_trace_closure.enabled()) {
306       gpr_log(GPR_DEBUG, "closure %p finished", closure);
307     }
308 #endif
309   }
310 };
311 }  // namespace grpc_core
312 
313 #endif  // GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
314