1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/surface/completion_queue.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <new>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/str_join.h"
35
36 #include <grpc/grpc.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/atm.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/sync.h>
41 #include <grpc/support/time.h>
42
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/debug/stats_data.h"
45 #include "src/core/lib/gpr/spinlock.h"
46 #include "src/core/lib/gprpp/atomic_utils.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/gprpp/status_helper.h"
50 #include "src/core/lib/gprpp/time.h"
51 #include "src/core/lib/iomgr/closure.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/executor.h"
54 #include "src/core/lib/iomgr/iomgr.h"
55 #include "src/core/lib/iomgr/pollset.h"
56 #include "src/core/lib/surface/api_trace.h"
57 #include "src/core/lib/surface/event_string.h"
58
59 #ifdef GPR_WINDOWS
60 #include "src/core/lib/experiments/experiments.h"
61 #endif
62
63 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
64 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
65 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
66
67 namespace {
68
69 // Specifies a cq thread local cache.
70 // The first event that occurs on a thread
71 // with a cq cache will go into that cache, and
72 // will only be returned on the thread that initialized the cache.
73 // NOTE: Only one event will ever be cached.
74 thread_local grpc_cq_completion* g_cached_event;
75 thread_local grpc_completion_queue* g_cached_cq;
76
77 struct plucker {
78 grpc_pollset_worker** worker;
79 void* tag;
80 };
81 struct cq_poller_vtable {
82 bool can_get_pollset;
83 bool can_listen;
84 size_t (*size)(void);
85 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
86 grpc_error_handle (*kick)(grpc_pollset* pollset,
87 grpc_pollset_worker* specific_worker);
88 grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
89 grpc_core::Timestamp deadline);
90 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
91 void (*destroy)(grpc_pollset* pollset);
92 };
93 typedef struct non_polling_worker {
94 gpr_cv cv;
95 bool kicked;
96 struct non_polling_worker* next;
97 struct non_polling_worker* prev;
98 } non_polling_worker;
99
100 struct non_polling_poller {
101 gpr_mu mu;
102 bool kicked_without_poller;
103 non_polling_worker* root;
104 grpc_closure* shutdown;
105 };
non_polling_poller_size(void)106 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
107
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)108 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
109 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
110 gpr_mu_init(&npp->mu);
111 *mu = &npp->mu;
112 }
113
non_polling_poller_destroy(grpc_pollset * pollset)114 void non_polling_poller_destroy(grpc_pollset* pollset) {
115 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
116 gpr_mu_destroy(&npp->mu);
117 }
118
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)119 grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
120 grpc_pollset_worker** worker,
121 grpc_core::Timestamp deadline) {
122 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
123 if (npp->shutdown) return absl::OkStatus();
124 if (npp->kicked_without_poller) {
125 npp->kicked_without_poller = false;
126 return absl::OkStatus();
127 }
128 non_polling_worker w;
129 gpr_cv_init(&w.cv);
130 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
131 if (npp->root == nullptr) {
132 npp->root = w.next = w.prev = &w;
133 } else {
134 w.next = npp->root;
135 w.prev = w.next->prev;
136 w.next->prev = w.prev->next = &w;
137 }
138 w.kicked = false;
139 gpr_timespec deadline_ts = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
140 while (!npp->shutdown && !w.kicked &&
141 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) {
142 }
143 grpc_core::ExecCtx::Get()->InvalidateNow();
144 if (&w == npp->root) {
145 npp->root = w.next;
146 if (&w == npp->root) {
147 if (npp->shutdown) {
148 grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown,
149 absl::OkStatus());
150 }
151 npp->root = nullptr;
152 }
153 }
154 w.next->prev = w.prev;
155 w.prev->next = w.next;
156 gpr_cv_destroy(&w.cv);
157 if (worker != nullptr) *worker = nullptr;
158 return absl::OkStatus();
159 }
160
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)161 grpc_error_handle non_polling_poller_kick(
162 grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
163 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
164 if (specific_worker == nullptr) {
165 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
166 }
167 if (specific_worker != nullptr) {
168 non_polling_worker* w =
169 reinterpret_cast<non_polling_worker*>(specific_worker);
170 if (!w->kicked) {
171 w->kicked = true;
172 gpr_cv_signal(&w->cv);
173 }
174 } else {
175 p->kicked_without_poller = true;
176 }
177 return absl::OkStatus();
178 }
179
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)180 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
181 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
182 GPR_ASSERT(closure != nullptr);
183 p->shutdown = closure;
184 if (p->root == nullptr) {
185 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
186 } else {
187 non_polling_worker* w = p->root;
188 do {
189 gpr_cv_signal(&w->cv);
190 w = w->next;
191 } while (w != p->root);
192 }
193 }
194
195 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
196 // GRPC_CQ_DEFAULT_POLLING
197 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
198 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
199 // GRPC_CQ_NON_LISTENING
200 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
201 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
202 // GRPC_CQ_NON_POLLING
203 {false, false, non_polling_poller_size, non_polling_poller_init,
204 non_polling_poller_kick, non_polling_poller_work,
205 non_polling_poller_shutdown, non_polling_poller_destroy},
206 };
207
208 } // namespace
209
210 struct cq_vtable {
211 grpc_cq_completion_type cq_completion_type;
212 size_t data_size;
213 void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
214 void (*shutdown)(grpc_completion_queue* cq);
215 void (*destroy)(void* data);
216 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
217 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error,
218 void (*done)(void* done_arg, grpc_cq_completion* storage),
219 void* done_arg, grpc_cq_completion* storage, bool internal);
220 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
221 void* reserved);
222 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
223 gpr_timespec deadline, void* reserved);
224 };
225
226 namespace {
227
228 // Queue that holds the cq_completion_events. Internally uses
229 // MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
230 // queue). It uses a queue_lock to support multiple consumers.
231 // Only used in completion queues whose completion_type is GRPC_CQ_NEXT
232 class CqEventQueue {
233 public:
234 CqEventQueue() = default;
235 ~CqEventQueue() = default;
236
237 // Note: The counter is not incremented/decremented atomically with push/pop.
238 // The count is only eventually consistent
num_items() const239 intptr_t num_items() const {
240 return num_queue_items_.load(std::memory_order_relaxed);
241 }
242
243 bool Push(grpc_cq_completion* c);
244 grpc_cq_completion* Pop();
245
246 private:
247 // Spinlock to serialize consumers i.e pop() operations
248 gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
249
250 grpc_core::MultiProducerSingleConsumerQueue queue_;
251
252 // A lazy counter of number of items in the queue. This is NOT atomically
253 // incremented/decremented along with push/pop operations and hence is only
254 // eventually consistent
255 std::atomic<intptr_t> num_queue_items_{0};
256 };
257
258 struct cq_next_data {
~cq_next_data__anon26b18a380211::cq_next_data259 ~cq_next_data() {
260 GPR_ASSERT(queue.num_items() == 0);
261 #ifndef NDEBUG
262 if (pending_events.load(std::memory_order_acquire) != 0) {
263 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
264 }
265 #endif
266 }
267
268 /// Completed events for completion-queues of type GRPC_CQ_NEXT
269 CqEventQueue queue;
270
271 /// Counter of how many things have ever been queued on this completion queue
272 /// useful for avoiding locks to check the queue
273 std::atomic<intptr_t> things_queued_ever{0};
274
275 /// Number of outstanding events (+1 if not shut down)
276 /// Initial count is dropped by grpc_completion_queue_shutdown
277 std::atomic<intptr_t> pending_events{1};
278
279 /// 0 initially. 1 once we initiated shutdown
280 bool shutdown_called = false;
281 };
282
283 struct cq_pluck_data {
cq_pluck_data__anon26b18a380211::cq_pluck_data284 cq_pluck_data() {
285 completed_tail = &completed_head;
286 completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
287 }
288
~cq_pluck_data__anon26b18a380211::cq_pluck_data289 ~cq_pluck_data() {
290 GPR_ASSERT(completed_head.next ==
291 reinterpret_cast<uintptr_t>(&completed_head));
292 #ifndef NDEBUG
293 if (pending_events.load(std::memory_order_acquire) != 0) {
294 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
295 }
296 #endif
297 }
298
299 /// Completed events for completion-queues of type GRPC_CQ_PLUCK
300 grpc_cq_completion completed_head;
301 grpc_cq_completion* completed_tail;
302
303 /// Number of pending events (+1 if we're not shutdown).
304 /// Initial count is dropped by grpc_completion_queue_shutdown.
305 std::atomic<intptr_t> pending_events{1};
306
307 /// Counter of how many things have ever been queued on this completion queue
308 /// useful for avoiding locks to check the queue
309 std::atomic<intptr_t> things_queued_ever{0};
310
311 /// 0 initially. 1 once we completed shutting
312 // TODO(sreek): This is not needed since (shutdown == 1) if and only if
313 // (pending_events == 0). So consider removing this in future and use
314 // pending_events
315 std::atomic<bool> shutdown{false};
316
317 /// 0 initially. 1 once we initiated shutdown
318 bool shutdown_called = false;
319
320 int num_pluckers = 0;
321 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
322 };
323
324 struct cq_callback_data {
cq_callback_data__anon26b18a380211::cq_callback_data325 explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
326 : shutdown_callback(shutdown_callback) {}
327
~cq_callback_data__anon26b18a380211::cq_callback_data328 ~cq_callback_data() {
329 #ifndef NDEBUG
330 if (pending_events.load(std::memory_order_acquire) != 0) {
331 gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
332 }
333 #endif
334 }
335
336 /// No actual completed events queue, unlike other types
337
338 /// Number of pending events (+1 if we're not shutdown).
339 /// Initial count is dropped by grpc_completion_queue_shutdown.
340 std::atomic<intptr_t> pending_events{1};
341
342 /// 0 initially. 1 once we initiated shutdown
343 bool shutdown_called = false;
344
345 /// A callback that gets invoked when the CQ completes shutdown
346 grpc_completion_queue_functor* shutdown_callback;
347 };
348
349 } // namespace
350
351 // Completion queue structure
352 struct grpc_completion_queue {
353 /// Once owning_refs drops to zero, we will destroy the cq
354 grpc_core::RefCount owning_refs;
355 /// Add the paddings to fix the false sharing
356 char padding_1[GPR_CACHELINE_SIZE];
357 gpr_mu* mu;
358
359 char padding_2[GPR_CACHELINE_SIZE];
360 const cq_vtable* vtable;
361
362 char padding_3[GPR_CACHELINE_SIZE];
363 const cq_poller_vtable* poller_vtable;
364
365 #ifndef NDEBUG
366 void** outstanding_tags;
367 size_t outstanding_tag_count;
368 size_t outstanding_tag_capacity;
369 #endif
370
371 grpc_closure pollset_shutdown_done;
372 int num_polls;
373 };
374
375 // Forward declarations
376 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
377 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
378 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
379 static void cq_shutdown_next(grpc_completion_queue* cq);
380 static void cq_shutdown_pluck(grpc_completion_queue* cq);
381 static void cq_shutdown_callback(grpc_completion_queue* cq);
382
383 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
384 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
385 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
386
387 // A cq_end_op function is called when an operation on a given CQ with
388 // a given tag has completed. The storage argument is a reference to the
389 // space reserved for this completion as it is placed into the corresponding
390 // queue. The done argument is a callback that will be invoked when it is
391 // safe to free up that storage. The storage MUST NOT be freed until the
392 // done callback is invoked.
393 static void cq_end_op_for_next(
394 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
395 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
396 grpc_cq_completion* storage, bool internal);
397
398 static void cq_end_op_for_pluck(
399 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
400 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
401 grpc_cq_completion* storage, bool internal);
402
403 static void cq_end_op_for_callback(
404 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
405 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
406 grpc_cq_completion* storage, bool internal);
407
408 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
409 void* reserved);
410
411 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
412 gpr_timespec deadline, void* reserved);
413
414 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
415 static void cq_init_next(void* data,
416 grpc_completion_queue_functor* shutdown_callback);
417 static void cq_init_pluck(void* data,
418 grpc_completion_queue_functor* shutdown_callback);
419 static void cq_init_callback(void* data,
420 grpc_completion_queue_functor* shutdown_callback);
421 static void cq_destroy_next(void* data);
422 static void cq_destroy_pluck(void* data);
423 static void cq_destroy_callback(void* data);
424
425 // Completion queue vtables based on the completion-type
426 static const cq_vtable g_cq_vtable[] = {
427 // GRPC_CQ_NEXT
428 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
429 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
430 nullptr},
431 // GRPC_CQ_PLUCK
432 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
433 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
434 cq_pluck},
435 // GRPC_CQ_CALLBACK
436 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
437 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
438 cq_end_op_for_callback, nullptr, nullptr},
439 };
440
441 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
442 #define POLLSET_FROM_CQ(cq) \
443 ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
444
445 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
446
447 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
448 do { \
449 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) && \
450 (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) || \
451 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
452 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, \
453 grpc_event_string(event).c_str()); \
454 } \
455 } while (0)
456
457 static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
458
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)459 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
460 if (g_cached_cq == nullptr) {
461 g_cached_event = nullptr;
462 g_cached_cq = cq;
463 }
464 }
465
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)466 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
467 void** tag, int* ok) {
468 grpc_cq_completion* storage = g_cached_event;
469 int ret = 0;
470 if (storage != nullptr && g_cached_cq == cq) {
471 *tag = storage->tag;
472 grpc_core::ExecCtx exec_ctx;
473 *ok = (storage->next & uintptr_t{1}) == 1;
474 storage->done(storage->done_arg, storage);
475 ret = 1;
476 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
477 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
478 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
479 gpr_mu_lock(cq->mu);
480 cq_finish_shutdown_next(cq);
481 gpr_mu_unlock(cq->mu);
482 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
483 }
484 }
485 g_cached_event = nullptr;
486 g_cached_cq = nullptr;
487
488 return ret;
489 }
490
Push(grpc_cq_completion * c)491 bool CqEventQueue::Push(grpc_cq_completion* c) {
492 queue_.Push(
493 reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
494 return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
495 }
496
Pop()497 grpc_cq_completion* CqEventQueue::Pop() {
498 grpc_cq_completion* c = nullptr;
499
500 if (gpr_spinlock_trylock(&queue_lock_)) {
501 bool is_empty = false;
502 c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
503 gpr_spinlock_unlock(&queue_lock_);
504 }
505
506 if (c) {
507 num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
508 }
509
510 return c;
511 }
512
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_completion_queue_functor * shutdown_callback)513 grpc_completion_queue* grpc_completion_queue_create_internal(
514 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
515 grpc_completion_queue_functor* shutdown_callback) {
516 grpc_completion_queue* cq;
517
518 GRPC_API_TRACE(
519 "grpc_completion_queue_create_internal(completion_type=%d, "
520 "polling_type=%d)",
521 2, (completion_type, polling_type));
522
523 switch (completion_type) {
524 case GRPC_CQ_NEXT:
525 grpc_core::global_stats().IncrementCqNextCreates();
526 break;
527 case GRPC_CQ_PLUCK:
528 grpc_core::global_stats().IncrementCqPluckCreates();
529 break;
530 case GRPC_CQ_CALLBACK:
531 grpc_core::global_stats().IncrementCqCallbackCreates();
532 break;
533 }
534
535 const cq_vtable* vtable = &g_cq_vtable[completion_type];
536 const cq_poller_vtable* poller_vtable =
537 &g_poller_vtable_by_poller_type[polling_type];
538
539 grpc_core::ExecCtx exec_ctx;
540
541 cq = static_cast<grpc_completion_queue*>(
542 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
543 poller_vtable->size()));
544
545 cq->vtable = vtable;
546 cq->poller_vtable = poller_vtable;
547
548 // One for destroy(), one for pollset_shutdown
549 new (&cq->owning_refs) grpc_core::RefCount(
550 2, grpc_trace_cq_refcount.enabled() ? "completion_queue" : nullptr);
551
552 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
553 vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
554
555 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
556 grpc_schedule_on_exec_ctx);
557 return cq;
558 }
559
cq_init_next(void * data,grpc_completion_queue_functor *)560 static void cq_init_next(void* data,
561 grpc_completion_queue_functor* /*shutdown_callback*/) {
562 new (data) cq_next_data();
563 }
564
cq_destroy_next(void * data)565 static void cq_destroy_next(void* data) {
566 cq_next_data* cqd = static_cast<cq_next_data*>(data);
567 cqd->~cq_next_data();
568 }
569
cq_init_pluck(void * data,grpc_completion_queue_functor *)570 static void cq_init_pluck(
571 void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
572 new (data) cq_pluck_data();
573 }
574
cq_destroy_pluck(void * data)575 static void cq_destroy_pluck(void* data) {
576 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
577 cqd->~cq_pluck_data();
578 }
579
cq_init_callback(void * data,grpc_completion_queue_functor * shutdown_callback)580 static void cq_init_callback(void* data,
581 grpc_completion_queue_functor* shutdown_callback) {
582 new (data) cq_callback_data(shutdown_callback);
583 }
584
cq_destroy_callback(void * data)585 static void cq_destroy_callback(void* data) {
586 cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
587 cqd->~cq_callback_data();
588 }
589
grpc_get_cq_completion_type(grpc_completion_queue * cq)590 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
591 return cq->vtable->cq_completion_type;
592 }
593
grpc_get_cq_poll_num(grpc_completion_queue * cq)594 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
595 int cur_num_polls;
596 gpr_mu_lock(cq->mu);
597 cur_num_polls = cq->num_polls;
598 gpr_mu_unlock(cq->mu);
599 return cur_num_polls;
600 }
601
602 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)603 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
604 const char* file, int line) {
605 grpc_core::DebugLocation debug_location(file, line);
606 #else
607 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
608 grpc_core::DebugLocation debug_location;
609 const char* reason = nullptr;
610 #endif
611 cq->owning_refs.Ref(debug_location, reason);
612 }
613
614 static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
615 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
616 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
617 }
618
619 #ifndef NDEBUG
620 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
621 const char* file, int line) {
622 grpc_core::DebugLocation debug_location(file, line);
623 #else
624 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
625 grpc_core::DebugLocation debug_location;
626 const char* reason = nullptr;
627 #endif
628 if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
629 cq->vtable->destroy(DATA_FROM_CQ(cq));
630 cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
631 #ifndef NDEBUG
632 gpr_free(cq->outstanding_tags);
633 #endif
634 gpr_free(cq);
635 }
636 }
637
638 #ifndef NDEBUG
639 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
640 int found = 0;
641 if (lock_cq) {
642 gpr_mu_lock(cq->mu);
643 }
644
645 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
646 if (cq->outstanding_tags[i] == tag) {
647 cq->outstanding_tag_count--;
648 std::swap(cq->outstanding_tags[i],
649 cq->outstanding_tags[cq->outstanding_tag_count]);
650 found = 1;
651 break;
652 }
653 }
654
655 if (lock_cq) {
656 gpr_mu_unlock(cq->mu);
657 }
658
659 GPR_ASSERT(found);
660 }
661 #else
662 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
663 bool /*lock_cq*/) {}
664 #endif
665
666 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
667 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
668 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
669 }
670
671 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
672 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
673 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
674 }
675
676 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
677 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
678 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
679 }
680
681 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
682 #ifndef NDEBUG
683 gpr_mu_lock(cq->mu);
684 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
685 cq->outstanding_tag_capacity =
686 std::max(size_t(4), 2 * cq->outstanding_tag_capacity);
687 cq->outstanding_tags = static_cast<void**>(gpr_realloc(
688 cq->outstanding_tags,
689 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
690 }
691 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
692 gpr_mu_unlock(cq->mu);
693 #endif
694 return cq->vtable->begin_op(cq, tag);
695 }
696
697 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
698 // completion
699 // type of GRPC_CQ_NEXT)
700 static void cq_end_op_for_next(
701 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
702 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
703 grpc_cq_completion* storage, bool /*internal*/) {
704 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
705 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
706 std::string errmsg = grpc_core::StatusToString(error);
707 GRPC_API_TRACE(
708 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
709 "done=%p, done_arg=%p, storage=%p)",
710 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
711 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
712 gpr_log(GPR_INFO, "Operation failed: tag=%p, error=%s", tag,
713 errmsg.c_str());
714 }
715 }
716 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
717 int is_success = (error.ok());
718
719 storage->tag = tag;
720 storage->done = done;
721 storage->done_arg = done_arg;
722 storage->next = static_cast<uintptr_t>(is_success);
723
724 cq_check_tag(cq, tag, true); // Used in debug builds only
725
726 if (g_cached_cq == cq && g_cached_event == nullptr) {
727 g_cached_event = storage;
728 } else {
729 // Add the completion to the queue
730 bool is_first = cqd->queue.Push(storage);
731 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
732 // Since we do not hold the cq lock here, it is important to do an 'acquire'
733 // load here (instead of a 'no_barrier' load) to match with the release
734 // store
735 // (done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
736 //
737 if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
738 // Only kick if this is the first item queued
739 if (is_first) {
740 gpr_mu_lock(cq->mu);
741 grpc_error_handle kick_error =
742 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
743 gpr_mu_unlock(cq->mu);
744
745 if (!kick_error.ok()) {
746 gpr_log(GPR_ERROR, "Kick failed: %s",
747 grpc_core::StatusToString(kick_error).c_str());
748 }
749 }
750 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
751 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
752 gpr_mu_lock(cq->mu);
753 cq_finish_shutdown_next(cq);
754 gpr_mu_unlock(cq->mu);
755 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
756 }
757 } else {
758 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
759 cqd->pending_events.store(0, std::memory_order_release);
760 gpr_mu_lock(cq->mu);
761 cq_finish_shutdown_next(cq);
762 gpr_mu_unlock(cq->mu);
763 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
764 }
765 }
766 }
767
768 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
769 // completion
770 // type of GRPC_CQ_PLUCK)
771 static void cq_end_op_for_pluck(
772 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
773 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
774 grpc_cq_completion* storage, bool /*internal*/) {
775 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
776 int is_success = (error.ok());
777
778 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
779 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
780 std::string errmsg = grpc_core::StatusToString(error);
781 GRPC_API_TRACE(
782 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
783 "done=%p, done_arg=%p, storage=%p)",
784 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
785 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
786 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
787 errmsg.c_str());
788 }
789 }
790
791 storage->tag = tag;
792 storage->done = done;
793 storage->done_arg = done_arg;
794 storage->next = reinterpret_cast<uintptr_t>(&cqd->completed_head) |
795 static_cast<uintptr_t>(is_success);
796
797 gpr_mu_lock(cq->mu);
798 cq_check_tag(cq, tag, false); // Used in debug builds only
799
800 // Add to the list of completions
801 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
802 cqd->completed_tail->next =
803 reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
804 cqd->completed_tail = storage;
805
806 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
807 cq_finish_shutdown_pluck(cq);
808 gpr_mu_unlock(cq->mu);
809 } else {
810 grpc_pollset_worker* pluck_worker = nullptr;
811 for (int i = 0; i < cqd->num_pluckers; i++) {
812 if (cqd->pluckers[i].tag == tag) {
813 pluck_worker = *cqd->pluckers[i].worker;
814 break;
815 }
816 }
817
818 grpc_error_handle kick_error =
819 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
820 gpr_mu_unlock(cq->mu);
821 if (!kick_error.ok()) {
822 gpr_log(GPR_ERROR, "Kick failed: %s",
823 grpc_core::StatusToString(kick_error).c_str());
824 }
825 }
826 }
827
828 static void functor_callback(void* arg, grpc_error_handle error) {
829 auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
830 functor->functor_run(functor, error.ok());
831 }
832
833 // Complete an event on a completion queue of type GRPC_CQ_CALLBACK
834 static void cq_end_op_for_callback(
835 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
836 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
837 grpc_cq_completion* storage, bool internal) {
838 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
839
840 if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
841 (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok())) {
842 std::string errmsg = grpc_core::StatusToString(error);
843 GRPC_API_TRACE(
844 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
845 "done=%p, done_arg=%p, storage=%p)",
846 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
847 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && !error.ok()) {
848 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
849 errmsg.c_str());
850 }
851 }
852
853 // The callback-based CQ isn't really a queue at all and thus has no need
854 // for reserved storage. Invoke the done callback right away to release it.
855 done(done_arg, storage);
856
857 cq_check_tag(cq, tag, true); // Used in debug builds only
858
859 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
860 cq_finish_shutdown_callback(cq);
861 }
862
863 // If possible, schedule the callback onto an existing thread-local
864 // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
865 // 1. The callback is internally-generated and there is an ACEC available
866 // 2. The callback is marked inlineable and there is an ACEC available
867 // 3. We are already running in a background poller thread (which always has
868 // an ACEC available at the base of the stack).
869 auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
870 if (((internal || functor->inlineable) &&
871 grpc_core::ApplicationCallbackExecCtx::Available()) ||
872 grpc_iomgr_is_any_background_poller_thread()) {
873 grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));
874 return;
875 }
876
877 // Schedule the callback on a closure if not internal or triggered
878 // from a background poller thread.
879 grpc_core::Executor::Run(
880 GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
881 }
882
883 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
884 grpc_error_handle error,
885 void (*done)(void* done_arg, grpc_cq_completion* storage),
886 void* done_arg, grpc_cq_completion* storage,
887 bool internal) {
888 cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
889 }
890
891 struct cq_is_finished_arg {
892 gpr_atm last_seen_things_queued_ever;
893 grpc_completion_queue* cq;
894 grpc_core::Timestamp deadline;
895 grpc_cq_completion* stolen_completion;
896 void* tag; // for pluck
897 bool first_loop;
898 };
899 class ExecCtxNext : public grpc_core::ExecCtx {
900 public:
901 explicit ExecCtxNext(void* arg)
902 : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
903
904 bool CheckReadyToFinish() override {
905 cq_is_finished_arg* a =
906 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
907 grpc_completion_queue* cq = a->cq;
908 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
909 GPR_ASSERT(a->stolen_completion == nullptr);
910
911 intptr_t current_last_seen_things_queued_ever =
912 cqd->things_queued_ever.load(std::memory_order_relaxed);
913
914 if (current_last_seen_things_queued_ever !=
915 a->last_seen_things_queued_ever) {
916 a->last_seen_things_queued_ever =
917 cqd->things_queued_ever.load(std::memory_order_relaxed);
918
919 // Pop a cq_completion from the queue. Returns NULL if the queue is empty
920 // might return NULL in some cases even if the queue is not empty; but
921 // that
922 // is ok and doesn't affect correctness. Might effect the tail latencies a
923 // bit)
924 a->stolen_completion = cqd->queue.Pop();
925 if (a->stolen_completion != nullptr) {
926 return true;
927 }
928 }
929 return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
930 }
931
932 private:
933 void* check_ready_to_finish_arg_;
934 };
935
936 #ifndef NDEBUG
937 static void dump_pending_tags(grpc_completion_queue* cq) {
938 if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
939 std::vector<std::string> parts;
940 parts.push_back("PENDING TAGS:");
941 gpr_mu_lock(cq->mu);
942 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
943 parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
944 }
945 gpr_mu_unlock(cq->mu);
946 gpr_log(GPR_DEBUG, "%s", absl::StrJoin(parts, "").c_str());
947 }
948 #else
949 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
950 #endif
951
952 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
953 void* reserved) {
954 grpc_event ret;
955 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
956
957 GRPC_API_TRACE(
958 "grpc_completion_queue_next("
959 "cq=%p, "
960 "deadline=gpr_timespec { tv_sec: %" PRId64
961 ", tv_nsec: %d, clock_type: %d }, "
962 "reserved=%p)",
963 5,
964 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
965 reserved));
966 GPR_ASSERT(!reserved);
967
968 dump_pending_tags(cq);
969
970 GRPC_CQ_INTERNAL_REF(cq, "next");
971
972 grpc_core::Timestamp deadline_millis =
973 grpc_core::Timestamp::FromTimespecRoundUp(deadline);
974 cq_is_finished_arg is_finished_arg = {
975 cqd->things_queued_ever.load(std::memory_order_relaxed),
976 cq,
977 deadline_millis,
978 nullptr,
979 nullptr,
980 true};
981 ExecCtxNext exec_ctx(&is_finished_arg);
982 for (;;) {
983 grpc_core::Timestamp iteration_deadline = deadline_millis;
984
985 if (is_finished_arg.stolen_completion != nullptr) {
986 grpc_cq_completion* c = is_finished_arg.stolen_completion;
987 is_finished_arg.stolen_completion = nullptr;
988 ret.type = GRPC_OP_COMPLETE;
989 ret.success = c->next & 1u;
990 ret.tag = c->tag;
991 c->done(c->done_arg, c);
992 break;
993 }
994
995 grpc_cq_completion* c = cqd->queue.Pop();
996
997 if (c != nullptr) {
998 ret.type = GRPC_OP_COMPLETE;
999 ret.success = c->next & 1u;
1000 ret.tag = c->tag;
1001 c->done(c->done_arg, c);
1002 break;
1003 } else {
1004 // If c == NULL it means either the queue is empty OR in an transient
1005 // inconsistent state. If it is the latter, we shold do a 0-timeout poll
1006 // so that the thread comes back quickly from poll to make a second
1007 // attempt at popping. Not doing this can potentially deadlock this
1008 // thread forever (if the deadline is infinity)
1009 if (cqd->queue.num_items() > 0) {
1010 iteration_deadline = grpc_core::Timestamp::ProcessEpoch();
1011 }
1012 }
1013
1014 if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1015 // Before returning, check if the queue has any items left over (since
1016 // MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1017 // even if the queue is not empty. If so, keep retrying but do not
1018 // return GRPC_QUEUE_SHUTDOWN
1019 if (cqd->queue.num_items() > 0) {
1020 // Go to the beginning of the loop. No point doing a poll because
1021 // (cq->shutdown == true) is only possible when there is no pending
1022 // work (i.e cq->pending_events == 0) and any outstanding completion
1023 // events should have already been queued on this cq
1024 continue;
1025 }
1026
1027 ret.type = GRPC_QUEUE_SHUTDOWN;
1028 ret.success = 0;
1029 break;
1030 }
1031
1032 if (!is_finished_arg.first_loop &&
1033 grpc_core::Timestamp::Now() >= deadline_millis) {
1034 ret.type = GRPC_QUEUE_TIMEOUT;
1035 ret.success = 0;
1036 dump_pending_tags(cq);
1037 break;
1038 }
1039
1040 // The main polling work happens in grpc_pollset_work
1041 gpr_mu_lock(cq->mu);
1042 cq->num_polls++;
1043 grpc_error_handle err = cq->poller_vtable->work(
1044 POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
1045 gpr_mu_unlock(cq->mu);
1046
1047 if (!err.ok()) {
1048 gpr_log(GPR_ERROR, "Completion queue next failed: %s",
1049 grpc_core::StatusToString(err).c_str());
1050 if (err == absl::CancelledError()) {
1051 ret.type = GRPC_QUEUE_SHUTDOWN;
1052 } else {
1053 ret.type = GRPC_QUEUE_TIMEOUT;
1054 }
1055 ret.success = 0;
1056 dump_pending_tags(cq);
1057 break;
1058 }
1059 is_finished_arg.first_loop = false;
1060 }
1061
1062 if (cqd->queue.num_items() > 0 &&
1063 cqd->pending_events.load(std::memory_order_acquire) > 0) {
1064 gpr_mu_lock(cq->mu);
1065 (void)cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1066 gpr_mu_unlock(cq->mu);
1067 }
1068
1069 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1070 GRPC_CQ_INTERNAL_UNREF(cq, "next");
1071
1072 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1073
1074 return ret;
1075 }
1076
1077 // Finishes the completion queue shutdown. This means that there are no more
1078 // completion events / tags expected from the completion queue
1079 // - Must be called under completion queue lock
1080 // - Must be called only once in completion queue's lifetime
1081 // - grpc_completion_queue_shutdown() MUST have been called before calling
1082 // this function
1083 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1084 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1085
1086 GPR_ASSERT(cqd->shutdown_called);
1087 GPR_ASSERT(cqd->pending_events.load(std::memory_order_relaxed) == 0);
1088
1089 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1090 }
1091
1092 static void cq_shutdown_next(grpc_completion_queue* cq) {
1093 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1094
1095 // Need an extra ref for cq here because:
1096 // We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1097 // Pollset shutdown decrements the cq ref count which can potentially destroy
1098 // the cq (if that happens to be the last ref).
1099 // Creating an extra ref here prevents the cq from getting destroyed while
1100 // this function is still active
1101 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1102 gpr_mu_lock(cq->mu);
1103 if (cqd->shutdown_called) {
1104 gpr_mu_unlock(cq->mu);
1105 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1106 return;
1107 }
1108 cqd->shutdown_called = true;
1109 // Doing acq/release fetch_sub here to match with
1110 // cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1111 // on this counter without necessarily holding a lock on cq
1112 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1113 cq_finish_shutdown_next(cq);
1114 }
1115 gpr_mu_unlock(cq->mu);
1116 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1117 }
1118
1119 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1120 gpr_timespec deadline, void* reserved) {
1121 return cq->vtable->next(cq, deadline, reserved);
1122 }
1123
1124 static int add_plucker(grpc_completion_queue* cq, void* tag,
1125 grpc_pollset_worker** worker) {
1126 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1127 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1128 return 0;
1129 }
1130 cqd->pluckers[cqd->num_pluckers].tag = tag;
1131 cqd->pluckers[cqd->num_pluckers].worker = worker;
1132 cqd->num_pluckers++;
1133 return 1;
1134 }
1135
1136 static void del_plucker(grpc_completion_queue* cq, void* tag,
1137 grpc_pollset_worker** worker) {
1138 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1139 for (int i = 0; i < cqd->num_pluckers; i++) {
1140 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1141 cqd->num_pluckers--;
1142 std::swap(cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1143 return;
1144 }
1145 }
1146 GPR_UNREACHABLE_CODE(return);
1147 }
1148
1149 class ExecCtxPluck : public grpc_core::ExecCtx {
1150 public:
1151 explicit ExecCtxPluck(void* arg)
1152 : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1153
1154 bool CheckReadyToFinish() override {
1155 cq_is_finished_arg* a =
1156 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1157 grpc_completion_queue* cq = a->cq;
1158 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1159
1160 GPR_ASSERT(a->stolen_completion == nullptr);
1161 gpr_atm current_last_seen_things_queued_ever =
1162 cqd->things_queued_ever.load(std::memory_order_relaxed);
1163 if (current_last_seen_things_queued_ever !=
1164 a->last_seen_things_queued_ever) {
1165 gpr_mu_lock(cq->mu);
1166 a->last_seen_things_queued_ever =
1167 cqd->things_queued_ever.load(std::memory_order_relaxed);
1168 grpc_cq_completion* c;
1169 grpc_cq_completion* prev = &cqd->completed_head;
1170 while ((c = reinterpret_cast<grpc_cq_completion*>(
1171 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1172 if (c->tag == a->tag) {
1173 prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1174 if (c == cqd->completed_tail) {
1175 cqd->completed_tail = prev;
1176 }
1177 gpr_mu_unlock(cq->mu);
1178 a->stolen_completion = c;
1179 return true;
1180 }
1181 prev = c;
1182 }
1183 gpr_mu_unlock(cq->mu);
1184 }
1185 return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
1186 }
1187
1188 private:
1189 void* check_ready_to_finish_arg_;
1190 };
1191
1192 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1193 gpr_timespec deadline, void* reserved) {
1194 grpc_event ret;
1195 grpc_cq_completion* c;
1196 grpc_cq_completion* prev;
1197 grpc_pollset_worker* worker = nullptr;
1198 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1199
1200 if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1201 GRPC_API_TRACE(
1202 "grpc_completion_queue_pluck("
1203 "cq=%p, tag=%p, "
1204 "deadline=gpr_timespec { tv_sec: %" PRId64
1205 ", tv_nsec: %d, clock_type: %d }, "
1206 "reserved=%p)",
1207 6,
1208 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1209 reserved));
1210 }
1211 GPR_ASSERT(!reserved);
1212
1213 dump_pending_tags(cq);
1214
1215 GRPC_CQ_INTERNAL_REF(cq, "pluck");
1216 gpr_mu_lock(cq->mu);
1217 grpc_core::Timestamp deadline_millis =
1218 grpc_core::Timestamp::FromTimespecRoundUp(deadline);
1219 cq_is_finished_arg is_finished_arg = {
1220 cqd->things_queued_ever.load(std::memory_order_relaxed),
1221 cq,
1222 deadline_millis,
1223 nullptr,
1224 tag,
1225 true};
1226 ExecCtxPluck exec_ctx(&is_finished_arg);
1227 for (;;) {
1228 if (is_finished_arg.stolen_completion != nullptr) {
1229 gpr_mu_unlock(cq->mu);
1230 c = is_finished_arg.stolen_completion;
1231 is_finished_arg.stolen_completion = nullptr;
1232 ret.type = GRPC_OP_COMPLETE;
1233 ret.success = c->next & 1u;
1234 ret.tag = c->tag;
1235 c->done(c->done_arg, c);
1236 break;
1237 }
1238 prev = &cqd->completed_head;
1239 while ((c = reinterpret_cast<grpc_cq_completion*>(
1240 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1241 if (GPR_LIKELY(c->tag == tag)) {
1242 prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1243 if (c == cqd->completed_tail) {
1244 cqd->completed_tail = prev;
1245 }
1246 gpr_mu_unlock(cq->mu);
1247 ret.type = GRPC_OP_COMPLETE;
1248 ret.success = c->next & 1u;
1249 ret.tag = c->tag;
1250 c->done(c->done_arg, c);
1251 goto done;
1252 }
1253 prev = c;
1254 }
1255 if (cqd->shutdown.load(std::memory_order_relaxed)) {
1256 gpr_mu_unlock(cq->mu);
1257 ret.type = GRPC_QUEUE_SHUTDOWN;
1258 ret.success = 0;
1259 break;
1260 }
1261 if (!add_plucker(cq, tag, &worker)) {
1262 gpr_log(GPR_DEBUG,
1263 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1264 "is %d",
1265 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1266 gpr_mu_unlock(cq->mu);
1267 // TODO(ctiller): should we use a different result here
1268 ret.type = GRPC_QUEUE_TIMEOUT;
1269 ret.success = 0;
1270 dump_pending_tags(cq);
1271 break;
1272 }
1273 if (!is_finished_arg.first_loop &&
1274 grpc_core::Timestamp::Now() >= deadline_millis) {
1275 del_plucker(cq, tag, &worker);
1276 gpr_mu_unlock(cq->mu);
1277 ret.type = GRPC_QUEUE_TIMEOUT;
1278 ret.success = 0;
1279 dump_pending_tags(cq);
1280 break;
1281 }
1282 cq->num_polls++;
1283 grpc_error_handle err =
1284 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1285 if (!err.ok()) {
1286 del_plucker(cq, tag, &worker);
1287 gpr_mu_unlock(cq->mu);
1288 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s",
1289 grpc_core::StatusToString(err).c_str());
1290 ret.type = GRPC_QUEUE_TIMEOUT;
1291 ret.success = 0;
1292 dump_pending_tags(cq);
1293 break;
1294 }
1295 is_finished_arg.first_loop = false;
1296 del_plucker(cq, tag, &worker);
1297 }
1298 done:
1299 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1300 GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1301
1302 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1303
1304 return ret;
1305 }
1306
1307 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1308 gpr_timespec deadline, void* reserved) {
1309 return cq->vtable->pluck(cq, tag, deadline, reserved);
1310 }
1311
1312 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1313 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1314
1315 GPR_ASSERT(cqd->shutdown_called);
1316 GPR_ASSERT(!cqd->shutdown.load(std::memory_order_relaxed));
1317 cqd->shutdown.store(true, std::memory_order_relaxed);
1318
1319 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1320 }
1321
1322 // NOTE: This function is almost exactly identical to cq_shutdown_next() but
1323 // merging them is a bit tricky and probably not worth it
1324 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1325 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1326
1327 // Need an extra ref for cq here because:
1328 // We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1329 // Pollset shutdown decrements the cq ref count which can potentially destroy
1330 // the cq (if that happens to be the last ref).
1331 // Creating an extra ref here prevents the cq from getting destroyed while
1332 // this function is still active
1333 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1334 gpr_mu_lock(cq->mu);
1335 if (cqd->shutdown_called) {
1336 gpr_mu_unlock(cq->mu);
1337 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1338 return;
1339 }
1340 cqd->shutdown_called = true;
1341 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1342 cq_finish_shutdown_pluck(cq);
1343 }
1344 gpr_mu_unlock(cq->mu);
1345 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1346 }
1347
1348 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1349 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1350 auto* callback = cqd->shutdown_callback;
1351
1352 GPR_ASSERT(cqd->shutdown_called);
1353
1354 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1355 if (grpc_iomgr_is_any_background_poller_thread()) {
1356 grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1357 return;
1358 }
1359
1360 // Schedule the callback on a closure if not internal or triggered
1361 // from a background poller thread.
1362 grpc_core::Executor::Run(
1363 GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1364 absl::OkStatus());
1365 }
1366
1367 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1368 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1369
1370 // Need an extra ref for cq here because:
1371 // We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1372 // Pollset shutdown decrements the cq ref count which can potentially destroy
1373 // the cq (if that happens to be the last ref).
1374 // Creating an extra ref here prevents the cq from getting destroyed while
1375 // this function is still active
1376 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1377 gpr_mu_lock(cq->mu);
1378 if (cqd->shutdown_called) {
1379 gpr_mu_unlock(cq->mu);
1380 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1381 return;
1382 }
1383 cqd->shutdown_called = true;
1384 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1385 gpr_mu_unlock(cq->mu);
1386 cq_finish_shutdown_callback(cq);
1387 } else {
1388 gpr_mu_unlock(cq->mu);
1389 }
1390 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1391 }
1392
1393 // Shutdown simply drops a ref that we reserved at creation time; if we drop
1394 // to zero here, then enter shutdown mode and wake up any waiters
1395 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1396 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1397 grpc_core::ExecCtx exec_ctx;
1398 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1399 cq->vtable->shutdown(cq);
1400 }
1401
1402 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1403 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1404 grpc_completion_queue_shutdown(cq);
1405
1406 grpc_core::ExecCtx exec_ctx;
1407 GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1408 }
1409
1410 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1411 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1412 }
1413
1414 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1415 return cq->poller_vtable->can_listen;
1416 }
1417