1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/util/work_serializer.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21 #include <stdint.h>
22
23 #include <algorithm>
24 #include <atomic>
25 #include <chrono>
26 #include <functional>
27 #include <memory>
28 #include <thread>
29 #include <utility>
30
31 #include "absl/container/inlined_vector.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/experiments/experiments.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/telemetry/stats.h"
38 #include "src/core/telemetry/stats_data.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/latent_see.h"
41 #include "src/core/util/mpscq.h"
42 #include "src/core/util/orphanable.h"
43 #include "src/core/util/sync.h"
44
45 namespace grpc_core {
46
47 //
48 // WorkSerializer::WorkSerializerImpl
49 //
50
51 class WorkSerializer::WorkSerializerImpl : public Orphanable {
52 public:
53 virtual void Run(std::function<void()> callback,
54 const DebugLocation& location) = 0;
55 virtual void Schedule(std::function<void()> callback,
56 const DebugLocation& location) = 0;
57 virtual void DrainQueue() = 0;
58
59 #ifndef NDEBUG
60 virtual bool RunningInWorkSerializer() const = 0;
61 #endif
62 };
63
64 //
65 // WorkSerializer::LegacyWorkSerializer
66 //
67
68 class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
69 public:
70 void Run(std::function<void()> callback,
71 const DebugLocation& location) override;
72 void Schedule(std::function<void()> callback,
73 const DebugLocation& location) override;
74 void DrainQueue() override;
75 void Orphan() override;
76
77 #ifndef NDEBUG
RunningInWorkSerializer() const78 bool RunningInWorkSerializer() const override {
79 return std::this_thread::get_id() == current_thread_;
80 }
81 #endif
82
83 private:
84 struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::LegacyWorkSerializer::CallbackWrapper85 CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
86 : callback(std::move(cb)), location(loc) {}
87
88 MultiProducerSingleConsumerQueue::Node mpscq_node;
89 const std::function<void()> callback;
90 const DebugLocation location;
91 };
92
93 // Callers of DrainQueueOwned should make sure to grab the lock on the
94 // workserializer with
95 //
96 // prev_ref_pair =
97 // refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
98 //
99 // and only invoke DrainQueueOwned() if there was previously no owner. Note
100 // that the queue size is also incremented as part of the fetch_add to allow
101 // the callers to add a callback to the queue if another thread already holds
102 // the lock to the work serializer.
103 void DrainQueueOwned();
104
105 // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
106 // queue size (i.e., refs).
MakeRefPair(uint16_t owners,uint64_t size)107 static uint64_t MakeRefPair(uint16_t owners, uint64_t size) {
108 CHECK_EQ(size >> 48, 0u);
109 return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
110 }
GetOwners(uint64_t ref_pair)111 static uint32_t GetOwners(uint64_t ref_pair) {
112 return static_cast<uint32_t>(ref_pair >> 48);
113 }
GetSize(uint64_t ref_pair)114 static uint64_t GetSize(uint64_t ref_pair) {
115 return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
116 }
117
118 #ifndef NDEBUG
SetCurrentThread()119 void SetCurrentThread() { current_thread_ = std::this_thread::get_id(); }
ClearCurrentThread()120 void ClearCurrentThread() { current_thread_ = std::thread::id(); }
121 #else
SetCurrentThread()122 void SetCurrentThread() {}
ClearCurrentThread()123 void ClearCurrentThread() {}
124 #endif
125
126 // An initial size of 1 keeps track of whether the work serializer has been
127 // orphaned.
128 std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
129 MultiProducerSingleConsumerQueue queue_;
130 #ifndef NDEBUG
131 std::thread::id current_thread_;
132 #endif
133 };
134
Run(std::function<void ()> callback,const DebugLocation & location)135 void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
136 const DebugLocation& location) {
137 GRPC_TRACE_LOG(work_serializer, INFO)
138 << "WorkSerializer::Run() " << this << " Scheduling callback ["
139 << location.file() << ":" << location.line() << "]";
140 // Increment queue size for the new callback and owner count to attempt to
141 // take ownership of the WorkSerializer.
142 const uint64_t prev_ref_pair =
143 refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
144 // The work serializer should not have been orphaned.
145 DCHECK_GT(GetSize(prev_ref_pair), 0u);
146 if (GetOwners(prev_ref_pair) == 0) {
147 // We took ownership of the WorkSerializer. Invoke callback and drain queue.
148 SetCurrentThread();
149 GRPC_TRACE_LOG(work_serializer, INFO) << " Executing immediately";
150 callback();
151 // Delete the callback while still holding the WorkSerializer, so
152 // that any refs being held by the callback via lambda captures will
153 // be destroyed inside the WorkSerializer.
154 callback = nullptr;
155 DrainQueueOwned();
156 } else {
157 // Another thread is holding the WorkSerializer, so decrement the
158 // ownership count we just added and queue the callback.
159 refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
160 CallbackWrapper* cb_wrapper =
161 new CallbackWrapper(std::move(callback), location);
162 GRPC_TRACE_LOG(work_serializer, INFO)
163 << " Scheduling on queue : item " << cb_wrapper;
164 queue_.Push(&cb_wrapper->mpscq_node);
165 }
166 }
167
Schedule(std::function<void ()> callback,const DebugLocation & location)168 void WorkSerializer::LegacyWorkSerializer::Schedule(
169 std::function<void()> callback, const DebugLocation& location) {
170 CallbackWrapper* cb_wrapper =
171 new CallbackWrapper(std::move(callback), location);
172 GRPC_TRACE_LOG(work_serializer, INFO)
173 << "WorkSerializer::Schedule() " << this << " Scheduling callback "
174 << cb_wrapper << " [" << location.file() << ":" << location.line() << "]";
175 refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
176 queue_.Push(&cb_wrapper->mpscq_node);
177 }
178
Orphan()179 void WorkSerializer::LegacyWorkSerializer::Orphan() {
180 GRPC_TRACE_LOG(work_serializer, INFO) << "WorkSerializer::Orphan() " << this;
181 const uint64_t prev_ref_pair =
182 refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
183 if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
184 GRPC_TRACE_LOG(work_serializer, INFO) << " Destroying";
185 delete this;
186 }
187 }
188
189 // The thread that calls this loans itself to the work serializer so as to
190 // execute all the scheduled callbacks.
DrainQueue()191 void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
192 GRPC_TRACE_LOG(work_serializer, INFO)
193 << "WorkSerializer::DrainQueue() " << this;
194 // Attempt to take ownership of the WorkSerializer. Also increment the queue
195 // size as required by `DrainQueueOwned()`.
196 const uint64_t prev_ref_pair =
197 refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
198 if (GetOwners(prev_ref_pair) == 0) {
199 SetCurrentThread();
200 // We took ownership of the WorkSerializer. Drain the queue.
201 DrainQueueOwned();
202 } else {
203 // Another thread is holding the WorkSerializer, so decrement the
204 // ownership count we just added and queue a no-op callback.
205 refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
206 CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
207 queue_.Push(&cb_wrapper->mpscq_node);
208 }
209 }
210
DrainQueueOwned()211 void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
212 GRPC_TRACE_LOG(work_serializer, INFO)
213 << "WorkSerializer::DrainQueueOwned() " << this;
214 while (true) {
215 auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
216 // It is possible that while draining the queue, the last callback ended
217 // up orphaning the work serializer. In that case, delete the object.
218 if (GetSize(prev_ref_pair) == 1) {
219 GRPC_TRACE_LOG(work_serializer, INFO) << " Queue Drained. Destroying";
220 delete this;
221 return;
222 }
223 if (GetSize(prev_ref_pair) == 2) {
224 // Queue drained. Give up ownership but only if queue remains empty.
225 // Reset current_thread_ before giving up ownership to avoid TSAN
226 // race. If we don't wind up giving up ownership, we'll set this
227 // again below before we pull the next callback out of the queue.
228 ClearCurrentThread();
229 uint64_t expected = MakeRefPair(1, 1);
230 if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
231 std::memory_order_acq_rel)) {
232 // Queue is drained.
233 return;
234 }
235 if (GetSize(expected) == 0) {
236 // WorkSerializer got orphaned while this was running
237 GRPC_TRACE_LOG(work_serializer, INFO) << " Queue Drained. Destroying";
238 delete this;
239 return;
240 }
241 // Didn't wind up giving up ownership, so set current_thread_ again.
242 SetCurrentThread();
243 }
244 // There is at least one callback on the queue. Pop the callback from the
245 // queue and execute it.
246 CallbackWrapper* cb_wrapper = nullptr;
247 bool empty_unused;
248 while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
249 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
250 // This can happen due to a race condition within the mpscq
251 // implementation or because of a race with Run()/Schedule().
252 GRPC_TRACE_LOG(work_serializer, INFO)
253 << " Queue returned nullptr, trying again";
254 }
255 GRPC_TRACE_LOG(work_serializer, INFO)
256 << " Running item " << cb_wrapper << " : callback scheduled at ["
257 << cb_wrapper->location.file() << ":" << cb_wrapper->location.line()
258 << "]";
259 cb_wrapper->callback();
260 delete cb_wrapper;
261 }
262 }
263
264 //
265 // WorkSerializer::DispatchingWorkSerializer
266 //
267
268 // DispatchingWorkSerializer: executes callbacks one at a time on EventEngine.
269 // One at a time guarantees that fixed size thread pools in EventEngine
270 // implementations are not starved of threads by long running work
271 // serializers. We implement EventEngine::Closure directly to avoid allocating
272 // once per callback in the queue when scheduling.
273 class WorkSerializer::DispatchingWorkSerializer final
274 : public WorkSerializerImpl,
275 public grpc_event_engine::experimental::EventEngine::Closure {
276 public:
DispatchingWorkSerializer(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)277 explicit DispatchingWorkSerializer(
278 std::shared_ptr<grpc_event_engine::experimental::EventEngine>
279 event_engine)
280 : event_engine_(std::move(event_engine)) {}
281 void Run(std::function<void()> callback,
282 const DebugLocation& location) override;
Schedule(std::function<void ()> callback,const DebugLocation & location)283 void Schedule(std::function<void()> callback,
284 const DebugLocation& location) override {
285 // We always dispatch to event engine, so Schedule and Run share
286 // semantics.
287 Run(callback, location);
288 }
DrainQueue()289 void DrainQueue() override {}
290 void Orphan() override;
291
292 // Override EventEngine::Closure
293 void Run() override;
294
295 #ifndef NDEBUG
RunningInWorkSerializer() const296 bool RunningInWorkSerializer() const override {
297 return running_work_serializer_ == this;
298 }
299 #endif
300
301 private:
302 // Wrapper to capture DebugLocation for the callback.
303 struct CallbackWrapper {
CallbackWrappergrpc_core::WorkSerializer::DispatchingWorkSerializer::CallbackWrapper304 CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
305 : callback(std::move(cb)), location(loc) {}
306 std::function<void()> callback;
307 // GPR_NO_UNIQUE_ADDRESS means this is 0 sized in release builds.
308 GPR_NO_UNIQUE_ADDRESS DebugLocation location;
309 };
310 using CallbackVector = absl::InlinedVector<CallbackWrapper, 1>;
311
312 // Refill processing_ from incoming_
313 // If processing_ is empty, also update running_ and return false.
314 // If additionally orphaned, will also delete this (therefore, it's not safe
315 // to touch any member variables if Refill returns false).
316 bool Refill();
317
318 // Perform the parts of Refill that need to acquire mu_
319 // Returns a tri-state indicating whether we were refilled successfully (=>
320 // keep running), or finished, and then if we were orphaned.
321 enum class RefillResult { kRefilled, kFinished, kFinishedAndOrphaned };
322 RefillResult RefillInner();
323
324 #ifndef NDEBUG
SetCurrentThread()325 void SetCurrentThread() { running_work_serializer_ = this; }
ClearCurrentThread()326 void ClearCurrentThread() { running_work_serializer_ = nullptr; }
327 #else
SetCurrentThread()328 void SetCurrentThread() {}
ClearCurrentThread()329 void ClearCurrentThread() {}
330 #endif
331
332 // Member variables are roughly sorted to keep processing cache lines
333 // separated from incoming cache lines.
334
335 // Callbacks that are currently being processed.
336 // Only accessed by: a Run() call going from not-running to running, or a
337 // work item being executed in EventEngine -- ie this does not need a mutex
338 // because all access is serialized. Stored in reverse execution order so
339 // that callbacks can be `pop_back()`'d on completion to free up any
340 // resources they hold.
341 CallbackVector processing_;
342 // EventEngine instance upon which we'll do our work.
343 const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
344 event_engine_;
345 std::chrono::steady_clock::time_point running_start_time_
346 ABSL_GUARDED_BY(mu_);
347 std::chrono::steady_clock::duration time_running_items_;
348 uint64_t items_processed_during_run_;
349 // Flags containing run state:
350 // - running_ goes from false->true whenever the first callback is scheduled
351 // on an idle WorkSerializer, and transitions back to false after the last
352 // callback scheduled is completed and the WorkSerializer is again idle.
353 // - orphaned_ transitions to true once upon Orphan being called.
354 // When orphaned_ is true and running_ is false, the
355 // DispatchingWorkSerializer instance is deleted.
356 bool running_ ABSL_GUARDED_BY(mu_) = false;
357 bool orphaned_ ABSL_GUARDED_BY(mu_) = false;
358 Mutex mu_;
359 // Queued callbacks. New work items land here, and when processing_ is
360 // drained we move this entire queue into processing_ and work on draining
361 // it again. In low traffic scenarios this gives two mutex acquisitions per
362 // work item, but as load increases we get some natural batching and the
363 // rate of mutex acquisitions per work item tends towards 1.
364 CallbackVector incoming_ ABSL_GUARDED_BY(mu_);
365
366 GPR_NO_UNIQUE_ADDRESS latent_see::Flow flow_;
367
368 #ifndef NDEBUG
369 static thread_local DispatchingWorkSerializer* running_work_serializer_;
370 #endif
371 };
372
373 #ifndef NDEBUG
374 thread_local WorkSerializer::DispatchingWorkSerializer*
375 WorkSerializer::DispatchingWorkSerializer::running_work_serializer_ =
376 nullptr;
377 #endif
378
Orphan()379 void WorkSerializer::DispatchingWorkSerializer::Orphan() {
380 ReleasableMutexLock lock(&mu_);
381 // If we're not running, then we can delete immediately.
382 if (!running_) {
383 lock.Release();
384 delete this;
385 return;
386 }
387 // Otherwise store a flag to delete when we're done.
388 orphaned_ = true;
389 }
390
391 // Implementation of WorkSerializerImpl::Run
Run(std::function<void ()> callback,const DebugLocation & location)392 void WorkSerializer::DispatchingWorkSerializer::Run(
393 std::function<void()> callback, const DebugLocation& location) {
394 GRPC_TRACE_LOG(work_serializer, INFO)
395 << "WorkSerializer[" << this << "] Scheduling callback ["
396 << location.file() << ":" << location.line() << "]";
397 global_stats().IncrementWorkSerializerItemsEnqueued();
398 MutexLock lock(&mu_);
399 if (!running_) {
400 // If we were previously idle, insert this callback directly into the
401 // empty processing_ list and start running.
402 running_ = true;
403 running_start_time_ = std::chrono::steady_clock::now();
404 items_processed_during_run_ = 0;
405 time_running_items_ = std::chrono::steady_clock::duration();
406 CHECK(processing_.empty());
407 processing_.emplace_back(std::move(callback), location);
408 event_engine_->Run(this);
409 } else {
410 // We are already running, so add this callback to the incoming_ list.
411 // The work loop will eventually get to it.
412 incoming_.emplace_back(std::move(callback), location);
413 }
414 }
415
416 // Implementation of EventEngine::Closure::Run - our actual work loop
Run()417 void WorkSerializer::DispatchingWorkSerializer::Run() {
418 GRPC_LATENT_SEE_PARENT_SCOPE("WorkSerializer::Run");
419 flow_.End();
420 // TODO(ctiller): remove these when we can deprecate ExecCtx
421 ApplicationCallbackExecCtx app_exec_ctx;
422 ExecCtx exec_ctx;
423 // Grab the last element of processing_ - which is the next item in our
424 // queue since processing_ is stored in reverse order.
425 auto& cb = processing_.back();
426 GRPC_TRACE_LOG(work_serializer, INFO)
427 << "WorkSerializer[" << this << "] Executing callback ["
428 << cb.location.file() << ":" << cb.location.line() << "]";
429 // Run the work item.
430 const auto start = std::chrono::steady_clock::now();
431 SetCurrentThread();
432 cb.callback();
433 // pop_back here destroys the callback - freeing any resources it might
434 // hold. We do so before clearing the current thread in case the callback
435 // destructor wants to check that it's in the WorkSerializer too.
436 processing_.pop_back();
437 ClearCurrentThread();
438 global_stats().IncrementWorkSerializerItemsDequeued();
439 const auto work_time = std::chrono::steady_clock::now() - start;
440 global_stats().IncrementWorkSerializerWorkTimePerItemMs(
441 std::chrono::duration_cast<std::chrono::milliseconds>(work_time).count());
442 time_running_items_ += work_time;
443 ++items_processed_during_run_;
444 // Check if we've drained the queue and if so refill it.
445 if (processing_.empty() && !Refill()) return;
446 // There's still work in processing_, so schedule ourselves again on
447 // EventEngine.
448 flow_.Begin(GRPC_LATENT_SEE_METADATA("WorkSerializer::Link"));
449 event_engine_->Run(this);
450 }
451
452 WorkSerializer::DispatchingWorkSerializer::RefillResult
RefillInner()453 WorkSerializer::DispatchingWorkSerializer::RefillInner() {
454 // Recover any memory held by processing_, so that we don't grow forever.
455 // Do so before acquiring a lock so we don't cause inadvertent contention.
456 processing_.shrink_to_fit();
457 MutexLock lock(&mu_);
458 // Swap incoming_ into processing_ - effectively lets us release memory
459 // (outside the lock) once per iteration for the storage vectors.
460 processing_.swap(incoming_);
461 // If there were no items, then we've finished running.
462 if (processing_.empty()) {
463 running_ = false;
464 global_stats().IncrementWorkSerializerRunTimeMs(
465 std::chrono::duration_cast<std::chrono::milliseconds>(
466 std::chrono::steady_clock::now() - running_start_time_)
467 .count());
468 global_stats().IncrementWorkSerializerWorkTimeMs(
469 std::chrono::duration_cast<std::chrono::milliseconds>(
470 time_running_items_)
471 .count());
472 global_stats().IncrementWorkSerializerItemsPerRun(
473 items_processed_during_run_);
474 // And if we're also orphaned then it's time to delete this object.
475 if (orphaned_) {
476 return RefillResult::kFinishedAndOrphaned;
477 } else {
478 return RefillResult::kFinished;
479 }
480 }
481 return RefillResult::kRefilled;
482 }
483
Refill()484 bool WorkSerializer::DispatchingWorkSerializer::Refill() {
485 const auto result = RefillInner();
486 switch (result) {
487 case RefillResult::kRefilled:
488 // Reverse processing_ so that we can pop_back() items in the correct
489 // order. (note that this is mostly pointer swaps inside the
490 // std::function's, so should be relatively cheap even for longer
491 // lists). Do so here so we're outside of the RefillInner lock.
492 std::reverse(processing_.begin(), processing_.end());
493 return true;
494 case RefillResult::kFinished:
495 return false;
496 case RefillResult::kFinishedAndOrphaned:
497 // Orphaned and finished - finally delete this object.
498 // Here so that the mutex lock in RefillInner is released.
499 delete this;
500 return false;
501 }
502 GPR_UNREACHABLE_CODE(return false);
503 }
504
505 //
506 // WorkSerializer
507 //
508
WorkSerializer(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)509 WorkSerializer::WorkSerializer(
510 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
511 : impl_(IsWorkSerializerDispatchEnabled()
512 ? OrphanablePtr<WorkSerializerImpl>(
513 MakeOrphanable<DispatchingWorkSerializer>(
514 std::move(event_engine)))
515 : OrphanablePtr<WorkSerializerImpl>(
516 MakeOrphanable<LegacyWorkSerializer>())) {}
517
518 WorkSerializer::~WorkSerializer() = default;
519
Run(std::function<void ()> callback,const DebugLocation & location)520 void WorkSerializer::Run(std::function<void()> callback,
521 const DebugLocation& location) {
522 impl_->Run(std::move(callback), location);
523 }
524
Schedule(std::function<void ()> callback,const DebugLocation & location)525 void WorkSerializer::Schedule(std::function<void()> callback,
526 const DebugLocation& location) {
527 impl_->Schedule(std::move(callback), location);
528 }
529
DrainQueue()530 void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
531
532 #ifndef NDEBUG
RunningInWorkSerializer() const533 bool WorkSerializer::RunningInWorkSerializer() const {
534 return impl_->RunningInWorkSerializer();
535 }
536 #endif
537
538 } // namespace grpc_core
539