1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 20 #define GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 21 22 #include <grpc/support/atm.h> 23 #include <grpc/support/port_platform.h> 24 #include <stddef.h> 25 26 #include "absl/container/inlined_vector.h" 27 #include "absl/log/log.h" 28 #include "src/core/lib/iomgr/closure.h" 29 #include "src/core/lib/iomgr/dynamic_annotations.h" 30 #include "src/core/lib/iomgr/exec_ctx.h" 31 #include "src/core/util/mpscq.h" 32 #include "src/core/util/ref_counted.h" 33 #include "src/core/util/ref_counted_ptr.h" 34 35 // A simple, lock-free mechanism for serializing activity related to a 36 // single call. This is similar to a combiner but is more lightweight. 37 // 38 // It requires the callback (or, in the common case where the callback 39 // actually kicks off a chain of callbacks, the last callback in that 40 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) 41 // when it is done with the action that was kicked off by the original 42 // callback. 43 44 namespace grpc_core { 45 46 class CallCombiner { 47 public: 48 CallCombiner(); 49 ~CallCombiner(); 50 51 #ifndef NDEBUG 52 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 53 (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason)) 54 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 55 (call_combiner)->Stop(__FILE__, __LINE__, (reason)) 56 /// Starts processing \a closure. 57 void Start(grpc_closure* closure, grpc_error_handle error, const char* file, 58 int line, const char* reason); 59 /// Yields the call combiner to the next closure in the queue, if any. 60 void Stop(const char* file, int line, const char* reason); 61 #else 62 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ 63 (call_combiner)->Start((closure), (error), (reason)) 64 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ 65 (call_combiner)->Stop((reason)) 66 /// Starts processing \a closure. 67 void Start(grpc_closure* closure, grpc_error_handle error, 68 const char* reason); 69 /// Yields the call combiner to the next closure in the queue, if any. 70 void Stop(const char* reason); 71 #endif 72 73 /// Registers \a closure to be invoked when Cancel() is called. 74 /// 75 /// Once a closure is registered, it will always be scheduled exactly 76 /// once; this allows the closure to hold references that will be freed 77 /// regardless of whether or not the call was cancelled. If a cancellation 78 /// does occur, the closure will be scheduled with the cancellation error; 79 /// otherwise, it will be scheduled with absl::OkStatus(). 80 /// 81 /// The closure will be scheduled in the following cases: 82 /// - If Cancel() was called prior to registering the closure, it will be 83 /// scheduled immediately with the cancellation error. 84 /// - If Cancel() is called after registering the closure, the closure will 85 /// be scheduled with the cancellation error. 86 /// - If SetNotifyOnCancel() is called again to register a new cancellation 87 /// closure, the previous cancellation closure will be scheduled with 88 /// absl::OkStatus(). 89 /// 90 /// If \a closure is NULL, then no closure will be invoked on 91 /// cancellation; this effectively unregisters the previously set closure. 92 /// However, most filters will not need to explicitly unregister their 93 /// callbacks, as this is done automatically when the call is destroyed. 94 void SetNotifyOnCancel(grpc_closure* closure); 95 96 /// Indicates that the call has been cancelled. 97 void Cancel(grpc_error_handle error); 98 99 private: 100 void ScheduleClosure(grpc_closure* closure, grpc_error_handle error); 101 #ifdef GRPC_TSAN_ENABLED 102 static void TsanClosure(void* arg, grpc_error_handle error); 103 #endif 104 105 gpr_atm size_ = 0; // size_t, num closures in queue or currently executing 106 MultiProducerSingleConsumerQueue queue_; 107 // Either 0 (if not cancelled and no cancellation closure set), 108 // a grpc_closure* (if the lowest bit is 0), 109 // or a grpc_error_handle (if the lowest bit is 1). 110 gpr_atm cancel_state_ = 0; 111 #ifdef GRPC_TSAN_ENABLED 112 // A fake ref-counted lock that is kept alive after the destruction of 113 // grpc_call_combiner, when we are running the original closure. 114 // 115 // Ideally we want to lock and unlock the call combiner as a pointer, when the 116 // callback is called. However, original_closure is free to trigger 117 // anything on the call combiner (including destruction of grpc_call). 118 // Thus, we need a ref-counted structure that can outlive the call combiner. 119 struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> { TsanLockTsanLock120 TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } ~TsanLockTsanLock121 ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } 122 // To avoid double-locking by the same thread, we should acquire/release 123 // the lock only when taken is false. On each acquire taken must be set to 124 // true. 125 std::atomic<bool> taken{false}; 126 }; 127 RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>(); 128 grpc_closure tsan_closure_; 129 grpc_closure* original_closure_; 130 #endif 131 }; 132 133 // Helper for running a list of closures in a call combiner. 134 // 135 // Each callback running in the call combiner will eventually be 136 // returned to the surface, at which point the surface will yield the 137 // call combiner. So when we are running in the call combiner and have 138 // more than one callback to return to the surface, we need to re-enter 139 // the call combiner for all but one of those callbacks. 140 class CallCombinerClosureList { 141 public: CallCombinerClosureList()142 CallCombinerClosureList() {} 143 144 // Adds a closure to the list. The closure must eventually result in 145 // the call combiner being yielded. Add(grpc_closure * closure,grpc_error_handle error,const char * reason)146 void Add(grpc_closure* closure, grpc_error_handle error, const char* reason) { 147 closures_.emplace_back(closure, error, reason); 148 } 149 150 // Runs all closures in the call combiner and yields the call combiner. 151 // 152 // All but one of the closures in the list will be scheduled via 153 // GRPC_CALL_COMBINER_START(), and the remaining closure will be 154 // scheduled via ExecCtx::Run(), which will eventually result 155 // in yielding the call combiner. If the list is empty, then the call 156 // combiner will be yielded immediately. RunClosures(CallCombiner * call_combiner)157 void RunClosures(CallCombiner* call_combiner) { 158 if (closures_.empty()) { 159 GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); 160 return; 161 } 162 for (size_t i = 1; i < closures_.size(); ++i) { 163 auto& closure = closures_[i]; 164 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 165 closure.reason); 166 } 167 GRPC_TRACE_LOG(call_combiner, INFO) 168 << "CallCombinerClosureList executing closure while already " 169 "holding call_combiner " 170 << call_combiner << ": closure=" << closures_[0].closure->DebugString() 171 << " error=" << StatusToString(closures_[0].error) 172 << " reason=" << closures_[0].reason; 173 // This will release the call combiner. 174 ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error); 175 closures_.clear(); 176 } 177 178 // Runs all closures in the call combiner, but does NOT yield the call 179 // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). RunClosuresWithoutYielding(CallCombiner * call_combiner)180 void RunClosuresWithoutYielding(CallCombiner* call_combiner) { 181 for (size_t i = 0; i < closures_.size(); ++i) { 182 auto& closure = closures_[i]; 183 GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, 184 closure.reason); 185 } 186 closures_.clear(); 187 } 188 size()189 size_t size() const { return closures_.size(); } 190 191 private: 192 struct CallCombinerClosure { 193 grpc_closure* closure; 194 grpc_error_handle error; 195 const char* reason; 196 CallCombinerClosureCallCombinerClosure197 CallCombinerClosure(grpc_closure* closure, grpc_error_handle error, 198 const char* reason) 199 : closure(closure), error(error), reason(reason) {} 200 }; 201 202 // There are generally a maximum of 6 closures to run in the call 203 // combiner, one for each pending op. 204 absl::InlinedVector<CallCombinerClosure, 6> closures_; 205 }; 206 207 } // namespace grpc_core 208 209 #endif // GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H 210