1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/surface/completion_queue.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25
26 #include <vector>
27
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/str_join.h"
30
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/atm.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/debug/stats.h"
38 #include "src/core/lib/gpr/spinlock.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gpr/tls.h"
41 #include "src/core/lib/gprpp/atomic.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/executor.h"
44 #include "src/core/lib/iomgr/pollset.h"
45 #include "src/core/lib/iomgr/timer.h"
46 #include "src/core/lib/profiling/timers.h"
47 #include "src/core/lib/surface/api_trace.h"
48 #include "src/core/lib/surface/call.h"
49 #include "src/core/lib/surface/event_string.h"
50
51 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
52 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
53 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
54
55 namespace {
56
57 // Specifies a cq thread local cache.
58 // The first event that occurs on a thread
59 // with a cq cache will go into that cache, and
60 // will only be returned on the thread that initialized the cache.
61 // NOTE: Only one event will ever be cached.
62 GPR_TLS_DECL(g_cached_event);
63 GPR_TLS_DECL(g_cached_cq);
64
65 struct plucker {
66 grpc_pollset_worker** worker;
67 void* tag;
68 };
69 struct cq_poller_vtable {
70 bool can_get_pollset;
71 bool can_listen;
72 size_t (*size)(void);
73 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
74 grpc_error* (*kick)(grpc_pollset* pollset,
75 grpc_pollset_worker* specific_worker);
76 grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
77 grpc_millis deadline);
78 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
79 void (*destroy)(grpc_pollset* pollset);
80 };
81 typedef struct non_polling_worker {
82 gpr_cv cv;
83 bool kicked;
84 struct non_polling_worker* next;
85 struct non_polling_worker* prev;
86 } non_polling_worker;
87
88 struct non_polling_poller {
89 gpr_mu mu;
90 bool kicked_without_poller;
91 non_polling_worker* root;
92 grpc_closure* shutdown;
93 };
non_polling_poller_size(void)94 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
95
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)96 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
97 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
98 gpr_mu_init(&npp->mu);
99 *mu = &npp->mu;
100 }
101
non_polling_poller_destroy(grpc_pollset * pollset)102 void non_polling_poller_destroy(grpc_pollset* pollset) {
103 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
104 gpr_mu_destroy(&npp->mu);
105 }
106
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)107 grpc_error* non_polling_poller_work(grpc_pollset* pollset,
108 grpc_pollset_worker** worker,
109 grpc_millis deadline) {
110 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
111 if (npp->shutdown) return GRPC_ERROR_NONE;
112 if (npp->kicked_without_poller) {
113 npp->kicked_without_poller = false;
114 return GRPC_ERROR_NONE;
115 }
116 non_polling_worker w;
117 gpr_cv_init(&w.cv);
118 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
119 if (npp->root == nullptr) {
120 npp->root = w.next = w.prev = &w;
121 } else {
122 w.next = npp->root;
123 w.prev = w.next->prev;
124 w.next->prev = w.prev->next = &w;
125 }
126 w.kicked = false;
127 gpr_timespec deadline_ts =
128 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
129 while (!npp->shutdown && !w.kicked &&
130 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
131 ;
132 grpc_core::ExecCtx::Get()->InvalidateNow();
133 if (&w == npp->root) {
134 npp->root = w.next;
135 if (&w == npp->root) {
136 if (npp->shutdown) {
137 grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE);
138 }
139 npp->root = nullptr;
140 }
141 }
142 w.next->prev = w.prev;
143 w.prev->next = w.next;
144 gpr_cv_destroy(&w.cv);
145 if (worker != nullptr) *worker = nullptr;
146 return GRPC_ERROR_NONE;
147 }
148
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)149 grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
150 grpc_pollset_worker* specific_worker) {
151 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
152 if (specific_worker == nullptr)
153 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
154 if (specific_worker != nullptr) {
155 non_polling_worker* w =
156 reinterpret_cast<non_polling_worker*>(specific_worker);
157 if (!w->kicked) {
158 w->kicked = true;
159 gpr_cv_signal(&w->cv);
160 }
161 } else {
162 p->kicked_without_poller = true;
163 }
164 return GRPC_ERROR_NONE;
165 }
166
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)167 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
168 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
169 GPR_ASSERT(closure != nullptr);
170 p->shutdown = closure;
171 if (p->root == nullptr) {
172 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
173 } else {
174 non_polling_worker* w = p->root;
175 do {
176 gpr_cv_signal(&w->cv);
177 w = w->next;
178 } while (w != p->root);
179 }
180 }
181
182 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
183 /* GRPC_CQ_DEFAULT_POLLING */
184 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
185 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
186 /* GRPC_CQ_NON_LISTENING */
187 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
188 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
189 /* GRPC_CQ_NON_POLLING */
190 {false, false, non_polling_poller_size, non_polling_poller_init,
191 non_polling_poller_kick, non_polling_poller_work,
192 non_polling_poller_shutdown, non_polling_poller_destroy},
193 };
194
195 } // namespace
196
197 struct cq_vtable {
198 grpc_cq_completion_type cq_completion_type;
199 size_t data_size;
200 void (*init)(void* data,
201 grpc_experimental_completion_queue_functor* shutdown_callback);
202 void (*shutdown)(grpc_completion_queue* cq);
203 void (*destroy)(void* data);
204 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
205 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
206 void (*done)(void* done_arg, grpc_cq_completion* storage),
207 void* done_arg, grpc_cq_completion* storage, bool internal);
208 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
209 void* reserved);
210 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
211 gpr_timespec deadline, void* reserved);
212 // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer
213 // needed.
214 grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq);
215 };
216
217 namespace {
218
219 /* Queue that holds the cq_completion_events. Internally uses
220 * MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
221 * queue). It uses a queue_lock to support multiple consumers.
222 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
223 class CqEventQueue {
224 public:
225 CqEventQueue() = default;
226 ~CqEventQueue() = default;
227
228 /* Note: The counter is not incremented/decremented atomically with push/pop.
229 * The count is only eventually consistent */
num_items() const230 intptr_t num_items() const {
231 return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
232 }
233
234 bool Push(grpc_cq_completion* c);
235 grpc_cq_completion* Pop();
236
237 private:
238 /* Spinlock to serialize consumers i.e pop() operations */
239 gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
240
241 grpc_core::MultiProducerSingleConsumerQueue queue_;
242
243 /* A lazy counter of number of items in the queue. This is NOT atomically
244 incremented/decremented along with push/pop operations and hence is only
245 eventually consistent */
246 grpc_core::Atomic<intptr_t> num_queue_items_{0};
247 };
248
249 struct cq_next_data {
~cq_next_data__anon2868c5d30211::cq_next_data250 ~cq_next_data() {
251 GPR_ASSERT(queue.num_items() == 0);
252 #ifndef NDEBUG
253 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
254 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
255 }
256 #endif
257 }
258
259 /** Completed events for completion-queues of type GRPC_CQ_NEXT */
260 CqEventQueue queue;
261
262 /** Counter of how many things have ever been queued on this completion queue
263 useful for avoiding locks to check the queue */
264 grpc_core::Atomic<intptr_t> things_queued_ever{0};
265
266 /** Number of outstanding events (+1 if not shut down)
267 Initial count is dropped by grpc_completion_queue_shutdown */
268 grpc_core::Atomic<intptr_t> pending_events{1};
269
270 /** 0 initially. 1 once we initiated shutdown */
271 bool shutdown_called = false;
272 };
273
274 struct cq_pluck_data {
cq_pluck_data__anon2868c5d30211::cq_pluck_data275 cq_pluck_data() {
276 completed_tail = &completed_head;
277 completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
278 }
279
~cq_pluck_data__anon2868c5d30211::cq_pluck_data280 ~cq_pluck_data() {
281 GPR_ASSERT(completed_head.next ==
282 reinterpret_cast<uintptr_t>(&completed_head));
283 #ifndef NDEBUG
284 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
285 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
286 }
287 #endif
288 }
289
290 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
291 grpc_cq_completion completed_head;
292 grpc_cq_completion* completed_tail;
293
294 /** Number of pending events (+1 if we're not shutdown).
295 Initial count is dropped by grpc_completion_queue_shutdown. */
296 grpc_core::Atomic<intptr_t> pending_events{1};
297
298 /** Counter of how many things have ever been queued on this completion queue
299 useful for avoiding locks to check the queue */
300 grpc_core::Atomic<intptr_t> things_queued_ever{0};
301
302 /** 0 initially. 1 once we completed shutting */
303 /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
304 * (pending_events == 0). So consider removing this in future and use
305 * pending_events */
306 grpc_core::Atomic<bool> shutdown{false};
307
308 /** 0 initially. 1 once we initiated shutdown */
309 bool shutdown_called = false;
310
311 int num_pluckers = 0;
312 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
313 };
314
315 struct cq_callback_data {
cq_callback_data__anon2868c5d30211::cq_callback_data316 explicit cq_callback_data(
317 grpc_experimental_completion_queue_functor* shutdown_callback)
318 : shutdown_callback(shutdown_callback) {}
319
~cq_callback_data__anon2868c5d30211::cq_callback_data320 ~cq_callback_data() {
321 #ifndef NDEBUG
322 if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
323 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
324 }
325 #endif
326 }
327
328 /** No actual completed events queue, unlike other types */
329
330 /** Number of pending events (+1 if we're not shutdown).
331 Initial count is dropped by grpc_completion_queue_shutdown. */
332 grpc_core::Atomic<intptr_t> pending_events{1};
333
334 /** 0 initially. 1 once we initiated shutdown */
335 bool shutdown_called = false;
336
337 /** A callback that gets invoked when the CQ completes shutdown */
338 grpc_experimental_completion_queue_functor* shutdown_callback;
339 };
340
341 // TODO(vjpai): Remove all callback_alternative variants when event manager is
342 // the only supported poller.
343 struct cq_callback_alternative_data {
cq_callback_alternative_data__anon2868c5d30211::cq_callback_alternative_data344 explicit cq_callback_alternative_data(
345 grpc_experimental_completion_queue_functor* shutdown_callback)
346 : implementation(SharedNextableCQ()),
347 shutdown_callback(shutdown_callback) {}
348
349 /* This just points to a single shared nextable CQ */
350 grpc_completion_queue* const implementation;
351
352 /** Number of outstanding events (+1 if not shut down)
353 Initial count is dropped by grpc_completion_queue_shutdown */
354 grpc_core::Atomic<intptr_t> pending_events{1};
355
356 /** 0 initially. 1 once we initiated shutdown */
357 bool shutdown_called = false;
358
359 /** A callback that gets invoked when the CQ completes shutdown */
360 grpc_experimental_completion_queue_functor* shutdown_callback;
361
SharedNextableCQ__anon2868c5d30211::cq_callback_alternative_data362 static grpc_completion_queue* SharedNextableCQ() {
363 grpc_core::MutexLock lock(&*shared_cq_next_mu);
364
365 if (shared_cq_next == nullptr) {
366 shared_cq_next = grpc_completion_queue_create_for_next(nullptr);
367 int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 1, 32);
368 threads_remaining.Store(num_nexting_threads,
369 grpc_core::MemoryOrder::RELEASE);
370 for (int i = 0; i < num_nexting_threads; i++) {
371 grpc_core::Executor::Run(
372 GRPC_CLOSURE_CREATE(
373 [](void* arg, grpc_error* /*error*/) {
374 grpc_completion_queue* cq =
375 static_cast<grpc_completion_queue*>(arg);
376 while (true) {
377 grpc_event event = grpc_completion_queue_next(
378 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
379 if (event.type == GRPC_QUEUE_SHUTDOWN) {
380 break;
381 }
382 GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE);
383 // We can always execute the callback inline rather than
384 // pushing it to another Executor thread because this
385 // thread is definitely running on an executor, does not
386 // hold any application locks before executing the callback,
387 // and cannot be entered recursively.
388 auto* functor = static_cast<
389 grpc_experimental_completion_queue_functor*>(event.tag);
390 functor->functor_run(functor, event.success);
391 }
392 if (threads_remaining.FetchSub(
393 1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
394 grpc_completion_queue_destroy(cq);
395 }
396 },
397 shared_cq_next, nullptr),
398 GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
399 grpc_core::ExecutorJobType::LONG);
400 }
401 }
402 return shared_cq_next;
403 }
404 // Use manually-constructed Mutex to avoid static construction issues
405 static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu;
406 static grpc_completion_queue*
407 shared_cq_next; // GUARDED_BY(shared_cq_next_mu)
408 static grpc_core::Atomic<int> threads_remaining;
409 };
410
411 grpc_core::ManualConstructor<grpc_core::Mutex>
412 cq_callback_alternative_data::shared_cq_next_mu;
413 grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr;
414 grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0};
415
416 } // namespace
417
418 /* Completion queue structure */
419 struct grpc_completion_queue {
420 /** Once owning_refs drops to zero, we will destroy the cq */
421 grpc_core::RefCount owning_refs;
422
423 gpr_mu* mu;
424
425 const cq_vtable* vtable;
426 const cq_poller_vtable* poller_vtable;
427
428 // The pollset entry is allowed to enable proxy CQs like the
429 // callback_alternative.
430 // TODO(vjpai): Consider removing pollset and reverting to previous
431 // calculation of pollset once callback_alternative is no longer needed.
432 grpc_pollset* pollset;
433
434 #ifndef NDEBUG
435 void** outstanding_tags;
436 size_t outstanding_tag_count;
437 size_t outstanding_tag_capacity;
438 #endif
439
440 grpc_closure pollset_shutdown_done;
441 int num_polls;
442 };
443
444 /* Forward declarations */
445 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
446 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
447 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
448 static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq);
449 static void cq_shutdown_next(grpc_completion_queue* cq);
450 static void cq_shutdown_pluck(grpc_completion_queue* cq);
451 static void cq_shutdown_callback(grpc_completion_queue* cq);
452 static void cq_shutdown_callback_alternative(grpc_completion_queue* cq);
453
454 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
455 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
456 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
457 static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
458 void* tag);
459
460 // A cq_end_op function is called when an operation on a given CQ with
461 // a given tag has completed. The storage argument is a reference to the
462 // space reserved for this completion as it is placed into the corresponding
463 // queue. The done argument is a callback that will be invoked when it is
464 // safe to free up that storage. The storage MUST NOT be freed until the
465 // done callback is invoked.
466 static void cq_end_op_for_next(
467 grpc_completion_queue* cq, void* tag, grpc_error* error,
468 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
469 grpc_cq_completion* storage, bool internal);
470
471 static void cq_end_op_for_pluck(
472 grpc_completion_queue* cq, void* tag, grpc_error* error,
473 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
474 grpc_cq_completion* storage, bool internal);
475
476 static void cq_end_op_for_callback(
477 grpc_completion_queue* cq, void* tag, grpc_error* error,
478 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
479 grpc_cq_completion* storage, bool internal);
480
481 static void cq_end_op_for_callback_alternative(
482 grpc_completion_queue* cq, void* tag, grpc_error* error,
483 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
484 grpc_cq_completion* storage, bool internal);
485
486 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
487 void* reserved);
488
489 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
490 gpr_timespec deadline, void* reserved);
491
492 static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
493 grpc_completion_queue* cq);
494
495 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
496 static void cq_init_next(
497 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
498 static void cq_init_pluck(
499 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
500 static void cq_init_callback(
501 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
502 // poller becomes only option.
503 static void cq_init_callback_alternative(
504 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
505 static void cq_destroy_next(void* data);
506 static void cq_destroy_pluck(void* data);
507 static void cq_destroy_callback(void* data);
508 static void cq_destroy_callback_alternative(void* data);
509
510 /* Completion queue vtables based on the completion-type */
511 // TODO(vjpai): Make this const again once we stop needing callback_alternative
512 static cq_vtable g_polling_cq_vtable[] = {
513 /* GRPC_CQ_NEXT */
514 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
515 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
516 nullptr, nullptr},
517 /* GRPC_CQ_PLUCK */
518 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
519 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
520 cq_pluck, nullptr},
521 /* GRPC_CQ_CALLBACK */
522 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
523 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
524 cq_end_op_for_callback, nullptr, nullptr, nullptr},
525 };
526
527 // Separate vtable for non-polling cqs, assign at init
528 static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) /
529 sizeof(g_polling_cq_vtable[0])];
530
531 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
532 #define INLINE_POLLSET_FROM_CQ(cq) \
533 ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
534 #define POLLSET_FROM_CQ(cq) (cq->pollset)
535
536 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
537
538 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
539 do { \
540 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) && \
541 (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
542 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
543 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
544 grpc_event_string(event).c_str()); \
545 } \
546 } while (0)
547
548 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
549
grpc_cq_global_init()550 void grpc_cq_global_init() {
551 gpr_tls_init(&g_cached_event);
552 gpr_tls_init(&g_cached_cq);
553 g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT];
554 g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK];
555 g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] =
556 g_polling_cq_vtable[GRPC_CQ_CALLBACK];
557 }
558
559 // TODO(vjpai): Remove when callback_alternative is no longer needed
grpc_cq_init()560 void grpc_cq_init() {
561 // If the iomgr runs in the background, we can use the preferred callback CQ.
562 // If the iomgr is non-polling, we cannot use the alternative callback CQ.
563 if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
564 cq_callback_alternative_data::shared_cq_next_mu.Init();
565 g_polling_cq_vtable[GRPC_CQ_CALLBACK] = {
566 GRPC_CQ_CALLBACK,
567 sizeof(cq_callback_alternative_data),
568 cq_init_callback_alternative,
569 cq_shutdown_callback_alternative,
570 cq_destroy_callback_alternative,
571 cq_begin_op_for_callback_alternative,
572 cq_end_op_for_callback_alternative,
573 nullptr,
574 nullptr,
575 cq_proxy_pollset_for_callback_alternative};
576 }
577 }
578
579 // TODO(vjpai): Remove when callback_alternative is no longer needed
grpc_cq_shutdown()580 void grpc_cq_shutdown() {
581 if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
582 {
583 grpc_core::MutexLock lock(
584 &*cq_callback_alternative_data::shared_cq_next_mu);
585 if (cq_callback_alternative_data::shared_cq_next != nullptr) {
586 grpc_completion_queue_shutdown(
587 cq_callback_alternative_data::shared_cq_next);
588 }
589 cq_callback_alternative_data::shared_cq_next = nullptr;
590 }
591 cq_callback_alternative_data::shared_cq_next_mu.Destroy();
592 }
593 }
594
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)595 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
596 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
597 gpr_tls_set(&g_cached_event, (intptr_t)0);
598 gpr_tls_set(&g_cached_cq, (intptr_t)cq);
599 }
600 }
601
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)602 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
603 void** tag, int* ok) {
604 grpc_cq_completion* storage =
605 (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
606 int ret = 0;
607 if (storage != nullptr &&
608 (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
609 *tag = storage->tag;
610 grpc_core::ExecCtx exec_ctx;
611 *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
612 storage->done(storage->done_arg, storage);
613 ret = 1;
614 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
615 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
616 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
617 gpr_mu_lock(cq->mu);
618 cq_finish_shutdown_next(cq);
619 gpr_mu_unlock(cq->mu);
620 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
621 }
622 }
623 gpr_tls_set(&g_cached_event, (intptr_t)0);
624 gpr_tls_set(&g_cached_cq, (intptr_t)0);
625
626 return ret;
627 }
628
Push(grpc_cq_completion * c)629 bool CqEventQueue::Push(grpc_cq_completion* c) {
630 queue_.Push(
631 reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
632 return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
633 }
634
Pop()635 grpc_cq_completion* CqEventQueue::Pop() {
636 grpc_cq_completion* c = nullptr;
637
638 if (gpr_spinlock_trylock(&queue_lock_)) {
639 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
640
641 bool is_empty = false;
642 c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
643 gpr_spinlock_unlock(&queue_lock_);
644
645 if (c == nullptr && !is_empty) {
646 GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
647 }
648 } else {
649 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
650 }
651
652 if (c) {
653 num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
654 }
655
656 return c;
657 }
658
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_experimental_completion_queue_functor * shutdown_callback)659 grpc_completion_queue* grpc_completion_queue_create_internal(
660 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
661 grpc_experimental_completion_queue_functor* shutdown_callback) {
662 GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
663
664 grpc_completion_queue* cq;
665
666 GRPC_API_TRACE(
667 "grpc_completion_queue_create_internal(completion_type=%d, "
668 "polling_type=%d)",
669 2, (completion_type, polling_type));
670
671 const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING)
672 ? &g_nonpolling_cq_vtable[completion_type]
673 : &g_polling_cq_vtable[completion_type];
674 const cq_poller_vtable* poller_vtable =
675 &g_poller_vtable_by_poller_type[polling_type];
676
677 grpc_core::ExecCtx exec_ctx;
678 GRPC_STATS_INC_CQS_CREATED();
679
680 cq = static_cast<grpc_completion_queue*>(
681 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
682 poller_vtable->size()));
683
684 cq->vtable = vtable;
685 cq->poller_vtable = poller_vtable;
686
687 /* One for destroy(), one for pollset_shutdown */
688 new (&cq->owning_refs) grpc_core::RefCount(2);
689
690 vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
691
692 // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can
693 // be removed and the nullptr proxy_pollset value below can be the definition
694 // of POLLSET_FROM_CQ.
695 cq->pollset = cq->vtable->proxy_pollset == nullptr
696 ? INLINE_POLLSET_FROM_CQ(cq)
697 : cq->vtable->proxy_pollset(cq);
698 // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be
699 // init'ed in its CQ init.
700 cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu);
701
702 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
703 grpc_schedule_on_exec_ctx);
704 return cq;
705 }
706
cq_init_next(void * data,grpc_experimental_completion_queue_functor *)707 static void cq_init_next(
708 void* data,
709 grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
710 new (data) cq_next_data();
711 }
712
cq_destroy_next(void * data)713 static void cq_destroy_next(void* data) {
714 cq_next_data* cqd = static_cast<cq_next_data*>(data);
715 cqd->~cq_next_data();
716 }
717
cq_init_pluck(void * data,grpc_experimental_completion_queue_functor *)718 static void cq_init_pluck(
719 void* data,
720 grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
721 new (data) cq_pluck_data();
722 }
723
cq_destroy_pluck(void * data)724 static void cq_destroy_pluck(void* data) {
725 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
726 cqd->~cq_pluck_data();
727 }
728
cq_init_callback(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)729 static void cq_init_callback(
730 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
731 new (data) cq_callback_data(shutdown_callback);
732 }
733
cq_destroy_callback(void * data)734 static void cq_destroy_callback(void* data) {
735 cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
736 cqd->~cq_callback_data();
737 }
738
cq_init_callback_alternative(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)739 static void cq_init_callback_alternative(
740 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
741 new (data) cq_callback_alternative_data(shutdown_callback);
742 }
743
cq_destroy_callback_alternative(void * data)744 static void cq_destroy_callback_alternative(void* data) {
745 cq_callback_alternative_data* cqd =
746 static_cast<cq_callback_alternative_data*>(data);
747 cqd->~cq_callback_alternative_data();
748 }
749
grpc_get_cq_completion_type(grpc_completion_queue * cq)750 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
751 return cq->vtable->cq_completion_type;
752 }
753
grpc_get_cq_poll_num(grpc_completion_queue * cq)754 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
755 int cur_num_polls;
756 gpr_mu_lock(cq->mu);
757 cur_num_polls = cq->num_polls;
758 gpr_mu_unlock(cq->mu);
759 return cur_num_polls;
760 }
761
762 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)763 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
764 const char* file, int line) {
765 grpc_core::DebugLocation debug_location(file, line);
766 #else
767 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
768 grpc_core::DebugLocation debug_location;
769 const char* reason = nullptr;
770 #endif
771 cq->owning_refs.Ref(debug_location, reason);
772 }
773
774 static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) {
775 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
776 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
777 }
778
779 #ifndef NDEBUG
780 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
781 const char* file, int line) {
782 grpc_core::DebugLocation debug_location(file, line);
783 #else
784 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
785 grpc_core::DebugLocation debug_location;
786 const char* reason = nullptr;
787 #endif
788 if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
789 cq->vtable->destroy(DATA_FROM_CQ(cq));
790 // Only destroy the inlined pollset. If a proxy CQ is used, the proxy
791 // pollset will be destroyed by the proxy CQ.
792 cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq));
793 #ifndef NDEBUG
794 gpr_free(cq->outstanding_tags);
795 #endif
796 gpr_free(cq);
797 }
798 }
799
800 #ifndef NDEBUG
801 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
802 int found = 0;
803 if (lock_cq) {
804 gpr_mu_lock(cq->mu);
805 }
806
807 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
808 if (cq->outstanding_tags[i] == tag) {
809 cq->outstanding_tag_count--;
810 GPR_SWAP(void*, cq->outstanding_tags[i],
811 cq->outstanding_tags[cq->outstanding_tag_count]);
812 found = 1;
813 break;
814 }
815 }
816
817 if (lock_cq) {
818 gpr_mu_unlock(cq->mu);
819 }
820
821 GPR_ASSERT(found);
822 }
823 #else
824 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
825 bool /*lock_cq*/) {}
826 #endif
827
828 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
829 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
830 return cqd->pending_events.IncrementIfNonzero();
831 }
832
833 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
834 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
835 return cqd->pending_events.IncrementIfNonzero();
836 }
837
838 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
839 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
840 return cqd->pending_events.IncrementIfNonzero();
841 }
842
843 static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
844 void* tag) {
845 cq_callback_alternative_data* cqd =
846 static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
847 return grpc_cq_begin_op(cqd->implementation, tag) &&
848 cqd->pending_events.IncrementIfNonzero();
849 }
850
851 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
852 #ifndef NDEBUG
853 gpr_mu_lock(cq->mu);
854 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
855 cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
856 cq->outstanding_tags = static_cast<void**>(gpr_realloc(
857 cq->outstanding_tags,
858 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
859 }
860 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
861 gpr_mu_unlock(cq->mu);
862 #endif
863 return cq->vtable->begin_op(cq, tag);
864 }
865
866 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
867 * completion
868 * type of GRPC_CQ_NEXT) */
869 static void cq_end_op_for_next(
870 grpc_completion_queue* cq, void* tag, grpc_error* error,
871 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
872 grpc_cq_completion* storage, bool /*internal*/) {
873 GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
874
875 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
876 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
877 error != GRPC_ERROR_NONE)) {
878 const char* errmsg = grpc_error_string(error);
879 GRPC_API_TRACE(
880 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
881 "done=%p, done_arg=%p, storage=%p)",
882 6, (cq, tag, errmsg, done, done_arg, storage));
883 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
884 error != GRPC_ERROR_NONE) {
885 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
886 }
887 }
888 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
889 int is_success = (error == GRPC_ERROR_NONE);
890
891 storage->tag = tag;
892 storage->done = done;
893 storage->done_arg = done_arg;
894 storage->next = static_cast<uintptr_t>(is_success);
895
896 cq_check_tag(cq, tag, true); /* Used in debug builds only */
897
898 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
899 (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
900 gpr_tls_set(&g_cached_event, (intptr_t)storage);
901 } else {
902 /* Add the completion to the queue */
903 bool is_first = cqd->queue.Push(storage);
904 cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
905 /* Since we do not hold the cq lock here, it is important to do an 'acquire'
906 load here (instead of a 'no_barrier' load) to match with the release
907 store
908 (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
909 */
910 if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
911 /* Only kick if this is the first item queued */
912 if (is_first) {
913 gpr_mu_lock(cq->mu);
914 grpc_error* kick_error =
915 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
916 gpr_mu_unlock(cq->mu);
917
918 if (kick_error != GRPC_ERROR_NONE) {
919 const char* msg = grpc_error_string(kick_error);
920 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
921 GRPC_ERROR_UNREF(kick_error);
922 }
923 }
924 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
925 1) {
926 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
927 gpr_mu_lock(cq->mu);
928 cq_finish_shutdown_next(cq);
929 gpr_mu_unlock(cq->mu);
930 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
931 }
932 } else {
933 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
934 cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
935 gpr_mu_lock(cq->mu);
936 cq_finish_shutdown_next(cq);
937 gpr_mu_unlock(cq->mu);
938 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
939 }
940 }
941
942 GRPC_ERROR_UNREF(error);
943 }
944
945 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
946 * completion
947 * type of GRPC_CQ_PLUCK) */
948 static void cq_end_op_for_pluck(
949 grpc_completion_queue* cq, void* tag, grpc_error* error,
950 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
951 grpc_cq_completion* storage, bool /*internal*/) {
952 GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
953
954 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
955 int is_success = (error == GRPC_ERROR_NONE);
956
957 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
958 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
959 error != GRPC_ERROR_NONE)) {
960 const char* errmsg = grpc_error_string(error);
961 GRPC_API_TRACE(
962 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
963 "done=%p, done_arg=%p, storage=%p)",
964 6, (cq, tag, errmsg, done, done_arg, storage));
965 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
966 error != GRPC_ERROR_NONE) {
967 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
968 }
969 }
970
971 storage->tag = tag;
972 storage->done = done;
973 storage->done_arg = done_arg;
974 storage->next =
975 ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
976
977 gpr_mu_lock(cq->mu);
978 cq_check_tag(cq, tag, false); /* Used in debug builds only */
979
980 /* Add to the list of completions */
981 cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
982 cqd->completed_tail->next =
983 ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
984 cqd->completed_tail = storage;
985
986 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
987 cq_finish_shutdown_pluck(cq);
988 gpr_mu_unlock(cq->mu);
989 } else {
990 grpc_pollset_worker* pluck_worker = nullptr;
991 for (int i = 0; i < cqd->num_pluckers; i++) {
992 if (cqd->pluckers[i].tag == tag) {
993 pluck_worker = *cqd->pluckers[i].worker;
994 break;
995 }
996 }
997
998 grpc_error* kick_error =
999 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
1000
1001 gpr_mu_unlock(cq->mu);
1002
1003 if (kick_error != GRPC_ERROR_NONE) {
1004 const char* msg = grpc_error_string(kick_error);
1005 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
1006
1007 GRPC_ERROR_UNREF(kick_error);
1008 }
1009 }
1010
1011 GRPC_ERROR_UNREF(error);
1012 }
1013
1014 void functor_callback(void* arg, grpc_error* error) {
1015 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
1016 functor->functor_run(functor, error == GRPC_ERROR_NONE);
1017 }
1018
1019 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
1020 static void cq_end_op_for_callback(
1021 grpc_completion_queue* cq, void* tag, grpc_error* error,
1022 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
1023 grpc_cq_completion* storage, bool internal) {
1024 GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
1025
1026 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1027
1028 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
1029 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1030 error != GRPC_ERROR_NONE)) {
1031 const char* errmsg = grpc_error_string(error);
1032 GRPC_API_TRACE(
1033 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
1034 "done=%p, done_arg=%p, storage=%p)",
1035 6, (cq, tag, errmsg, done, done_arg, storage));
1036 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1037 error != GRPC_ERROR_NONE) {
1038 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
1039 }
1040 }
1041
1042 // The callback-based CQ isn't really a queue at all and thus has no need
1043 // for reserved storage. Invoke the done callback right away to release it.
1044 done(done_arg, storage);
1045
1046 cq_check_tag(cq, tag, true); /* Used in debug builds only */
1047
1048 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1049 cq_finish_shutdown_callback(cq);
1050 }
1051
1052 // If possible, schedule the callback onto an existing thread-local
1053 // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
1054 // 1. The callback is internally-generated and there is an ACEC available
1055 // 2. The callback is marked inlineable and there is an ACEC available
1056 // 3. We are already running in a background poller thread (which always has
1057 // an ACEC available at the base of the stack).
1058 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
1059 if (((internal || functor->inlineable) &&
1060 grpc_core::ApplicationCallbackExecCtx::Available()) ||
1061 grpc_iomgr_is_any_background_poller_thread()) {
1062 grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
1063 (error == GRPC_ERROR_NONE));
1064 GRPC_ERROR_UNREF(error);
1065 return;
1066 }
1067
1068 // Schedule the callback on a closure if not internal or triggered
1069 // from a background poller thread.
1070 grpc_core::Executor::Run(
1071 GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
1072 }
1073
1074 static void cq_end_op_for_callback_alternative(
1075 grpc_completion_queue* cq, void* tag, grpc_error* error,
1076 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
1077 grpc_cq_completion* storage, bool internal) {
1078 GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0);
1079
1080 cq_callback_alternative_data* cqd =
1081 static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1082
1083 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
1084 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1085 error != GRPC_ERROR_NONE)) {
1086 const char* errmsg = grpc_error_string(error);
1087 GRPC_API_TRACE(
1088 "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, "
1089 "done=%p, done_arg=%p, storage=%p)",
1090 6, (cq, tag, errmsg, done, done_arg, storage));
1091 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
1092 error != GRPC_ERROR_NONE) {
1093 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
1094 }
1095 }
1096
1097 // Pass through the actual work to the internal nextable CQ
1098 grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage,
1099 internal);
1100
1101 cq_check_tag(cq, tag, true); /* Used in debug builds only */
1102
1103 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1104 cq_finish_shutdown_callback_alternative(cq);
1105 }
1106 }
1107
1108 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
1109 void (*done)(void* done_arg, grpc_cq_completion* storage),
1110 void* done_arg, grpc_cq_completion* storage,
1111 bool internal) {
1112 cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
1113 }
1114
1115 static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
1116 grpc_completion_queue* cq) {
1117 cq_callback_alternative_data* cqd =
1118 static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq));
1119 return POLLSET_FROM_CQ(cqd->implementation);
1120 }
1121
1122 struct cq_is_finished_arg {
1123 gpr_atm last_seen_things_queued_ever;
1124 grpc_completion_queue* cq;
1125 grpc_millis deadline;
1126 grpc_cq_completion* stolen_completion;
1127 void* tag; /* for pluck */
1128 bool first_loop;
1129 };
1130 class ExecCtxNext : public grpc_core::ExecCtx {
1131 public:
1132 ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1133
1134 bool CheckReadyToFinish() override {
1135 cq_is_finished_arg* a =
1136 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1137 grpc_completion_queue* cq = a->cq;
1138 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1139 GPR_ASSERT(a->stolen_completion == nullptr);
1140
1141 intptr_t current_last_seen_things_queued_ever =
1142 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1143
1144 if (current_last_seen_things_queued_ever !=
1145 a->last_seen_things_queued_ever) {
1146 a->last_seen_things_queued_ever =
1147 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1148
1149 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
1150 * might return NULL in some cases even if the queue is not empty; but
1151 * that
1152 * is ok and doesn't affect correctness. Might effect the tail latencies a
1153 * bit) */
1154 a->stolen_completion = cqd->queue.Pop();
1155 if (a->stolen_completion != nullptr) {
1156 return true;
1157 }
1158 }
1159 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1160 }
1161
1162 private:
1163 void* check_ready_to_finish_arg_;
1164 };
1165
1166 #ifndef NDEBUG
1167 static void dump_pending_tags(grpc_completion_queue* cq) {
1168 if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
1169 std::vector<std::string> parts;
1170 parts.push_back("PENDING TAGS:");
1171 gpr_mu_lock(cq->mu);
1172 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
1173 parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
1174 }
1175 gpr_mu_unlock(cq->mu);
1176 gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
1177 }
1178 #else
1179 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
1180 #endif
1181
1182 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
1183 void* reserved) {
1184 GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
1185
1186 grpc_event ret;
1187 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1188
1189 GRPC_API_TRACE(
1190 "grpc_completion_queue_next("
1191 "cq=%p, "
1192 "deadline=gpr_timespec { tv_sec: %" PRId64
1193 ", tv_nsec: %d, clock_type: %d }, "
1194 "reserved=%p)",
1195 5,
1196 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1197 reserved));
1198 GPR_ASSERT(!reserved);
1199
1200 dump_pending_tags(cq);
1201
1202 GRPC_CQ_INTERNAL_REF(cq, "next");
1203
1204 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1205 cq_is_finished_arg is_finished_arg = {
1206 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1207 cq,
1208 deadline_millis,
1209 nullptr,
1210 nullptr,
1211 true};
1212 ExecCtxNext exec_ctx(&is_finished_arg);
1213 for (;;) {
1214 grpc_millis iteration_deadline = deadline_millis;
1215
1216 if (is_finished_arg.stolen_completion != nullptr) {
1217 grpc_cq_completion* c = is_finished_arg.stolen_completion;
1218 is_finished_arg.stolen_completion = nullptr;
1219 ret.type = GRPC_OP_COMPLETE;
1220 ret.success = c->next & 1u;
1221 ret.tag = c->tag;
1222 c->done(c->done_arg, c);
1223 break;
1224 }
1225
1226 grpc_cq_completion* c = cqd->queue.Pop();
1227
1228 if (c != nullptr) {
1229 ret.type = GRPC_OP_COMPLETE;
1230 ret.success = c->next & 1u;
1231 ret.tag = c->tag;
1232 c->done(c->done_arg, c);
1233 break;
1234 } else {
1235 /* If c == NULL it means either the queue is empty OR in an transient
1236 inconsistent state. If it is the latter, we shold do a 0-timeout poll
1237 so that the thread comes back quickly from poll to make a second
1238 attempt at popping. Not doing this can potentially deadlock this
1239 thread forever (if the deadline is infinity) */
1240 if (cqd->queue.num_items() > 0) {
1241 iteration_deadline = 0;
1242 }
1243 }
1244
1245 if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
1246 /* Before returning, check if the queue has any items left over (since
1247 MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1248 even if the queue is not empty. If so, keep retrying but do not
1249 return GRPC_QUEUE_SHUTDOWN */
1250 if (cqd->queue.num_items() > 0) {
1251 /* Go to the beginning of the loop. No point doing a poll because
1252 (cq->shutdown == true) is only possible when there is no pending
1253 work (i.e cq->pending_events == 0) and any outstanding completion
1254 events should have already been queued on this cq */
1255 continue;
1256 }
1257
1258 ret.type = GRPC_QUEUE_SHUTDOWN;
1259 ret.success = 0;
1260 break;
1261 }
1262
1263 if (!is_finished_arg.first_loop &&
1264 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1265 ret.type = GRPC_QUEUE_TIMEOUT;
1266 ret.success = 0;
1267 dump_pending_tags(cq);
1268 break;
1269 }
1270
1271 /* The main polling work happens in grpc_pollset_work */
1272 gpr_mu_lock(cq->mu);
1273 cq->num_polls++;
1274 grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1275 iteration_deadline);
1276 gpr_mu_unlock(cq->mu);
1277
1278 if (err != GRPC_ERROR_NONE) {
1279 const char* msg = grpc_error_string(err);
1280 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1281
1282 GRPC_ERROR_UNREF(err);
1283 ret.type = GRPC_QUEUE_TIMEOUT;
1284 ret.success = 0;
1285 dump_pending_tags(cq);
1286 break;
1287 }
1288 is_finished_arg.first_loop = false;
1289 }
1290
1291 if (cqd->queue.num_items() > 0 &&
1292 cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
1293 gpr_mu_lock(cq->mu);
1294 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1295 gpr_mu_unlock(cq->mu);
1296 }
1297
1298 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1299 GRPC_CQ_INTERNAL_UNREF(cq, "next");
1300
1301 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1302
1303 return ret;
1304 }
1305
1306 /* Finishes the completion queue shutdown. This means that there are no more
1307 completion events / tags expected from the completion queue
1308 - Must be called under completion queue lock
1309 - Must be called only once in completion queue's lifetime
1310 - grpc_completion_queue_shutdown() MUST have been called before calling
1311 this function */
1312 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1313 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1314
1315 GPR_ASSERT(cqd->shutdown_called);
1316 GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
1317
1318 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1319 }
1320
1321 static void cq_shutdown_next(grpc_completion_queue* cq) {
1322 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1323
1324 /* Need an extra ref for cq here because:
1325 * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1326 * Pollset shutdown decrements the cq ref count which can potentially destroy
1327 * the cq (if that happens to be the last ref).
1328 * Creating an extra ref here prevents the cq from getting destroyed while
1329 * this function is still active */
1330 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1331 gpr_mu_lock(cq->mu);
1332 if (cqd->shutdown_called) {
1333 gpr_mu_unlock(cq->mu);
1334 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1335 return;
1336 }
1337 cqd->shutdown_called = true;
1338 /* Doing acq/release FetchSub here to match with
1339 * cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1340 * on this counter without necessarily holding a lock on cq */
1341 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1342 cq_finish_shutdown_next(cq);
1343 }
1344 gpr_mu_unlock(cq->mu);
1345 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1346 }
1347
1348 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1349 gpr_timespec deadline, void* reserved) {
1350 return cq->vtable->next(cq, deadline, reserved);
1351 }
1352
1353 static int add_plucker(grpc_completion_queue* cq, void* tag,
1354 grpc_pollset_worker** worker) {
1355 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1356 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1357 return 0;
1358 }
1359 cqd->pluckers[cqd->num_pluckers].tag = tag;
1360 cqd->pluckers[cqd->num_pluckers].worker = worker;
1361 cqd->num_pluckers++;
1362 return 1;
1363 }
1364
1365 static void del_plucker(grpc_completion_queue* cq, void* tag,
1366 grpc_pollset_worker** worker) {
1367 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1368 for (int i = 0; i < cqd->num_pluckers; i++) {
1369 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1370 cqd->num_pluckers--;
1371 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1372 return;
1373 }
1374 }
1375 GPR_UNREACHABLE_CODE(return );
1376 }
1377
1378 class ExecCtxPluck : public grpc_core::ExecCtx {
1379 public:
1380 ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1381
1382 bool CheckReadyToFinish() override {
1383 cq_is_finished_arg* a =
1384 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1385 grpc_completion_queue* cq = a->cq;
1386 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1387
1388 GPR_ASSERT(a->stolen_completion == nullptr);
1389 gpr_atm current_last_seen_things_queued_ever =
1390 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1391 if (current_last_seen_things_queued_ever !=
1392 a->last_seen_things_queued_ever) {
1393 gpr_mu_lock(cq->mu);
1394 a->last_seen_things_queued_ever =
1395 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1396 grpc_cq_completion* c;
1397 grpc_cq_completion* prev = &cqd->completed_head;
1398 while ((c = (grpc_cq_completion*)(prev->next &
1399 ~static_cast<uintptr_t>(1))) !=
1400 &cqd->completed_head) {
1401 if (c->tag == a->tag) {
1402 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1403 (c->next & ~static_cast<uintptr_t>(1));
1404 if (c == cqd->completed_tail) {
1405 cqd->completed_tail = prev;
1406 }
1407 gpr_mu_unlock(cq->mu);
1408 a->stolen_completion = c;
1409 return true;
1410 }
1411 prev = c;
1412 }
1413 gpr_mu_unlock(cq->mu);
1414 }
1415 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1416 }
1417
1418 private:
1419 void* check_ready_to_finish_arg_;
1420 };
1421
1422 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1423 gpr_timespec deadline, void* reserved) {
1424 GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1425
1426 grpc_event ret;
1427 grpc_cq_completion* c;
1428 grpc_cq_completion* prev;
1429 grpc_pollset_worker* worker = nullptr;
1430 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1431
1432 if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1433 GRPC_API_TRACE(
1434 "grpc_completion_queue_pluck("
1435 "cq=%p, tag=%p, "
1436 "deadline=gpr_timespec { tv_sec: %" PRId64
1437 ", tv_nsec: %d, clock_type: %d }, "
1438 "reserved=%p)",
1439 6,
1440 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1441 reserved));
1442 }
1443 GPR_ASSERT(!reserved);
1444
1445 dump_pending_tags(cq);
1446
1447 GRPC_CQ_INTERNAL_REF(cq, "pluck");
1448 gpr_mu_lock(cq->mu);
1449 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1450 cq_is_finished_arg is_finished_arg = {
1451 cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1452 cq,
1453 deadline_millis,
1454 nullptr,
1455 tag,
1456 true};
1457 ExecCtxPluck exec_ctx(&is_finished_arg);
1458 for (;;) {
1459 if (is_finished_arg.stolen_completion != nullptr) {
1460 gpr_mu_unlock(cq->mu);
1461 c = is_finished_arg.stolen_completion;
1462 is_finished_arg.stolen_completion = nullptr;
1463 ret.type = GRPC_OP_COMPLETE;
1464 ret.success = c->next & 1u;
1465 ret.tag = c->tag;
1466 c->done(c->done_arg, c);
1467 break;
1468 }
1469 prev = &cqd->completed_head;
1470 while (
1471 (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1472 &cqd->completed_head) {
1473 if (c->tag == tag) {
1474 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1475 (c->next & ~static_cast<uintptr_t>(1));
1476 if (c == cqd->completed_tail) {
1477 cqd->completed_tail = prev;
1478 }
1479 gpr_mu_unlock(cq->mu);
1480 ret.type = GRPC_OP_COMPLETE;
1481 ret.success = c->next & 1u;
1482 ret.tag = c->tag;
1483 c->done(c->done_arg, c);
1484 goto done;
1485 }
1486 prev = c;
1487 }
1488 if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
1489 gpr_mu_unlock(cq->mu);
1490 ret.type = GRPC_QUEUE_SHUTDOWN;
1491 ret.success = 0;
1492 break;
1493 }
1494 if (!add_plucker(cq, tag, &worker)) {
1495 gpr_log(GPR_DEBUG,
1496 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1497 "is %d",
1498 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1499 gpr_mu_unlock(cq->mu);
1500 /* TODO(ctiller): should we use a different result here */
1501 ret.type = GRPC_QUEUE_TIMEOUT;
1502 ret.success = 0;
1503 dump_pending_tags(cq);
1504 break;
1505 }
1506 if (!is_finished_arg.first_loop &&
1507 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1508 del_plucker(cq, tag, &worker);
1509 gpr_mu_unlock(cq->mu);
1510 ret.type = GRPC_QUEUE_TIMEOUT;
1511 ret.success = 0;
1512 dump_pending_tags(cq);
1513 break;
1514 }
1515 cq->num_polls++;
1516 grpc_error* err =
1517 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1518 if (err != GRPC_ERROR_NONE) {
1519 del_plucker(cq, tag, &worker);
1520 gpr_mu_unlock(cq->mu);
1521 const char* msg = grpc_error_string(err);
1522 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1523
1524 GRPC_ERROR_UNREF(err);
1525 ret.type = GRPC_QUEUE_TIMEOUT;
1526 ret.success = 0;
1527 dump_pending_tags(cq);
1528 break;
1529 }
1530 is_finished_arg.first_loop = false;
1531 del_plucker(cq, tag, &worker);
1532 }
1533 done:
1534 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1535 GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1536
1537 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1538
1539 return ret;
1540 }
1541
1542 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1543 gpr_timespec deadline, void* reserved) {
1544 return cq->vtable->pluck(cq, tag, deadline, reserved);
1545 }
1546
1547 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1548 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1549
1550 GPR_ASSERT(cqd->shutdown_called);
1551 GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
1552 cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED);
1553
1554 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1555 }
1556
1557 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1558 * merging them is a bit tricky and probably not worth it */
1559 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1560 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1561
1562 /* Need an extra ref for cq here because:
1563 * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1564 * Pollset shutdown decrements the cq ref count which can potentially destroy
1565 * the cq (if that happens to be the last ref).
1566 * Creating an extra ref here prevents the cq from getting destroyed while
1567 * this function is still active */
1568 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1569 gpr_mu_lock(cq->mu);
1570 if (cqd->shutdown_called) {
1571 gpr_mu_unlock(cq->mu);
1572 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1573 return;
1574 }
1575 cqd->shutdown_called = true;
1576 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1577 cq_finish_shutdown_pluck(cq);
1578 }
1579 gpr_mu_unlock(cq->mu);
1580 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1581 }
1582
1583 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1584 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1585 auto* callback = cqd->shutdown_callback;
1586
1587 GPR_ASSERT(cqd->shutdown_called);
1588
1589 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1590 if (grpc_iomgr_is_any_background_poller_thread()) {
1591 grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1592 return;
1593 }
1594
1595 // Schedule the callback on a closure if not internal or triggered
1596 // from a background poller thread.
1597 grpc_core::Executor::Run(
1598 GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1599 GRPC_ERROR_NONE);
1600 }
1601
1602 static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) {
1603 cq_callback_alternative_data* cqd =
1604 static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1605 auto* callback = cqd->shutdown_callback;
1606
1607 GPR_ASSERT(cqd->shutdown_called);
1608
1609 // Shutdown the non-proxy pollset
1610 cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq),
1611 &cq->pollset_shutdown_done);
1612 grpc_core::Executor::Run(
1613 GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1614 GRPC_ERROR_NONE);
1615 }
1616
1617 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1618 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1619
1620 /* Need an extra ref for cq here because:
1621 * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1622 * Pollset shutdown decrements the cq ref count which can potentially destroy
1623 * the cq (if that happens to be the last ref).
1624 * Creating an extra ref here prevents the cq from getting destroyed while
1625 * this function is still active */
1626 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1627 gpr_mu_lock(cq->mu);
1628 if (cqd->shutdown_called) {
1629 gpr_mu_unlock(cq->mu);
1630 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1631 return;
1632 }
1633 cqd->shutdown_called = true;
1634 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1635 gpr_mu_unlock(cq->mu);
1636 cq_finish_shutdown_callback(cq);
1637 } else {
1638 gpr_mu_unlock(cq->mu);
1639 }
1640 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1641 }
1642
1643 static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) {
1644 cq_callback_alternative_data* cqd =
1645 static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
1646
1647 /* Need an extra ref for cq here because:
1648 * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1649 * Pollset shutdown decrements the cq ref count which can potentially destroy
1650 * the cq (if that happens to be the last ref).
1651 * Creating an extra ref here prevents the cq from getting destroyed while
1652 * this function is still active */
1653 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1654 gpr_mu_lock(cq->mu);
1655 if (cqd->shutdown_called) {
1656 gpr_mu_unlock(cq->mu);
1657 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1658 return;
1659 }
1660 cqd->shutdown_called = true;
1661 if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1662 gpr_mu_unlock(cq->mu);
1663 cq_finish_shutdown_callback_alternative(cq);
1664 } else {
1665 gpr_mu_unlock(cq->mu);
1666 }
1667 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1668 }
1669
1670 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1671 to zero here, then enter shutdown mode and wake up any waiters */
1672 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1673 GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1674 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1675 grpc_core::ExecCtx exec_ctx;
1676 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1677 cq->vtable->shutdown(cq);
1678 }
1679
1680 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1681 GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1682 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1683 grpc_completion_queue_shutdown(cq);
1684
1685 grpc_core::ExecCtx exec_ctx;
1686 GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1687 }
1688
1689 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1690 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1691 }
1692
1693 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1694 return cq->poller_vtable->can_listen;
1695 }
1696