• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/surface/completion_queue.h"
21 
22 #include <inttypes.h>
23 #include <stdio.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <new>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
35 
36 #include <grpc/grpc.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/atm.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/sync.h>
41 #include <grpc/support/time.h>
42 
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/debug/stats_data.h"
45 #include "src/core/lib/gpr/spinlock.h"
46 #include "src/core/lib/gprpp/atomic_utils.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/gprpp/status_helper.h"
50 #include "src/core/lib/gprpp/time.h"
51 #include "src/core/lib/iomgr/closure.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/executor.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/pollset.h"
56 #include "src/core/lib/surface/api_trace.h"
57 #include "src/core/lib/surface/event_string.h"
58 
59 #ifdef GPR_WINDOWS
60 #include "src/core/lib/experiments/experiments.h"
61 #endif
62 
63 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
64 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
65 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
66 
67 namespace {
68 
69 // Specifies a cq thread local cache.
70 // The first event that occurs on a thread
71 // with a cq cache will go into that cache, and
72 // will only be returned on the thread that initialized the cache.
73 // NOTE: Only one event will ever be cached.
74 thread_local grpc_cq_completion* g_cached_event;
75 thread_local grpc_completion_queue* g_cached_cq;
76 
77 struct plucker {
78   grpc_pollset_worker** worker;
79   void* tag;
80 };
81 struct cq_poller_vtable {
82   bool can_get_pollset;
83   bool can_listen;
84   size_t (*size)(void);
85   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
86   grpc_error_handle (*kick)(grpc_pollset* pollset,
87                             grpc_pollset_worker* specific_worker);
88   grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
89                             grpc_core::Timestamp deadline);
90   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
91   void (*destroy)(grpc_pollset* pollset);
92 };
93 typedef struct non_polling_worker {
94   gpr_cv cv;
95   bool kicked;
96   struct non_polling_worker* next;
97   struct non_polling_worker* prev;
98 } non_polling_worker;
99 
100 struct non_polling_poller {
101   gpr_mu mu;
102   bool kicked_without_poller;
103   non_polling_worker* root;
104   grpc_closure* shutdown;
105 };
non_polling_poller_size(void)106 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
107 
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)108 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
109   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
110   gpr_mu_init(&npp->mu);
111   *mu = &npp->mu;
112 }
113 
non_polling_poller_destroy(grpc_pollset * pollset)114 void non_polling_poller_destroy(grpc_pollset* pollset) {
115   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
116   gpr_mu_destroy(&npp->mu);
117 }
118 
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)119 grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
120                                           grpc_pollset_worker** worker,
121                                           grpc_core::Timestamp deadline) {
122   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
123   if (npp->shutdown) return absl::OkStatus();
124   if (npp->kicked_without_poller) {
125     npp->kicked_without_poller = false;
126     return absl::OkStatus();
127   }
128   non_polling_worker w;
129   gpr_cv_init(&w.cv);
130   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
131   if (npp->root == nullptr) {
132     npp->root = w.next = w.prev = &w;
133   } else {
134     w.next = npp->root;
135     w.prev = w.next->prev;
136     w.next->prev = w.prev->next = &w;
137   }
138   w.kicked = false;
139   gpr_timespec deadline_ts = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
140   while (!npp->shutdown && !w.kicked &&
141          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) {
142   }
143   grpc_core::ExecCtx::Get()->InvalidateNow();
144   if (&w == npp->root) {
145     npp->root = w.next;
146     if (&w == npp->root) {
147       if (npp->shutdown) {
148         grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown,
149                                 absl::OkStatus());
150       }
151       npp->root = nullptr;
152     }
153   }
154   w.next->prev = w.prev;
155   w.prev->next = w.next;
156   gpr_cv_destroy(&w.cv);
157   if (worker != nullptr) *worker = nullptr;
158   return absl::OkStatus();
159 }
160 
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)161 grpc_error_handle non_polling_poller_kick(
162     grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
163   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
164   if (specific_worker == nullptr) {
165     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
166   }
167   if (specific_worker != nullptr) {
168     non_polling_worker* w =
169         reinterpret_cast<non_polling_worker*>(specific_worker);
170     if (!w->kicked) {
171       w->kicked = true;
172       gpr_cv_signal(&w->cv);
173     }
174   } else {
175     p->kicked_without_poller = true;
176   }
177   return absl::OkStatus();
178 }
179 
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)180 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
181   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
182   GPR_ASSERT(closure != nullptr);
183   p->shutdown = closure;
184   if (p->root == nullptr) {
185     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
186   } else {
187     non_polling_worker* w = p->root;
188     do {
189       gpr_cv_signal(&w->cv);
190       w = w->next;
191     } while (w != p->root);
192   }
193 }
194 
195 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
196     // GRPC_CQ_DEFAULT_POLLING
197     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
198      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
199     // GRPC_CQ_NON_LISTENING
200     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
201      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
202     // GRPC_CQ_NON_POLLING
203     {false, false, non_polling_poller_size, non_polling_poller_init,
204      non_polling_poller_kick, non_polling_poller_work,
205      non_polling_poller_shutdown, non_polling_poller_destroy},
206 };
207 
208 }  // namespace
209 
210 struct cq_vtable {
211   grpc_cq_completion_type cq_completion_type;
212   size_t data_size;
213   void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
214   void (*shutdown)(grpc_completion_queue* cq);
215   void (*destroy)(void* data);
216   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
217   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error,
218                  void (*done)(void* done_arg, grpc_cq_completion* storage),
219                  void* done_arg, grpc_cq_completion* storage, bool internal);
220   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
221                      void* reserved);
222   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
223                       gpr_timespec deadline, void* reserved);
224 };
225 
226 namespace {
227 
228 // Queue that holds the cq_completion_events. Internally uses
229 // MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
230 // queue). It uses a queue_lock to support multiple consumers.
231 // Only used in completion queues whose completion_type is GRPC_CQ_NEXT
232 class CqEventQueue {
233  public:
234   CqEventQueue() = default;
235   ~CqEventQueue() = default;
236 
237   // Note: The counter is not incremented/decremented atomically with push/pop.
238   // The count is only eventually consistent
num_items() const239   intptr_t num_items() const {
240     return num_queue_items_.load(std::memory_order_relaxed);
241   }
242 
243   bool Push(grpc_cq_completion* c);
244   grpc_cq_completion* Pop();
245 
246  private:
247   // Spinlock to serialize consumers i.e pop() operations
248   gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
249 
250   grpc_core::MultiProducerSingleConsumerQueue queue_;
251 
252   // A lazy counter of number of items in the queue. This is NOT atomically
253   // incremented/decremented along with push/pop operations and hence is only
254   // eventually consistent
255   std::atomic<intptr_t> num_queue_items_{0};
256 };
257 
258 struct cq_next_data {
~cq_next_data__anon26b18a380211::cq_next_data259   ~cq_next_data() {
260     GPR_ASSERT(queue.num_items() == 0);
261 #ifndef NDEBUG
262     if (pending_events.load(std::memory_order_acquire) != 0) {
263       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
264     }
265 #endif
266   }
267 
268   /// Completed events for completion-queues of type GRPC_CQ_NEXT
269   CqEventQueue queue;
270 
271   /// Counter of how many things have ever been queued on this completion queue
272   /// useful for avoiding locks to check the queue
273   std::atomic<intptr_t> things_queued_ever{0};
274 
275   /// Number of outstanding events (+1 if not shut down)
276   /// Initial count is dropped by grpc_completion_queue_shutdown
277   std::atomic<intptr_t> pending_events{1};
278 
279   /// 0 initially. 1 once we initiated shutdown
280   bool shutdown_called = false;
281 };
282 
283 struct cq_pluck_data {
cq_pluck_data__anon26b18a380211::cq_pluck_data284   cq_pluck_data() {
285     completed_tail = &completed_head;
286     completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
287   }
288 
~cq_pluck_data__anon26b18a380211::cq_pluck_data289   ~cq_pluck_data() {
290     GPR_ASSERT(completed_head.next ==
291                reinterpret_cast<uintptr_t>(&completed_head));
292 #ifndef NDEBUG
293     if (pending_events.load(std::memory_order_acquire) != 0) {
294       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
295     }
296 #endif
297   }
298 
299   /// Completed events for completion-queues of type GRPC_CQ_PLUCK
300   grpc_cq_completion completed_head;
301   grpc_cq_completion* completed_tail;
302 
303   /// Number of pending events (+1 if we're not shutdown).
304   /// Initial count is dropped by grpc_completion_queue_shutdown.
305   std::atomic<intptr_t> pending_events{1};
306 
307   /// Counter of how many things have ever been queued on this completion queue
308   /// useful for avoiding locks to check the queue
309   std::atomic<intptr_t> things_queued_ever{0};
310 
311   /// 0 initially. 1 once we completed shutting
312   // TODO(sreek): This is not needed since (shutdown == 1) if and only if
313   // (pending_events == 0). So consider removing this in future and use
314   // pending_events
315   std::atomic<bool> shutdown{false};
316 
317   /// 0 initially. 1 once we initiated shutdown
318   bool shutdown_called = false;
319 
320   int num_pluckers = 0;
321   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
322 };
323 
324 struct cq_callback_data {
cq_callback_data__anon26b18a380211::cq_callback_data325   explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
326       : shutdown_callback(shutdown_callback) {}
327 
~cq_callback_data__anon26b18a380211::cq_callback_data328   ~cq_callback_data() {
329 #ifndef NDEBUG
330     if (pending_events.load(std::memory_order_acquire) != 0) {
331       gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
332     }
333 #endif
334   }
335 
336   /// No actual completed events queue, unlike other types
337 
338   /// Number of pending events (+1 if we're not shutdown).
339   /// Initial count is dropped by grpc_completion_queue_shutdown.
340   std::atomic<intptr_t> pending_events{1};
341 
342   /// 0 initially. 1 once we initiated shutdown
343   bool shutdown_called = false;
344 
345   /// A callback that gets invoked when the CQ completes shutdown
346   grpc_completion_queue_functor* shutdown_callback;
347 };
348 
349 }  // namespace
350 
351 // Completion queue structure
352 struct grpc_completion_queue {
353   /// Once owning_refs drops to zero, we will destroy the cq
354   grpc_core::RefCount owning_refs;
355   /// Add the paddings to fix the false sharing
356   char padding_1[GPR_CACHELINE_SIZE];
357   gpr_mu* mu;
358 
359   char padding_2[GPR_CACHELINE_SIZE];
360   const cq_vtable* vtable;
361 
362   char padding_3[GPR_CACHELINE_SIZE];
363   const cq_poller_vtable* poller_vtable;
364 
365 #ifndef NDEBUG
366   void** outstanding_tags;
367   size_t outstanding_tag_count;
368   size_t outstanding_tag_capacity;
369 #endif
370 
371   grpc_closure pollset_shutdown_done;
372   int num_polls;
373 };
374 
375 // Forward declarations
376 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
377 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
378 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
379 static void cq_shutdown_next(grpc_completion_queue* cq);
380 static void cq_shutdown_pluck(grpc_completion_queue* cq);
381 static void cq_shutdown_callback(grpc_completion_queue* cq);
382 
383 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
384 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
385 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
386 
387 // A cq_end_op function is called when an operation on a given CQ with
388 // a given tag has completed. The storage argument is a reference to the
389 // space reserved for this completion as it is placed into the corresponding
390 // queue. The done argument is a callback that will be invoked when it is
391 // safe to free up that storage. The storage MUST NOT be freed until the
392 // done callback is invoked.
393 static void cq_end_op_for_next(
394     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
395     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
396     grpc_cq_completion* storage, bool internal);
397 
398 static void cq_end_op_for_pluck(
399     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
400     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
401     grpc_cq_completion* storage, bool internal);
402 
403 static void cq_end_op_for_callback(
404     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
405     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
406     grpc_cq_completion* storage, bool internal);
407 
408 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
409                           void* reserved);
410 
411 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
412                            gpr_timespec deadline, void* reserved);
413 
414 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
415 static void cq_init_next(void* data,
416                          grpc_completion_queue_functor* shutdown_callback);
417 static void cq_init_pluck(void* data,
418                           grpc_completion_queue_functor* shutdown_callback);
419 static void cq_init_callback(void* data,
420                              grpc_completion_queue_functor* shutdown_callback);
421 static void cq_destroy_next(void* data);
422 static void cq_destroy_pluck(void* data);
423 static void cq_destroy_callback(void* data);
424 
425 // Completion queue vtables based on the completion-type
426 static const cq_vtable g_cq_vtable[] = {
427     // GRPC_CQ_NEXT
428     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
429      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
430      nullptr},
431     // GRPC_CQ_PLUCK
432     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
433      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
434      cq_pluck},
435     // GRPC_CQ_CALLBACK
436     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
437      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
438      cq_end_op_for_callback, nullptr, nullptr},
439 };
440 
441 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
442 #define POLLSET_FROM_CQ(cq) \
443   ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
444 
445 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
446 
447 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)     \
448   do {                                                   \
449     if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) &&       \
450         (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
451          (event)->type != GRPC_QUEUE_TIMEOUT)) {         \
452       gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq,      \
453               grpc_event_string(event).c_str());         \
454     }                                                    \
455   } while (0)
456 
457 static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
458 
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)459 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
460   if (g_cached_cq == nullptr) {
461     g_cached_event = nullptr;
462     g_cached_cq = cq;
463   }
464 }
465 
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)466 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
467                                                    void** tag, int* ok) {
468   grpc_cq_completion* storage = g_cached_event;
469   int ret = 0;
470   if (storage != nullptr && g_cached_cq == cq) {
471     *tag = storage->tag;
472     grpc_core::ExecCtx exec_ctx;
473     *ok = (storage->next & uintptr_t{1}) == 1;
474     storage->done(storage->done_arg, storage);
475     ret = 1;
476     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
477     if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
478       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
479       gpr_mu_lock(cq->mu);
480       cq_finish_shutdown_next(cq);
481       gpr_mu_unlock(cq->mu);
482       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
483     }
484   }
485   g_cached_event = nullptr;
486   g_cached_cq = nullptr;
487 
488   return ret;
489 }
490 
Push(grpc_cq_completion * c)491 bool CqEventQueue::Push(grpc_cq_completion* c) {
492   queue_.Push(
493       reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
494   return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
495 }
496 
Pop()497 grpc_cq_completion* CqEventQueue::Pop() {
498   grpc_cq_completion* c = nullptr;
499 
500   if (gpr_spinlock_trylock(&queue_lock_)) {
501     bool is_empty = false;
502     c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
503     gpr_spinlock_unlock(&queue_lock_);
504   }
505 
506   if (c) {
507     num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
508   }
509 
510   return c;
511 }
512 
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_completion_queue_functor * shutdown_callback)513 grpc_completion_queue* grpc_completion_queue_create_internal(
514     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
515     grpc_completion_queue_functor* shutdown_callback) {
516   grpc_completion_queue* cq;
517 
518   GRPC_API_TRACE(
519       "grpc_completion_queue_create_internal(completion_type=%d, "
520       "polling_type=%d)",
521       2, (completion_type, polling_type));
522 
523   switch (completion_type) {
524     case GRPC_CQ_NEXT:
525       grpc_core::global_stats().IncrementCqNextCreates();
526       break;
527     case GRPC_CQ_PLUCK:
528       grpc_core::global_stats().IncrementCqPluckCreates();
529       break;
530     case GRPC_CQ_CALLBACK:
531       grpc_core::global_stats().IncrementCqCallbackCreates();
532       break;
533   }
534 
535   const cq_vtable* vtable = &g_cq_vtable[completion_type];
536   const cq_poller_vtable* poller_vtable =
537       &g_poller_vtable_by_poller_type[polling_type];
538 
539   grpc_core::ExecCtx exec_ctx;
540 
541   cq = static_cast<grpc_completion_queue*>(
542       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
543                  poller_vtable->size()));
544 
545   cq->vtable = vtable;
546   cq->poller_vtable = poller_vtable;
547 
548   // One for destroy(), one for pollset_shutdown
549   new (&cq->owning_refs) grpc_core::RefCount(
550       2, grpc_trace_cq_refcount.enabled() ? "completion_queue" : nullptr);
551 
552   poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
553   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
554 
555   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
556                     grpc_schedule_on_exec_ctx);
557   return cq;
558 }
559 
cq_init_next(void * data,grpc_completion_queue_functor *)560 static void cq_init_next(void* data,
561                          grpc_completion_queue_functor* /*shutdown_callback*/) {
562   new (data) cq_next_data();
563 }
564 
cq_destroy_next(void * data)565 static void cq_destroy_next(void* data) {
566   cq_next_data* cqd = static_cast<cq_next_data*>(data);
567   cqd->~cq_next_data();
568 }
569 
cq_init_pluck(void * data,grpc_completion_queue_functor *)570 static void cq_init_pluck(
571     void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
572   new (data) cq_pluck_data();
573 }
574 
cq_destroy_pluck(void * data)575 static void cq_destroy_pluck(void* data) {
576   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
577   cqd->~cq_pluck_data();
578 }
579 
cq_init_callback(void * data,grpc_completion_queue_functor * shutdown_callback)580 static void cq_init_callback(void* data,
581                              grpc_completion_queue_functor* shutdown_callback) {
582   new (data) cq_callback_data(shutdown_callback);
583 }
584 
cq_destroy_callback(void * data)585 static void cq_destroy_callback(void* data) {
586   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
587   cqd->~cq_callback_data();
588 }
589 
grpc_get_cq_completion_type(grpc_completion_queue * cq)590 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
591   return cq->vtable->cq_completion_type;
592 }
593 
grpc_get_cq_poll_num(grpc_completion_queue * cq)594 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
595   int cur_num_polls;
596   gpr_mu_lock(cq->mu);
597   cur_num_polls = cq->num_polls;
598   gpr_mu_unlock(cq->mu);
599   return cur_num_polls;
600 }
601 
602 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)603 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
604                           const char* file, int line) {
605   grpc_core::DebugLocation debug_location(file, line);
606 #else
607 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
608   grpc_core::DebugLocation debug_location;
609   const char* reason = nullptr;
610 #endif
611   cq->owning_refs.Ref(debug_location, reason);
612 }
613 
614 static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
615   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
616   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
617 }
618 
619 #ifndef NDEBUG
620 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
621                             const char* file, int line) {
622   grpc_core::DebugLocation debug_location(file, line);
623 #else
624 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
625   grpc_core::DebugLocation debug_location;
626   const char* reason = nullptr;
627 #endif
628   if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
629     cq->vtable->destroy(DATA_FROM_CQ(cq));
630     cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
631 #ifndef NDEBUG
632     gpr_free(cq->outstanding_tags);
633 #endif
634     gpr_free(cq);
635   }
636 }
637 
638 #ifndef NDEBUG
639 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
640   int found = 0;
641   if (lock_cq) {
642     gpr_mu_lock(cq->mu);
643   }
644 
645   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
646     if (cq->outstanding_tags[i] == tag) {
647       cq->outstanding_tag_count--;
648       std::swap(cq->outstanding_tags[i],
649                 cq->outstanding_tags[cq->outstanding_tag_count]);
650       found = 1;
651       break;
652     }
653   }
654 
655   if (lock_cq) {
656     gpr_mu_unlock(cq->mu);
657   }
658 
659   GPR_ASSERT(found);
660 }
661 #else
662 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
663                          bool /*lock_cq*/) {}
664 #endif
665 
666 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
667   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
668   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
669 }
670 
671 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
672   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
673   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
674 }
675 
676 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
677   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
678   return grpc_core::IncrementIfNonzero(&cqd->pending_events);
679 }
680 
681 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
682 #ifndef NDEBUG
683   gpr_mu_lock(cq->mu);
684   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
685     cq->outstanding_tag_capacity =
686         std::max(size_t(4), 2 * cq->outstanding_tag_capacity);
687     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
688         cq->outstanding_tags,
689         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
690   }
691   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
692   gpr_mu_unlock(cq->mu);
693 #endif
694   return cq->vtable->begin_op(cq, tag);
695 }
696 
697 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
698 // completion
699 // type of GRPC_CQ_NEXT)
700 static void cq_end_op_for_next(
701     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
702     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
703     grpc_cq_completion* storage, bool /*internal*/) {
704   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
705       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
706     std::string errmsg = grpc_core::StatusToString(error);
707     GRPC_API_TRACE(
708         "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
709         "done=%p, done_arg=%p, storage=%p)",
710         6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
711     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
712       gpr_log(GPR_INFO, "Operation failed: tag=%p, error=%s", tag,
713               errmsg.c_str());
714     }
715   }
716   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
717   int is_success = (error.ok());
718 
719   storage->tag = tag;
720   storage->done = done;
721   storage->done_arg = done_arg;
722   storage->next = static_cast<uintptr_t>(is_success);
723 
724   cq_check_tag(cq, tag, true);  // Used in debug builds only
725 
726   if (g_cached_cq == cq && g_cached_event == nullptr) {
727     g_cached_event = storage;
728   } else {
729     // Add the completion to the queue
730     bool is_first = cqd->queue.Push(storage);
731     cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
732     // Since we do not hold the cq lock here, it is important to do an 'acquire'
733     // load here (instead of a 'no_barrier' load) to match with the release
734     // store
735     // (done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
736     //
737     if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
738       // Only kick if this is the first item queued
739       if (is_first) {
740         gpr_mu_lock(cq->mu);
741         grpc_error_handle kick_error =
742             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
743         gpr_mu_unlock(cq->mu);
744 
745         if (!kick_error.ok()) {
746           gpr_log(GPR_ERROR, "Kick failed: %s",
747                   grpc_core::StatusToString(kick_error).c_str());
748         }
749       }
750       if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
751         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
752         gpr_mu_lock(cq->mu);
753         cq_finish_shutdown_next(cq);
754         gpr_mu_unlock(cq->mu);
755         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
756       }
757     } else {
758       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
759       cqd->pending_events.store(0, std::memory_order_release);
760       gpr_mu_lock(cq->mu);
761       cq_finish_shutdown_next(cq);
762       gpr_mu_unlock(cq->mu);
763       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
764     }
765   }
766 }
767 
768 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
769 // completion
770 // type of GRPC_CQ_PLUCK)
771 static void cq_end_op_for_pluck(
772     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
773     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
774     grpc_cq_completion* storage, bool /*internal*/) {
775   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
776   int is_success = (error.ok());
777 
778   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
779       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
780     std::string errmsg = grpc_core::StatusToString(error);
781     GRPC_API_TRACE(
782         "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
783         "done=%p, done_arg=%p, storage=%p)",
784         6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
785     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
786       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
787               errmsg.c_str());
788     }
789   }
790 
791   storage->tag = tag;
792   storage->done = done;
793   storage->done_arg = done_arg;
794   storage->next = reinterpret_cast<uintptr_t>(&cqd->completed_head) |
795                   static_cast<uintptr_t>(is_success);
796 
797   gpr_mu_lock(cq->mu);
798   cq_check_tag(cq, tag, false);  // Used in debug builds only
799 
800   // Add to the list of completions
801   cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
802   cqd->completed_tail->next =
803       reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
804   cqd->completed_tail = storage;
805 
806   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
807     cq_finish_shutdown_pluck(cq);
808     gpr_mu_unlock(cq->mu);
809   } else {
810     grpc_pollset_worker* pluck_worker = nullptr;
811     for (int i = 0; i < cqd->num_pluckers; i++) {
812       if (cqd->pluckers[i].tag == tag) {
813         pluck_worker = *cqd->pluckers[i].worker;
814         break;
815       }
816     }
817 
818     grpc_error_handle kick_error =
819         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
820     gpr_mu_unlock(cq->mu);
821     if (!kick_error.ok()) {
822       gpr_log(GPR_ERROR, "Kick failed: %s",
823               grpc_core::StatusToString(kick_error).c_str());
824     }
825   }
826 }
827 
828 static void functor_callback(void* arg, grpc_error_handle error) {
829   auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
830   functor->functor_run(functor, error.ok());
831 }
832 
833 // Complete an event on a completion queue of type GRPC_CQ_CALLBACK
834 static void cq_end_op_for_callback(
835     grpc_completion_queue* cq, void* tag, grpc_error_handle error,
836     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
837     grpc_cq_completion* storage, bool internal) {
838   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
839 
840   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
841       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
842     std::string errmsg = grpc_core::StatusToString(error);
843     GRPC_API_TRACE(
844         "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
845         "done=%p, done_arg=%p, storage=%p)",
846         6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
847     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
848       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
849               errmsg.c_str());
850     }
851   }
852 
853   // The callback-based CQ isn't really a queue at all and thus has no need
854   // for reserved storage. Invoke the done callback right away to release it.
855   done(done_arg, storage);
856 
857   cq_check_tag(cq, tag, true);  // Used in debug builds only
858 
859   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
860     cq_finish_shutdown_callback(cq);
861   }
862 
863   // If possible, schedule the callback onto an existing thread-local
864   // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
865   // 1. The callback is internally-generated and there is an ACEC available
866   // 2. The callback is marked inlineable and there is an ACEC available
867   // 3. We are already running in a background poller thread (which always has
868   //    an ACEC available at the base of the stack).
869   auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
870   if (((internal || functor->inlineable) &&
871        grpc_core::ApplicationCallbackExecCtx::Available()) ||
872       grpc_iomgr_is_any_background_poller_thread()) {
873     grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));
874     return;
875   }
876 
877   // Schedule the callback on a closure if not internal or triggered
878   // from a background poller thread.
879   grpc_core::Executor::Run(
880       GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
881 }
882 
883 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
884                     grpc_error_handle error,
885                     void (*done)(void* done_arg, grpc_cq_completion* storage),
886                     void* done_arg, grpc_cq_completion* storage,
887                     bool internal) {
888   cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
889 }
890 
891 struct cq_is_finished_arg {
892   gpr_atm last_seen_things_queued_ever;
893   grpc_completion_queue* cq;
894   grpc_core::Timestamp deadline;
895   grpc_cq_completion* stolen_completion;
896   void* tag;  // for pluck
897   bool first_loop;
898 };
899 class ExecCtxNext : public grpc_core::ExecCtx {
900  public:
901   explicit ExecCtxNext(void* arg)
902       : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
903 
904   bool CheckReadyToFinish() override {
905     cq_is_finished_arg* a =
906         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
907     grpc_completion_queue* cq = a->cq;
908     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
909     GPR_ASSERT(a->stolen_completion == nullptr);
910 
911     intptr_t current_last_seen_things_queued_ever =
912         cqd->things_queued_ever.load(std::memory_order_relaxed);
913 
914     if (current_last_seen_things_queued_ever !=
915         a->last_seen_things_queued_ever) {
916       a->last_seen_things_queued_ever =
917           cqd->things_queued_ever.load(std::memory_order_relaxed);
918 
919       // Pop a cq_completion from the queue. Returns NULL if the queue is empty
920       // might return NULL in some cases even if the queue is not empty; but
921       // that
922       // is ok and doesn't affect correctness. Might effect the tail latencies a
923       // bit)
924       a->stolen_completion = cqd->queue.Pop();
925       if (a->stolen_completion != nullptr) {
926         return true;
927       }
928     }
929     return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
930   }
931 
932  private:
933   void* check_ready_to_finish_arg_;
934 };
935 
936 #ifndef NDEBUG
937 static void dump_pending_tags(grpc_completion_queue* cq) {
938   if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
939   std::vector<std::string> parts;
940   parts.push_back("PENDING TAGS:");
941   gpr_mu_lock(cq->mu);
942   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
943     parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
944   }
945   gpr_mu_unlock(cq->mu);
946   gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
947 }
948 #else
949 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
950 #endif
951 
952 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
953                           void* reserved) {
954   grpc_event ret;
955   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
956 
957   GRPC_API_TRACE(
958       "grpc_completion_queue_next("
959       "cq=%p, "
960       "deadline=gpr_timespec { tv_sec: %" PRId64
961       ", tv_nsec: %d, clock_type: %d }, "
962       "reserved=%p)",
963       5,
964       (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
965        reserved));
966   GPR_ASSERT(!reserved);
967 
968   dump_pending_tags(cq);
969 
970   GRPC_CQ_INTERNAL_REF(cq, "next");
971 
972   grpc_core::Timestamp deadline_millis =
973       grpc_core::Timestamp::FromTimespecRoundUp(deadline);
974   cq_is_finished_arg is_finished_arg = {
975       cqd->things_queued_ever.load(std::memory_order_relaxed),
976       cq,
977       deadline_millis,
978       nullptr,
979       nullptr,
980       true};
981   ExecCtxNext exec_ctx(&is_finished_arg);
982   for (;;) {
983     grpc_core::Timestamp iteration_deadline = deadline_millis;
984 
985     if (is_finished_arg.stolen_completion != nullptr) {
986       grpc_cq_completion* c = is_finished_arg.stolen_completion;
987       is_finished_arg.stolen_completion = nullptr;
988       ret.type = GRPC_OP_COMPLETE;
989       ret.success = c->next & 1u;
990       ret.tag = c->tag;
991       c->done(c->done_arg, c);
992       break;
993     }
994 
995     grpc_cq_completion* c = cqd->queue.Pop();
996 
997     if (c != nullptr) {
998       ret.type = GRPC_OP_COMPLETE;
999       ret.success = c->next & 1u;
1000       ret.tag = c->tag;
1001       c->done(c->done_arg, c);
1002       break;
1003     } else {
1004       // If c == NULL it means either the queue is empty OR in an transient
1005       // inconsistent state. If it is the latter, we shold do a 0-timeout poll
1006       // so that the thread comes back quickly from poll to make a second
1007       // attempt at popping. Not doing this can potentially deadlock this
1008       // thread forever (if the deadline is infinity)
1009       if (cqd->queue.num_items() > 0) {
1010         iteration_deadline = grpc_core::Timestamp::ProcessEpoch();
1011       }
1012     }
1013 
1014     if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1015       // Before returning, check if the queue has any items left over (since
1016       // MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1017       // even if the queue is not empty. If so, keep retrying but do not
1018       // return GRPC_QUEUE_SHUTDOWN
1019       if (cqd->queue.num_items() > 0) {
1020         // Go to the beginning of the loop. No point doing a poll because
1021         // (cq->shutdown == true) is only possible when there is no pending
1022         // work (i.e cq->pending_events == 0) and any outstanding completion
1023         // events should have already been queued on this cq
1024         continue;
1025       }
1026 
1027       ret.type = GRPC_QUEUE_SHUTDOWN;
1028       ret.success = 0;
1029       break;
1030     }
1031 
1032     if (!is_finished_arg.first_loop &&
1033         grpc_core::Timestamp::Now() >= deadline_millis) {
1034       ret.type = GRPC_QUEUE_TIMEOUT;
1035       ret.success = 0;
1036       dump_pending_tags(cq);
1037       break;
1038     }
1039 
1040     // The main polling work happens in grpc_pollset_work
1041     gpr_mu_lock(cq->mu);
1042     cq->num_polls++;
1043     grpc_error_handle err = cq->poller_vtable->work(
1044         POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
1045     gpr_mu_unlock(cq->mu);
1046 
1047     if (!err.ok()) {
1048       gpr_log(GPR_ERROR, "Completion queue next failed: %s",
1049               grpc_core::StatusToString(err).c_str());
1050       if (err == absl::CancelledError()) {
1051         ret.type = GRPC_QUEUE_SHUTDOWN;
1052       } else {
1053         ret.type = GRPC_QUEUE_TIMEOUT;
1054       }
1055       ret.success = 0;
1056       dump_pending_tags(cq);
1057       break;
1058     }
1059     is_finished_arg.first_loop = false;
1060   }
1061 
1062   if (cqd->queue.num_items() > 0 &&
1063       cqd->pending_events.load(std::memory_order_acquire) > 0) {
1064     gpr_mu_lock(cq->mu);
1065     (void)cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1066     gpr_mu_unlock(cq->mu);
1067   }
1068 
1069   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1070   GRPC_CQ_INTERNAL_UNREF(cq, "next");
1071 
1072   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1073 
1074   return ret;
1075 }
1076 
1077 // Finishes the completion queue shutdown. This means that there are no more
1078 // completion events / tags expected from the completion queue
1079 // - Must be called under completion queue lock
1080 // - Must be called only once in completion queue's lifetime
1081 // - grpc_completion_queue_shutdown() MUST have been called before calling
1082 // this function
1083 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1084   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1085 
1086   GPR_ASSERT(cqd->shutdown_called);
1087   GPR_ASSERT(cqd->pending_events.load(std::memory_order_relaxed) == 0);
1088 
1089   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1090 }
1091 
1092 static void cq_shutdown_next(grpc_completion_queue* cq) {
1093   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1094 
1095   // Need an extra ref for cq here because:
1096   // We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1097   // Pollset shutdown decrements the cq ref count which can potentially destroy
1098   // the cq (if that happens to be the last ref).
1099   // Creating an extra ref here prevents the cq from getting destroyed while
1100   // this function is still active
1101   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1102   gpr_mu_lock(cq->mu);
1103   if (cqd->shutdown_called) {
1104     gpr_mu_unlock(cq->mu);
1105     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1106     return;
1107   }
1108   cqd->shutdown_called = true;
1109   // Doing acq/release fetch_sub here to match with
1110   // cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1111   // on this counter without necessarily holding a lock on cq
1112   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1113     cq_finish_shutdown_next(cq);
1114   }
1115   gpr_mu_unlock(cq->mu);
1116   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1117 }
1118 
1119 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1120                                       gpr_timespec deadline, void* reserved) {
1121   return cq->vtable->next(cq, deadline, reserved);
1122 }
1123 
1124 static int add_plucker(grpc_completion_queue* cq, void* tag,
1125                        grpc_pollset_worker** worker) {
1126   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1127   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1128     return 0;
1129   }
1130   cqd->pluckers[cqd->num_pluckers].tag = tag;
1131   cqd->pluckers[cqd->num_pluckers].worker = worker;
1132   cqd->num_pluckers++;
1133   return 1;
1134 }
1135 
1136 static void del_plucker(grpc_completion_queue* cq, void* tag,
1137                         grpc_pollset_worker** worker) {
1138   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1139   for (int i = 0; i < cqd->num_pluckers; i++) {
1140     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1141       cqd->num_pluckers--;
1142       std::swap(cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1143       return;
1144     }
1145   }
1146   GPR_UNREACHABLE_CODE(return);
1147 }
1148 
1149 class ExecCtxPluck : public grpc_core::ExecCtx {
1150  public:
1151   explicit ExecCtxPluck(void* arg)
1152       : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1153 
1154   bool CheckReadyToFinish() override {
1155     cq_is_finished_arg* a =
1156         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1157     grpc_completion_queue* cq = a->cq;
1158     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1159 
1160     GPR_ASSERT(a->stolen_completion == nullptr);
1161     gpr_atm current_last_seen_things_queued_ever =
1162         cqd->things_queued_ever.load(std::memory_order_relaxed);
1163     if (current_last_seen_things_queued_ever !=
1164         a->last_seen_things_queued_ever) {
1165       gpr_mu_lock(cq->mu);
1166       a->last_seen_things_queued_ever =
1167           cqd->things_queued_ever.load(std::memory_order_relaxed);
1168       grpc_cq_completion* c;
1169       grpc_cq_completion* prev = &cqd->completed_head;
1170       while ((c = reinterpret_cast<grpc_cq_completion*>(
1171                   prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1172         if (c->tag == a->tag) {
1173           prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1174           if (c == cqd->completed_tail) {
1175             cqd->completed_tail = prev;
1176           }
1177           gpr_mu_unlock(cq->mu);
1178           a->stolen_completion = c;
1179           return true;
1180         }
1181         prev = c;
1182       }
1183       gpr_mu_unlock(cq->mu);
1184     }
1185     return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
1186   }
1187 
1188  private:
1189   void* check_ready_to_finish_arg_;
1190 };
1191 
1192 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1193                            gpr_timespec deadline, void* reserved) {
1194   grpc_event ret;
1195   grpc_cq_completion* c;
1196   grpc_cq_completion* prev;
1197   grpc_pollset_worker* worker = nullptr;
1198   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1199 
1200   if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1201     GRPC_API_TRACE(
1202         "grpc_completion_queue_pluck("
1203         "cq=%p, tag=%p, "
1204         "deadline=gpr_timespec { tv_sec: %" PRId64
1205         ", tv_nsec: %d, clock_type: %d }, "
1206         "reserved=%p)",
1207         6,
1208         (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1209          reserved));
1210   }
1211   GPR_ASSERT(!reserved);
1212 
1213   dump_pending_tags(cq);
1214 
1215   GRPC_CQ_INTERNAL_REF(cq, "pluck");
1216   gpr_mu_lock(cq->mu);
1217   grpc_core::Timestamp deadline_millis =
1218       grpc_core::Timestamp::FromTimespecRoundUp(deadline);
1219   cq_is_finished_arg is_finished_arg = {
1220       cqd->things_queued_ever.load(std::memory_order_relaxed),
1221       cq,
1222       deadline_millis,
1223       nullptr,
1224       tag,
1225       true};
1226   ExecCtxPluck exec_ctx(&is_finished_arg);
1227   for (;;) {
1228     if (is_finished_arg.stolen_completion != nullptr) {
1229       gpr_mu_unlock(cq->mu);
1230       c = is_finished_arg.stolen_completion;
1231       is_finished_arg.stolen_completion = nullptr;
1232       ret.type = GRPC_OP_COMPLETE;
1233       ret.success = c->next & 1u;
1234       ret.tag = c->tag;
1235       c->done(c->done_arg, c);
1236       break;
1237     }
1238     prev = &cqd->completed_head;
1239     while ((c = reinterpret_cast<grpc_cq_completion*>(
1240                 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1241       if (GPR_LIKELY(c->tag == tag)) {
1242         prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1243         if (c == cqd->completed_tail) {
1244           cqd->completed_tail = prev;
1245         }
1246         gpr_mu_unlock(cq->mu);
1247         ret.type = GRPC_OP_COMPLETE;
1248         ret.success = c->next & 1u;
1249         ret.tag = c->tag;
1250         c->done(c->done_arg, c);
1251         goto done;
1252       }
1253       prev = c;
1254     }
1255     if (cqd->shutdown.load(std::memory_order_relaxed)) {
1256       gpr_mu_unlock(cq->mu);
1257       ret.type = GRPC_QUEUE_SHUTDOWN;
1258       ret.success = 0;
1259       break;
1260     }
1261     if (!add_plucker(cq, tag, &worker)) {
1262       gpr_log(GPR_DEBUG,
1263               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1264               "is %d",
1265               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1266       gpr_mu_unlock(cq->mu);
1267       // TODO(ctiller): should we use a different result here
1268       ret.type = GRPC_QUEUE_TIMEOUT;
1269       ret.success = 0;
1270       dump_pending_tags(cq);
1271       break;
1272     }
1273     if (!is_finished_arg.first_loop &&
1274         grpc_core::Timestamp::Now() >= deadline_millis) {
1275       del_plucker(cq, tag, &worker);
1276       gpr_mu_unlock(cq->mu);
1277       ret.type = GRPC_QUEUE_TIMEOUT;
1278       ret.success = 0;
1279       dump_pending_tags(cq);
1280       break;
1281     }
1282     cq->num_polls++;
1283     grpc_error_handle err =
1284         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1285     if (!err.ok()) {
1286       del_plucker(cq, tag, &worker);
1287       gpr_mu_unlock(cq->mu);
1288       gpr_log(GPR_ERROR, "Completion queue pluck failed: %s",
1289               grpc_core::StatusToString(err).c_str());
1290       ret.type = GRPC_QUEUE_TIMEOUT;
1291       ret.success = 0;
1292       dump_pending_tags(cq);
1293       break;
1294     }
1295     is_finished_arg.first_loop = false;
1296     del_plucker(cq, tag, &worker);
1297   }
1298 done:
1299   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1300   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1301 
1302   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1303 
1304   return ret;
1305 }
1306 
1307 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1308                                        gpr_timespec deadline, void* reserved) {
1309   return cq->vtable->pluck(cq, tag, deadline, reserved);
1310 }
1311 
1312 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1313   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1314 
1315   GPR_ASSERT(cqd->shutdown_called);
1316   GPR_ASSERT(!cqd->shutdown.load(std::memory_order_relaxed));
1317   cqd->shutdown.store(true, std::memory_order_relaxed);
1318 
1319   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1320 }
1321 
1322 // NOTE: This function is almost exactly identical to cq_shutdown_next() but
1323 // merging them is a bit tricky and probably not worth it
1324 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1325   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1326 
1327   // Need an extra ref for cq here because:
1328   // We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1329   // Pollset shutdown decrements the cq ref count which can potentially destroy
1330   // the cq (if that happens to be the last ref).
1331   // Creating an extra ref here prevents the cq from getting destroyed while
1332   // this function is still active
1333   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1334   gpr_mu_lock(cq->mu);
1335   if (cqd->shutdown_called) {
1336     gpr_mu_unlock(cq->mu);
1337     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1338     return;
1339   }
1340   cqd->shutdown_called = true;
1341   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1342     cq_finish_shutdown_pluck(cq);
1343   }
1344   gpr_mu_unlock(cq->mu);
1345   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1346 }
1347 
1348 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1349   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1350   auto* callback = cqd->shutdown_callback;
1351 
1352   GPR_ASSERT(cqd->shutdown_called);
1353 
1354   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1355   if (grpc_iomgr_is_any_background_poller_thread()) {
1356     grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1357     return;
1358   }
1359 
1360   // Schedule the callback on a closure if not internal or triggered
1361   // from a background poller thread.
1362   grpc_core::Executor::Run(
1363       GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1364       absl::OkStatus());
1365 }
1366 
1367 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1368   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1369 
1370   // Need an extra ref for cq here because:
1371   // We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1372   // Pollset shutdown decrements the cq ref count which can potentially destroy
1373   // the cq (if that happens to be the last ref).
1374   // Creating an extra ref here prevents the cq from getting destroyed while
1375   // this function is still active
1376   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1377   gpr_mu_lock(cq->mu);
1378   if (cqd->shutdown_called) {
1379     gpr_mu_unlock(cq->mu);
1380     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1381     return;
1382   }
1383   cqd->shutdown_called = true;
1384   if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1385     gpr_mu_unlock(cq->mu);
1386     cq_finish_shutdown_callback(cq);
1387   } else {
1388     gpr_mu_unlock(cq->mu);
1389   }
1390   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1391 }
1392 
1393 // Shutdown simply drops a ref that we reserved at creation time; if we drop
1394 // to zero here, then enter shutdown mode and wake up any waiters
1395 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1396   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1397   grpc_core::ExecCtx exec_ctx;
1398   GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1399   cq->vtable->shutdown(cq);
1400 }
1401 
1402 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1403   GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1404   grpc_completion_queue_shutdown(cq);
1405 
1406   grpc_core::ExecCtx exec_ctx;
1407   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1408 }
1409 
1410 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1411   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1412 }
1413 
1414 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1415   return cq->poller_vtable->can_listen;
1416 }
1417