1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include <grpc/support/atm.h> 27 28 #include "src/core/lib/gpr/mpscq.h" 29 #include "src/core/lib/gprpp/inlined_vector.h" 30 #include "src/core/lib/iomgr/closure.h" 31 32 // A simple, lock-free mechanism for serializing activity related to a 33 // single call. This is similar to a combiner but is more lightweight. 34 // 35 // It requires the callback (or, in the common case where the callback 36 // actually kicks off a chain of callbacks, the last callback in that 37 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) 38 // when it is done with the action that was kicked off by the original 39 // callback. 40 41 extern grpc_core::TraceFlag grpc_call_combiner_trace; 42 43 typedef struct { 44 gpr_atm size; // size_t, num closures in queue or currently executing 45 gpr_mpscq queue; 46 // Either 0 (if not cancelled and no cancellation closure set), 47 // a grpc_closure* (if the lowest bit is 0), 48 // or a grpc_error* (if the lowest bit is 1). 49 gpr_atm cancel_state; 50 } grpc_call_combiner; 51 52 // Assumes memory was initialized to zero. 53 void grpc_call_combiner_init(grpc_call_combiner* call_combiner); 54 55 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); 56 57 #ifndef NDEBUG 58 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 59 grpc_call_combiner_start((call_combiner), (closure), (error), __FILE__, \ 60 __LINE__, (reason)) 61 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 62 grpc_call_combiner_stop((call_combiner), __FILE__, __LINE__, (reason)) 63 /// Starts processing \a closure on \a call_combiner. 64 void grpc_call_combiner_start(grpc_call_combiner* call_combiner, 65 grpc_closure* closure, grpc_error* error, 66 const char* file, int line, const char* reason); 67 /// Yields the call combiner to the next closure in the queue, if any. 68 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner, 69 const char* file, int line, const char* reason); 70 #else 71 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 72 grpc_call_combiner_start((call_combiner), (closure), (error), (reason)) 73 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 74 grpc_call_combiner_stop((call_combiner), (reason)) 75 /// Starts processing \a closure on \a call_combiner. 76 void grpc_call_combiner_start(grpc_call_combiner* call_combiner, 77 grpc_closure* closure, grpc_error* error, 78 const char* reason); 79 /// Yields the call combiner to the next closure in the queue, if any. 80 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner, 81 const char* reason); 82 #endif 83 84 /// Registers \a closure to be invoked by \a call_combiner when 85 /// grpc_call_combiner_cancel() is called. 86 /// 87 /// Once a closure is registered, it will always be scheduled exactly 88 /// once; this allows the closure to hold references that will be freed 89 /// regardless of whether or not the call was cancelled. If a cancellation 90 /// does occur, the closure will be scheduled with the cancellation error; 91 /// otherwise, it will be scheduled with GRPC_ERROR_NONE. 92 /// 93 /// The closure will be scheduled in the following cases: 94 /// - If grpc_call_combiner_cancel() was called prior to registering the 95 /// closure, it will be scheduled immediately with the cancelation error. 96 /// - If grpc_call_combiner_cancel() is called after registering the 97 /// closure, the closure will be scheduled with the cancellation error. 98 /// - If grpc_call_combiner_set_notify_on_cancel() is called again to 99 /// register a new cancellation closure, the previous cancellation 100 /// closure will be scheduled with GRPC_ERROR_NONE. 101 /// 102 /// If \a closure is NULL, then no closure will be invoked on 103 /// cancellation; this effectively unregisters the previously set closure. 104 /// However, most filters will not need to explicitly unregister their 105 /// callbacks, as this is done automatically when the call is destroyed. Filters 106 /// that schedule the cancellation closure on ExecCtx do not need to take a ref 107 /// on the call stack to guarantee closure liveness. This is done by explicitly 108 /// flushing ExecCtx after the unregistration during call destruction. 109 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, 110 grpc_closure* closure); 111 112 /// Indicates that the call has been cancelled. 113 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, 114 grpc_error* error); 115 116 namespace grpc_core { 117 118 // Helper for running a list of closures in a call combiner. 119 // 120 // Each callback running in the call combiner will eventually be 121 // returned to the surface, at which point the surface will yield the 122 // call combiner. So when we are running in the call combiner and have 123 // more than one callback to return to the surface, we need to re-enter 124 // the call combiner for all but one of those callbacks. 125 class CallCombinerClosureList { 126 public: CallCombinerClosureList()127 CallCombinerClosureList() {} 128 129 // Adds a closure to the list. The closure must eventually result in 130 // the call combiner being yielded. Add(grpc_closure * closure,grpc_error * error,const char * reason)131 void Add(grpc_closure* closure, grpc_error* error, const char* reason) { 132 closures_.emplace_back(closure, error, reason); 133 } 134 135 // Runs all closures in the call combiner and yields the call combiner. 136 // 137 // All but one of the closures in the list will be scheduled via 138 // GRPC_CALL_COMBINER_START(), and the remaining closure will be 139 // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in 140 // yielding the call combiner. If the list is empty, then the call 141 // combiner will be yielded immediately. RunClosures(grpc_call_combiner * call_combiner)142 void RunClosures(grpc_call_combiner* call_combiner) { 143 if (closures_.empty()) { 144 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); 145 return; 146 } 147 for (size_t i = 1; i < closures_.size(); ++i) { 148 auto& closure = closures_[i]; 149 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 150 closure.reason); 151 } 152 if (grpc_call_combiner_trace.enabled()) { 153 gpr_log(GPR_INFO, 154 "CallCombinerClosureList executing closure while already " 155 "holding call_combiner %p: closure=%p error=%s reason=%s", 156 call_combiner, closures_[0].closure, 157 grpc_error_string(closures_[0].error), closures_[0].reason); 158 } 159 // This will release the call combiner. 160 GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); 161 closures_.clear(); 162 } 163 164 // Runs all closures in the call combiner, but does NOT yield the call 165 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). RunClosuresWithoutYielding(grpc_call_combiner * call_combiner)166 void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { 167 for (size_t i = 0; i < closures_.size(); ++i) { 168 auto& closure = closures_[i]; 169 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 170 closure.reason); 171 } 172 closures_.clear(); 173 } 174 size()175 size_t size() const { return closures_.size(); } 176 177 private: 178 struct CallCombinerClosure { 179 grpc_closure* closure; 180 grpc_error* error; 181 const char* reason; 182 CallCombinerClosureCallCombinerClosure183 CallCombinerClosure(grpc_closure* closure, grpc_error* error, 184 const char* reason) 185 : closure(closure), error(error), reason(reason) {} 186 }; 187 188 // There are generally a maximum of 6 closures to run in the call 189 // combiner, one for each pending op. 190 InlinedVector<CallCombinerClosure, 6> closures_; 191 }; 192 193 } // namespace grpc_core 194 195 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ 196