1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include "absl/container/inlined_vector.h" 27 28 #include <grpc/support/atm.h> 29 30 #include "src/core/lib/gprpp/mpscq.h" 31 #include "src/core/lib/gprpp/ref_counted.h" 32 #include "src/core/lib/gprpp/ref_counted_ptr.h" 33 #include "src/core/lib/iomgr/closure.h" 34 #include "src/core/lib/iomgr/dynamic_annotations.h" 35 #include "src/core/lib/iomgr/exec_ctx.h" 36 37 // A simple, lock-free mechanism for serializing activity related to a 38 // single call. This is similar to a combiner but is more lightweight. 39 // 40 // It requires the callback (or, in the common case where the callback 41 // actually kicks off a chain of callbacks, the last callback in that 42 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) 43 // when it is done with the action that was kicked off by the original 44 // callback. 45 46 namespace grpc_core { 47 48 extern DebugOnlyTraceFlag grpc_call_combiner_trace; 49 50 class CallCombiner { 51 public: 52 CallCombiner(); 53 ~CallCombiner(); 54 55 #ifndef NDEBUG 56 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 57 (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason)) 58 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 59 (call_combiner)->Stop(__FILE__, __LINE__, (reason)) 60 /// Starts processing \a closure. 61 void Start(grpc_closure* closure, grpc_error_handle error, const char* file, 62 int line, const char* reason); 63 /// Yields the call combiner to the next closure in the queue, if any. 64 void Stop(const char* file, int line, const char* reason); 65 #else 66 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 67 (call_combiner)->Start((closure), (error), (reason)) 68 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 69 (call_combiner)->Stop((reason)) 70 /// Starts processing \a closure. 71 void Start(grpc_closure* closure, grpc_error_handle error, 72 const char* reason); 73 /// Yields the call combiner to the next closure in the queue, if any. 74 void Stop(const char* reason); 75 #endif 76 77 /// Registers \a closure to be invoked when Cancel() is called. 78 /// 79 /// Once a closure is registered, it will always be scheduled exactly 80 /// once; this allows the closure to hold references that will be freed 81 /// regardless of whether or not the call was cancelled. If a cancellation 82 /// does occur, the closure will be scheduled with the cancellation error; 83 /// otherwise, it will be scheduled with GRPC_ERROR_NONE. 84 /// 85 /// The closure will be scheduled in the following cases: 86 /// - If Cancel() was called prior to registering the closure, it will be 87 /// scheduled immediately with the cancelation error. 88 /// - If Cancel() is called after registering the closure, the closure will 89 /// be scheduled with the cancellation error. 90 /// - If SetNotifyOnCancel() is called again to register a new cancellation 91 /// closure, the previous cancellation closure will be scheduled with 92 /// GRPC_ERROR_NONE. 93 /// 94 /// If \a closure is NULL, then no closure will be invoked on 95 /// cancellation; this effectively unregisters the previously set closure. 96 /// However, most filters will not need to explicitly unregister their 97 /// callbacks, as this is done automatically when the call is destroyed. 98 void SetNotifyOnCancel(grpc_closure* closure); 99 100 /// Indicates that the call has been cancelled. 101 void Cancel(grpc_error_handle error); 102 103 private: 104 void ScheduleClosure(grpc_closure* closure, grpc_error_handle error); 105 #ifdef GRPC_TSAN_ENABLED 106 static void TsanClosure(void* arg, grpc_error_handle error); 107 #endif 108 109 gpr_atm size_ = 0; // size_t, num closures in queue or currently executing 110 MultiProducerSingleConsumerQueue queue_; 111 // Either 0 (if not cancelled and no cancellation closure set), 112 // a grpc_closure* (if the lowest bit is 0), 113 // or a grpc_error_handle (if the lowest bit is 1). 114 gpr_atm cancel_state_ = 0; 115 #ifdef GRPC_TSAN_ENABLED 116 // A fake ref-counted lock that is kept alive after the destruction of 117 // grpc_call_combiner, when we are running the original closure. 118 // 119 // Ideally we want to lock and unlock the call combiner as a pointer, when the 120 // callback is called. However, original_closure is free to trigger 121 // anything on the call combiner (including destruction of grpc_call). 122 // Thus, we need a ref-counted structure that can outlive the call combiner. 123 struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> { TsanLockTsanLock124 TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } ~TsanLockTsanLock125 ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } 126 // To avoid double-locking by the same thread, we should acquire/release 127 // the lock only when taken is false. On each acquire taken must be set to 128 // true. 129 std::atomic<bool> taken{false}; 130 }; 131 RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>(); 132 grpc_closure tsan_closure_; 133 grpc_closure* original_closure_; 134 #endif 135 }; 136 137 // Helper for running a list of closures in a call combiner. 138 // 139 // Each callback running in the call combiner will eventually be 140 // returned to the surface, at which point the surface will yield the 141 // call combiner. So when we are running in the call combiner and have 142 // more than one callback to return to the surface, we need to re-enter 143 // the call combiner for all but one of those callbacks. 144 class CallCombinerClosureList { 145 public: CallCombinerClosureList()146 CallCombinerClosureList() {} 147 148 // Adds a closure to the list. The closure must eventually result in 149 // the call combiner being yielded. Add(grpc_closure * closure,grpc_error_handle error,const char * reason)150 void Add(grpc_closure* closure, grpc_error_handle error, const char* reason) { 151 closures_.emplace_back(closure, error, reason); 152 } 153 154 // Runs all closures in the call combiner and yields the call combiner. 155 // 156 // All but one of the closures in the list will be scheduled via 157 // GRPC_CALL_COMBINER_START(), and the remaining closure will be 158 // scheduled via ExecCtx::Run(), which will eventually result 159 // in yielding the call combiner. If the list is empty, then the call 160 // combiner will be yielded immediately. RunClosures(CallCombiner * call_combiner)161 void RunClosures(CallCombiner* call_combiner) { 162 if (closures_.empty()) { 163 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); 164 return; 165 } 166 for (size_t i = 1; i < closures_.size(); ++i) { 167 auto& closure = closures_[i]; 168 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 169 closure.reason); 170 } 171 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) { 172 gpr_log(GPR_INFO, 173 "CallCombinerClosureList executing closure while already " 174 "holding call_combiner %p: closure=%p error=%s reason=%s", 175 call_combiner, closures_[0].closure, 176 grpc_error_std_string(closures_[0].error).c_str(), 177 closures_[0].reason); 178 } 179 // This will release the call combiner. 180 ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error); 181 closures_.clear(); 182 } 183 184 // Runs all closures in the call combiner, but does NOT yield the call 185 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). RunClosuresWithoutYielding(CallCombiner * call_combiner)186 void RunClosuresWithoutYielding(CallCombiner* call_combiner) { 187 for (size_t i = 0; i < closures_.size(); ++i) { 188 auto& closure = closures_[i]; 189 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 190 closure.reason); 191 } 192 closures_.clear(); 193 } 194 size()195 size_t size() const { return closures_.size(); } 196 197 private: 198 struct CallCombinerClosure { 199 grpc_closure* closure; 200 grpc_error_handle error; 201 const char* reason; 202 CallCombinerClosureCallCombinerClosure203 CallCombinerClosure(grpc_closure* closure, grpc_error_handle error, 204 const char* reason) 205 : closure(closure), error(error), reason(reason) {} 206 }; 207 208 // There are generally a maximum of 6 closures to run in the call 209 // combiner, one for each pending op. 210 absl::InlinedVector<CallCombinerClosure, 6> closures_; 211 }; 212 213 } // namespace grpc_core 214 215 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ 216