• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/surface/completion_queue.h"
21 
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25 
26 #include <vector>
27 
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_join.h"
30 
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/debug/stats.h"
38 #include "src/core/lib/gpr/spinlock.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gpr/tls.h"
41 #include "src/core/lib/gprpp/atomic.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/executor.h"
44 #include "src/core/lib/iomgr/pollset.h"
45 #include "src/core/lib/iomgr/timer.h"
46 #include "src/core/lib/profiling/timers.h"
47 #include "src/core/lib/surface/api_trace.h"
48 #include "src/core/lib/surface/call.h"
49 #include "src/core/lib/surface/event_string.h"
50 
51 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
52 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
53 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
54 
55 namespace {
56 
57 // Specifies a cq thread local cache.
58 // The first event that occurs on a thread
59 // with a cq cache will go into that cache, and
60 // will only be returned on the thread that initialized the cache.
61 // NOTE: Only one event will ever be cached.
62 GPR_TLS_DECL(g_cached_event);
63 GPR_TLS_DECL(g_cached_cq);
64 
65 struct plucker {
66   grpc_pollset_worker** worker;
67   void* tag;
68 };
69 struct cq_poller_vtable {
70   bool can_get_pollset;
71   bool can_listen;
72   size_t (*size)(void);
73   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
74   grpc_error* (*kick)(grpc_pollset* pollset,
75                       grpc_pollset_worker* specific_worker);
76   grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
77                       grpc_millis deadline);
78   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
79   void (*destroy)(grpc_pollset* pollset);
80 };
81 typedef struct non_polling_worker {
82   gpr_cv cv;
83   bool kicked;
84   struct non_polling_worker* next;
85   struct non_polling_worker* prev;
86 } non_polling_worker;
87 
88 struct non_polling_poller {
89   gpr_mu mu;
90   bool kicked_without_poller;
91   non_polling_worker* root;
92   grpc_closure* shutdown;
93 };
non_polling_poller_size(void)94 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
95 
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)96 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
97   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
98   gpr_mu_init(&npp->mu);
99   *mu = &npp->mu;
100 }
101 
non_polling_poller_destroy(grpc_pollset * pollset)102 void non_polling_poller_destroy(grpc_pollset* pollset) {
103   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
104   gpr_mu_destroy(&npp->mu);
105 }
106 
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)107 grpc_error* non_polling_poller_work(grpc_pollset* pollset,
108                                     grpc_pollset_worker** worker,
109                                     grpc_millis deadline) {
110   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
111   if (npp->shutdown) return GRPC_ERROR_NONE;
112   if (npp->kicked_without_poller) {
113     npp->kicked_without_poller = false;
114     return GRPC_ERROR_NONE;
115   }
116   non_polling_worker w;
117   gpr_cv_init(&w.cv);
118   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
119   if (npp->root == nullptr) {
120     npp->root = w.next = w.prev = &w;
121   } else {
122     w.next = npp->root;
123     w.prev = w.next->prev;
124     w.next->prev = w.prev->next = &w;
125   }
126   w.kicked = false;
127   gpr_timespec deadline_ts =
128       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
129   while (!npp->shutdown && !w.kicked &&
130          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
131     ;
132   grpc_core::ExecCtx::Get()->InvalidateNow();
133   if (&w == npp->root) {
134     npp->root = w.next;
135     if (&w == npp->root) {
136       if (npp->shutdown) {
137         grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE);
138       }
139       npp->root = nullptr;
140     }
141   }
142   w.next->prev = w.prev;
143   w.prev->next = w.next;
144   gpr_cv_destroy(&w.cv);
145   if (worker != nullptr) *worker = nullptr;
146   return GRPC_ERROR_NONE;
147 }
148 
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)149 grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
150                                     grpc_pollset_worker* specific_worker) {
151   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
152   if (specific_worker == nullptr)
153     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
154   if (specific_worker != nullptr) {
155     non_polling_worker* w =
156         reinterpret_cast<non_polling_worker*>(specific_worker);
157     if (!w->kicked) {
158       w->kicked = true;
159       gpr_cv_signal(&w->cv);
160     }
161   } else {
162     p->kicked_without_poller = true;
163   }
164   return GRPC_ERROR_NONE;
165 }
166 
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)167 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
168   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
169   GPR_ASSERT(closure != nullptr);
170   p->shutdown = closure;
171   if (p->root == nullptr) {
172     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
173   } else {
174     non_polling_worker* w = p->root;
175     do {
176       gpr_cv_signal(&w->cv);
177       w = w->next;
178     } while (w != p->root);
179   }
180 }
181 
182 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
183     /* GRPC_CQ_DEFAULT_POLLING */
184     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
185      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
186     /* GRPC_CQ_NON_LISTENING */
187     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
188      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
189     /* GRPC_CQ_NON_POLLING */
190     {false, false, non_polling_poller_size, non_polling_poller_init,
191      non_polling_poller_kick, non_polling_poller_work,
192      non_polling_poller_shutdown, non_polling_poller_destroy},
193 };
194 
195 }  // namespace
196 
197 struct cq_vtable {
198   grpc_cq_completion_type cq_completion_type;
199   size_t data_size;
200   void (*init)(void* data,
201                grpc_experimental_completion_queue_functor* shutdown_callback);
202   void (*shutdown)(grpc_completion_queue* cq);
203   void (*destroy)(void* data);
204   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
205   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
206                  void (*done)(void* done_arg, grpc_cq_completion* storage),
207                  void* done_arg, grpc_cq_completion* storage, bool internal);
208   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
209                      void* reserved);
210   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
211                       gpr_timespec deadline, void* reserved);
212   // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer
213   // needed.
214   grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq);
215 };
216 
217 namespace {
218 
219 /* Queue that holds the cq_completion_events. Internally uses
220  * MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
221  * queue). It uses a queue_lock to support multiple consumers.
222  * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
223 class CqEventQueue {
224  public:
225   CqEventQueue() = default;
226   ~CqEventQueue() = default;
227 
228   /* Note: The counter is not incremented/decremented atomically with push/pop.
229    * The count is only eventually consistent */
num_items() const230   intptr_t num_items() const {
231     return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
232   }
233 
234   bool Push(grpc_cq_completion* c);
235   grpc_cq_completion* Pop();
236 
237  private:
238   /* Spinlock to serialize consumers i.e pop() operations */
239   gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
240 
241   grpc_core::MultiProducerSingleConsumerQueue queue_;
242 
243   /* A lazy counter of number of items in the queue. This is NOT atomically
244      incremented/decremented along with push/pop operations and hence is only
245      eventually consistent */
246   grpc_core::Atomic<intptr_t> num_queue_items_{0};
247 };
248 
249 struct cq_next_data {
~cq_next_data__anon2868c5d30211::cq_next_data250   ~cq_next_data() {
251     GPR_ASSERT(queue.num_items() == 0);
252 #ifndef NDEBUG
253     if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
254       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
255     }
256 #endif
257   }
258 
259   /** Completed events for completion-queues of type GRPC_CQ_NEXT */
260   CqEventQueue queue;
261 
262   /** Counter of how many things have ever been queued on this completion queue
263       useful for avoiding locks to check the queue */
264   grpc_core::Atomic<intptr_t> things_queued_ever{0};
265 
266   /** Number of outstanding events (+1 if not shut down)
267       Initial count is dropped by grpc_completion_queue_shutdown */
268   grpc_core::Atomic<intptr_t> pending_events{1};
269 
270   /** 0 initially. 1 once we initiated shutdown */
271   bool shutdown_called = false;
272 };
273 
274 struct cq_pluck_data {
cq_pluck_data__anon2868c5d30211::cq_pluck_data275   cq_pluck_data() {
276     completed_tail = &completed_head;
277     completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
278   }
279 
~cq_pluck_data__anon2868c5d30211::cq_pluck_data280   ~cq_pluck_data() {
281     GPR_ASSERT(completed_head.next ==
282                reinterpret_cast<uintptr_t>(&completed_head));
283 #ifndef NDEBUG
284     if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
285       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
286     }
287 #endif
288   }
289 
290   /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
291   grpc_cq_completion completed_head;
292   grpc_cq_completion* completed_tail;
293 
294   /** Number of pending events (+1 if we're not shutdown).
295       Initial count is dropped by grpc_completion_queue_shutdown. */
296   grpc_core::Atomic<intptr_t> pending_events{1};
297 
298   /** Counter of how many things have ever been queued on this completion queue
299       useful for avoiding locks to check the queue */
300   grpc_core::Atomic<intptr_t> things_queued_ever{0};
301 
302   /** 0 initially. 1 once we completed shutting */
303   /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
304    * (pending_events == 0). So consider removing this in future and use
305    * pending_events */
306   grpc_core::Atomic<bool> shutdown{false};
307 
308   /** 0 initially. 1 once we initiated shutdown */
309   bool shutdown_called = false;
310 
311   int num_pluckers = 0;
312   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
313 };
314 
315 struct cq_callback_data {
cq_callback_data__anon2868c5d30211::cq_callback_data316   explicit cq_callback_data(
317       grpc_experimental_completion_queue_functor* shutdown_callback)
318       : shutdown_callback(shutdown_callback) {}
319 
~cq_callback_data__anon2868c5d30211::cq_callback_data320   ~cq_callback_data() {
321 #ifndef NDEBUG
322     if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
323       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
324     }
325 #endif
326   }
327 
328   /** No actual completed events queue, unlike other types */
329 
330   /** Number of pending events (+1 if we're not shutdown).
331       Initial count is dropped by grpc_completion_queue_shutdown. */
332   grpc_core::Atomic<intptr_t> pending_events{1};
333 
334   /** 0 initially. 1 once we initiated shutdown */
335   bool shutdown_called = false;
336 
337   /** A callback that gets invoked when the CQ completes shutdown */
338   grpc_experimental_completion_queue_functor* shutdown_callback;
339 };
340 
341 // TODO(vjpai): Remove all callback_alternative variants when event manager is
342 // the only supported poller.
343 struct cq_callback_alternative_data {
cq_callback_alternative_data__anon2868c5d30211::cq_callback_alternative_data344   explicit cq_callback_alternative_data(
345       grpc_experimental_completion_queue_functor* shutdown_callback)
346       : implementation(SharedNextableCQ()),
347         shutdown_callback(shutdown_callback) {}
348 
349   /* This just points to a single shared nextable CQ */
350   grpc_completion_queue* const implementation;
351 
352   /** Number of outstanding events (+1 if not shut down)
353       Initial count is dropped by grpc_completion_queue_shutdown */
354   grpc_core::Atomic<intptr_t> pending_events{1};
355 
356   /** 0 initially. 1 once we initiated shutdown */
357   bool shutdown_called = false;
358 
359   /** A callback that gets invoked when the CQ completes shutdown */
360   grpc_experimental_completion_queue_functor* shutdown_callback;
361 
SharedNextableCQ__anon2868c5d30211::cq_callback_alternative_data362   static grpc_completion_queue* SharedNextableCQ() {
363     grpc_core::MutexLock lock(&*shared_cq_next_mu);
364 
365     if (shared_cq_next == nullptr) {
366       shared_cq_next = grpc_completion_queue_create_for_next(nullptr);
367       int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 1, 32);
368       threads_remaining.Store(num_nexting_threads,
369                               grpc_core::MemoryOrder::RELEASE);
370       for (int i = 0; i < num_nexting_threads; i++) {
371         grpc_core::Executor::Run(
372             GRPC_CLOSURE_CREATE(
373                 [](void* arg, grpc_error* /*error*/) {
374                   grpc_completion_queue* cq =
375                       static_cast<grpc_completion_queue*>(arg);
376                   while (true) {
377                     grpc_event event = grpc_completion_queue_next(
378                         cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
379                     if (event.type == GRPC_QUEUE_SHUTDOWN) {
380                       break;
381                     }
382                     GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE);
383                     // We can always execute the callback inline rather than
384                     // pushing it to another Executor thread because this
385                     // thread is definitely running on an executor, does not
386                     // hold any application locks before executing the callback,
387                     // and cannot be entered recursively.
388                     auto* functor = static_cast<
389                         grpc_experimental_completion_queue_functor*>(event.tag);
390                     functor->functor_run(functor, event.success);
391                   }
392                   if (threads_remaining.FetchSub(
393                           1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
394                     grpc_completion_queue_destroy(cq);
395                   }
396                 },
397                 shared_cq_next, nullptr),
398             GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
399             grpc_core::ExecutorJobType::LONG);
400       }
401     }
402     return shared_cq_next;
403   }
404   // Use manually-constructed Mutex to avoid static construction issues
405   static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu;
406   static grpc_completion_queue*
407       shared_cq_next;  // GUARDED_BY(shared_cq_next_mu)
408   static grpc_core::Atomic<int> threads_remaining;
409 };
410 
411 grpc_core::ManualConstructor<grpc_core::Mutex>
412     cq_callback_alternative_data::shared_cq_next_mu;
413 grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr;
414 grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0};
415 
416 }  // namespace
417 
418 /* Completion queue structure */
419 struct grpc_completion_queue {
420   /** Once owning_refs drops to zero, we will destroy the cq */
421   grpc_core::RefCount owning_refs;
422 
423   gpr_mu* mu;
424 
425   const cq_vtable* vtable;
426   const cq_poller_vtable* poller_vtable;
427 
428   // The pollset entry is allowed to enable proxy CQs like the
429   // callback_alternative.
430   // TODO(vjpai): Consider removing pollset and reverting to previous
431   // calculation of pollset once callback_alternative is no longer needed.
432   grpc_pollset* pollset;
433 
434 #ifndef NDEBUG
435   void** outstanding_tags;
436   size_t outstanding_tag_count;
437   size_t outstanding_tag_capacity;
438 #endif
439 
440   grpc_closure pollset_shutdown_done;
441   int num_polls;
442 };
443 
444 /* Forward declarations */
445 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
446 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
447 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
448 static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq);
449 static void cq_shutdown_next(grpc_completion_queue* cq);
450 static void cq_shutdown_pluck(grpc_completion_queue* cq);
451 static void cq_shutdown_callback(grpc_completion_queue* cq);
452 static void cq_shutdown_callback_alternative(grpc_completion_queue* cq);
453 
454 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
455 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
456 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
457 static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
458                                                  void* tag);
459 
460 // A cq_end_op function is called when an operation on a given CQ with
461 // a given tag has completed. The storage argument is a reference to the
462 // space reserved for this completion as it is placed into the corresponding
463 // queue. The done argument is a callback that will be invoked when it is
464 // safe to free up that storage. The storage MUST NOT be freed until the
465 // done callback is invoked.
466 static void cq_end_op_for_next(
467     grpc_completion_queue* cq, void* tag, grpc_error* error,
468     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
469     grpc_cq_completion* storage, bool internal);
470 
471 static void cq_end_op_for_pluck(
472     grpc_completion_queue* cq, void* tag, grpc_error* error,
473     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
474     grpc_cq_completion* storage, bool internal);
475 
476 static void cq_end_op_for_callback(
477     grpc_completion_queue* cq, void* tag, grpc_error* error,
478     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
479     grpc_cq_completion* storage, bool internal);
480 
481 static void cq_end_op_for_callback_alternative(
482     grpc_completion_queue* cq, void* tag, grpc_error* error,
483     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
484     grpc_cq_completion* storage, bool internal);
485 
486 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
487                           void* reserved);
488 
489 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
490                            gpr_timespec deadline, void* reserved);
491 
492 static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
493     grpc_completion_queue* cq);
494 
495 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
496 static void cq_init_next(
497     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
498 static void cq_init_pluck(
499     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
500 static void cq_init_callback(
501     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
502 // poller becomes only option.
503 static void cq_init_callback_alternative(
504     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
505 static void cq_destroy_next(void* data);
506 static void cq_destroy_pluck(void* data);
507 static void cq_destroy_callback(void* data);
508 static void cq_destroy_callback_alternative(void* data);
509 
510 /* Completion queue vtables based on the completion-type */
511 // TODO(vjpai): Make this const again once we stop needing callback_alternative
512 static cq_vtable g_polling_cq_vtable[] = {
513     /* GRPC_CQ_NEXT */
514     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
515      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
516      nullptr, nullptr},
517     /* GRPC_CQ_PLUCK */
518     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
519      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
520      cq_pluck, nullptr},
521     /* GRPC_CQ_CALLBACK */
522     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
523      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
524      cq_end_op_for_callback, nullptr, nullptr, nullptr},
525 };
526 
527 // Separate vtable for non-polling cqs, assign at init
528 static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) /
529                                         sizeof(g_polling_cq_vtable[0])];
530 
531 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
532 #define INLINE_POLLSET_FROM_CQ(cq) \
533   ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
534 #define POLLSET_FROM_CQ(cq) (cq->pollset)
535 
536 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
537 
538 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)     \
539   do {                                                   \
540     if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) &&       \
541         (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
542          (event)->type != GRPC_QUEUE_TIMEOUT)) {         \
543       gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq,      \
544               grpc_event_string(event).c_str());         \
545     }                                                    \
546   } while (0)
547 
548 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
549 
grpc_cq_global_init()550 void grpc_cq_global_init() {
551   gpr_tls_init(&g_cached_event);
552   gpr_tls_init(&g_cached_cq);
553   g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT];
554   g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK];
555   g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] =
556       g_polling_cq_vtable[GRPC_CQ_CALLBACK];
557 }
558 
559 // TODO(vjpai): Remove when callback_alternative is no longer needed
grpc_cq_init()560 void grpc_cq_init() {
561   // If the iomgr runs in the background, we can use the preferred callback CQ.
562   // If the iomgr is non-polling, we cannot use the alternative callback CQ.
563   if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
564     cq_callback_alternative_data::shared_cq_next_mu.Init();
565     g_polling_cq_vtable[GRPC_CQ_CALLBACK] = {
566         GRPC_CQ_CALLBACK,
567         sizeof(cq_callback_alternative_data),
568         cq_init_callback_alternative,
569         cq_shutdown_callback_alternative,
570         cq_destroy_callback_alternative,
571         cq_begin_op_for_callback_alternative,
572         cq_end_op_for_callback_alternative,
573         nullptr,
574         nullptr,
575         cq_proxy_pollset_for_callback_alternative};
576   }
577 }
578 
579 // TODO(vjpai): Remove when callback_alternative is no longer needed
grpc_cq_shutdown()580 void grpc_cq_shutdown() {
581   if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
582     {
583       grpc_core::MutexLock lock(
584           &*cq_callback_alternative_data::shared_cq_next_mu);
585       if (cq_callback_alternative_data::shared_cq_next != nullptr) {
586         grpc_completion_queue_shutdown(
587             cq_callback_alternative_data::shared_cq_next);
588       }
589       cq_callback_alternative_data::shared_cq_next = nullptr;
590     }
591     cq_callback_alternative_data::shared_cq_next_mu.Destroy();
592   }
593 }
594 
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)595 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
596   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
597     gpr_tls_set(&g_cached_event, (intptr_t)0);
598     gpr_tls_set(&g_cached_cq, (intptr_t)cq);
599   }
600 }
601 
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)602 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
603                                                    void** tag, int* ok) {
604   grpc_cq_completion* storage =
605       (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
606   int ret = 0;
607   if (storage != nullptr &&
608       (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
609     *tag = storage->tag;
610     grpc_core::ExecCtx exec_ctx;
611     *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
612     storage->done(storage->done_arg, storage);
613     ret = 1;
614     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
615     if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
616       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
617       gpr_mu_lock(cq->mu);
618       cq_finish_shutdown_next(cq);
619       gpr_mu_unlock(cq->mu);
620       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
621     }
622   }
623   gpr_tls_set(&g_cached_event, (intptr_t)0);
624   gpr_tls_set(&g_cached_cq, (intptr_t)0);
625 
626   return ret;
627 }
628 
Push(grpc_cq_completion * c)629 bool CqEventQueue::Push(grpc_cq_completion* c) {
630   queue_.Push(
631       reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
632   return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
633 }
634 
Pop()635 grpc_cq_completion* CqEventQueue::Pop() {
636   grpc_cq_completion* c = nullptr;
637 
638   if (gpr_spinlock_trylock(&queue_lock_)) {
639     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
640 
641     bool is_empty = false;
642     c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
643     gpr_spinlock_unlock(&queue_lock_);
644 
645     if (c == nullptr && !is_empty) {
646       GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
647     }
648   } else {
649     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
650   }
651 
652   if (c) {
653     num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
654   }
655 
656   return c;
657 }
658 
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_experimental_completion_queue_functor * shutdown_callback)659 grpc_completion_queue* grpc_completion_queue_create_internal(
660     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
661     grpc_experimental_completion_queue_functor* shutdown_callback) {
662   GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
663 
664   grpc_completion_queue* cq;
665 
666   GRPC_API_TRACE(
667       "grpc_completion_queue_create_internal(completion_type=%d, "
668       "polling_type=%d)",
669       2, (completion_type, polling_type));
670 
671   const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING)
672                                 ? &g_nonpolling_cq_vtable[completion_type]
673                                 : &g_polling_cq_vtable[completion_type];
674   const cq_poller_vtable* poller_vtable =
675       &g_poller_vtable_by_poller_type[polling_type];
676 
677   grpc_core::ExecCtx exec_ctx;
678   GRPC_STATS_INC_CQS_CREATED();
679 
680   cq = static_cast<grpc_completion_queue*>(
681       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
682                  poller_vtable->size()));
683 
684   cq->vtable = vtable;
685   cq->poller_vtable = poller_vtable;
686 
687   /* One for destroy(), one for pollset_shutdown */
688   new (&cq->owning_refs) grpc_core::RefCount(2);
689 
690   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
691 
692   // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can
693   // be removed and the nullptr proxy_pollset value below can be the definition
694   // of POLLSET_FROM_CQ.
695   cq->pollset = cq->vtable->proxy_pollset == nullptr
696                     ? INLINE_POLLSET_FROM_CQ(cq)
697                     : cq->vtable->proxy_pollset(cq);
698   // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be
699   // init'ed in its CQ init.
700   cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu);
701 
702   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
703                     grpc_schedule_on_exec_ctx);
704   return cq;
705 }
706 
cq_init_next(void * data,grpc_experimental_completion_queue_functor *)707 static void cq_init_next(
708     void* data,
709     grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
710   new (data) cq_next_data();
711 }
712 
cq_destroy_next(void * data)713 static void cq_destroy_next(void* data) {
714   cq_next_data* cqd = static_cast<cq_next_data*>(data);
715   cqd->~cq_next_data();
716 }
717 
cq_init_pluck(void * data,grpc_experimental_completion_queue_functor *)718 static void cq_init_pluck(
719     void* data,
720     grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
721   new (data) cq_pluck_data();
722 }
723 
cq_destroy_pluck(void * data)724 static void cq_destroy_pluck(void* data) {
725   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
726   cqd->~cq_pluck_data();
727 }
728 
cq_init_callback(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)729 static void cq_init_callback(
730     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
731   new (data) cq_callback_data(shutdown_callback);
732 }
733 
cq_destroy_callback(void * data)734 static void cq_destroy_callback(void* data) {
735   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
736   cqd->~cq_callback_data();
737 }
738 
cq_init_callback_alternative(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)739 static void cq_init_callback_alternative(
740     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
741   new (data) cq_callback_alternative_data(shutdown_callback);
742 }
743 
cq_destroy_callback_alternative(void * data)744 static void cq_destroy_callback_alternative(void* data) {
745   cq_callback_alternative_data* cqd =
746       static_cast<cq_callback_alternative_data*>(data);
747   cqd->~cq_callback_alternative_data();
748 }
749 
grpc_get_cq_completion_type(grpc_completion_queue * cq)750 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
751   return cq->vtable->cq_completion_type;
752 }
753 
grpc_get_cq_poll_num(grpc_completion_queue * cq)754 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
755   int cur_num_polls;
756   gpr_mu_lock(cq->mu);
757   cur_num_polls = cq->num_polls;
758   gpr_mu_unlock(cq->mu);
759   return cur_num_polls;
760 }
761 
762 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)763 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
764                           const char* file, int line) {
765   grpc_core::DebugLocation debug_location(file, line);
766 #else
767 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
768   grpc_core::DebugLocation debug_location;
769   const char* reason = nullptr;
770 #endif
771   cq->owning_refs.Ref(debug_location, reason);
772 }
773 
774 static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) {
775   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
776   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
777 }
778 
779 #ifndef NDEBUG
780 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
781                             const char* file, int line) {
782   grpc_core::DebugLocation debug_location(file, line);
783 #else
784 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
785   grpc_core::DebugLocation debug_location;
786   const char* reason = nullptr;
787 #endif
788   if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
789     cq->vtable->destroy(DATA_FROM_CQ(cq));
790     // Only destroy the inlined pollset. If a proxy CQ is used, the proxy
791     // pollset will be destroyed by the proxy CQ.
792     cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq));
793 #ifndef NDEBUG
794     gpr_free(cq->outstanding_tags);
795 #endif
796     gpr_free(cq);
797   }
798 }
799 
800 #ifndef NDEBUG
801 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
802   int found = 0;
803   if (lock_cq) {
804     gpr_mu_lock(cq->mu);
805   }
806 
807   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
808     if (cq->outstanding_tags[i] == tag) {
809       cq->outstanding_tag_count--;
810       GPR_SWAP(void*, cq->outstanding_tags[i],
811                cq->outstanding_tags[cq->outstanding_tag_count]);
812       found = 1;
813       break;
814     }
815   }
816 
817   if (lock_cq) {
818     gpr_mu_unlock(cq->mu);
819   }
820 
821   GPR_ASSERT(found);
822 }
823 #else
824 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
825                          bool /*lock_cq*/) {}
826 #endif
827 
828 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
829   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
830   return cqd->pending_events.IncrementIfNonzero();
831 }
832 
833 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
834   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
835   return cqd->pending_events.IncrementIfNonzero();
836 }
837 
838 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
839   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
840   return cqd->pending_events.IncrementIfNonzero();
841 }
842 
843 static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
844                                                  void* tag) {
845   cq_callback_alternative_data* cqd =
846       static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
847   return grpc_cq_begin_op(cqd->implementation, tag) &&
848          cqd->pending_events.IncrementIfNonzero();
849 }
850 
851 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
852 #ifndef NDEBUG
853   gpr_mu_lock(cq->mu);
854   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
855     cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
856     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
857         cq->outstanding_tags,
858         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
859   }
860   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
861   gpr_mu_unlock(cq->mu);
862 #endif
863   return cq->vtable->begin_op(cq, tag);
864 }
865 
866 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
867  * completion
868  * type of GRPC_CQ_NEXT) */
869 static void cq_end_op_for_next(
870     grpc_completion_queue* cq, void* tag, grpc_error* error,
871     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
872     grpc_cq_completion* storage, bool /*internal*/) {
873   GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
874 
875   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
876       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
877        error != GRPC_ERROR_NONE)) {
878     const char* errmsg = grpc_error_string(error);
879     GRPC_API_TRACE(
880         "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
881         "done=%p, done_arg=%p, storage=%p)",
882         6, (cq, tag, errmsg, done, done_arg, storage));
883     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
884         error != GRPC_ERROR_NONE) {
885       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
886     }
887   }
888   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
889   int is_success = (error == GRPC_ERROR_NONE);
890 
891   storage->tag = tag;
892   storage->done = done;
893   storage->done_arg = done_arg;
894   storage->next = static_cast<uintptr_t>(is_success);
895 
896   cq_check_tag(cq, tag, true); /* Used in debug builds only */
897 
898   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
899       (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
900     gpr_tls_set(&g_cached_event, (intptr_t)storage);
901   } else {
902     /* Add the completion to the queue */
903     bool is_first = cqd->queue.Push(storage);
904     cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
905     /* Since we do not hold the cq lock here, it is important to do an 'acquire'
906        load here (instead of a 'no_barrier' load) to match with the release
907        store
908        (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
909        */
910     if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
911       /* Only kick if this is the first item queued */
912       if (is_first) {
913         gpr_mu_lock(cq->mu);
914         grpc_error* kick_error =
915             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
916         gpr_mu_unlock(cq->mu);
917 
918         if (kick_error != GRPC_ERROR_NONE) {
919           const char* msg = grpc_error_string(kick_error);
920           gpr_log(GPR_ERROR, "Kick failed: %s", msg);
921           GRPC_ERROR_UNREF(kick_error);
922         }
923       }
924       if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
925           1) {
926         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
927         gpr_mu_lock(cq->mu);
928         cq_finish_shutdown_next(cq);
929         gpr_mu_unlock(cq->mu);
930         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
931       }
932     } else {
933       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
934       cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
935       gpr_mu_lock(cq->mu);
936       cq_finish_shutdown_next(cq);
937       gpr_mu_unlock(cq->mu);
938       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
939     }
940   }
941 
942   GRPC_ERROR_UNREF(error);
943 }
944 
945 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
946  * completion
947  * type of GRPC_CQ_PLUCK) */
948 static void cq_end_op_for_pluck(
949     grpc_completion_queue* cq, void* tag, grpc_error* error,
950     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
951     grpc_cq_completion* storage, bool /*internal*/) {
952   GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
953 
954   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
955   int is_success = (error == GRPC_ERROR_NONE);
956 
957   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
958       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
959        error != GRPC_ERROR_NONE)) {
960     const char* errmsg = grpc_error_string(error);
961     GRPC_API_TRACE(
962         "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
963         "done=%p, done_arg=%p, storage=%p)",
964         6, (cq, tag, errmsg, done, done_arg, storage));
965     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
966         error != GRPC_ERROR_NONE) {
967       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
968     }
969   }
970 
971   storage->tag = tag;
972   storage->done = done;
973   storage->done_arg = done_arg;
974   storage->next =
975       ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
976 
977   gpr_mu_lock(cq->mu);
978   cq_check_tag(cq, tag, false); /* Used in debug builds only */
979 
980   /* Add to the list of completions */
981   cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
982   cqd->completed_tail->next =
983       ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
984   cqd->completed_tail = storage;
985 
986   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
987     cq_finish_shutdown_pluck(cq);
988     gpr_mu_unlock(cq->mu);
989   } else {
990     grpc_pollset_worker* pluck_worker = nullptr;
991     for (int i = 0; i < cqd->num_pluckers; i++) {
992       if (cqd->pluckers[i].tag == tag) {
993         pluck_worker = *cqd->pluckers[i].worker;
994         break;
995       }
996     }
997 
998     grpc_error* kick_error =
999         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
1000 
1001     gpr_mu_unlock(cq->mu);
1002 
1003     if (kick_error != GRPC_ERROR_NONE) {
1004       const char* msg = grpc_error_string(kick_error);
1005       gpr_log(GPR_ERROR, "Kick failed: %s", msg);
1006 
1007       GRPC_ERROR_UNREF(kick_error);
1008     }
1009   }
1010 
1011   GRPC_ERROR_UNREF(error);
1012 }
1013 
1014 void functor_callback(void* arg, grpc_error* error) {
1015   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
1016   functor->functor_run(functor, error == GRPC_ERROR_NONE);
1017 }
1018 
1019 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
1020 static void cq_end_op_for_callback(
1021     grpc_completion_queue* cq, void* tag, grpc_error* error,
1022     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
1023     grpc_cq_completion* storage, bool internal) {
1024   GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
1025 
1026   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1027 
1028   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
1029       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1030        error != GRPC_ERROR_NONE)) {
1031     const char* errmsg = grpc_error_string(error);
1032     GRPC_API_TRACE(
1033         "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
1034         "done=%p, done_arg=%p, storage=%p)",
1035         6, (cq, tag, errmsg, done, done_arg, storage));
1036     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1037         error != GRPC_ERROR_NONE) {
1038       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
1039     }
1040   }
1041 
1042   // The callback-based CQ isn't really a queue at all and thus has no need
1043   // for reserved storage. Invoke the done callback right away to release it.
1044   done(done_arg, storage);
1045 
1046   cq_check_tag(cq, tag, true); /* Used in debug builds only */
1047 
1048   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1049     cq_finish_shutdown_callback(cq);
1050   }
1051 
1052   // If possible, schedule the callback onto an existing thread-local
1053   // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
1054   // 1. The callback is internally-generated and there is an ACEC available
1055   // 2. The callback is marked inlineable and there is an ACEC available
1056   // 3. We are already running in a background poller thread (which always has
1057   //    an ACEC available at the base of the stack).
1058   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
1059   if (((internal || functor->inlineable) &&
1060        grpc_core::ApplicationCallbackExecCtx::Available()) ||
1061       grpc_iomgr_is_any_background_poller_thread()) {
1062     grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
1063                                                    (error == GRPC_ERROR_NONE));
1064     GRPC_ERROR_UNREF(error);
1065     return;
1066   }
1067 
1068   // Schedule the callback on a closure if not internal or triggered
1069   // from a background poller thread.
1070   grpc_core::Executor::Run(
1071       GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
1072 }
1073 
1074 static void cq_end_op_for_callback_alternative(
1075     grpc_completion_queue* cq, void* tag, grpc_error* error,
1076     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
1077     grpc_cq_completion* storage, bool internal) {
1078   GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0);
1079 
1080   cq_callback_alternative_data* cqd =
1081       static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1082 
1083   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
1084       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1085        error != GRPC_ERROR_NONE)) {
1086     const char* errmsg = grpc_error_string(error);
1087     GRPC_API_TRACE(
1088         "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, "
1089         "done=%p, done_arg=%p, storage=%p)",
1090         6, (cq, tag, errmsg, done, done_arg, storage));
1091     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1092         error != GRPC_ERROR_NONE) {
1093       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
1094     }
1095   }
1096 
1097   // Pass through the actual work to the internal nextable CQ
1098   grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage,
1099                  internal);
1100 
1101   cq_check_tag(cq, tag, true); /* Used in debug builds only */
1102 
1103   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1104     cq_finish_shutdown_callback_alternative(cq);
1105   }
1106 }
1107 
1108 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
1109                     void (*done)(void* done_arg, grpc_cq_completion* storage),
1110                     void* done_arg, grpc_cq_completion* storage,
1111                     bool internal) {
1112   cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
1113 }
1114 
1115 static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
1116     grpc_completion_queue* cq) {
1117   cq_callback_alternative_data* cqd =
1118       static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq));
1119   return POLLSET_FROM_CQ(cqd->implementation);
1120 }
1121 
1122 struct cq_is_finished_arg {
1123   gpr_atm last_seen_things_queued_ever;
1124   grpc_completion_queue* cq;
1125   grpc_millis deadline;
1126   grpc_cq_completion* stolen_completion;
1127   void* tag; /* for pluck */
1128   bool first_loop;
1129 };
1130 class ExecCtxNext : public grpc_core::ExecCtx {
1131  public:
1132   ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1133 
1134   bool CheckReadyToFinish() override {
1135     cq_is_finished_arg* a =
1136         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1137     grpc_completion_queue* cq = a->cq;
1138     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1139     GPR_ASSERT(a->stolen_completion == nullptr);
1140 
1141     intptr_t current_last_seen_things_queued_ever =
1142         cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1143 
1144     if (current_last_seen_things_queued_ever !=
1145         a->last_seen_things_queued_ever) {
1146       a->last_seen_things_queued_ever =
1147           cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1148 
1149       /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
1150        * might return NULL in some cases even if the queue is not empty; but
1151        * that
1152        * is ok and doesn't affect correctness. Might effect the tail latencies a
1153        * bit) */
1154       a->stolen_completion = cqd->queue.Pop();
1155       if (a->stolen_completion != nullptr) {
1156         return true;
1157       }
1158     }
1159     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1160   }
1161 
1162  private:
1163   void* check_ready_to_finish_arg_;
1164 };
1165 
1166 #ifndef NDEBUG
1167 static void dump_pending_tags(grpc_completion_queue* cq) {
1168   if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
1169   std::vector<std::string> parts;
1170   parts.push_back("PENDING TAGS:");
1171   gpr_mu_lock(cq->mu);
1172   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
1173     parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
1174   }
1175   gpr_mu_unlock(cq->mu);
1176   gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
1177 }
1178 #else
1179 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
1180 #endif
1181 
1182 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
1183                           void* reserved) {
1184   GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
1185 
1186   grpc_event ret;
1187   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1188 
1189   GRPC_API_TRACE(
1190       "grpc_completion_queue_next("
1191       "cq=%p, "
1192       "deadline=gpr_timespec { tv_sec: %" PRId64
1193       ", tv_nsec: %d, clock_type: %d }, "
1194       "reserved=%p)",
1195       5,
1196       (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1197        reserved));
1198   GPR_ASSERT(!reserved);
1199 
1200   dump_pending_tags(cq);
1201 
1202   GRPC_CQ_INTERNAL_REF(cq, "next");
1203 
1204   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1205   cq_is_finished_arg is_finished_arg = {
1206       cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1207       cq,
1208       deadline_millis,
1209       nullptr,
1210       nullptr,
1211       true};
1212   ExecCtxNext exec_ctx(&is_finished_arg);
1213   for (;;) {
1214     grpc_millis iteration_deadline = deadline_millis;
1215 
1216     if (is_finished_arg.stolen_completion != nullptr) {
1217       grpc_cq_completion* c = is_finished_arg.stolen_completion;
1218       is_finished_arg.stolen_completion = nullptr;
1219       ret.type = GRPC_OP_COMPLETE;
1220       ret.success = c->next & 1u;
1221       ret.tag = c->tag;
1222       c->done(c->done_arg, c);
1223       break;
1224     }
1225 
1226     grpc_cq_completion* c = cqd->queue.Pop();
1227 
1228     if (c != nullptr) {
1229       ret.type = GRPC_OP_COMPLETE;
1230       ret.success = c->next & 1u;
1231       ret.tag = c->tag;
1232       c->done(c->done_arg, c);
1233       break;
1234     } else {
1235       /* If c == NULL it means either the queue is empty OR in an transient
1236          inconsistent state. If it is the latter, we shold do a 0-timeout poll
1237          so that the thread comes back quickly from poll to make a second
1238          attempt at popping. Not doing this can potentially deadlock this
1239          thread forever (if the deadline is infinity) */
1240       if (cqd->queue.num_items() > 0) {
1241         iteration_deadline = 0;
1242       }
1243     }
1244 
1245     if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
1246       /* Before returning, check if the queue has any items left over (since
1247          MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1248          even if the queue is not empty. If so, keep retrying but do not
1249          return GRPC_QUEUE_SHUTDOWN */
1250       if (cqd->queue.num_items() > 0) {
1251         /* Go to the beginning of the loop. No point doing a poll because
1252            (cq->shutdown == true) is only possible when there is no pending
1253            work (i.e cq->pending_events == 0) and any outstanding completion
1254            events should have already been queued on this cq */
1255         continue;
1256       }
1257 
1258       ret.type = GRPC_QUEUE_SHUTDOWN;
1259       ret.success = 0;
1260       break;
1261     }
1262 
1263     if (!is_finished_arg.first_loop &&
1264         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1265       ret.type = GRPC_QUEUE_TIMEOUT;
1266       ret.success = 0;
1267       dump_pending_tags(cq);
1268       break;
1269     }
1270 
1271     /* The main polling work happens in grpc_pollset_work */
1272     gpr_mu_lock(cq->mu);
1273     cq->num_polls++;
1274     grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1275                                               iteration_deadline);
1276     gpr_mu_unlock(cq->mu);
1277 
1278     if (err != GRPC_ERROR_NONE) {
1279       const char* msg = grpc_error_string(err);
1280       gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1281 
1282       GRPC_ERROR_UNREF(err);
1283       ret.type = GRPC_QUEUE_TIMEOUT;
1284       ret.success = 0;
1285       dump_pending_tags(cq);
1286       break;
1287     }
1288     is_finished_arg.first_loop = false;
1289   }
1290 
1291   if (cqd->queue.num_items() > 0 &&
1292       cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
1293     gpr_mu_lock(cq->mu);
1294     cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1295     gpr_mu_unlock(cq->mu);
1296   }
1297 
1298   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1299   GRPC_CQ_INTERNAL_UNREF(cq, "next");
1300 
1301   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1302 
1303   return ret;
1304 }
1305 
1306 /* Finishes the completion queue shutdown. This means that there are no more
1307    completion events / tags expected from the completion queue
1308    - Must be called under completion queue lock
1309    - Must be called only once in completion queue's lifetime
1310    - grpc_completion_queue_shutdown() MUST have been called before calling
1311    this function */
1312 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1313   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1314 
1315   GPR_ASSERT(cqd->shutdown_called);
1316   GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
1317 
1318   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1319 }
1320 
1321 static void cq_shutdown_next(grpc_completion_queue* cq) {
1322   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1323 
1324   /* Need an extra ref for cq here because:
1325    * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1326    * Pollset shutdown decrements the cq ref count which can potentially destroy
1327    * the cq (if that happens to be the last ref).
1328    * Creating an extra ref here prevents the cq from getting destroyed while
1329    * this function is still active */
1330   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1331   gpr_mu_lock(cq->mu);
1332   if (cqd->shutdown_called) {
1333     gpr_mu_unlock(cq->mu);
1334     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1335     return;
1336   }
1337   cqd->shutdown_called = true;
1338   /* Doing acq/release FetchSub here to match with
1339    * cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1340    * on this counter without necessarily holding a lock on cq */
1341   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1342     cq_finish_shutdown_next(cq);
1343   }
1344   gpr_mu_unlock(cq->mu);
1345   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1346 }
1347 
1348 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1349                                       gpr_timespec deadline, void* reserved) {
1350   return cq->vtable->next(cq, deadline, reserved);
1351 }
1352 
1353 static int add_plucker(grpc_completion_queue* cq, void* tag,
1354                        grpc_pollset_worker** worker) {
1355   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1356   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1357     return 0;
1358   }
1359   cqd->pluckers[cqd->num_pluckers].tag = tag;
1360   cqd->pluckers[cqd->num_pluckers].worker = worker;
1361   cqd->num_pluckers++;
1362   return 1;
1363 }
1364 
1365 static void del_plucker(grpc_completion_queue* cq, void* tag,
1366                         grpc_pollset_worker** worker) {
1367   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1368   for (int i = 0; i < cqd->num_pluckers; i++) {
1369     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1370       cqd->num_pluckers--;
1371       GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1372       return;
1373     }
1374   }
1375   GPR_UNREACHABLE_CODE(return );
1376 }
1377 
1378 class ExecCtxPluck : public grpc_core::ExecCtx {
1379  public:
1380   ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1381 
1382   bool CheckReadyToFinish() override {
1383     cq_is_finished_arg* a =
1384         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1385     grpc_completion_queue* cq = a->cq;
1386     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1387 
1388     GPR_ASSERT(a->stolen_completion == nullptr);
1389     gpr_atm current_last_seen_things_queued_ever =
1390         cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1391     if (current_last_seen_things_queued_ever !=
1392         a->last_seen_things_queued_ever) {
1393       gpr_mu_lock(cq->mu);
1394       a->last_seen_things_queued_ever =
1395           cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1396       grpc_cq_completion* c;
1397       grpc_cq_completion* prev = &cqd->completed_head;
1398       while ((c = (grpc_cq_completion*)(prev->next &
1399                                         ~static_cast<uintptr_t>(1))) !=
1400              &cqd->completed_head) {
1401         if (c->tag == a->tag) {
1402           prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1403                        (c->next & ~static_cast<uintptr_t>(1));
1404           if (c == cqd->completed_tail) {
1405             cqd->completed_tail = prev;
1406           }
1407           gpr_mu_unlock(cq->mu);
1408           a->stolen_completion = c;
1409           return true;
1410         }
1411         prev = c;
1412       }
1413       gpr_mu_unlock(cq->mu);
1414     }
1415     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1416   }
1417 
1418  private:
1419   void* check_ready_to_finish_arg_;
1420 };
1421 
1422 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1423                            gpr_timespec deadline, void* reserved) {
1424   GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1425 
1426   grpc_event ret;
1427   grpc_cq_completion* c;
1428   grpc_cq_completion* prev;
1429   grpc_pollset_worker* worker = nullptr;
1430   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1431 
1432   if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1433     GRPC_API_TRACE(
1434         "grpc_completion_queue_pluck("
1435         "cq=%p, tag=%p, "
1436         "deadline=gpr_timespec { tv_sec: %" PRId64
1437         ", tv_nsec: %d, clock_type: %d }, "
1438         "reserved=%p)",
1439         6,
1440         (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1441          reserved));
1442   }
1443   GPR_ASSERT(!reserved);
1444 
1445   dump_pending_tags(cq);
1446 
1447   GRPC_CQ_INTERNAL_REF(cq, "pluck");
1448   gpr_mu_lock(cq->mu);
1449   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1450   cq_is_finished_arg is_finished_arg = {
1451       cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1452       cq,
1453       deadline_millis,
1454       nullptr,
1455       tag,
1456       true};
1457   ExecCtxPluck exec_ctx(&is_finished_arg);
1458   for (;;) {
1459     if (is_finished_arg.stolen_completion != nullptr) {
1460       gpr_mu_unlock(cq->mu);
1461       c = is_finished_arg.stolen_completion;
1462       is_finished_arg.stolen_completion = nullptr;
1463       ret.type = GRPC_OP_COMPLETE;
1464       ret.success = c->next & 1u;
1465       ret.tag = c->tag;
1466       c->done(c->done_arg, c);
1467       break;
1468     }
1469     prev = &cqd->completed_head;
1470     while (
1471         (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1472         &cqd->completed_head) {
1473       if (c->tag == tag) {
1474         prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1475                      (c->next & ~static_cast<uintptr_t>(1));
1476         if (c == cqd->completed_tail) {
1477           cqd->completed_tail = prev;
1478         }
1479         gpr_mu_unlock(cq->mu);
1480         ret.type = GRPC_OP_COMPLETE;
1481         ret.success = c->next & 1u;
1482         ret.tag = c->tag;
1483         c->done(c->done_arg, c);
1484         goto done;
1485       }
1486       prev = c;
1487     }
1488     if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
1489       gpr_mu_unlock(cq->mu);
1490       ret.type = GRPC_QUEUE_SHUTDOWN;
1491       ret.success = 0;
1492       break;
1493     }
1494     if (!add_plucker(cq, tag, &worker)) {
1495       gpr_log(GPR_DEBUG,
1496               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1497               "is %d",
1498               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1499       gpr_mu_unlock(cq->mu);
1500       /* TODO(ctiller): should we use a different result here */
1501       ret.type = GRPC_QUEUE_TIMEOUT;
1502       ret.success = 0;
1503       dump_pending_tags(cq);
1504       break;
1505     }
1506     if (!is_finished_arg.first_loop &&
1507         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1508       del_plucker(cq, tag, &worker);
1509       gpr_mu_unlock(cq->mu);
1510       ret.type = GRPC_QUEUE_TIMEOUT;
1511       ret.success = 0;
1512       dump_pending_tags(cq);
1513       break;
1514     }
1515     cq->num_polls++;
1516     grpc_error* err =
1517         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1518     if (err != GRPC_ERROR_NONE) {
1519       del_plucker(cq, tag, &worker);
1520       gpr_mu_unlock(cq->mu);
1521       const char* msg = grpc_error_string(err);
1522       gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1523 
1524       GRPC_ERROR_UNREF(err);
1525       ret.type = GRPC_QUEUE_TIMEOUT;
1526       ret.success = 0;
1527       dump_pending_tags(cq);
1528       break;
1529     }
1530     is_finished_arg.first_loop = false;
1531     del_plucker(cq, tag, &worker);
1532   }
1533 done:
1534   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1535   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1536 
1537   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1538 
1539   return ret;
1540 }
1541 
1542 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1543                                        gpr_timespec deadline, void* reserved) {
1544   return cq->vtable->pluck(cq, tag, deadline, reserved);
1545 }
1546 
1547 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1548   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1549 
1550   GPR_ASSERT(cqd->shutdown_called);
1551   GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
1552   cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED);
1553 
1554   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1555 }
1556 
1557 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1558  * merging them is a bit tricky and probably not worth it */
1559 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1560   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1561 
1562   /* Need an extra ref for cq here because:
1563    * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1564    * Pollset shutdown decrements the cq ref count which can potentially destroy
1565    * the cq (if that happens to be the last ref).
1566    * Creating an extra ref here prevents the cq from getting destroyed while
1567    * this function is still active */
1568   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1569   gpr_mu_lock(cq->mu);
1570   if (cqd->shutdown_called) {
1571     gpr_mu_unlock(cq->mu);
1572     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1573     return;
1574   }
1575   cqd->shutdown_called = true;
1576   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1577     cq_finish_shutdown_pluck(cq);
1578   }
1579   gpr_mu_unlock(cq->mu);
1580   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1581 }
1582 
1583 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1584   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1585   auto* callback = cqd->shutdown_callback;
1586 
1587   GPR_ASSERT(cqd->shutdown_called);
1588 
1589   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1590   if (grpc_iomgr_is_any_background_poller_thread()) {
1591     grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1592     return;
1593   }
1594 
1595   // Schedule the callback on a closure if not internal or triggered
1596   // from a background poller thread.
1597   grpc_core::Executor::Run(
1598       GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1599       GRPC_ERROR_NONE);
1600 }
1601 
1602 static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) {
1603   cq_callback_alternative_data* cqd =
1604       static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1605   auto* callback = cqd->shutdown_callback;
1606 
1607   GPR_ASSERT(cqd->shutdown_called);
1608 
1609   // Shutdown the non-proxy pollset
1610   cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq),
1611                               &cq->pollset_shutdown_done);
1612   grpc_core::Executor::Run(
1613       GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1614       GRPC_ERROR_NONE);
1615 }
1616 
1617 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1618   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1619 
1620   /* Need an extra ref for cq here because:
1621    * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1622    * Pollset shutdown decrements the cq ref count which can potentially destroy
1623    * the cq (if that happens to be the last ref).
1624    * Creating an extra ref here prevents the cq from getting destroyed while
1625    * this function is still active */
1626   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1627   gpr_mu_lock(cq->mu);
1628   if (cqd->shutdown_called) {
1629     gpr_mu_unlock(cq->mu);
1630     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1631     return;
1632   }
1633   cqd->shutdown_called = true;
1634   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1635     gpr_mu_unlock(cq->mu);
1636     cq_finish_shutdown_callback(cq);
1637   } else {
1638     gpr_mu_unlock(cq->mu);
1639   }
1640   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1641 }
1642 
1643 static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) {
1644   cq_callback_alternative_data* cqd =
1645       static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1646 
1647   /* Need an extra ref for cq here because:
1648    * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1649    * Pollset shutdown decrements the cq ref count which can potentially destroy
1650    * the cq (if that happens to be the last ref).
1651    * Creating an extra ref here prevents the cq from getting destroyed while
1652    * this function is still active */
1653   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1654   gpr_mu_lock(cq->mu);
1655   if (cqd->shutdown_called) {
1656     gpr_mu_unlock(cq->mu);
1657     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1658     return;
1659   }
1660   cqd->shutdown_called = true;
1661   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1662     gpr_mu_unlock(cq->mu);
1663     cq_finish_shutdown_callback_alternative(cq);
1664   } else {
1665     gpr_mu_unlock(cq->mu);
1666   }
1667   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1668 }
1669 
1670 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1671    to zero here, then enter shutdown mode and wake up any waiters */
1672 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1673   GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1674   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1675   grpc_core::ExecCtx exec_ctx;
1676   GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1677   cq->vtable->shutdown(cq);
1678 }
1679 
1680 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1681   GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1682   GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1683   grpc_completion_queue_shutdown(cq);
1684 
1685   grpc_core::ExecCtx exec_ctx;
1686   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1687 }
1688 
1689 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1690   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1691 }
1692 
1693 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1694   return cq->poller_vtable->can_listen;
1695 }
1696