• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
21 
22 #include <assert.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/port_platform.h>
25 #include <stdbool.h>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/core/lib/iomgr/error.h"
30 #include "src/core/util/crash.h"
31 #include "src/core/util/debug_location.h"
32 #include "src/core/util/manual_constructor.h"
33 #include "src/core/util/mpscq.h"
34 
35 struct grpc_closure;
36 typedef struct grpc_closure grpc_closure;
37 
38 typedef struct grpc_closure_list {
39   grpc_closure* head;
40   grpc_closure* tail;
41 } grpc_closure_list;
42 
43 /// gRPC Callback definition.
44 ///
45 ///\param arg Arbitrary input.
46 ///\param error absl::OkStatus() if no error occurred, otherwise some grpc_error
47 ///             describing what went wrong.
48 ///             Error contract: it is not the cb's job to unref this error;
49 ///             the closure scheduler will do that after the cb returns
50 typedef void (*grpc_iomgr_cb_func)(void* arg, grpc_error_handle error);
51 
52 /// A closure over a grpc_iomgr_cb_func.
53 struct grpc_closure {
54   /// Once queued, next indicates the next queued closure; before then, scratch
55   /// space
56   union {
57     grpc_closure* next;
58     grpc_core::ManualConstructor<
59         grpc_core::MultiProducerSingleConsumerQueue::Node>
60         mpscq_node;
61     uintptr_t scratch;
62   } next_data;
63 
64   /// Bound callback.
65   grpc_iomgr_cb_func cb;
66 
67   /// Arguments to be passed to "cb".
68   void* cb_arg;
69 
70   /// Once queued, the result of the closure. Before then: scratch space
71   union {
72     uintptr_t error;
73     uintptr_t scratch;
74   } error_data;
75 
76 // extra tracing and debugging for grpc_closure. This incurs a decent amount of
77 // overhead per closure, so it must be enabled at compile time.
78 #ifndef NDEBUG
79   bool scheduled;
80   bool run;  // true = run, false = scheduled
81   const char* file_created;
82   int line_created;
83   const char* file_initiated;
84   int line_initiated;
85 #endif
86 
87   std::string DebugString() const;
88 };
89 
90 #ifndef NDEBUG
grpc_closure_init(const char * file,int line,grpc_closure * closure,grpc_iomgr_cb_func cb,void * cb_arg)91 inline grpc_closure* grpc_closure_init(const char* file, int line,
92                                        grpc_closure* closure,
93                                        grpc_iomgr_cb_func cb, void* cb_arg) {
94 #else
95 inline grpc_closure* grpc_closure_init(grpc_closure* closure,
96                                        grpc_iomgr_cb_func cb, void* cb_arg) {
97 #endif
98   closure->cb = cb;
99   closure->cb_arg = cb_arg;
100   closure->error_data.error = 0;
101 #ifndef NDEBUG
102   closure->scheduled = false;
103   closure->file_initiated = nullptr;
104   closure->line_initiated = 0;
105   closure->run = false;
106   closure->file_created = file;
107   closure->line_created = line;
108 #endif
109   return closure;
110 }
111 
112 /// Initializes \a closure with \a cb and \a cb_arg. Returns \a closure.
113 #ifndef NDEBUG
114 #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
115   grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg)
116 #else
117 #define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
118   grpc_closure_init(closure, cb, cb_arg)
119 #endif
120 
121 namespace grpc_core {
122 template <typename T, void (T::*cb)(grpc_error_handle)>
123 grpc_closure MakeMemberClosure(T* p, DebugLocation location = DebugLocation()) {
124   grpc_closure out;
125   GRPC_CLOSURE_INIT(
126       &out, [](void* p, grpc_error_handle e) { (static_cast<T*>(p)->*cb)(e); },
127       p, nullptr);
128 #ifndef NDEBUG
129   out.file_created = location.file();
130   out.line_created = location.line();
131 #else
132   (void)location;
133 #endif
134   return out;
135 }
136 
137 template <typename T, void (T::*cb)()>
138 grpc_closure MakeMemberClosure(T* p, DebugLocation location = DebugLocation()) {
139   grpc_closure out;
140   GRPC_CLOSURE_INIT(
141       &out, [](void* p, grpc_error_handle) { (static_cast<T*>(p)->*cb)(); }, p,
142       nullptr);
143 #ifndef NDEBUG
144   out.file_created = location.file();
145   out.line_created = location.line();
146 #else
147   (void)location;
148 #endif
149   return out;
150 }
151 
152 template <typename F>
153 grpc_closure* NewClosure(F f) {
154   struct Closure : public grpc_closure {
155     explicit Closure(F f) : f(std::move(f)) {}
156     F f;
157     static void Run(void* arg, grpc_error_handle error) {
158       auto self = static_cast<Closure*>(arg);
159       self->f(error);
160       delete self;
161     }
162   };
163   Closure* c = new Closure(std::move(f));
164   GRPC_CLOSURE_INIT(c, Closure::Run, c, nullptr);
165   return c;
166 }
167 }  // namespace grpc_core
168 
169 namespace closure_impl {
170 
171 struct wrapped_closure {
172   grpc_iomgr_cb_func cb;
173   void* cb_arg;
174   grpc_closure wrapper;
175 };
176 inline void closure_wrapper(void* arg, grpc_error_handle error) {
177   wrapped_closure* wc = static_cast<wrapped_closure*>(arg);
178   grpc_iomgr_cb_func cb = wc->cb;
179   void* cb_arg = wc->cb_arg;
180   gpr_free(wc);
181   cb(cb_arg, error);
182 }
183 
184 }  // namespace closure_impl
185 
186 #ifndef NDEBUG
187 inline grpc_closure* grpc_closure_create(const char* file, int line,
188                                          grpc_iomgr_cb_func cb, void* cb_arg) {
189 #else
190 inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg) {
191 #endif
192   closure_impl::wrapped_closure* wc =
193       static_cast<closure_impl::wrapped_closure*>(gpr_malloc(sizeof(*wc)));
194   wc->cb = cb;
195   wc->cb_arg = cb_arg;
196 #ifndef NDEBUG
197   grpc_closure_init(file, line, &wc->wrapper, closure_impl::closure_wrapper,
198                     wc);
199 #else
200   grpc_closure_init(&wc->wrapper, closure_impl::closure_wrapper, wc);
201 #endif
202   return &wc->wrapper;
203 }
204 
205 // Create a heap allocated closure: try to avoid except for very rare events
206 #ifndef NDEBUG
207 #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
208   grpc_closure_create(__FILE__, __LINE__, cb, cb_arg)
209 #else
210 #define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
211   grpc_closure_create(cb, cb_arg)
212 #endif
213 
214 #define GRPC_CLOSURE_LIST_INIT \
215   {                            \
216     nullptr, nullptr           \
217   }
218 
219 inline void grpc_closure_list_init(grpc_closure_list* closure_list) {
220   closure_list->head = closure_list->tail = nullptr;
221 }
222 
223 /// add \a closure to the end of \a list
224 /// Returns true if \a list becomes non-empty
225 inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
226                                      grpc_closure* closure) {
227   if (closure == nullptr) {
228     return false;
229   }
230   closure->next_data.next = nullptr;
231   bool was_empty = (closure_list->head == nullptr);
232   if (was_empty) {
233     closure_list->head = closure;
234   } else {
235     closure_list->tail->next_data.next = closure;
236   }
237   closure_list->tail = closure;
238   return was_empty;
239 }
240 
241 /// add \a closure to the end of \a list
242 /// and set \a closure's result to \a error
243 /// Returns true if \a list becomes non-empty
244 inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
245                                      grpc_closure* closure,
246                                      grpc_error_handle error) {
247   if (closure == nullptr) {
248     return false;
249   }
250   closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
251   return grpc_closure_list_append(closure_list, closure);
252 }
253 
254 /// force all success bits in \a list to false
255 inline void grpc_closure_list_fail_all(grpc_closure_list* list,
256                                        grpc_error_handle forced_failure) {
257   for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
258     if (c->error_data.error == 0) {
259       c->error_data.error =
260           grpc_core::internal::StatusAllocHeapPtr(forced_failure);
261     }
262   }
263 }
264 
265 /// append all closures from \a src to \a dst and empty \a src.
266 inline void grpc_closure_list_move(grpc_closure_list* src,
267                                    grpc_closure_list* dst) {
268   if (src->head == nullptr) {
269     return;
270   }
271   if (dst->head == nullptr) {
272     *dst = *src;
273   } else {
274     dst->tail->next_data.next = src->head;
275     dst->tail = src->tail;
276   }
277   src->head = src->tail = nullptr;
278 }
279 
280 /// return whether \a list is empty.
281 inline bool grpc_closure_list_empty(grpc_closure_list closure_list) {
282   return closure_list.head == nullptr;
283 }
284 
285 namespace grpc_core {
286 class Closure {
287  public:
288   static void Run(const DebugLocation& location, grpc_closure* closure,
289                   grpc_error_handle error) {
290     (void)location;
291     if (closure == nullptr) {
292       return;
293     }
294 #ifndef NDEBUG
295     GRPC_TRACE_VLOG(closure, 2)
296         << "running closure " << closure << ": created ["
297         << closure->file_created << ":" << closure->line_created << "]: run ["
298         << location.file() << ":" << location.line() << "]";
299     CHECK_NE(closure->cb, nullptr);
300 #endif
301     closure->cb(closure->cb_arg, error);
302 #ifndef NDEBUG
303     GRPC_TRACE_VLOG(closure, 2) << "closure " << closure << " finished";
304 #endif
305   }
306 };
307 }  // namespace grpc_core
308 
309 #endif  // GRPC_SRC_CORE_LIB_IOMGR_CLOSURE_H
310