• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 
26 #include <grpc/support/atm.h>
27 
28 #include "src/core/lib/gpr/mpscq.h"
29 #include "src/core/lib/gprpp/inlined_vector.h"
30 #include "src/core/lib/iomgr/closure.h"
31 
32 // A simple, lock-free mechanism for serializing activity related to a
33 // single call.  This is similar to a combiner but is more lightweight.
34 //
35 // It requires the callback (or, in the common case where the callback
36 // actually kicks off a chain of callbacks, the last callback in that
37 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
38 // when it is done with the action that was kicked off by the original
39 // callback.
40 
41 extern grpc_core::TraceFlag grpc_call_combiner_trace;
42 
43 typedef struct {
44   gpr_atm size;  // size_t, num closures in queue or currently executing
45   gpr_mpscq queue;
46   // Either 0 (if not cancelled and no cancellation closure set),
47   // a grpc_closure* (if the lowest bit is 0),
48   // or a grpc_error* (if the lowest bit is 1).
49   gpr_atm cancel_state;
50 } grpc_call_combiner;
51 
52 // Assumes memory was initialized to zero.
53 void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
54 
55 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
56 
57 #ifndef NDEBUG
58 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)   \
59   grpc_call_combiner_start((call_combiner), (closure), (error), __FILE__, \
60                            __LINE__, (reason))
61 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
62   grpc_call_combiner_stop((call_combiner), __FILE__, __LINE__, (reason))
63 /// Starts processing \a closure on \a call_combiner.
64 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
65                               grpc_closure* closure, grpc_error* error,
66                               const char* file, int line, const char* reason);
67 /// Yields the call combiner to the next closure in the queue, if any.
68 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
69                              const char* file, int line, const char* reason);
70 #else
71 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
72   grpc_call_combiner_start((call_combiner), (closure), (error), (reason))
73 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
74   grpc_call_combiner_stop((call_combiner), (reason))
75 /// Starts processing \a closure on \a call_combiner.
76 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
77                               grpc_closure* closure, grpc_error* error,
78                               const char* reason);
79 /// Yields the call combiner to the next closure in the queue, if any.
80 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
81                              const char* reason);
82 #endif
83 
84 /// Registers \a closure to be invoked by \a call_combiner when
85 /// grpc_call_combiner_cancel() is called.
86 ///
87 /// Once a closure is registered, it will always be scheduled exactly
88 /// once; this allows the closure to hold references that will be freed
89 /// regardless of whether or not the call was cancelled.  If a cancellation
90 /// does occur, the closure will be scheduled with the cancellation error;
91 /// otherwise, it will be scheduled with GRPC_ERROR_NONE.
92 ///
93 /// The closure will be scheduled in the following cases:
94 /// - If grpc_call_combiner_cancel() was called prior to registering the
95 ///   closure, it will be scheduled immediately with the cancelation error.
96 /// - If grpc_call_combiner_cancel() is called after registering the
97 ///   closure, the closure will be scheduled with the cancellation error.
98 /// - If grpc_call_combiner_set_notify_on_cancel() is called again to
99 ///   register a new cancellation closure, the previous cancellation
100 ///   closure will be scheduled with GRPC_ERROR_NONE.
101 ///
102 /// If \a closure is NULL, then no closure will be invoked on
103 /// cancellation; this effectively unregisters the previously set closure.
104 /// However, most filters will not need to explicitly unregister their
105 /// callbacks, as this is done automatically when the call is destroyed. Filters
106 /// that schedule the cancellation closure on ExecCtx do not need to take a ref
107 /// on the call stack to guarantee closure liveness. This is done by explicitly
108 /// flushing ExecCtx after the unregistration during call destruction.
109 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
110                                              grpc_closure* closure);
111 
112 /// Indicates that the call has been cancelled.
113 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
114                                grpc_error* error);
115 
116 namespace grpc_core {
117 
118 // Helper for running a list of closures in a call combiner.
119 //
120 // Each callback running in the call combiner will eventually be
121 // returned to the surface, at which point the surface will yield the
122 // call combiner.  So when we are running in the call combiner and have
123 // more than one callback to return to the surface, we need to re-enter
124 // the call combiner for all but one of those callbacks.
125 class CallCombinerClosureList {
126  public:
CallCombinerClosureList()127   CallCombinerClosureList() {}
128 
129   // Adds a closure to the list.  The closure must eventually result in
130   // the call combiner being yielded.
Add(grpc_closure * closure,grpc_error * error,const char * reason)131   void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
132     closures_.emplace_back(closure, error, reason);
133   }
134 
135   // Runs all closures in the call combiner and yields the call combiner.
136   //
137   // All but one of the closures in the list will be scheduled via
138   // GRPC_CALL_COMBINER_START(), and the remaining closure will be
139   // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
140   // yielding the call combiner.  If the list is empty, then the call
141   // combiner will be yielded immediately.
RunClosures(grpc_call_combiner * call_combiner)142   void RunClosures(grpc_call_combiner* call_combiner) {
143     if (closures_.empty()) {
144       GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
145       return;
146     }
147     for (size_t i = 1; i < closures_.size(); ++i) {
148       auto& closure = closures_[i];
149       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
150                                closure.reason);
151     }
152     if (grpc_call_combiner_trace.enabled()) {
153       gpr_log(GPR_INFO,
154               "CallCombinerClosureList executing closure while already "
155               "holding call_combiner %p: closure=%p error=%s reason=%s",
156               call_combiner, closures_[0].closure,
157               grpc_error_string(closures_[0].error), closures_[0].reason);
158     }
159     // This will release the call combiner.
160     GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
161     closures_.clear();
162   }
163 
164   // Runs all closures in the call combiner, but does NOT yield the call
165   // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
RunClosuresWithoutYielding(grpc_call_combiner * call_combiner)166   void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
167     for (size_t i = 0; i < closures_.size(); ++i) {
168       auto& closure = closures_[i];
169       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
170                                closure.reason);
171     }
172     closures_.clear();
173   }
174 
size()175   size_t size() const { return closures_.size(); }
176 
177  private:
178   struct CallCombinerClosure {
179     grpc_closure* closure;
180     grpc_error* error;
181     const char* reason;
182 
CallCombinerClosureCallCombinerClosure183     CallCombinerClosure(grpc_closure* closure, grpc_error* error,
184                         const char* reason)
185         : closure(closure), error(error), reason(reason) {}
186   };
187 
188   // There are generally a maximum of 6 closures to run in the call
189   // combiner, one for each pending op.
190   InlinedVector<CallCombinerClosure, 6> closures_;
191 };
192 
193 }  // namespace grpc_core
194 
195 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
196