1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include "absl/container/inlined_vector.h" 27 28 #include <grpc/support/atm.h> 29 30 #include "src/core/lib/gprpp/mpscq.h" 31 #include "src/core/lib/gprpp/ref_counted.h" 32 #include "src/core/lib/gprpp/ref_counted_ptr.h" 33 #include "src/core/lib/iomgr/closure.h" 34 #include "src/core/lib/iomgr/dynamic_annotations.h" 35 #include "src/core/lib/iomgr/exec_ctx.h" 36 37 // A simple, lock-free mechanism for serializing activity related to a 38 // single call. This is similar to a combiner but is more lightweight. 39 // 40 // It requires the callback (or, in the common case where the callback 41 // actually kicks off a chain of callbacks, the last callback in that 42 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) 43 // when it is done with the action that was kicked off by the original 44 // callback. 45 46 namespace grpc_core { 47 48 extern DebugOnlyTraceFlag grpc_call_combiner_trace; 49 50 class CallCombiner { 51 public: 52 CallCombiner(); 53 ~CallCombiner(); 54 55 #ifndef NDEBUG 56 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 57 (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason)) 58 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 59 (call_combiner)->Stop(__FILE__, __LINE__, (reason)) 60 /// Starts processing \a closure. 61 void Start(grpc_closure* closure, grpc_error* error, const char* file, 62 int line, const char* reason); 63 /// Yields the call combiner to the next closure in the queue, if any. 64 void Stop(const char* file, int line, const char* reason); 65 #else 66 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 67 (call_combiner)->Start((closure), (error), (reason)) 68 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 69 (call_combiner)->Stop((reason)) 70 /// Starts processing \a closure. 71 void Start(grpc_closure* closure, grpc_error* error, const char* reason); 72 /// Yields the call combiner to the next closure in the queue, if any. 73 void Stop(const char* reason); 74 #endif 75 76 /// Registers \a closure to be invoked when Cancel() is called. 77 /// 78 /// Once a closure is registered, it will always be scheduled exactly 79 /// once; this allows the closure to hold references that will be freed 80 /// regardless of whether or not the call was cancelled. If a cancellation 81 /// does occur, the closure will be scheduled with the cancellation error; 82 /// otherwise, it will be scheduled with GRPC_ERROR_NONE. 83 /// 84 /// The closure will be scheduled in the following cases: 85 /// - If Cancel() was called prior to registering the closure, it will be 86 /// scheduled immediately with the cancelation error. 87 /// - If Cancel() is called after registering the closure, the closure will 88 /// be scheduled with the cancellation error. 89 /// - If SetNotifyOnCancel() is called again to register a new cancellation 90 /// closure, the previous cancellation closure will be scheduled with 91 /// GRPC_ERROR_NONE. 92 /// 93 /// If \a closure is NULL, then no closure will be invoked on 94 /// cancellation; this effectively unregisters the previously set closure. 95 /// However, most filters will not need to explicitly unregister their 96 /// callbacks, as this is done automatically when the call is destroyed. 97 /// Filters that schedule the cancellation closure on ExecCtx do not need 98 /// to take a ref on the call stack to guarantee closure liveness. This is 99 /// done by explicitly flushing ExecCtx after the unregistration during 100 /// call destruction. 101 void SetNotifyOnCancel(grpc_closure* closure); 102 103 /// Indicates that the call has been cancelled. 104 void Cancel(grpc_error* error); 105 106 private: 107 void ScheduleClosure(grpc_closure* closure, grpc_error* error); 108 #ifdef GRPC_TSAN_ENABLED 109 static void TsanClosure(void* arg, grpc_error* error); 110 #endif 111 112 gpr_atm size_ = 0; // size_t, num closures in queue or currently executing 113 MultiProducerSingleConsumerQueue queue_; 114 // Either 0 (if not cancelled and no cancellation closure set), 115 // a grpc_closure* (if the lowest bit is 0), 116 // or a grpc_error* (if the lowest bit is 1). 117 gpr_atm cancel_state_ = 0; 118 #ifdef GRPC_TSAN_ENABLED 119 // A fake ref-counted lock that is kept alive after the destruction of 120 // grpc_call_combiner, when we are running the original closure. 121 // 122 // Ideally we want to lock and unlock the call combiner as a pointer, when the 123 // callback is called. However, original_closure is free to trigger 124 // anything on the call combiner (including destruction of grpc_call). 125 // Thus, we need a ref-counted structure that can outlive the call combiner. 126 struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> { TsanLockTsanLock127 TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } ~TsanLockTsanLock128 ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } 129 // To avoid double-locking by the same thread, we should acquire/release 130 // the lock only when taken is false. On each acquire taken must be set to 131 // true. 132 std::atomic<bool> taken{false}; 133 }; 134 RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>(); 135 grpc_closure tsan_closure_; 136 grpc_closure* original_closure_; 137 #endif 138 }; 139 140 // Helper for running a list of closures in a call combiner. 141 // 142 // Each callback running in the call combiner will eventually be 143 // returned to the surface, at which point the surface will yield the 144 // call combiner. So when we are running in the call combiner and have 145 // more than one callback to return to the surface, we need to re-enter 146 // the call combiner for all but one of those callbacks. 147 class CallCombinerClosureList { 148 public: CallCombinerClosureList()149 CallCombinerClosureList() {} 150 151 // Adds a closure to the list. The closure must eventually result in 152 // the call combiner being yielded. Add(grpc_closure * closure,grpc_error * error,const char * reason)153 void Add(grpc_closure* closure, grpc_error* error, const char* reason) { 154 closures_.emplace_back(closure, error, reason); 155 } 156 157 // Runs all closures in the call combiner and yields the call combiner. 158 // 159 // All but one of the closures in the list will be scheduled via 160 // GRPC_CALL_COMBINER_START(), and the remaining closure will be 161 // scheduled via ExecCtx::Run(), which will eventually result 162 // in yielding the call combiner. If the list is empty, then the call 163 // combiner will be yielded immediately. RunClosures(CallCombiner * call_combiner)164 void RunClosures(CallCombiner* call_combiner) { 165 if (closures_.empty()) { 166 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); 167 return; 168 } 169 for (size_t i = 1; i < closures_.size(); ++i) { 170 auto& closure = closures_[i]; 171 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 172 closure.reason); 173 } 174 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) { 175 gpr_log(GPR_INFO, 176 "CallCombinerClosureList executing closure while already " 177 "holding call_combiner %p: closure=%p error=%s reason=%s", 178 call_combiner, closures_[0].closure, 179 grpc_error_string(closures_[0].error), closures_[0].reason); 180 } 181 // This will release the call combiner. 182 ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error); 183 closures_.clear(); 184 } 185 186 // Runs all closures in the call combiner, but does NOT yield the call 187 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). RunClosuresWithoutYielding(CallCombiner * call_combiner)188 void RunClosuresWithoutYielding(CallCombiner* call_combiner) { 189 for (size_t i = 0; i < closures_.size(); ++i) { 190 auto& closure = closures_[i]; 191 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 192 closure.reason); 193 } 194 closures_.clear(); 195 } 196 size()197 size_t size() const { return closures_.size(); } 198 199 private: 200 struct CallCombinerClosure { 201 grpc_closure* closure; 202 grpc_error* error; 203 const char* reason; 204 CallCombinerClosureCallCombinerClosure205 CallCombinerClosure(grpc_closure* closure, grpc_error* error, 206 const char* reason) 207 : closure(closure), error(error), reason(reason) {} 208 }; 209 210 // There are generally a maximum of 6 closures to run in the call 211 // combiner, one for each pending op. 212 absl::InlinedVector<CallCombinerClosure, 6> closures_; 213 }; 214 215 } // namespace grpc_core 216 217 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ 218