• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H
20 #define GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H
21 
22 #include <grpc/support/atm.h>
23 #include <grpc/support/port_platform.h>
24 #include <stddef.h>
25 
26 #include "absl/container/inlined_vector.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/iomgr/closure.h"
29 #include "src/core/lib/iomgr/dynamic_annotations.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 #include "src/core/util/mpscq.h"
32 #include "src/core/util/ref_counted.h"
33 #include "src/core/util/ref_counted_ptr.h"
34 
35 // A simple, lock-free mechanism for serializing activity related to a
36 // single call.  This is similar to a combiner but is more lightweight.
37 //
38 // It requires the callback (or, in the common case where the callback
39 // actually kicks off a chain of callbacks, the last callback in that
40 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
41 // when it is done with the action that was kicked off by the original
42 // callback.
43 
44 namespace grpc_core {
45 
46 class CallCombiner {
47  public:
48   CallCombiner();
49   ~CallCombiner();
50 
51 #ifndef NDEBUG
52 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
53   (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason))
54 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
55   (call_combiner)->Stop(__FILE__, __LINE__, (reason))
56   /// Starts processing \a closure.
57   void Start(grpc_closure* closure, grpc_error_handle error, const char* file,
58              int line, const char* reason);
59   /// Yields the call combiner to the next closure in the queue, if any.
60   void Stop(const char* file, int line, const char* reason);
61 #else
62 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
63   (call_combiner)->Start((closure), (error), (reason))
64 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
65   (call_combiner)->Stop((reason))
66   /// Starts processing \a closure.
67   void Start(grpc_closure* closure, grpc_error_handle error,
68              const char* reason);
69   /// Yields the call combiner to the next closure in the queue, if any.
70   void Stop(const char* reason);
71 #endif
72 
73   /// Registers \a closure to be invoked when Cancel() is called.
74   ///
75   /// Once a closure is registered, it will always be scheduled exactly
76   /// once; this allows the closure to hold references that will be freed
77   /// regardless of whether or not the call was cancelled.  If a cancellation
78   /// does occur, the closure will be scheduled with the cancellation error;
79   /// otherwise, it will be scheduled with absl::OkStatus().
80   ///
81   /// The closure will be scheduled in the following cases:
82   /// - If Cancel() was called prior to registering the closure, it will be
83   ///   scheduled immediately with the cancellation error.
84   /// - If Cancel() is called after registering the closure, the closure will
85   ///   be scheduled with the cancellation error.
86   /// - If SetNotifyOnCancel() is called again to register a new cancellation
87   ///   closure, the previous cancellation closure will be scheduled with
88   ///   absl::OkStatus().
89   ///
90   /// If \a closure is NULL, then no closure will be invoked on
91   /// cancellation; this effectively unregisters the previously set closure.
92   /// However, most filters will not need to explicitly unregister their
93   /// callbacks, as this is done automatically when the call is destroyed.
94   void SetNotifyOnCancel(grpc_closure* closure);
95 
96   /// Indicates that the call has been cancelled.
97   void Cancel(grpc_error_handle error);
98 
99  private:
100   void ScheduleClosure(grpc_closure* closure, grpc_error_handle error);
101 #ifdef GRPC_TSAN_ENABLED
102   static void TsanClosure(void* arg, grpc_error_handle error);
103 #endif
104 
105   gpr_atm size_ = 0;  // size_t, num closures in queue or currently executing
106   MultiProducerSingleConsumerQueue queue_;
107   // Either 0 (if not cancelled and no cancellation closure set),
108   // a grpc_closure* (if the lowest bit is 0),
109   // or a grpc_error_handle (if the lowest bit is 1).
110   gpr_atm cancel_state_ = 0;
111 #ifdef GRPC_TSAN_ENABLED
112   // A fake ref-counted lock that is kept alive after the destruction of
113   // grpc_call_combiner, when we are running the original closure.
114   //
115   // Ideally we want to lock and unlock the call combiner as a pointer, when the
116   // callback is called. However, original_closure is free to trigger
117   // anything on the call combiner (including destruction of grpc_call).
118   // Thus, we need a ref-counted structure that can outlive the call combiner.
119   struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> {
TsanLockTsanLock120     TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
~TsanLockTsanLock121     ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
122     // To avoid double-locking by the same thread, we should acquire/release
123     // the lock only when taken is false. On each acquire taken must be set to
124     // true.
125     std::atomic<bool> taken{false};
126   };
127   RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>();
128   grpc_closure tsan_closure_;
129   grpc_closure* original_closure_;
130 #endif
131 };
132 
133 // Helper for running a list of closures in a call combiner.
134 //
135 // Each callback running in the call combiner will eventually be
136 // returned to the surface, at which point the surface will yield the
137 // call combiner.  So when we are running in the call combiner and have
138 // more than one callback to return to the surface, we need to re-enter
139 // the call combiner for all but one of those callbacks.
140 class CallCombinerClosureList {
141  public:
CallCombinerClosureList()142   CallCombinerClosureList() {}
143 
144   // Adds a closure to the list.  The closure must eventually result in
145   // the call combiner being yielded.
Add(grpc_closure * closure,grpc_error_handle error,const char * reason)146   void Add(grpc_closure* closure, grpc_error_handle error, const char* reason) {
147     closures_.emplace_back(closure, error, reason);
148   }
149 
150   // Runs all closures in the call combiner and yields the call combiner.
151   //
152   // All but one of the closures in the list will be scheduled via
153   // GRPC_CALL_COMBINER_START(), and the remaining closure will be
154   // scheduled via ExecCtx::Run(), which will eventually result
155   // in yielding the call combiner.  If the list is empty, then the call
156   // combiner will be yielded immediately.
RunClosures(CallCombiner * call_combiner)157   void RunClosures(CallCombiner* call_combiner) {
158     if (closures_.empty()) {
159       GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
160       return;
161     }
162     for (size_t i = 1; i < closures_.size(); ++i) {
163       auto& closure = closures_[i];
164       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
165                                closure.reason);
166     }
167     GRPC_TRACE_LOG(call_combiner, INFO)
168         << "CallCombinerClosureList executing closure while already "
169            "holding call_combiner "
170         << call_combiner << ": closure=" << closures_[0].closure->DebugString()
171         << " error=" << StatusToString(closures_[0].error)
172         << " reason=" << closures_[0].reason;
173     // This will release the call combiner.
174     ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error);
175     closures_.clear();
176   }
177 
178   // Runs all closures in the call combiner, but does NOT yield the call
179   // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
RunClosuresWithoutYielding(CallCombiner * call_combiner)180   void RunClosuresWithoutYielding(CallCombiner* call_combiner) {
181     for (size_t i = 0; i < closures_.size(); ++i) {
182       auto& closure = closures_[i];
183       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
184                                closure.reason);
185     }
186     closures_.clear();
187   }
188 
size()189   size_t size() const { return closures_.size(); }
190 
191  private:
192   struct CallCombinerClosure {
193     grpc_closure* closure;
194     grpc_error_handle error;
195     const char* reason;
196 
CallCombinerClosureCallCombinerClosure197     CallCombinerClosure(grpc_closure* closure, grpc_error_handle error,
198                         const char* reason)
199         : closure(closure), error(error), reason(reason) {}
200   };
201 
202   // There are generally a maximum of 6 closures to run in the call
203   // combiner, one for each pending op.
204   absl::InlinedVector<CallCombinerClosure, 6> closures_;
205 };
206 
207 }  // namespace grpc_core
208 
209 #endif  // GRPC_SRC_CORE_LIB_IOMGR_CALL_COMBINER_H
210