• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include "src/core/lib/surface/completion_queue.h"
19 
20 #include <grpc/grpc.h>
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/atm.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27 #include <stdio.h>
28 
29 #include <algorithm>
30 #include <atomic>
31 #include <new>
32 #include <string>
33 #include <utility>
34 #include <vector>
35 
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_format.h"
40 #include "absl/strings/str_join.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/executor.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/iomgr/pollset.h"
47 #include "src/core/lib/surface/event_string.h"
48 #include "src/core/telemetry/stats.h"
49 #include "src/core/telemetry/stats_data.h"
50 #include "src/core/util/atomic_utils.h"
51 #include "src/core/util/debug_location.h"
52 #include "src/core/util/ref_counted.h"
53 #include "src/core/util/spinlock.h"
54 #include "src/core/util/status_helper.h"
55 #include "src/core/util/time.h"
56 
57 #ifdef GPR_WINDOWS
58 #include "src/core/lib/experiments/experiments.h"
59 #endif
60 
61 namespace {
62 
63 // Specifies a cq thread local cache.
64 // The first event that occurs on a thread
65 // with a cq cache will go into that cache, and
66 // will only be returned on the thread that initialized the cache.
67 // NOTE: Only one event will ever be cached.
68 thread_local grpc_cq_completion* g_cached_event;
69 thread_local grpc_completion_queue* g_cached_cq;
70 
71 struct plucker {
72   grpc_pollset_worker** worker;
73   void* tag;
74 };
75 struct cq_poller_vtable {
76   bool can_get_pollset;
77   bool can_listen;
78   size_t (*size)(void);
79   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
80   grpc_error_handle (*kick)(grpc_pollset* pollset,
81                             grpc_pollset_worker* specific_worker);
82   grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
83                             grpc_core::Timestamp deadline);
84   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
85   void (*destroy)(grpc_pollset* pollset);
86 };
87 typedef struct non_polling_worker {
88   gpr_cv cv;
89   bool kicked;
90   struct non_polling_worker* next;
91   struct non_polling_worker* prev;
92 } non_polling_worker;
93 
94 struct non_polling_poller {
95   gpr_mu mu;
96   bool kicked_without_poller;
97   non_polling_worker* root;
98   grpc_closure* shutdown;
99 };
non_polling_poller_size(void)100 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
101 
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)102 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
103   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
104   gpr_mu_init(&npp->mu);
105   *mu = &npp->mu;
106 }
107 
non_polling_poller_destroy(grpc_pollset * pollset)108 void non_polling_poller_destroy(grpc_pollset* pollset) {
109   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
110   gpr_mu_destroy(&npp->mu);
111 }
112 
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)113 grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
114                                           grpc_pollset_worker** worker,
115                                           grpc_core::Timestamp deadline) {
116   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
117   if (npp->shutdown) return absl::OkStatus();
118   if (npp->kicked_without_poller) {
119     npp->kicked_without_poller = false;
120     return absl::OkStatus();
121   }
122   non_polling_worker w;
123   gpr_cv_init(&w.cv);
124   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
125   if (npp->root == nullptr) {
126     npp->root = w.next = w.prev = &w;
127   } else {
128     w.next = npp->root;
129     w.prev = w.next->prev;
130     w.next->prev = w.prev->next = &w;
131   }
132   w.kicked = false;
133   gpr_timespec deadline_ts = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
134   while (!npp->shutdown && !w.kicked &&
135          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) {
136   }
137   grpc_core::ExecCtx::Get()->InvalidateNow();
138   if (&w == npp->root) {
139     npp->root = w.next;
140     if (&w == npp->root) {
141       if (npp->shutdown) {
142         grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown,
143                                 absl::OkStatus());
144       }
145       npp->root = nullptr;
146     }
147   }
148   w.next->prev = w.prev;
149   w.prev->next = w.next;
150   gpr_cv_destroy(&w.cv);
151   if (worker != nullptr) *worker = nullptr;
152   return absl::OkStatus();
153 }
154 
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)155 grpc_error_handle non_polling_poller_kick(
156     grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
157   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
158   if (specific_worker == nullptr) {
159     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
160   }
161   if (specific_worker != nullptr) {
162     non_polling_worker* w =
163         reinterpret_cast<non_polling_worker*>(specific_worker);
164     if (!w->kicked) {
165       w->kicked = true;
166       gpr_cv_signal(&w->cv);
167     }
168   } else {
169     p->kicked_without_poller = true;
170   }
171   return absl::OkStatus();
172 }
173 
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)174 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
175   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
176   CHECK_NE(closure, nullptr);
177   p->shutdown = closure;
178   if (p->root == nullptr) {
179     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
180   } else {
181     non_polling_worker* w = p->root;
182     do {
183       gpr_cv_signal(&w->cv);
184       w = w->next;
185     } while (w != p->root);
186   }
187 }
188 
189 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
190     // GRPC_CQ_DEFAULT_POLLING
191     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
192      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
193     // GRPC_CQ_NON_LISTENING
194     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
195      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
196     // GRPC_CQ_NON_POLLING
197     {false, false, non_polling_poller_size, non_polling_poller_init,
198      non_polling_poller_kick, non_polling_poller_work,
199      non_polling_poller_shutdown, non_polling_poller_destroy},
200 };
201 
202 }  // namespace
203 
204 struct cq_vtable {
205   grpc_cq_completion_type cq_completion_type;
206   size_t data_size;
207   void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
208   void (*shutdown)(grpc_completion_queue* cq);
209   void (*destroy)(void* data);
210   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
211   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error,
212                  void (*done)(void* done_arg, grpc_cq_completion* storage),
213                  void* done_arg, grpc_cq_completion* storage, bool internal);
214   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
215                      void* reserved);
216   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
217                       gpr_timespec deadline, void* reserved);
218 };
219 
220 namespace {
221 
222 // Queue that holds the cq_completion_events. Internally uses
223 // MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
224 // queue). It uses a queue_lock to support multiple consumers.
225 // Only used in completion queues whose completion_type is GRPC_CQ_NEXT
226 class CqEventQueue {
227  public:
228   CqEventQueue() = default;
229   ~CqEventQueue() = default;
230 
231   // Note: The counter is not incremented/decremented atomically with push/pop.
232   // The count is only eventually consistent
num_items() const233   intptr_t num_items() const {
234     return num_queue_items_.load(std::memory_order_relaxed);
235   }
236 
237   bool Push(grpc_cq_completion* c);
238   grpc_cq_completion* Pop();
239 
240  private:
241   // Spinlock to serialize consumers i.e pop() operations
242   gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
243 
244   grpc_core::MultiProducerSingleConsumerQueue queue_;
245 
246   // A lazy counter of number of items in the queue. This is NOT atomically
247   // incremented/decremented along with push/pop operations and hence is only
248   // eventually consistent
249   std::atomic<intptr_t> num_queue_items_{0};
250 };
251 
252 struct cq_next_data {
~cq_next_data__anon4ffaa0de0211::cq_next_data253   ~cq_next_data() {
254     CHECK_EQ(queue.num_items(), 0);
255 #ifndef NDEBUG
256     if (pending_events.load(std::memory_order_acquire) != 0) {
257       LOG(ERROR) << "Destroying CQ without draining it fully.";
258     }
259 #endif
260   }
261 
262   /// Completed events for completion-queues of type GRPC_CQ_NEXT
263   CqEventQueue queue;
264 
265   /// Counter of how many things have ever been queued on this completion queue
266   /// useful for avoiding locks to check the queue
267   std::atomic<intptr_t> things_queued_ever{0};
268 
269   /// Number of outstanding events (+1 if not shut down)
270   /// Initial count is dropped by grpc_completion_queue_shutdown
271   std::atomic<intptr_t> pending_events{1};
272 
273   /// 0 initially. 1 once we initiated shutdown
274   bool shutdown_called = false;
275 };
276 
277 struct cq_pluck_data {
cq_pluck_data__anon4ffaa0de0211::cq_pluck_data278   cq_pluck_data() {
279     completed_tail = &completed_head;
280     completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
281   }
282 
~cq_pluck_data__anon4ffaa0de0211::cq_pluck_data283   ~cq_pluck_data() {
284     CHECK(completed_head.next == reinterpret_cast<uintptr_t>(&completed_head));
285 #ifndef NDEBUG
286     if (pending_events.load(std::memory_order_acquire) != 0) {
287       LOG(ERROR) << "Destroying CQ without draining it fully.";
288     }
289 #endif
290   }
291 
292   /// Completed events for completion-queues of type GRPC_CQ_PLUCK
293   grpc_cq_completion completed_head;
294   grpc_cq_completion* completed_tail;
295 
296   /// Number of pending events (+1 if we're not shutdown).
297   /// Initial count is dropped by grpc_completion_queue_shutdown.
298   std::atomic<intptr_t> pending_events{1};
299 
300   /// Counter of how many things have ever been queued on this completion queue
301   /// useful for avoiding locks to check the queue
302   std::atomic<intptr_t> things_queued_ever{0};
303 
304   /// 0 initially. 1 once we completed shutting
305   // TODO(sreek): This is not needed since (shutdown == 1) if and only if
306   // (pending_events == 0). So consider removing this in future and use
307   // pending_events
308   std::atomic<bool> shutdown{false};
309 
310   /// 0 initially. 1 once we initiated shutdown
311   bool shutdown_called = false;
312 
313   int num_pluckers = 0;
314   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
315 };
316 
317 struct cq_callback_data {
cq_callback_data__anon4ffaa0de0211::cq_callback_data318   explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
319       : shutdown_callback(shutdown_callback),
320         event_engine(grpc_event_engine::experimental::GetDefaultEventEngine()) {
321   }
322 
~cq_callback_data__anon4ffaa0de0211::cq_callback_data323   ~cq_callback_data() {
324 #ifndef NDEBUG
325     if (pending_events.load(std::memory_order_acquire) != 0) {
326       LOG(ERROR) << "Destroying CQ without draining it fully.";
327     }
328 #endif
329   }
330 
331   /// No actual completed events queue, unlike other types
332 
333   /// Number of pending events (+1 if we're not shutdown).
334   /// Initial count is dropped by grpc_completion_queue_shutdown.
335   std::atomic<intptr_t> pending_events{1};
336 
337   /// 0 initially. 1 once we initiated shutdown
338   bool shutdown_called = false;
339 
340   /// A callback that gets invoked when the CQ completes shutdown
341   grpc_completion_queue_functor* shutdown_callback;
342 
343   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
344 };
345 
346 }  // namespace
347 
348 // Completion queue structure
349 struct grpc_completion_queue {
350   /// Once owning_refs drops to zero, we will destroy the cq
351   grpc_core::RefCount owning_refs;
352   /// Add the paddings to fix the false sharing
353   char padding_1[GPR_CACHELINE_SIZE];
354   gpr_mu* mu;
355 
356   char padding_2[GPR_CACHELINE_SIZE];
357   const cq_vtable* vtable;
358 
359   char padding_3[GPR_CACHELINE_SIZE];
360   const cq_poller_vtable* poller_vtable;
361 
362 #ifndef NDEBUG
363   void** outstanding_tags;
364   size_t outstanding_tag_count;
365   size_t outstanding_tag_capacity;
366 #endif
367 
368   grpc_closure pollset_shutdown_done;
369   int num_polls;
370 };
371 
372 // Forward declarations
373 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
374 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
375 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
376 static void cq_shutdown_next(grpc_completion_queue* cq);
377 static void cq_shutdown_pluck(grpc_completion_queue* cq);
378 static void cq_shutdown_callback(grpc_completion_queue* cq);
379 
380 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
381 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
382 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
383 
384 // A cq_end_op function is called when an operation on a given CQ with
385 // a given tag has completed. The storage argument is a reference to the
386 // space reserved for this completion as it is placed into the corresponding
387 // queue. The done argument is a callback that will be invoked when it is
388 // safe to free up that storage. The storage MUST NOT be freed until the
389 // done callback is invoked.
390 static void cq_end_op_for_next(
391     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
392     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
393     grpc_cq_completion* storage, bool internal);
394 
395 static void cq_end_op_for_pluck(
396     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
397     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
398     grpc_cq_completion* storage, bool internal);
399 
400 static void cq_end_op_for_callback(
401     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
402     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
403     grpc_cq_completion* storage, bool internal);
404 
405 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
406                           void* reserved);
407 
408 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
409                            gpr_timespec deadline, void* reserved);
410 
411 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
412 static void cq_init_next(void* data,
413                          grpc_completion_queue_functor* shutdown_callback);
414 static void cq_init_pluck(void* data,
415                           grpc_completion_queue_functor* shutdown_callback);
416 static void cq_init_callback(void* data,
417                              grpc_completion_queue_functor* shutdown_callback);
418 static void cq_destroy_next(void* data);
419 static void cq_destroy_pluck(void* data);
420 static void cq_destroy_callback(void* data);
421 
422 // Completion queue vtables based on the completion-type
423 static const cq_vtable g_cq_vtable[] = {
424     // GRPC_CQ_NEXT
425     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
426      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
427      nullptr},
428     // GRPC_CQ_PLUCK
429     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
430      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
431      cq_pluck},
432     // GRPC_CQ_CALLBACK
433     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
434      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
435      cq_end_op_for_callback, nullptr, nullptr},
436 };
437 
438 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
439 #define POLLSET_FROM_CQ(cq) \
440   ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
441 
442 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)  \
443   do {                                                \
444     if (GRPC_TRACE_FLAG_ENABLED(api) &&               \
445         (GRPC_TRACE_FLAG_ENABLED(queue_pluck) ||      \
446          (event)->type != GRPC_QUEUE_TIMEOUT)) {      \
447       LOG(INFO) << "RETURN_EVENT[" << (cq)            \
448                 << "]: " << grpc_event_string(event); \
449     }                                                 \
450   } while (0)
451 
452 static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
453 
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)454 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
455   if (g_cached_cq == nullptr) {
456     g_cached_event = nullptr;
457     g_cached_cq = cq;
458   }
459 }
460 
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)461 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
462                                                    void** tag, int* ok) {
463   grpc_cq_completion* storage = g_cached_event;
464   int ret = 0;
465   if (storage != nullptr && g_cached_cq == cq) {
466     *tag = storage->tag;
467     grpc_core::ExecCtx exec_ctx;
468     *ok = (storage->next & uintptr_t{1}) == 1;
469     storage->done(storage->done_arg, storage);
470     ret = 1;
471     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
472     if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
473       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
474       gpr_mu_lock(cq->mu);
475       cq_finish_shutdown_next(cq);
476       gpr_mu_unlock(cq->mu);
477       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
478     }
479   }
480   g_cached_event = nullptr;
481   g_cached_cq = nullptr;
482 
483   return ret;
484 }
485 
Push(grpc_cq_completion * c)486 bool CqEventQueue::Push(grpc_cq_completion* c) {
487   queue_.Push(
488       reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
489   return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
490 }
491 
Pop()492 grpc_cq_completion* CqEventQueue::Pop() {
493   grpc_cq_completion* c = nullptr;
494 
495   if (gpr_spinlock_trylock(&queue_lock_)) {
496     bool is_empty = false;
497     c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
498     gpr_spinlock_unlock(&queue_lock_);
499   }
500 
501   if (c) {
502     num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
503   }
504 
505   return c;
506 }
507 
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_completion_queue_functor * shutdown_callback)508 grpc_completion_queue* grpc_completion_queue_create_internal(
509     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
510     grpc_completion_queue_functor* shutdown_callback) {
511   grpc_completion_queue* cq;
512 
513   GRPC_TRACE_LOG(api, INFO)
514       << "grpc_completion_queue_create_internal(completion_type="
515       << completion_type << ", polling_type=" << polling_type << ")";
516 
517   switch (completion_type) {
518     case GRPC_CQ_NEXT:
519       grpc_core::global_stats().IncrementCqNextCreates();
520       break;
521     case GRPC_CQ_PLUCK:
522       grpc_core::global_stats().IncrementCqPluckCreates();
523       break;
524     case GRPC_CQ_CALLBACK:
525       grpc_core::global_stats().IncrementCqCallbackCreates();
526       break;
527   }
528 
529   const cq_vtable* vtable = &g_cq_vtable[completion_type];
530   const cq_poller_vtable* poller_vtable =
531       &g_poller_vtable_by_poller_type[polling_type];
532 
533   grpc_core::ExecCtx exec_ctx;
534 
535   cq = static_cast<grpc_completion_queue*>(
536       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
537                  poller_vtable->size()));
538 
539   cq->vtable = vtable;
540   cq->poller_vtable = poller_vtable;
541 
542   // One for destroy(), one for pollset_shutdown
543   new (&cq->owning_refs) grpc_core::RefCount(
544       2, GRPC_TRACE_FLAG_ENABLED(cq_refcount) ? "completion_queue" : nullptr);
545 
546   poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
547   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
548 
549   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
550                     grpc_schedule_on_exec_ctx);
551   return cq;
552 }
553 
cq_init_next(void * data,grpc_completion_queue_functor *)554 static void cq_init_next(void* data,
555                          grpc_completion_queue_functor* /*shutdown_callback*/) {
556   new (data) cq_next_data();
557 }
558 
cq_destroy_next(void * data)559 static void cq_destroy_next(void* data) {
560   cq_next_data* cqd = static_cast<cq_next_data*>(data);
561   cqd->~cq_next_data();
562 }
563 
cq_init_pluck(void * data,grpc_completion_queue_functor *)564 static void cq_init_pluck(
565     void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
566   new (data) cq_pluck_data();
567 }
568 
cq_destroy_pluck(void * data)569 static void cq_destroy_pluck(void* data) {
570   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
571   cqd->~cq_pluck_data();
572 }
573 
cq_init_callback(void * data,grpc_completion_queue_functor * shutdown_callback)574 static void cq_init_callback(void* data,
575                              grpc_completion_queue_functor* shutdown_callback) {
576   new (data) cq_callback_data(shutdown_callback);
577 }
578 
cq_destroy_callback(void * data)579 static void cq_destroy_callback(void* data) {
580   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
581   cqd->~cq_callback_data();
582 }
583 
grpc_get_cq_completion_type(grpc_completion_queue * cq)584 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
585   return cq->vtable->cq_completion_type;
586 }
587 
grpc_get_cq_poll_num(grpc_completion_queue * cq)588 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
589   int cur_num_polls;
590   gpr_mu_lock(cq->mu);
591   cur_num_polls = cq->num_polls;
592   gpr_mu_unlock(cq->mu);
593   return cur_num_polls;
594 }
595 
596 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)597 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
598                           const char* file, int line) {
599   grpc_core::DebugLocation debug_location(file, line);
600 #else
601 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
602   grpc_core::DebugLocation debug_location;
603   const char* reason = nullptr;
604 #endif
605   cq->owning_refs.Ref(debug_location, reason);
606 }
607 
608 static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
609   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
610   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
611 }
612 
613 #ifndef NDEBUG
614 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
615                             const char* file, int line) {
616   grpc_core::DebugLocation debug_location(file, line);
617 #else
618 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
619   grpc_core::DebugLocation debug_location;
620   const char* reason = nullptr;
621 #endif
622   if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
623     cq->vtable->destroy(DATA_FROM_CQ(cq));
624     cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
625 #ifndef NDEBUG
626     gpr_free(cq->outstanding_tags);
627 #endif
628     gpr_free(cq);
629   }
630 }
631 
632 #ifndef NDEBUG
633 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
634   int found = 0;
635   if (lock_cq) {
636     gpr_mu_lock(cq->mu);
637   }
638 
639   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
640     if (cq->outstanding_tags[i] == tag) {
641       cq->outstanding_tag_count--;
642       std::swap(cq->outstanding_tags[i],
643                 cq->outstanding_tags[cq->outstanding_tag_count]);
644       found = 1;
645       break;
646     }
647   }
648 
649   if (lock_cq) {
650     gpr_mu_unlock(cq->mu);
651   }
652 
653   CHECK(found);
654 }
655 #else
656 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
657                          bool /*lock_cq*/) {}
658 #endif
659 
660 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
661   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
662   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
663 }
664 
665 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
666   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
667   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
668 }
669 
670 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
671   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
672   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
673 }
674 
675 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
676 #ifndef NDEBUG
677   gpr_mu_lock(cq->mu);
678   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
679     cq->outstanding_tag_capacity =
680         std::max(size_t(4), 2 * cq->outstanding_tag_capacity);
681     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
682         cq->outstanding_tags,
683         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
684   }
685   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
686   gpr_mu_unlock(cq->mu);
687 #endif
688   return cq->vtable->begin_op(cq, tag);
689 }
690 
691 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
692 // completion
693 // type of GRPC_CQ_NEXT)
694 static void cq_end_op_for_next(
695     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
696     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
697     grpc_cq_completion* storage, bool /*internal*/) {
698   if (GRPC_TRACE_FLAG_ENABLED(api) ||
699       (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
700     std::string errmsg = grpc_core::StatusToString(error);
701     GRPC_TRACE_LOG(api, INFO)
702         << "cq_end_op_for_next(cq=" << cq << ", tag=" << tag
703         << ", error=" << errmsg.c_str() << ", done=" << done
704         << ", done_arg=" << done_arg << ", storage=" << storage << ")";
705     if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
706       LOG(INFO) << "Operation failed: tag=" << tag << ", error=" << errmsg;
707     }
708   }
709   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
710   int is_success = (error.ok());
711 
712   storage->tag = tag;
713   storage->done = done;
714   storage->done_arg = done_arg;
715   storage->next = static_cast<uintptr_t>(is_success);
716 
717   cq_check_tag(cq, tag, true);  // Used in debug builds only
718 
719   if (g_cached_cq == cq && g_cached_event == nullptr) {
720     g_cached_event = storage;
721   } else {
722     // Add the completion to the queue
723     bool is_first = cqd->queue.Push(storage);
724     cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
725     // Since we do not hold the cq lock here, it is important to do an 'acquire'
726     // load here (instead of a 'no_barrier' load) to match with the release
727     // store
728     // (done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
729     //
730     if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
731       // Only kick if this is the first item queued
732       if (is_first) {
733         gpr_mu_lock(cq->mu);
734         grpc_error_handle kick_error =
735             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
736         gpr_mu_unlock(cq->mu);
737 
738         if (!kick_error.ok()) {
739           LOG(ERROR) << "Kick failed: "
740                      << grpc_core::StatusToString(kick_error);
741         }
742       }
743       if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
744         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
745         gpr_mu_lock(cq->mu);
746         cq_finish_shutdown_next(cq);
747         gpr_mu_unlock(cq->mu);
748         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
749       }
750     } else {
751       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
752       cqd->pending_events.store(0, std::memory_order_release);
753       gpr_mu_lock(cq->mu);
754       cq_finish_shutdown_next(cq);
755       gpr_mu_unlock(cq->mu);
756       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
757     }
758   }
759 }
760 
761 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
762 // completion
763 // type of GRPC_CQ_PLUCK)
764 static void cq_end_op_for_pluck(
765     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
766     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
767     grpc_cq_completion* storage, bool /*internal*/) {
768   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
769   int is_success = (error.ok());
770 
771   if (GRPC_TRACE_FLAG_ENABLED(api) ||
772       (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
773     std::string errmsg = grpc_core::StatusToString(error);
774     GRPC_TRACE_LOG(api, INFO)
775         << "cq_end_op_for_pluck(cq=" << cq << ", tag=" << tag
776         << ", error=" << errmsg.c_str() << ", done=" << done
777         << ", done_arg=" << done_arg << ", storage=" << storage << ")";
778     if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
779       LOG(ERROR) << "Operation failed: tag=" << tag << ", error=" << errmsg;
780     }
781   }
782 
783   storage->tag = tag;
784   storage->done = done;
785   storage->done_arg = done_arg;
786   storage->next = reinterpret_cast<uintptr_t>(&cqd->completed_head) |
787                   static_cast<uintptr_t>(is_success);
788 
789   gpr_mu_lock(cq->mu);
790   cq_check_tag(cq, tag, false);  // Used in debug builds only
791 
792   // Add to the list of completions
793   cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
794   cqd->completed_tail->next =
795       reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
796   cqd->completed_tail = storage;
797 
798   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
799     cq_finish_shutdown_pluck(cq);
800     gpr_mu_unlock(cq->mu);
801   } else {
802     grpc_pollset_worker* pluck_worker = nullptr;
803     for (int i = 0; i < cqd->num_pluckers; i++) {
804       if (cqd->pluckers[i].tag == tag) {
805         pluck_worker = *cqd->pluckers[i].worker;
806         break;
807       }
808     }
809 
810     grpc_error_handle kick_error =
811         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
812     gpr_mu_unlock(cq->mu);
813     if (!kick_error.ok()) {
814       LOG(ERROR) << "Kick failed: " << kick_error;
815     }
816   }
817 }
818 
819 static void functor_callback(void* arg, grpc_error_handle error) {
820   auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
821   functor->functor_run(functor, error.ok());
822 }
823 
824 // Complete an event on a completion queue of type GRPC_CQ_CALLBACK
825 static void cq_end_op_for_callback(
826     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
827     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
828     grpc_cq_completion* storage, bool internal) {
829   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
830 
831   if (GRPC_TRACE_FLAG_ENABLED(api) ||
832       (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
833     std::string errmsg = grpc_core::StatusToString(error);
834     GRPC_TRACE_LOG(api, INFO)
835         << "cq_end_op_for_callback(cq=" << cq << ", tag=" << tag
836         << ", error=" << errmsg.c_str() << ", done=" << done
837         << ", done_arg=" << done_arg << ", storage=" << storage << ")";
838     if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
839       LOG(ERROR) << "Operation failed: tag=" << tag << ", error=" << errmsg;
840     }
841   }
842 
843   // The callback-based CQ isn't really a queue at all and thus has no need
844   // for reserved storage. Invoke the done callback right away to release it.
845   done(done_arg, storage);
846 
847   cq_check_tag(cq, tag, true);  // Used in debug builds only
848 
849   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
850     cq_finish_shutdown_callback(cq);
851   }
852 
853   auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
854   if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
855     // Run the callback on EventEngine threads.
856     cqd->event_engine->Run(
857         [engine = cqd->event_engine, functor, ok = error.ok()]() {
858           grpc_core::ExecCtx exec_ctx;
859           (*functor->functor_run)(functor, ok);
860         });
861     return;
862   }
863   // If possible, schedule the callback onto an existing thread-local
864   // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
865   // 1. The callback is internally-generated and there is an ACEC available
866   // 2. The callback is marked inlineable and there is an ACEC available
867   // 3. We are already running in a background poller thread (which always has
868   //    an ACEC available at the base of the stack).
869   if (((internal || functor->inlineable) &&
870        grpc_core::ApplicationCallbackExecCtx::Available()) ||
871       grpc_iomgr_is_any_background_poller_thread()) {
872     grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));
873     return;
874   }
875 
876   // Schedule the callback on a closure if not internal or triggered
877   // from a background poller thread.
878   grpc_core::Executor::Run(
879       GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
880 }
881 
882 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
883                     grpc_error_handle error,
884                     void (*done)(void* done_arg, grpc_cq_completion* storage),
885                     void* done_arg, grpc_cq_completion* storage,
886                     bool internal) {
887   cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
888 }
889 
890 struct cq_is_finished_arg {
891   gpr_atm last_seen_things_queued_ever;
892   grpc_completion_queue* cq;
893   grpc_core::Timestamp deadline;
894   grpc_cq_completion* stolen_completion;
895   void* tag;  // for pluck
896   bool first_loop;
897 };
898 class ExecCtxNext : public grpc_core::ExecCtx {
899  public:
900   explicit ExecCtxNext(void* arg)
901       : ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqNext")),
902         check_ready_to_finish_arg_(arg) {}
903 
904   bool CheckReadyToFinish() override {
905     cq_is_finished_arg* a =
906         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
907     grpc_completion_queue* cq = a->cq;
908     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
909     CHECK_EQ(a->stolen_completion, nullptr);
910 
911     intptr_t current_last_seen_things_queued_ever =
912         cqd->things_queued_ever.load(std::memory_order_relaxed);
913 
914     if (current_last_seen_things_queued_ever !=
915         a->last_seen_things_queued_ever) {
916       a->last_seen_things_queued_ever =
917           cqd->things_queued_ever.load(std::memory_order_relaxed);
918 
919       // Pop a cq_completion from the queue. Returns NULL if the queue is empty
920       // might return NULL in some cases even if the queue is not empty; but
921       // that
922       // is ok and doesn't affect correctness. Might effect the tail latencies a
923       // bit)
924       a->stolen_completion = cqd->queue.Pop();
925       if (a->stolen_completion != nullptr) {
926         return true;
927       }
928     }
929     return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
930   }
931 
932  private:
933   void* check_ready_to_finish_arg_;
934 };
935 
936 #ifndef NDEBUG
937 static void dump_pending_tags(grpc_completion_queue* cq) {
938   if (!GRPC_TRACE_FLAG_ENABLED(pending_tags)) return;
939   std::vector<std::string> parts;
940   parts.push_back("PENDING TAGS:");
941   gpr_mu_lock(cq->mu);
942   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
943     parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
944   }
945   gpr_mu_unlock(cq->mu);
946   VLOG(2) << absl::StrJoin(parts, "");
947 }
948 #else
949 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
950 #endif
951 
952 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
953                           void* reserved) {
954   grpc_event ret;
955   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
956 
957   GRPC_TRACE_LOG(api, INFO)
958       << "grpc_completion_queue_next(cq=" << cq
959       << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
960       << ", tv_nsec: " << deadline.tv_nsec
961       << ", clock_type: " << (int)deadline.clock_type
962       << " }, reserved=" << reserved << ")";
963   CHECK(!reserved);
964 
965   dump_pending_tags(cq);
966 
967   GRPC_CQ_INTERNAL_REF(cq, "next");
968 
969   grpc_core::Timestamp deadline_millis =
970       grpc_core::Timestamp::FromTimespecRoundUp(deadline);
971   cq_is_finished_arg is_finished_arg = {
972       cqd->things_queued_ever.load(std::memory_order_relaxed),
973       cq,
974       deadline_millis,
975       nullptr,
976       nullptr,
977       true};
978   ExecCtxNext exec_ctx(&is_finished_arg);
979   for (;;) {
980     grpc_core::Timestamp iteration_deadline = deadline_millis;
981 
982     if (is_finished_arg.stolen_completion != nullptr) {
983       grpc_cq_completion* c = is_finished_arg.stolen_completion;
984       is_finished_arg.stolen_completion = nullptr;
985       ret.type = GRPC_OP_COMPLETE;
986       ret.success = c->next & 1u;
987       ret.tag = c->tag;
988       c->done(c->done_arg, c);
989       break;
990     }
991 
992     grpc_cq_completion* c = cqd->queue.Pop();
993 
994     if (c != nullptr) {
995       ret.type = GRPC_OP_COMPLETE;
996       ret.success = c->next & 1u;
997       ret.tag = c->tag;
998       c->done(c->done_arg, c);
999       break;
1000     } else {
1001       // If c == NULL it means either the queue is empty OR in an transient
1002       // inconsistent state. If it is the latter, we should do a 0-timeout poll
1003       // so that the thread comes back quickly from poll to make a second
1004       // attempt at popping. Not doing this can potentially deadlock this
1005       // thread forever (if the deadline is infinity)
1006       if (cqd->queue.num_items() > 0) {
1007         iteration_deadline = grpc_core::Timestamp::ProcessEpoch();
1008       }
1009     }
1010 
1011     if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1012       // Before returning, check if the queue has any items left over (since
1013       // MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1014       // even if the queue is not empty. If so, keep retrying but do not
1015       // return GRPC_QUEUE_SHUTDOWN
1016       if (cqd->queue.num_items() > 0) {
1017         // Go to the beginning of the loop. No point doing a poll because
1018         // (cq->shutdown == true) is only possible when there is no pending
1019         // work (i.e cq->pending_events == 0) and any outstanding completion
1020         // events should have already been queued on this cq
1021         continue;
1022       }
1023 
1024       ret.type = GRPC_QUEUE_SHUTDOWN;
1025       ret.success = 0;
1026       break;
1027     }
1028 
1029     if (!is_finished_arg.first_loop &&
1030         grpc_core::Timestamp::Now() >= deadline_millis) {
1031       ret.type = GRPC_QUEUE_TIMEOUT;
1032       ret.success = 0;
1033       dump_pending_tags(cq);
1034       break;
1035     }
1036 
1037     // The main polling work happens in grpc_pollset_work
1038     gpr_mu_lock(cq->mu);
1039     cq->num_polls++;
1040     grpc_error_handle err = cq->poller_vtable->work(
1041         POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
1042     gpr_mu_unlock(cq->mu);
1043 
1044     if (!err.ok()) {
1045       LOG(ERROR) << "Completion queue next failed: "
1046                  << grpc_core::StatusToString(err);
1047       if (err == absl::CancelledError()) {
1048         ret.type = GRPC_QUEUE_SHUTDOWN;
1049       } else {
1050         ret.type = GRPC_QUEUE_TIMEOUT;
1051       }
1052       ret.success = 0;
1053       dump_pending_tags(cq);
1054       break;
1055     }
1056     is_finished_arg.first_loop = false;
1057   }
1058 
1059   if (cqd->queue.num_items() > 0 &&
1060       cqd->pending_events.load(std::memory_order_acquire) > 0) {
1061     gpr_mu_lock(cq->mu);
1062     (void)cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1063     gpr_mu_unlock(cq->mu);
1064   }
1065 
1066   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1067   GRPC_CQ_INTERNAL_UNREF(cq, "next");
1068 
1069   CHECK_EQ(is_finished_arg.stolen_completion, nullptr);
1070 
1071   return ret;
1072 }
1073 
1074 // Finishes the completion queue shutdown. This means that there are no more
1075 // completion events / tags expected from the completion queue
1076 // - Must be called under completion queue lock
1077 // - Must be called only once in completion queue's lifetime
1078 // - grpc_completion_queue_shutdown() MUST have been called before calling
1079 // this function
1080 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1081   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1082 
1083   CHECK(cqd->shutdown_called);
1084   CHECK_EQ(cqd->pending_events.load(std::memory_order_relaxed), 0);
1085 
1086   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1087 }
1088 
1089 static void cq_shutdown_next(grpc_completion_queue* cq) {
1090   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1091 
1092   // Need an extra ref for cq here because:
1093   // We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1094   // Pollset shutdown decrements the cq ref count which can potentially destroy
1095   // the cq (if that happens to be the last ref).
1096   // Creating an extra ref here prevents the cq from getting destroyed while
1097   // this function is still active
1098   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1099   gpr_mu_lock(cq->mu);
1100   if (cqd->shutdown_called) {
1101     gpr_mu_unlock(cq->mu);
1102     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1103     return;
1104   }
1105   cqd->shutdown_called = true;
1106   // Doing acq/release fetch_sub here to match with
1107   // cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1108   // on this counter without necessarily holding a lock on cq
1109   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1110     cq_finish_shutdown_next(cq);
1111   }
1112   gpr_mu_unlock(cq->mu);
1113   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1114 }
1115 
1116 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1117                                       gpr_timespec deadline, void* reserved) {
1118   return cq->vtable->next(cq, deadline, reserved);
1119 }
1120 
1121 static int add_plucker(grpc_completion_queue* cq, void* tag,
1122                        grpc_pollset_worker** worker) {
1123   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1124   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1125     return 0;
1126   }
1127   cqd->pluckers[cqd->num_pluckers].tag = tag;
1128   cqd->pluckers[cqd->num_pluckers].worker = worker;
1129   cqd->num_pluckers++;
1130   return 1;
1131 }
1132 
1133 static void del_plucker(grpc_completion_queue* cq, void* tag,
1134                         grpc_pollset_worker** worker) {
1135   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1136   for (int i = 0; i < cqd->num_pluckers; i++) {
1137     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1138       cqd->num_pluckers--;
1139       std::swap(cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1140       return;
1141     }
1142   }
1143   GPR_UNREACHABLE_CODE(return);
1144 }
1145 
1146 class ExecCtxPluck : public grpc_core::ExecCtx {
1147  public:
1148   explicit ExecCtxPluck(void* arg)
1149       : ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqPluck")),
1150         check_ready_to_finish_arg_(arg) {}
1151 
1152   bool CheckReadyToFinish() override {
1153     cq_is_finished_arg* a =
1154         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1155     grpc_completion_queue* cq = a->cq;
1156     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1157 
1158     CHECK_EQ(a->stolen_completion, nullptr);
1159     gpr_atm current_last_seen_things_queued_ever =
1160         cqd->things_queued_ever.load(std::memory_order_relaxed);
1161     if (current_last_seen_things_queued_ever !=
1162         a->last_seen_things_queued_ever) {
1163       gpr_mu_lock(cq->mu);
1164       a->last_seen_things_queued_ever =
1165           cqd->things_queued_ever.load(std::memory_order_relaxed);
1166       grpc_cq_completion* c;
1167       grpc_cq_completion* prev = &cqd->completed_head;
1168       while ((c = reinterpret_cast<grpc_cq_completion*>(
1169                   prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1170         if (c->tag == a->tag) {
1171           prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1172           if (c == cqd->completed_tail) {
1173             cqd->completed_tail = prev;
1174           }
1175           gpr_mu_unlock(cq->mu);
1176           a->stolen_completion = c;
1177           return true;
1178         }
1179         prev = c;
1180       }
1181       gpr_mu_unlock(cq->mu);
1182     }
1183     return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
1184   }
1185 
1186  private:
1187   void* check_ready_to_finish_arg_;
1188 };
1189 
1190 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1191                            gpr_timespec deadline, void* reserved) {
1192   grpc_event ret;
1193   grpc_cq_completion* c;
1194   grpc_cq_completion* prev;
1195   grpc_pollset_worker* worker = nullptr;
1196   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1197 
1198   if (GRPC_TRACE_FLAG_ENABLED(queue_pluck)) {
1199     GRPC_TRACE_LOG(api, INFO)
1200         << "grpc_completion_queue_pluck(cq=" << cq << ", tag=" << tag
1201         << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
1202         << ", tv_nsec: " << deadline.tv_nsec
1203         << ", clock_type: " << (int)deadline.clock_type
1204         << " }, reserved=" << reserved << ")";
1205   }
1206   CHECK(!reserved);
1207 
1208   dump_pending_tags(cq);
1209 
1210   GRPC_CQ_INTERNAL_REF(cq, "pluck");
1211   gpr_mu_lock(cq->mu);
1212   grpc_core::Timestamp deadline_millis =
1213       grpc_core::Timestamp::FromTimespecRoundUp(deadline);
1214   cq_is_finished_arg is_finished_arg = {
1215       cqd->things_queued_ever.load(std::memory_order_relaxed),
1216       cq,
1217       deadline_millis,
1218       nullptr,
1219       tag,
1220       true};
1221   ExecCtxPluck exec_ctx(&is_finished_arg);
1222   for (;;) {
1223     if (is_finished_arg.stolen_completion != nullptr) {
1224       gpr_mu_unlock(cq->mu);
1225       c = is_finished_arg.stolen_completion;
1226       is_finished_arg.stolen_completion = nullptr;
1227       ret.type = GRPC_OP_COMPLETE;
1228       ret.success = c->next & 1u;
1229       ret.tag = c->tag;
1230       c->done(c->done_arg, c);
1231       break;
1232     }
1233     prev = &cqd->completed_head;
1234     while ((c = reinterpret_cast<grpc_cq_completion*>(
1235                 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1236       if (GPR_LIKELY(c->tag == tag)) {
1237         prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1238         if (c == cqd->completed_tail) {
1239           cqd->completed_tail = prev;
1240         }
1241         gpr_mu_unlock(cq->mu);
1242         ret.type = GRPC_OP_COMPLETE;
1243         ret.success = c->next & 1u;
1244         ret.tag = c->tag;
1245         c->done(c->done_arg, c);
1246         goto done;
1247       }
1248       prev = c;
1249     }
1250     if (cqd->shutdown.load(std::memory_order_relaxed)) {
1251       gpr_mu_unlock(cq->mu);
1252       ret.type = GRPC_QUEUE_SHUTDOWN;
1253       ret.success = 0;
1254       break;
1255     }
1256     if (!add_plucker(cq, tag, &worker)) {
1257       VLOG(2) << "Too many outstanding grpc_completion_queue_pluck calls: "
1258                  "maximum is "
1259               << GRPC_MAX_COMPLETION_QUEUE_PLUCKERS;
1260       gpr_mu_unlock(cq->mu);
1261       // TODO(ctiller): should we use a different result here
1262       ret.type = GRPC_QUEUE_TIMEOUT;
1263       ret.success = 0;
1264       dump_pending_tags(cq);
1265       break;
1266     }
1267     if (!is_finished_arg.first_loop &&
1268         grpc_core::Timestamp::Now() >= deadline_millis) {
1269       del_plucker(cq, tag, &worker);
1270       gpr_mu_unlock(cq->mu);
1271       ret.type = GRPC_QUEUE_TIMEOUT;
1272       ret.success = 0;
1273       dump_pending_tags(cq);
1274       break;
1275     }
1276     cq->num_polls++;
1277     grpc_error_handle err =
1278         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1279     if (!err.ok()) {
1280       del_plucker(cq, tag, &worker);
1281       gpr_mu_unlock(cq->mu);
1282       LOG(ERROR) << "Completion queue pluck failed: "
1283                  << grpc_core::StatusToString(err);
1284       ret.type = GRPC_QUEUE_TIMEOUT;
1285       ret.success = 0;
1286       dump_pending_tags(cq);
1287       break;
1288     }
1289     is_finished_arg.first_loop = false;
1290     del_plucker(cq, tag, &worker);
1291   }
1292 done:
1293   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1294   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1295 
1296   CHECK_EQ(is_finished_arg.stolen_completion, nullptr);
1297 
1298   return ret;
1299 }
1300 
1301 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1302                                        gpr_timespec deadline, void* reserved) {
1303   return cq->vtable->pluck(cq, tag, deadline, reserved);
1304 }
1305 
1306 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1307   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1308 
1309   CHECK(cqd->shutdown_called);
1310   CHECK(!cqd->shutdown.load(std::memory_order_relaxed));
1311   cqd->shutdown.store(true, std::memory_order_relaxed);
1312 
1313   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1314 }
1315 
1316 // NOTE: This function is almost exactly identical to cq_shutdown_next() but
1317 // merging them is a bit tricky and probably not worth it
1318 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1319   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1320 
1321   // Need an extra ref for cq here because:
1322   // We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1323   // Pollset shutdown decrements the cq ref count which can potentially destroy
1324   // the cq (if that happens to be the last ref).
1325   // Creating an extra ref here prevents the cq from getting destroyed while
1326   // this function is still active
1327   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1328   gpr_mu_lock(cq->mu);
1329   if (cqd->shutdown_called) {
1330     gpr_mu_unlock(cq->mu);
1331     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1332     return;
1333   }
1334   cqd->shutdown_called = true;
1335   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1336     cq_finish_shutdown_pluck(cq);
1337   }
1338   gpr_mu_unlock(cq->mu);
1339   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1340 }
1341 
1342 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1343   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1344   auto* callback = cqd->shutdown_callback;
1345 
1346   CHECK(cqd->shutdown_called);
1347 
1348   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1349 
1350   if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
1351     cqd->event_engine->Run([engine = cqd->event_engine, callback]() {
1352       grpc_core::ExecCtx exec_ctx;
1353       callback->functor_run(callback, /*true=*/1);
1354     });
1355     return;
1356   }
1357   if (grpc_iomgr_is_any_background_poller_thread()) {
1358     grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1359     return;
1360   }
1361 
1362   // Schedule the callback on a closure if not internal or triggered
1363   // from a background poller thread.
1364   grpc_core::Executor::Run(
1365       GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1366       absl::OkStatus());
1367 }
1368 
1369 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1370   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1371 
1372   // Need an extra ref for cq here because:
1373   // We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1374   // Pollset shutdown decrements the cq ref count which can potentially destroy
1375   // the cq (if that happens to be the last ref).
1376   // Creating an extra ref here prevents the cq from getting destroyed while
1377   // this function is still active
1378   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1379   gpr_mu_lock(cq->mu);
1380   if (cqd->shutdown_called) {
1381     gpr_mu_unlock(cq->mu);
1382     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1383     return;
1384   }
1385   cqd->shutdown_called = true;
1386   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1387     gpr_mu_unlock(cq->mu);
1388     cq_finish_shutdown_callback(cq);
1389   } else {
1390     gpr_mu_unlock(cq->mu);
1391   }
1392   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1393 }
1394 
1395 // Shutdown simply drops a ref that we reserved at creation time; if we drop
1396 // to zero here, then enter shutdown mode and wake up any waiters
1397 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1398   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1399   grpc_core::ExecCtx exec_ctx;
1400   GRPC_TRACE_LOG(api, INFO)
1401       << "grpc_completion_queue_shutdown(cq=" << cq << ")";
1402   cq->vtable->shutdown(cq);
1403 }
1404 
1405 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1406   GRPC_TRACE_LOG(api, INFO) << "grpc_completion_queue_destroy(cq=" << cq << ")";
1407   grpc_completion_queue_shutdown(cq);
1408 
1409   grpc_core::ExecCtx exec_ctx;
1410   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1411 }
1412 
1413 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1414   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1415 }
1416 
1417 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1418   return cq->poller_vtable->can_listen;
1419 }
1420