• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/util/work_serializer.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21 #include <stdint.h>
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <chrono>
26 #include <functional>
27 #include <memory>
28 #include <thread>
29 #include <utility>
30 
31 #include "absl/container/inlined_vector.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/experiments/experiments.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/telemetry/stats.h"
38 #include "src/core/telemetry/stats_data.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/latent_see.h"
41 #include "src/core/util/mpscq.h"
42 #include "src/core/util/orphanable.h"
43 #include "src/core/util/sync.h"
44 
45 namespace grpc_core {
46 
47 //
48 // WorkSerializer::WorkSerializerImpl
49 //
50 
51 class WorkSerializer::WorkSerializerImpl : public Orphanable {
52  public:
53   virtual void Run(std::function<void()> callback,
54                    const DebugLocation& location) = 0;
55   virtual void Schedule(std::function<void()> callback,
56                         const DebugLocation& location) = 0;
57   virtual void DrainQueue() = 0;
58 
59 #ifndef NDEBUG
60   virtual bool RunningInWorkSerializer() const = 0;
61 #endif
62 };
63 
64 //
65 // WorkSerializer::LegacyWorkSerializer
66 //
67 
68 class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
69  public:
70   void Run(std::function<void()> callback,
71            const DebugLocation& location) override;
72   void Schedule(std::function<void()> callback,
73                 const DebugLocation& location) override;
74   void DrainQueue() override;
75   void Orphan() override;
76 
77 #ifndef NDEBUG
RunningInWorkSerializer() const78   bool RunningInWorkSerializer() const override {
79     return std::this_thread::get_id() == current_thread_;
80   }
81 #endif
82 
83  private:
84   struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::LegacyWorkSerializer::CallbackWrapper85     CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
86         : callback(std::move(cb)), location(loc) {}
87 
88     MultiProducerSingleConsumerQueue::Node mpscq_node;
89     const std::function<void()> callback;
90     const DebugLocation location;
91   };
92 
93   // Callers of DrainQueueOwned should make sure to grab the lock on the
94   // workserializer with
95   //
96   //   prev_ref_pair =
97   //     refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
98   //
99   // and only invoke DrainQueueOwned() if there was previously no owner. Note
100   // that the queue size is also incremented as part of the fetch_add to allow
101   // the callers to add a callback to the queue if another thread already holds
102   // the lock to the work serializer.
103   void DrainQueueOwned();
104 
105   // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
106   // queue size (i.e., refs).
MakeRefPair(uint16_t owners,uint64_t size)107   static uint64_t MakeRefPair(uint16_t owners, uint64_t size) {
108     CHECK_EQ(size >> 48, 0u);
109     return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
110   }
GetOwners(uint64_t ref_pair)111   static uint32_t GetOwners(uint64_t ref_pair) {
112     return static_cast<uint32_t>(ref_pair >> 48);
113   }
GetSize(uint64_t ref_pair)114   static uint64_t GetSize(uint64_t ref_pair) {
115     return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
116   }
117 
118 #ifndef NDEBUG
SetCurrentThread()119   void SetCurrentThread() { current_thread_ = std::this_thread::get_id(); }
ClearCurrentThread()120   void ClearCurrentThread() { current_thread_ = std::thread::id(); }
121 #else
SetCurrentThread()122   void SetCurrentThread() {}
ClearCurrentThread()123   void ClearCurrentThread() {}
124 #endif
125 
126   // An initial size of 1 keeps track of whether the work serializer has been
127   // orphaned.
128   std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
129   MultiProducerSingleConsumerQueue queue_;
130 #ifndef NDEBUG
131   std::thread::id current_thread_;
132 #endif
133 };
134 
Run(std::function<void ()> callback,const DebugLocation & location)135 void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
136                                                const DebugLocation& location) {
137   GRPC_TRACE_LOG(work_serializer, INFO)
138       << "WorkSerializer::Run() " << this << " Scheduling callback ["
139       << location.file() << ":" << location.line() << "]";
140   // Increment queue size for the new callback and owner count to attempt to
141   // take ownership of the WorkSerializer.
142   const uint64_t prev_ref_pair =
143       refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
144   // The work serializer should not have been orphaned.
145   DCHECK_GT(GetSize(prev_ref_pair), 0u);
146   if (GetOwners(prev_ref_pair) == 0) {
147     // We took ownership of the WorkSerializer. Invoke callback and drain queue.
148     SetCurrentThread();
149     GRPC_TRACE_LOG(work_serializer, INFO) << "  Executing immediately";
150     callback();
151     // Delete the callback while still holding the WorkSerializer, so
152     // that any refs being held by the callback via lambda captures will
153     // be destroyed inside the WorkSerializer.
154     callback = nullptr;
155     DrainQueueOwned();
156   } else {
157     // Another thread is holding the WorkSerializer, so decrement the
158     // ownership count we just added and queue the callback.
159     refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
160     CallbackWrapper* cb_wrapper =
161         new CallbackWrapper(std::move(callback), location);
162     GRPC_TRACE_LOG(work_serializer, INFO)
163         << "  Scheduling on queue : item " << cb_wrapper;
164     queue_.Push(&cb_wrapper->mpscq_node);
165   }
166 }
167 
Schedule(std::function<void ()> callback,const DebugLocation & location)168 void WorkSerializer::LegacyWorkSerializer::Schedule(
169     std::function<void()> callback, const DebugLocation& location) {
170   CallbackWrapper* cb_wrapper =
171       new CallbackWrapper(std::move(callback), location);
172   GRPC_TRACE_LOG(work_serializer, INFO)
173       << "WorkSerializer::Schedule() " << this << " Scheduling callback "
174       << cb_wrapper << " [" << location.file() << ":" << location.line() << "]";
175   refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
176   queue_.Push(&cb_wrapper->mpscq_node);
177 }
178 
Orphan()179 void WorkSerializer::LegacyWorkSerializer::Orphan() {
180   GRPC_TRACE_LOG(work_serializer, INFO) << "WorkSerializer::Orphan() " << this;
181   const uint64_t prev_ref_pair =
182       refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
183   if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
184     GRPC_TRACE_LOG(work_serializer, INFO) << "  Destroying";
185     delete this;
186   }
187 }
188 
189 // The thread that calls this loans itself to the work serializer so as to
190 // execute all the scheduled callbacks.
DrainQueue()191 void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
192   GRPC_TRACE_LOG(work_serializer, INFO)
193       << "WorkSerializer::DrainQueue() " << this;
194   // Attempt to take ownership of the WorkSerializer. Also increment the queue
195   // size as required by `DrainQueueOwned()`.
196   const uint64_t prev_ref_pair =
197       refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
198   if (GetOwners(prev_ref_pair) == 0) {
199     SetCurrentThread();
200     // We took ownership of the WorkSerializer. Drain the queue.
201     DrainQueueOwned();
202   } else {
203     // Another thread is holding the WorkSerializer, so decrement the
204     // ownership count we just added and queue a no-op callback.
205     refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
206     CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
207     queue_.Push(&cb_wrapper->mpscq_node);
208   }
209 }
210 
DrainQueueOwned()211 void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
212   GRPC_TRACE_LOG(work_serializer, INFO)
213       << "WorkSerializer::DrainQueueOwned() " << this;
214   while (true) {
215     auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
216     // It is possible that while draining the queue, the last callback ended
217     // up orphaning the work serializer. In that case, delete the object.
218     if (GetSize(prev_ref_pair) == 1) {
219       GRPC_TRACE_LOG(work_serializer, INFO) << "  Queue Drained. Destroying";
220       delete this;
221       return;
222     }
223     if (GetSize(prev_ref_pair) == 2) {
224       // Queue drained. Give up ownership but only if queue remains empty.
225       // Reset current_thread_ before giving up ownership to avoid TSAN
226       // race.  If we don't wind up giving up ownership, we'll set this
227       // again below before we pull the next callback out of the queue.
228       ClearCurrentThread();
229       uint64_t expected = MakeRefPair(1, 1);
230       if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
231                                         std::memory_order_acq_rel)) {
232         // Queue is drained.
233         return;
234       }
235       if (GetSize(expected) == 0) {
236         // WorkSerializer got orphaned while this was running
237         GRPC_TRACE_LOG(work_serializer, INFO) << "  Queue Drained. Destroying";
238         delete this;
239         return;
240       }
241       // Didn't wind up giving up ownership, so set current_thread_ again.
242       SetCurrentThread();
243     }
244     // There is at least one callback on the queue. Pop the callback from the
245     // queue and execute it.
246     CallbackWrapper* cb_wrapper = nullptr;
247     bool empty_unused;
248     while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
249                 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
250       // This can happen due to a race condition within the mpscq
251       // implementation or because of a race with Run()/Schedule().
252       GRPC_TRACE_LOG(work_serializer, INFO)
253           << "  Queue returned nullptr, trying again";
254     }
255     GRPC_TRACE_LOG(work_serializer, INFO)
256         << "  Running item " << cb_wrapper << " : callback scheduled at ["
257         << cb_wrapper->location.file() << ":" << cb_wrapper->location.line()
258         << "]";
259     cb_wrapper->callback();
260     delete cb_wrapper;
261   }
262 }
263 
264 //
265 // WorkSerializer::DispatchingWorkSerializer
266 //
267 
268 // DispatchingWorkSerializer: executes callbacks one at a time on EventEngine.
269 // One at a time guarantees that fixed size thread pools in EventEngine
270 // implementations are not starved of threads by long running work
271 // serializers. We implement EventEngine::Closure directly to avoid allocating
272 // once per callback in the queue when scheduling.
273 class WorkSerializer::DispatchingWorkSerializer final
274     : public WorkSerializerImpl,
275       public grpc_event_engine::experimental::EventEngine::Closure {
276  public:
DispatchingWorkSerializer(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)277   explicit DispatchingWorkSerializer(
278       std::shared_ptr<grpc_event_engine::experimental::EventEngine>
279           event_engine)
280       : event_engine_(std::move(event_engine)) {}
281   void Run(std::function<void()> callback,
282            const DebugLocation& location) override;
Schedule(std::function<void ()> callback,const DebugLocation & location)283   void Schedule(std::function<void()> callback,
284                 const DebugLocation& location) override {
285     // We always dispatch to event engine, so Schedule and Run share
286     // semantics.
287     Run(callback, location);
288   }
DrainQueue()289   void DrainQueue() override {}
290   void Orphan() override;
291 
292   // Override EventEngine::Closure
293   void Run() override;
294 
295 #ifndef NDEBUG
RunningInWorkSerializer() const296   bool RunningInWorkSerializer() const override {
297     return running_work_serializer_ == this;
298   }
299 #endif
300 
301  private:
302   // Wrapper to capture DebugLocation for the callback.
303   struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::DispatchingWorkSerializer::CallbackWrapper304     CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
305         : callback(std::move(cb)), location(loc) {}
306     std::function<void()> callback;
307     // GPR_NO_UNIQUE_ADDRESS means this is 0 sized in release builds.
308     GPR_NO_UNIQUE_ADDRESS DebugLocation location;
309   };
310   using CallbackVector = absl::InlinedVector<CallbackWrapper, 1>;
311 
312   // Refill processing_ from incoming_
313   // If processing_ is empty, also update running_ and return false.
314   // If additionally orphaned, will also delete this (therefore, it's not safe
315   // to touch any member variables if Refill returns false).
316   bool Refill();
317 
318   // Perform the parts of Refill that need to acquire mu_
319   // Returns a tri-state indicating whether we were refilled successfully (=>
320   // keep running), or finished, and then if we were orphaned.
321   enum class RefillResult { kRefilled, kFinished, kFinishedAndOrphaned };
322   RefillResult RefillInner();
323 
324 #ifndef NDEBUG
SetCurrentThread()325   void SetCurrentThread() { running_work_serializer_ = this; }
ClearCurrentThread()326   void ClearCurrentThread() { running_work_serializer_ = nullptr; }
327 #else
SetCurrentThread()328   void SetCurrentThread() {}
ClearCurrentThread()329   void ClearCurrentThread() {}
330 #endif
331 
332   // Member variables are roughly sorted to keep processing cache lines
333   // separated from incoming cache lines.
334 
335   // Callbacks that are currently being processed.
336   // Only accessed by: a Run() call going from not-running to running, or a
337   // work item being executed in EventEngine -- ie this does not need a mutex
338   // because all access is serialized. Stored in reverse execution order so
339   // that callbacks can be `pop_back()`'d on completion to free up any
340   // resources they hold.
341   CallbackVector processing_;
342   // EventEngine instance upon which we'll do our work.
343   const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
344       event_engine_;
345   std::chrono::steady_clock::time_point running_start_time_
346       ABSL_GUARDED_BY(mu_);
347   std::chrono::steady_clock::duration time_running_items_;
348   uint64_t items_processed_during_run_;
349   // Flags containing run state:
350   // - running_ goes from false->true whenever the first callback is scheduled
351   //   on an idle WorkSerializer, and transitions back to false after the last
352   //   callback scheduled is completed and the WorkSerializer is again idle.
353   // - orphaned_ transitions to true once upon Orphan being called.
354   // When orphaned_ is true and running_ is false, the
355   // DispatchingWorkSerializer instance is deleted.
356   bool running_ ABSL_GUARDED_BY(mu_) = false;
357   bool orphaned_ ABSL_GUARDED_BY(mu_) = false;
358   Mutex mu_;
359   // Queued callbacks. New work items land here, and when processing_ is
360   // drained we move this entire queue into processing_ and work on draining
361   // it again. In low traffic scenarios this gives two mutex acquisitions per
362   // work item, but as load increases we get some natural batching and the
363   // rate of mutex acquisitions per work item tends towards 1.
364   CallbackVector incoming_ ABSL_GUARDED_BY(mu_);
365 
366   GPR_NO_UNIQUE_ADDRESS latent_see::Flow flow_;
367 
368 #ifndef NDEBUG
369   static thread_local DispatchingWorkSerializer* running_work_serializer_;
370 #endif
371 };
372 
373 #ifndef NDEBUG
374 thread_local WorkSerializer::DispatchingWorkSerializer*
375     WorkSerializer::DispatchingWorkSerializer::running_work_serializer_ =
376         nullptr;
377 #endif
378 
Orphan()379 void WorkSerializer::DispatchingWorkSerializer::Orphan() {
380   ReleasableMutexLock lock(&mu_);
381   // If we're not running, then we can delete immediately.
382   if (!running_) {
383     lock.Release();
384     delete this;
385     return;
386   }
387   // Otherwise store a flag to delete when we're done.
388   orphaned_ = true;
389 }
390 
391 // Implementation of WorkSerializerImpl::Run
Run(std::function<void ()> callback,const DebugLocation & location)392 void WorkSerializer::DispatchingWorkSerializer::Run(
393     std::function<void()> callback, const DebugLocation& location) {
394   GRPC_TRACE_LOG(work_serializer, INFO)
395       << "WorkSerializer[" << this << "] Scheduling callback ["
396       << location.file() << ":" << location.line() << "]";
397   global_stats().IncrementWorkSerializerItemsEnqueued();
398   MutexLock lock(&mu_);
399   if (!running_) {
400     // If we were previously idle, insert this callback directly into the
401     // empty processing_ list and start running.
402     running_ = true;
403     running_start_time_ = std::chrono::steady_clock::now();
404     items_processed_during_run_ = 0;
405     time_running_items_ = std::chrono::steady_clock::duration();
406     CHECK(processing_.empty());
407     processing_.emplace_back(std::move(callback), location);
408     event_engine_->Run(this);
409   } else {
410     // We are already running, so add this callback to the incoming_ list.
411     // The work loop will eventually get to it.
412     incoming_.emplace_back(std::move(callback), location);
413   }
414 }
415 
416 // Implementation of EventEngine::Closure::Run - our actual work loop
Run()417 void WorkSerializer::DispatchingWorkSerializer::Run() {
418   GRPC_LATENT_SEE_PARENT_SCOPE("WorkSerializer::Run");
419   flow_.End();
420   // TODO(ctiller): remove these when we can deprecate ExecCtx
421   ApplicationCallbackExecCtx app_exec_ctx;
422   ExecCtx exec_ctx;
423   // Grab the last element of processing_ - which is the next item in our
424   // queue since processing_ is stored in reverse order.
425   auto& cb = processing_.back();
426   GRPC_TRACE_LOG(work_serializer, INFO)
427       << "WorkSerializer[" << this << "] Executing callback ["
428       << cb.location.file() << ":" << cb.location.line() << "]";
429   // Run the work item.
430   const auto start = std::chrono::steady_clock::now();
431   SetCurrentThread();
432   cb.callback();
433   // pop_back here destroys the callback - freeing any resources it might
434   // hold. We do so before clearing the current thread in case the callback
435   // destructor wants to check that it's in the WorkSerializer too.
436   processing_.pop_back();
437   ClearCurrentThread();
438   global_stats().IncrementWorkSerializerItemsDequeued();
439   const auto work_time = std::chrono::steady_clock::now() - start;
440   global_stats().IncrementWorkSerializerWorkTimePerItemMs(
441       std::chrono::duration_cast<std::chrono::milliseconds>(work_time).count());
442   time_running_items_ += work_time;
443   ++items_processed_during_run_;
444   // Check if we've drained the queue and if so refill it.
445   if (processing_.empty() && !Refill()) return;
446   // There's still work in processing_, so schedule ourselves again on
447   // EventEngine.
448   flow_.Begin(GRPC_LATENT_SEE_METADATA("WorkSerializer::Link"));
449   event_engine_->Run(this);
450 }
451 
452 WorkSerializer::DispatchingWorkSerializer::RefillResult
RefillInner()453 WorkSerializer::DispatchingWorkSerializer::RefillInner() {
454   // Recover any memory held by processing_, so that we don't grow forever.
455   // Do so before acquiring a lock so we don't cause inadvertent contention.
456   processing_.shrink_to_fit();
457   MutexLock lock(&mu_);
458   // Swap incoming_ into processing_ - effectively lets us release memory
459   // (outside the lock) once per iteration for the storage vectors.
460   processing_.swap(incoming_);
461   // If there were no items, then we've finished running.
462   if (processing_.empty()) {
463     running_ = false;
464     global_stats().IncrementWorkSerializerRunTimeMs(
465         std::chrono::duration_cast<std::chrono::milliseconds>(
466             std::chrono::steady_clock::now() - running_start_time_)
467             .count());
468     global_stats().IncrementWorkSerializerWorkTimeMs(
469         std::chrono::duration_cast<std::chrono::milliseconds>(
470             time_running_items_)
471             .count());
472     global_stats().IncrementWorkSerializerItemsPerRun(
473         items_processed_during_run_);
474     // And if we're also orphaned then it's time to delete this object.
475     if (orphaned_) {
476       return RefillResult::kFinishedAndOrphaned;
477     } else {
478       return RefillResult::kFinished;
479     }
480   }
481   return RefillResult::kRefilled;
482 }
483 
Refill()484 bool WorkSerializer::DispatchingWorkSerializer::Refill() {
485   const auto result = RefillInner();
486   switch (result) {
487     case RefillResult::kRefilled:
488       // Reverse processing_ so that we can pop_back() items in the correct
489       // order. (note that this is mostly pointer swaps inside the
490       // std::function's, so should be relatively cheap even for longer
491       // lists). Do so here so we're outside of the RefillInner lock.
492       std::reverse(processing_.begin(), processing_.end());
493       return true;
494     case RefillResult::kFinished:
495       return false;
496     case RefillResult::kFinishedAndOrphaned:
497       // Orphaned and finished - finally delete this object.
498       // Here so that the mutex lock in RefillInner is released.
499       delete this;
500       return false;
501   }
502   GPR_UNREACHABLE_CODE(return false);
503 }
504 
505 //
506 // WorkSerializer
507 //
508 
WorkSerializer(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)509 WorkSerializer::WorkSerializer(
510     std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
511     : impl_(IsWorkSerializerDispatchEnabled()
512                 ? OrphanablePtr<WorkSerializerImpl>(
513                       MakeOrphanable<DispatchingWorkSerializer>(
514                           std::move(event_engine)))
515                 : OrphanablePtr<WorkSerializerImpl>(
516                       MakeOrphanable<LegacyWorkSerializer>())) {}
517 
518 WorkSerializer::~WorkSerializer() = default;
519 
Run(std::function<void ()> callback,const DebugLocation & location)520 void WorkSerializer::Run(std::function<void()> callback,
521                          const DebugLocation& location) {
522   impl_->Run(std::move(callback), location);
523 }
524 
Schedule(std::function<void ()> callback,const DebugLocation & location)525 void WorkSerializer::Schedule(std::function<void()> callback,
526                               const DebugLocation& location) {
527   impl_->Schedule(std::move(callback), location);
528 }
529 
DrainQueue()530 void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
531 
532 #ifndef NDEBUG
RunningInWorkSerializer() const533 bool WorkSerializer::RunningInWorkSerializer() const {
534   return impl_->RunningInWorkSerializer();
535 }
536 #endif
537 
538 }  // namespace grpc_core
539