1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 #include "src/core/lib/surface/completion_queue.h"
19
20 #include <grpc/grpc.h>
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/atm.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/sync.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27 #include <stdio.h>
28
29 #include <algorithm>
30 #include <atomic>
31 #include <new>
32 #include <string>
33 #include <utility>
34 #include <vector>
35
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_format.h"
40 #include "absl/strings/str_join.h"
41 #include "src/core/lib/event_engine/default_event_engine.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/executor.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/iomgr/pollset.h"
47 #include "src/core/lib/surface/event_string.h"
48 #include "src/core/telemetry/stats.h"
49 #include "src/core/telemetry/stats_data.h"
50 #include "src/core/util/atomic_utils.h"
51 #include "src/core/util/debug_location.h"
52 #include "src/core/util/ref_counted.h"
53 #include "src/core/util/spinlock.h"
54 #include "src/core/util/status_helper.h"
55 #include "src/core/util/time.h"
56
57 #ifdef GPR_WINDOWS
58 #include "src/core/lib/experiments/experiments.h"
59 #endif
60
61 namespace {
62
63 // Specifies a cq thread local cache.
64 // The first event that occurs on a thread
65 // with a cq cache will go into that cache, and
66 // will only be returned on the thread that initialized the cache.
67 // NOTE: Only one event will ever be cached.
68 thread_local grpc_cq_completion* g_cached_event;
69 thread_local grpc_completion_queue* g_cached_cq;
70
71 struct plucker {
72 grpc_pollset_worker** worker;
73 void* tag;
74 };
75 struct cq_poller_vtable {
76 bool can_get_pollset;
77 bool can_listen;
78 size_t (*size)(void);
79 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
80 grpc_error_handle (*kick)(grpc_pollset* pollset,
81 grpc_pollset_worker* specific_worker);
82 grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
83 grpc_core::Timestamp deadline);
84 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
85 void (*destroy)(grpc_pollset* pollset);
86 };
87 typedef struct non_polling_worker {
88 gpr_cv cv;
89 bool kicked;
90 struct non_polling_worker* next;
91 struct non_polling_worker* prev;
92 } non_polling_worker;
93
94 struct non_polling_poller {
95 gpr_mu mu;
96 bool kicked_without_poller;
97 non_polling_worker* root;
98 grpc_closure* shutdown;
99 };
non_polling_poller_size(void)100 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
101
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)102 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
103 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
104 gpr_mu_init(&npp->mu);
105 *mu = &npp->mu;
106 }
107
non_polling_poller_destroy(grpc_pollset * pollset)108 void non_polling_poller_destroy(grpc_pollset* pollset) {
109 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
110 gpr_mu_destroy(&npp->mu);
111 }
112
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)113 grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
114 grpc_pollset_worker** worker,
115 grpc_core::Timestamp deadline) {
116 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
117 if (npp->shutdown) return absl::OkStatus();
118 if (npp->kicked_without_poller) {
119 npp->kicked_without_poller = false;
120 return absl::OkStatus();
121 }
122 non_polling_worker w;
123 gpr_cv_init(&w.cv);
124 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
125 if (npp->root == nullptr) {
126 npp->root = w.next = w.prev = &w;
127 } else {
128 w.next = npp->root;
129 w.prev = w.next->prev;
130 w.next->prev = w.prev->next = &w;
131 }
132 w.kicked = false;
133 gpr_timespec deadline_ts = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
134 while (!npp->shutdown && !w.kicked &&
135 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) {
136 }
137 grpc_core::ExecCtx::Get()->InvalidateNow();
138 if (&w == npp->root) {
139 npp->root = w.next;
140 if (&w == npp->root) {
141 if (npp->shutdown) {
142 grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown,
143 absl::OkStatus());
144 }
145 npp->root = nullptr;
146 }
147 }
148 w.next->prev = w.prev;
149 w.prev->next = w.next;
150 gpr_cv_destroy(&w.cv);
151 if (worker != nullptr) *worker = nullptr;
152 return absl::OkStatus();
153 }
154
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)155 grpc_error_handle non_polling_poller_kick(
156 grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
157 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
158 if (specific_worker == nullptr) {
159 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
160 }
161 if (specific_worker != nullptr) {
162 non_polling_worker* w =
163 reinterpret_cast<non_polling_worker*>(specific_worker);
164 if (!w->kicked) {
165 w->kicked = true;
166 gpr_cv_signal(&w->cv);
167 }
168 } else {
169 p->kicked_without_poller = true;
170 }
171 return absl::OkStatus();
172 }
173
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)174 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
175 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
176 CHECK_NE(closure, nullptr);
177 p->shutdown = closure;
178 if (p->root == nullptr) {
179 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
180 } else {
181 non_polling_worker* w = p->root;
182 do {
183 gpr_cv_signal(&w->cv);
184 w = w->next;
185 } while (w != p->root);
186 }
187 }
188
189 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
190 // GRPC_CQ_DEFAULT_POLLING
191 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
192 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
193 // GRPC_CQ_NON_LISTENING
194 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
195 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
196 // GRPC_CQ_NON_POLLING
197 {false, false, non_polling_poller_size, non_polling_poller_init,
198 non_polling_poller_kick, non_polling_poller_work,
199 non_polling_poller_shutdown, non_polling_poller_destroy},
200 };
201
202 } // namespace
203
204 struct cq_vtable {
205 grpc_cq_completion_type cq_completion_type;
206 size_t data_size;
207 void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
208 void (*shutdown)(grpc_completion_queue* cq);
209 void (*destroy)(void* data);
210 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
211 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error,
212 void (*done)(void* done_arg, grpc_cq_completion* storage),
213 void* done_arg, grpc_cq_completion* storage, bool internal);
214 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
215 void* reserved);
216 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
217 gpr_timespec deadline, void* reserved);
218 };
219
220 namespace {
221
222 // Queue that holds the cq_completion_events. Internally uses
223 // MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
224 // queue). It uses a queue_lock to support multiple consumers.
225 // Only used in completion queues whose completion_type is GRPC_CQ_NEXT
226 class CqEventQueue {
227 public:
228 CqEventQueue() = default;
229 ~CqEventQueue() = default;
230
231 // Note: The counter is not incremented/decremented atomically with push/pop.
232 // The count is only eventually consistent
num_items() const233 intptr_t num_items() const {
234 return num_queue_items_.load(std::memory_order_relaxed);
235 }
236
237 bool Push(grpc_cq_completion* c);
238 grpc_cq_completion* Pop();
239
240 private:
241 // Spinlock to serialize consumers i.e pop() operations
242 gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
243
244 grpc_core::MultiProducerSingleConsumerQueue queue_;
245
246 // A lazy counter of number of items in the queue. This is NOT atomically
247 // incremented/decremented along with push/pop operations and hence is only
248 // eventually consistent
249 std::atomic<intptr_t> num_queue_items_{0};
250 };
251
252 struct cq_next_data {
~cq_next_data__anon4ffaa0de0211::cq_next_data253 ~cq_next_data() {
254 CHECK_EQ(queue.num_items(), 0);
255 #ifndef NDEBUG
256 if (pending_events.load(std::memory_order_acquire) != 0) {
257 LOG(ERROR) << "Destroying CQ without draining it fully.";
258 }
259 #endif
260 }
261
262 /// Completed events for completion-queues of type GRPC_CQ_NEXT
263 CqEventQueue queue;
264
265 /// Counter of how many things have ever been queued on this completion queue
266 /// useful for avoiding locks to check the queue
267 std::atomic<intptr_t> things_queued_ever{0};
268
269 /// Number of outstanding events (+1 if not shut down)
270 /// Initial count is dropped by grpc_completion_queue_shutdown
271 std::atomic<intptr_t> pending_events{1};
272
273 /// 0 initially. 1 once we initiated shutdown
274 bool shutdown_called = false;
275 };
276
277 struct cq_pluck_data {
cq_pluck_data__anon4ffaa0de0211::cq_pluck_data278 cq_pluck_data() {
279 completed_tail = &completed_head;
280 completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
281 }
282
~cq_pluck_data__anon4ffaa0de0211::cq_pluck_data283 ~cq_pluck_data() {
284 CHECK(completed_head.next == reinterpret_cast<uintptr_t>(&completed_head));
285 #ifndef NDEBUG
286 if (pending_events.load(std::memory_order_acquire) != 0) {
287 LOG(ERROR) << "Destroying CQ without draining it fully.";
288 }
289 #endif
290 }
291
292 /// Completed events for completion-queues of type GRPC_CQ_PLUCK
293 grpc_cq_completion completed_head;
294 grpc_cq_completion* completed_tail;
295
296 /// Number of pending events (+1 if we're not shutdown).
297 /// Initial count is dropped by grpc_completion_queue_shutdown.
298 std::atomic<intptr_t> pending_events{1};
299
300 /// Counter of how many things have ever been queued on this completion queue
301 /// useful for avoiding locks to check the queue
302 std::atomic<intptr_t> things_queued_ever{0};
303
304 /// 0 initially. 1 once we completed shutting
305 // TODO(sreek): This is not needed since (shutdown == 1) if and only if
306 // (pending_events == 0). So consider removing this in future and use
307 // pending_events
308 std::atomic<bool> shutdown{false};
309
310 /// 0 initially. 1 once we initiated shutdown
311 bool shutdown_called = false;
312
313 int num_pluckers = 0;
314 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
315 };
316
317 struct cq_callback_data {
cq_callback_data__anon4ffaa0de0211::cq_callback_data318 explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
319 : shutdown_callback(shutdown_callback),
320 event_engine(grpc_event_engine::experimental::GetDefaultEventEngine()) {
321 }
322
~cq_callback_data__anon4ffaa0de0211::cq_callback_data323 ~cq_callback_data() {
324 #ifndef NDEBUG
325 if (pending_events.load(std::memory_order_acquire) != 0) {
326 LOG(ERROR) << "Destroying CQ without draining it fully.";
327 }
328 #endif
329 }
330
331 /// No actual completed events queue, unlike other types
332
333 /// Number of pending events (+1 if we're not shutdown).
334 /// Initial count is dropped by grpc_completion_queue_shutdown.
335 std::atomic<intptr_t> pending_events{1};
336
337 /// 0 initially. 1 once we initiated shutdown
338 bool shutdown_called = false;
339
340 /// A callback that gets invoked when the CQ completes shutdown
341 grpc_completion_queue_functor* shutdown_callback;
342
343 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
344 };
345
346 } // namespace
347
348 // Completion queue structure
349 struct grpc_completion_queue {
350 /// Once owning_refs drops to zero, we will destroy the cq
351 grpc_core::RefCount owning_refs;
352 /// Add the paddings to fix the false sharing
353 char padding_1[GPR_CACHELINE_SIZE];
354 gpr_mu* mu;
355
356 char padding_2[GPR_CACHELINE_SIZE];
357 const cq_vtable* vtable;
358
359 char padding_3[GPR_CACHELINE_SIZE];
360 const cq_poller_vtable* poller_vtable;
361
362 #ifndef NDEBUG
363 void** outstanding_tags;
364 size_t outstanding_tag_count;
365 size_t outstanding_tag_capacity;
366 #endif
367
368 grpc_closure pollset_shutdown_done;
369 int num_polls;
370 };
371
372 // Forward declarations
373 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
374 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
375 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
376 static void cq_shutdown_next(grpc_completion_queue* cq);
377 static void cq_shutdown_pluck(grpc_completion_queue* cq);
378 static void cq_shutdown_callback(grpc_completion_queue* cq);
379
380 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
381 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
382 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
383
384 // A cq_end_op function is called when an operation on a given CQ with
385 // a given tag has completed. The storage argument is a reference to the
386 // space reserved for this completion as it is placed into the corresponding
387 // queue. The done argument is a callback that will be invoked when it is
388 // safe to free up that storage. The storage MUST NOT be freed until the
389 // done callback is invoked.
390 static void cq_end_op_for_next(
391 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
392 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
393 grpc_cq_completion* storage, bool internal);
394
395 static void cq_end_op_for_pluck(
396 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
397 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
398 grpc_cq_completion* storage, bool internal);
399
400 static void cq_end_op_for_callback(
401 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
402 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
403 grpc_cq_completion* storage, bool internal);
404
405 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
406 void* reserved);
407
408 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
409 gpr_timespec deadline, void* reserved);
410
411 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
412 static void cq_init_next(void* data,
413 grpc_completion_queue_functor* shutdown_callback);
414 static void cq_init_pluck(void* data,
415 grpc_completion_queue_functor* shutdown_callback);
416 static void cq_init_callback(void* data,
417 grpc_completion_queue_functor* shutdown_callback);
418 static void cq_destroy_next(void* data);
419 static void cq_destroy_pluck(void* data);
420 static void cq_destroy_callback(void* data);
421
422 // Completion queue vtables based on the completion-type
423 static const cq_vtable g_cq_vtable[] = {
424 // GRPC_CQ_NEXT
425 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
426 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
427 nullptr},
428 // GRPC_CQ_PLUCK
429 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
430 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
431 cq_pluck},
432 // GRPC_CQ_CALLBACK
433 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
434 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
435 cq_end_op_for_callback, nullptr, nullptr},
436 };
437
438 #define DATA_FROM_CQ(cq) ((void*)((cq) + 1))
439 #define POLLSET_FROM_CQ(cq) \
440 ((grpc_pollset*)((cq)->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
441
442 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
443 do { \
444 if (GRPC_TRACE_FLAG_ENABLED(api) && \
445 (GRPC_TRACE_FLAG_ENABLED(queue_pluck) || \
446 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
447 LOG(INFO) << "RETURN_EVENT[" << (cq) \
448 << "]: " << grpc_event_string(event); \
449 } \
450 } while (0)
451
452 static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
453
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)454 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
455 if (g_cached_cq == nullptr) {
456 g_cached_event = nullptr;
457 g_cached_cq = cq;
458 }
459 }
460
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)461 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
462 void** tag, int* ok) {
463 grpc_cq_completion* storage = g_cached_event;
464 int ret = 0;
465 if (storage != nullptr && g_cached_cq == cq) {
466 *tag = storage->tag;
467 grpc_core::ExecCtx exec_ctx;
468 *ok = (storage->next & uintptr_t{1}) == 1;
469 storage->done(storage->done_arg, storage);
470 ret = 1;
471 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
472 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
473 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
474 gpr_mu_lock(cq->mu);
475 cq_finish_shutdown_next(cq);
476 gpr_mu_unlock(cq->mu);
477 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
478 }
479 }
480 g_cached_event = nullptr;
481 g_cached_cq = nullptr;
482
483 return ret;
484 }
485
Push(grpc_cq_completion * c)486 bool CqEventQueue::Push(grpc_cq_completion* c) {
487 queue_.Push(
488 reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
489 return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
490 }
491
Pop()492 grpc_cq_completion* CqEventQueue::Pop() {
493 grpc_cq_completion* c = nullptr;
494
495 if (gpr_spinlock_trylock(&queue_lock_)) {
496 bool is_empty = false;
497 c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
498 gpr_spinlock_unlock(&queue_lock_);
499 }
500
501 if (c) {
502 num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
503 }
504
505 return c;
506 }
507
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_completion_queue_functor * shutdown_callback)508 grpc_completion_queue* grpc_completion_queue_create_internal(
509 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
510 grpc_completion_queue_functor* shutdown_callback) {
511 grpc_completion_queue* cq;
512
513 GRPC_TRACE_LOG(api, INFO)
514 << "grpc_completion_queue_create_internal(completion_type="
515 << completion_type << ", polling_type=" << polling_type << ")";
516
517 switch (completion_type) {
518 case GRPC_CQ_NEXT:
519 grpc_core::global_stats().IncrementCqNextCreates();
520 break;
521 case GRPC_CQ_PLUCK:
522 grpc_core::global_stats().IncrementCqPluckCreates();
523 break;
524 case GRPC_CQ_CALLBACK:
525 grpc_core::global_stats().IncrementCqCallbackCreates();
526 break;
527 }
528
529 const cq_vtable* vtable = &g_cq_vtable[completion_type];
530 const cq_poller_vtable* poller_vtable =
531 &g_poller_vtable_by_poller_type[polling_type];
532
533 grpc_core::ExecCtx exec_ctx;
534
535 cq = static_cast<grpc_completion_queue*>(
536 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
537 poller_vtable->size()));
538
539 cq->vtable = vtable;
540 cq->poller_vtable = poller_vtable;
541
542 // One for destroy(), one for pollset_shutdown
543 new (&cq->owning_refs) grpc_core::RefCount(
544 2, GRPC_TRACE_FLAG_ENABLED(cq_refcount) ? "completion_queue" : nullptr);
545
546 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
547 vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
548
549 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
550 grpc_schedule_on_exec_ctx);
551 return cq;
552 }
553
cq_init_next(void * data,grpc_completion_queue_functor *)554 static void cq_init_next(void* data,
555 grpc_completion_queue_functor* /*shutdown_callback*/) {
556 new (data) cq_next_data();
557 }
558
cq_destroy_next(void * data)559 static void cq_destroy_next(void* data) {
560 cq_next_data* cqd = static_cast<cq_next_data*>(data);
561 cqd->~cq_next_data();
562 }
563
cq_init_pluck(void * data,grpc_completion_queue_functor *)564 static void cq_init_pluck(
565 void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
566 new (data) cq_pluck_data();
567 }
568
cq_destroy_pluck(void * data)569 static void cq_destroy_pluck(void* data) {
570 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
571 cqd->~cq_pluck_data();
572 }
573
cq_init_callback(void * data,grpc_completion_queue_functor * shutdown_callback)574 static void cq_init_callback(void* data,
575 grpc_completion_queue_functor* shutdown_callback) {
576 new (data) cq_callback_data(shutdown_callback);
577 }
578
cq_destroy_callback(void * data)579 static void cq_destroy_callback(void* data) {
580 cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
581 cqd->~cq_callback_data();
582 }
583
grpc_get_cq_completion_type(grpc_completion_queue * cq)584 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
585 return cq->vtable->cq_completion_type;
586 }
587
grpc_get_cq_poll_num(grpc_completion_queue * cq)588 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
589 int cur_num_polls;
590 gpr_mu_lock(cq->mu);
591 cur_num_polls = cq->num_polls;
592 gpr_mu_unlock(cq->mu);
593 return cur_num_polls;
594 }
595
596 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)597 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
598 const char* file, int line) {
599 grpc_core::DebugLocation debug_location(file, line);
600 #else
601 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
602 grpc_core::DebugLocation debug_location;
603 const char* reason = nullptr;
604 #endif
605 cq->owning_refs.Ref(debug_location, reason);
606 }
607
608 static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
609 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
610 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
611 }
612
613 #ifndef NDEBUG
614 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
615 const char* file, int line) {
616 grpc_core::DebugLocation debug_location(file, line);
617 #else
618 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
619 grpc_core::DebugLocation debug_location;
620 const char* reason = nullptr;
621 #endif
622 if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
623 cq->vtable->destroy(DATA_FROM_CQ(cq));
624 cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
625 #ifndef NDEBUG
626 gpr_free(cq->outstanding_tags);
627 #endif
628 gpr_free(cq);
629 }
630 }
631
632 #ifndef NDEBUG
633 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
634 int found = 0;
635 if (lock_cq) {
636 gpr_mu_lock(cq->mu);
637 }
638
639 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
640 if (cq->outstanding_tags[i] == tag) {
641 cq->outstanding_tag_count--;
642 std::swap(cq->outstanding_tags[i],
643 cq->outstanding_tags[cq->outstanding_tag_count]);
644 found = 1;
645 break;
646 }
647 }
648
649 if (lock_cq) {
650 gpr_mu_unlock(cq->mu);
651 }
652
653 CHECK(found);
654 }
655 #else
656 static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
657 bool /*lock_cq*/) {}
658 #endif
659
660 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
661 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
662 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
663 }
664
665 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
666 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
667 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
668 }
669
670 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
671 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
672 return grpc_core::IncrementIfNonzero(&cqd->pending_events);
673 }
674
675 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
676 #ifndef NDEBUG
677 gpr_mu_lock(cq->mu);
678 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
679 cq->outstanding_tag_capacity =
680 std::max(size_t(4), 2 * cq->outstanding_tag_capacity);
681 cq->outstanding_tags = static_cast<void**>(gpr_realloc(
682 cq->outstanding_tags,
683 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
684 }
685 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
686 gpr_mu_unlock(cq->mu);
687 #endif
688 return cq->vtable->begin_op(cq, tag);
689 }
690
691 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
692 // completion
693 // type of GRPC_CQ_NEXT)
694 static void cq_end_op_for_next(
695 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
696 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
697 grpc_cq_completion* storage, bool /*internal*/) {
698 if (GRPC_TRACE_FLAG_ENABLED(api) ||
699 (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
700 std::string errmsg = grpc_core::StatusToString(error);
701 GRPC_TRACE_LOG(api, INFO)
702 << "cq_end_op_for_next(cq=" << cq << ", tag=" << tag
703 << ", error=" << errmsg.c_str() << ", done=" << done
704 << ", done_arg=" << done_arg << ", storage=" << storage << ")";
705 if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
706 LOG(INFO) << "Operation failed: tag=" << tag << ", error=" << errmsg;
707 }
708 }
709 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
710 int is_success = (error.ok());
711
712 storage->tag = tag;
713 storage->done = done;
714 storage->done_arg = done_arg;
715 storage->next = static_cast<uintptr_t>(is_success);
716
717 cq_check_tag(cq, tag, true); // Used in debug builds only
718
719 if (g_cached_cq == cq && g_cached_event == nullptr) {
720 g_cached_event = storage;
721 } else {
722 // Add the completion to the queue
723 bool is_first = cqd->queue.Push(storage);
724 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
725 // Since we do not hold the cq lock here, it is important to do an 'acquire'
726 // load here (instead of a 'no_barrier' load) to match with the release
727 // store
728 // (done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
729 //
730 if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
731 // Only kick if this is the first item queued
732 if (is_first) {
733 gpr_mu_lock(cq->mu);
734 grpc_error_handle kick_error =
735 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
736 gpr_mu_unlock(cq->mu);
737
738 if (!kick_error.ok()) {
739 LOG(ERROR) << "Kick failed: "
740 << grpc_core::StatusToString(kick_error);
741 }
742 }
743 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
744 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
745 gpr_mu_lock(cq->mu);
746 cq_finish_shutdown_next(cq);
747 gpr_mu_unlock(cq->mu);
748 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
749 }
750 } else {
751 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
752 cqd->pending_events.store(0, std::memory_order_release);
753 gpr_mu_lock(cq->mu);
754 cq_finish_shutdown_next(cq);
755 gpr_mu_unlock(cq->mu);
756 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
757 }
758 }
759 }
760
761 // Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
762 // completion
763 // type of GRPC_CQ_PLUCK)
764 static void cq_end_op_for_pluck(
765 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
766 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
767 grpc_cq_completion* storage, bool /*internal*/) {
768 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
769 int is_success = (error.ok());
770
771 if (GRPC_TRACE_FLAG_ENABLED(api) ||
772 (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
773 std::string errmsg = grpc_core::StatusToString(error);
774 GRPC_TRACE_LOG(api, INFO)
775 << "cq_end_op_for_pluck(cq=" << cq << ", tag=" << tag
776 << ", error=" << errmsg.c_str() << ", done=" << done
777 << ", done_arg=" << done_arg << ", storage=" << storage << ")";
778 if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
779 LOG(ERROR) << "Operation failed: tag=" << tag << ", error=" << errmsg;
780 }
781 }
782
783 storage->tag = tag;
784 storage->done = done;
785 storage->done_arg = done_arg;
786 storage->next = reinterpret_cast<uintptr_t>(&cqd->completed_head) |
787 static_cast<uintptr_t>(is_success);
788
789 gpr_mu_lock(cq->mu);
790 cq_check_tag(cq, tag, false); // Used in debug builds only
791
792 // Add to the list of completions
793 cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
794 cqd->completed_tail->next =
795 reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
796 cqd->completed_tail = storage;
797
798 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
799 cq_finish_shutdown_pluck(cq);
800 gpr_mu_unlock(cq->mu);
801 } else {
802 grpc_pollset_worker* pluck_worker = nullptr;
803 for (int i = 0; i < cqd->num_pluckers; i++) {
804 if (cqd->pluckers[i].tag == tag) {
805 pluck_worker = *cqd->pluckers[i].worker;
806 break;
807 }
808 }
809
810 grpc_error_handle kick_error =
811 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
812 gpr_mu_unlock(cq->mu);
813 if (!kick_error.ok()) {
814 LOG(ERROR) << "Kick failed: " << kick_error;
815 }
816 }
817 }
818
819 static void functor_callback(void* arg, grpc_error_handle error) {
820 auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
821 functor->functor_run(functor, error.ok());
822 }
823
824 // Complete an event on a completion queue of type GRPC_CQ_CALLBACK
825 static void cq_end_op_for_callback(
826 grpc_completion_queue* cq, void* tag, grpc_error_handle error,
827 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
828 grpc_cq_completion* storage, bool internal) {
829 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
830
831 if (GRPC_TRACE_FLAG_ENABLED(api) ||
832 (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok())) {
833 std::string errmsg = grpc_core::StatusToString(error);
834 GRPC_TRACE_LOG(api, INFO)
835 << "cq_end_op_for_callback(cq=" << cq << ", tag=" << tag
836 << ", error=" << errmsg.c_str() << ", done=" << done
837 << ", done_arg=" << done_arg << ", storage=" << storage << ")";
838 if (GRPC_TRACE_FLAG_ENABLED(op_failure) && !error.ok()) {
839 LOG(ERROR) << "Operation failed: tag=" << tag << ", error=" << errmsg;
840 }
841 }
842
843 // The callback-based CQ isn't really a queue at all and thus has no need
844 // for reserved storage. Invoke the done callback right away to release it.
845 done(done_arg, storage);
846
847 cq_check_tag(cq, tag, true); // Used in debug builds only
848
849 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
850 cq_finish_shutdown_callback(cq);
851 }
852
853 auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
854 if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
855 // Run the callback on EventEngine threads.
856 cqd->event_engine->Run(
857 [engine = cqd->event_engine, functor, ok = error.ok()]() {
858 grpc_core::ExecCtx exec_ctx;
859 (*functor->functor_run)(functor, ok);
860 });
861 return;
862 }
863 // If possible, schedule the callback onto an existing thread-local
864 // ApplicationCallbackExecCtx, which is a work queue. This is possible for:
865 // 1. The callback is internally-generated and there is an ACEC available
866 // 2. The callback is marked inlineable and there is an ACEC available
867 // 3. We are already running in a background poller thread (which always has
868 // an ACEC available at the base of the stack).
869 if (((internal || functor->inlineable) &&
870 grpc_core::ApplicationCallbackExecCtx::Available()) ||
871 grpc_iomgr_is_any_background_poller_thread()) {
872 grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));
873 return;
874 }
875
876 // Schedule the callback on a closure if not internal or triggered
877 // from a background poller thread.
878 grpc_core::Executor::Run(
879 GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
880 }
881
882 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
883 grpc_error_handle error,
884 void (*done)(void* done_arg, grpc_cq_completion* storage),
885 void* done_arg, grpc_cq_completion* storage,
886 bool internal) {
887 cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
888 }
889
890 struct cq_is_finished_arg {
891 gpr_atm last_seen_things_queued_ever;
892 grpc_completion_queue* cq;
893 grpc_core::Timestamp deadline;
894 grpc_cq_completion* stolen_completion;
895 void* tag; // for pluck
896 bool first_loop;
897 };
898 class ExecCtxNext : public grpc_core::ExecCtx {
899 public:
900 explicit ExecCtxNext(void* arg)
901 : ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqNext")),
902 check_ready_to_finish_arg_(arg) {}
903
904 bool CheckReadyToFinish() override {
905 cq_is_finished_arg* a =
906 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
907 grpc_completion_queue* cq = a->cq;
908 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
909 CHECK_EQ(a->stolen_completion, nullptr);
910
911 intptr_t current_last_seen_things_queued_ever =
912 cqd->things_queued_ever.load(std::memory_order_relaxed);
913
914 if (current_last_seen_things_queued_ever !=
915 a->last_seen_things_queued_ever) {
916 a->last_seen_things_queued_ever =
917 cqd->things_queued_ever.load(std::memory_order_relaxed);
918
919 // Pop a cq_completion from the queue. Returns NULL if the queue is empty
920 // might return NULL in some cases even if the queue is not empty; but
921 // that
922 // is ok and doesn't affect correctness. Might effect the tail latencies a
923 // bit)
924 a->stolen_completion = cqd->queue.Pop();
925 if (a->stolen_completion != nullptr) {
926 return true;
927 }
928 }
929 return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
930 }
931
932 private:
933 void* check_ready_to_finish_arg_;
934 };
935
936 #ifndef NDEBUG
937 static void dump_pending_tags(grpc_completion_queue* cq) {
938 if (!GRPC_TRACE_FLAG_ENABLED(pending_tags)) return;
939 std::vector<std::string> parts;
940 parts.push_back("PENDING TAGS:");
941 gpr_mu_lock(cq->mu);
942 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
943 parts.push_back(absl::StrFormat(" %p", cq->outstanding_tags[i]));
944 }
945 gpr_mu_unlock(cq->mu);
946 VLOG(2) << absl::StrJoin(parts, "");
947 }
948 #else
949 static void dump_pending_tags(grpc_completion_queue* /*cq*/) {}
950 #endif
951
952 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
953 void* reserved) {
954 grpc_event ret;
955 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
956
957 GRPC_TRACE_LOG(api, INFO)
958 << "grpc_completion_queue_next(cq=" << cq
959 << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
960 << ", tv_nsec: " << deadline.tv_nsec
961 << ", clock_type: " << (int)deadline.clock_type
962 << " }, reserved=" << reserved << ")";
963 CHECK(!reserved);
964
965 dump_pending_tags(cq);
966
967 GRPC_CQ_INTERNAL_REF(cq, "next");
968
969 grpc_core::Timestamp deadline_millis =
970 grpc_core::Timestamp::FromTimespecRoundUp(deadline);
971 cq_is_finished_arg is_finished_arg = {
972 cqd->things_queued_ever.load(std::memory_order_relaxed),
973 cq,
974 deadline_millis,
975 nullptr,
976 nullptr,
977 true};
978 ExecCtxNext exec_ctx(&is_finished_arg);
979 for (;;) {
980 grpc_core::Timestamp iteration_deadline = deadline_millis;
981
982 if (is_finished_arg.stolen_completion != nullptr) {
983 grpc_cq_completion* c = is_finished_arg.stolen_completion;
984 is_finished_arg.stolen_completion = nullptr;
985 ret.type = GRPC_OP_COMPLETE;
986 ret.success = c->next & 1u;
987 ret.tag = c->tag;
988 c->done(c->done_arg, c);
989 break;
990 }
991
992 grpc_cq_completion* c = cqd->queue.Pop();
993
994 if (c != nullptr) {
995 ret.type = GRPC_OP_COMPLETE;
996 ret.success = c->next & 1u;
997 ret.tag = c->tag;
998 c->done(c->done_arg, c);
999 break;
1000 } else {
1001 // If c == NULL it means either the queue is empty OR in an transient
1002 // inconsistent state. If it is the latter, we should do a 0-timeout poll
1003 // so that the thread comes back quickly from poll to make a second
1004 // attempt at popping. Not doing this can potentially deadlock this
1005 // thread forever (if the deadline is infinity)
1006 if (cqd->queue.num_items() > 0) {
1007 iteration_deadline = grpc_core::Timestamp::ProcessEpoch();
1008 }
1009 }
1010
1011 if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
1012 // Before returning, check if the queue has any items left over (since
1013 // MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
1014 // even if the queue is not empty. If so, keep retrying but do not
1015 // return GRPC_QUEUE_SHUTDOWN
1016 if (cqd->queue.num_items() > 0) {
1017 // Go to the beginning of the loop. No point doing a poll because
1018 // (cq->shutdown == true) is only possible when there is no pending
1019 // work (i.e cq->pending_events == 0) and any outstanding completion
1020 // events should have already been queued on this cq
1021 continue;
1022 }
1023
1024 ret.type = GRPC_QUEUE_SHUTDOWN;
1025 ret.success = 0;
1026 break;
1027 }
1028
1029 if (!is_finished_arg.first_loop &&
1030 grpc_core::Timestamp::Now() >= deadline_millis) {
1031 ret.type = GRPC_QUEUE_TIMEOUT;
1032 ret.success = 0;
1033 dump_pending_tags(cq);
1034 break;
1035 }
1036
1037 // The main polling work happens in grpc_pollset_work
1038 gpr_mu_lock(cq->mu);
1039 cq->num_polls++;
1040 grpc_error_handle err = cq->poller_vtable->work(
1041 POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
1042 gpr_mu_unlock(cq->mu);
1043
1044 if (!err.ok()) {
1045 LOG(ERROR) << "Completion queue next failed: "
1046 << grpc_core::StatusToString(err);
1047 if (err == absl::CancelledError()) {
1048 ret.type = GRPC_QUEUE_SHUTDOWN;
1049 } else {
1050 ret.type = GRPC_QUEUE_TIMEOUT;
1051 }
1052 ret.success = 0;
1053 dump_pending_tags(cq);
1054 break;
1055 }
1056 is_finished_arg.first_loop = false;
1057 }
1058
1059 if (cqd->queue.num_items() > 0 &&
1060 cqd->pending_events.load(std::memory_order_acquire) > 0) {
1061 gpr_mu_lock(cq->mu);
1062 (void)cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1063 gpr_mu_unlock(cq->mu);
1064 }
1065
1066 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1067 GRPC_CQ_INTERNAL_UNREF(cq, "next");
1068
1069 CHECK_EQ(is_finished_arg.stolen_completion, nullptr);
1070
1071 return ret;
1072 }
1073
1074 // Finishes the completion queue shutdown. This means that there are no more
1075 // completion events / tags expected from the completion queue
1076 // - Must be called under completion queue lock
1077 // - Must be called only once in completion queue's lifetime
1078 // - grpc_completion_queue_shutdown() MUST have been called before calling
1079 // this function
1080 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1081 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1082
1083 CHECK(cqd->shutdown_called);
1084 CHECK_EQ(cqd->pending_events.load(std::memory_order_relaxed), 0);
1085
1086 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1087 }
1088
1089 static void cq_shutdown_next(grpc_completion_queue* cq) {
1090 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1091
1092 // Need an extra ref for cq here because:
1093 // We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1094 // Pollset shutdown decrements the cq ref count which can potentially destroy
1095 // the cq (if that happens to be the last ref).
1096 // Creating an extra ref here prevents the cq from getting destroyed while
1097 // this function is still active
1098 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1099 gpr_mu_lock(cq->mu);
1100 if (cqd->shutdown_called) {
1101 gpr_mu_unlock(cq->mu);
1102 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1103 return;
1104 }
1105 cqd->shutdown_called = true;
1106 // Doing acq/release fetch_sub here to match with
1107 // cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1108 // on this counter without necessarily holding a lock on cq
1109 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1110 cq_finish_shutdown_next(cq);
1111 }
1112 gpr_mu_unlock(cq->mu);
1113 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1114 }
1115
1116 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1117 gpr_timespec deadline, void* reserved) {
1118 return cq->vtable->next(cq, deadline, reserved);
1119 }
1120
1121 static int add_plucker(grpc_completion_queue* cq, void* tag,
1122 grpc_pollset_worker** worker) {
1123 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1124 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1125 return 0;
1126 }
1127 cqd->pluckers[cqd->num_pluckers].tag = tag;
1128 cqd->pluckers[cqd->num_pluckers].worker = worker;
1129 cqd->num_pluckers++;
1130 return 1;
1131 }
1132
1133 static void del_plucker(grpc_completion_queue* cq, void* tag,
1134 grpc_pollset_worker** worker) {
1135 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1136 for (int i = 0; i < cqd->num_pluckers; i++) {
1137 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1138 cqd->num_pluckers--;
1139 std::swap(cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1140 return;
1141 }
1142 }
1143 GPR_UNREACHABLE_CODE(return);
1144 }
1145
1146 class ExecCtxPluck : public grpc_core::ExecCtx {
1147 public:
1148 explicit ExecCtxPluck(void* arg)
1149 : ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqPluck")),
1150 check_ready_to_finish_arg_(arg) {}
1151
1152 bool CheckReadyToFinish() override {
1153 cq_is_finished_arg* a =
1154 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1155 grpc_completion_queue* cq = a->cq;
1156 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1157
1158 CHECK_EQ(a->stolen_completion, nullptr);
1159 gpr_atm current_last_seen_things_queued_ever =
1160 cqd->things_queued_ever.load(std::memory_order_relaxed);
1161 if (current_last_seen_things_queued_ever !=
1162 a->last_seen_things_queued_ever) {
1163 gpr_mu_lock(cq->mu);
1164 a->last_seen_things_queued_ever =
1165 cqd->things_queued_ever.load(std::memory_order_relaxed);
1166 grpc_cq_completion* c;
1167 grpc_cq_completion* prev = &cqd->completed_head;
1168 while ((c = reinterpret_cast<grpc_cq_completion*>(
1169 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1170 if (c->tag == a->tag) {
1171 prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1172 if (c == cqd->completed_tail) {
1173 cqd->completed_tail = prev;
1174 }
1175 gpr_mu_unlock(cq->mu);
1176 a->stolen_completion = c;
1177 return true;
1178 }
1179 prev = c;
1180 }
1181 gpr_mu_unlock(cq->mu);
1182 }
1183 return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
1184 }
1185
1186 private:
1187 void* check_ready_to_finish_arg_;
1188 };
1189
1190 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1191 gpr_timespec deadline, void* reserved) {
1192 grpc_event ret;
1193 grpc_cq_completion* c;
1194 grpc_cq_completion* prev;
1195 grpc_pollset_worker* worker = nullptr;
1196 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1197
1198 if (GRPC_TRACE_FLAG_ENABLED(queue_pluck)) {
1199 GRPC_TRACE_LOG(api, INFO)
1200 << "grpc_completion_queue_pluck(cq=" << cq << ", tag=" << tag
1201 << ", deadline=gpr_timespec { tv_sec: " << deadline.tv_sec
1202 << ", tv_nsec: " << deadline.tv_nsec
1203 << ", clock_type: " << (int)deadline.clock_type
1204 << " }, reserved=" << reserved << ")";
1205 }
1206 CHECK(!reserved);
1207
1208 dump_pending_tags(cq);
1209
1210 GRPC_CQ_INTERNAL_REF(cq, "pluck");
1211 gpr_mu_lock(cq->mu);
1212 grpc_core::Timestamp deadline_millis =
1213 grpc_core::Timestamp::FromTimespecRoundUp(deadline);
1214 cq_is_finished_arg is_finished_arg = {
1215 cqd->things_queued_ever.load(std::memory_order_relaxed),
1216 cq,
1217 deadline_millis,
1218 nullptr,
1219 tag,
1220 true};
1221 ExecCtxPluck exec_ctx(&is_finished_arg);
1222 for (;;) {
1223 if (is_finished_arg.stolen_completion != nullptr) {
1224 gpr_mu_unlock(cq->mu);
1225 c = is_finished_arg.stolen_completion;
1226 is_finished_arg.stolen_completion = nullptr;
1227 ret.type = GRPC_OP_COMPLETE;
1228 ret.success = c->next & 1u;
1229 ret.tag = c->tag;
1230 c->done(c->done_arg, c);
1231 break;
1232 }
1233 prev = &cqd->completed_head;
1234 while ((c = reinterpret_cast<grpc_cq_completion*>(
1235 prev->next & ~uintptr_t{1})) != &cqd->completed_head) {
1236 if (GPR_LIKELY(c->tag == tag)) {
1237 prev->next = (prev->next & uintptr_t{1}) | (c->next & ~uintptr_t{1});
1238 if (c == cqd->completed_tail) {
1239 cqd->completed_tail = prev;
1240 }
1241 gpr_mu_unlock(cq->mu);
1242 ret.type = GRPC_OP_COMPLETE;
1243 ret.success = c->next & 1u;
1244 ret.tag = c->tag;
1245 c->done(c->done_arg, c);
1246 goto done;
1247 }
1248 prev = c;
1249 }
1250 if (cqd->shutdown.load(std::memory_order_relaxed)) {
1251 gpr_mu_unlock(cq->mu);
1252 ret.type = GRPC_QUEUE_SHUTDOWN;
1253 ret.success = 0;
1254 break;
1255 }
1256 if (!add_plucker(cq, tag, &worker)) {
1257 VLOG(2) << "Too many outstanding grpc_completion_queue_pluck calls: "
1258 "maximum is "
1259 << GRPC_MAX_COMPLETION_QUEUE_PLUCKERS;
1260 gpr_mu_unlock(cq->mu);
1261 // TODO(ctiller): should we use a different result here
1262 ret.type = GRPC_QUEUE_TIMEOUT;
1263 ret.success = 0;
1264 dump_pending_tags(cq);
1265 break;
1266 }
1267 if (!is_finished_arg.first_loop &&
1268 grpc_core::Timestamp::Now() >= deadline_millis) {
1269 del_plucker(cq, tag, &worker);
1270 gpr_mu_unlock(cq->mu);
1271 ret.type = GRPC_QUEUE_TIMEOUT;
1272 ret.success = 0;
1273 dump_pending_tags(cq);
1274 break;
1275 }
1276 cq->num_polls++;
1277 grpc_error_handle err =
1278 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1279 if (!err.ok()) {
1280 del_plucker(cq, tag, &worker);
1281 gpr_mu_unlock(cq->mu);
1282 LOG(ERROR) << "Completion queue pluck failed: "
1283 << grpc_core::StatusToString(err);
1284 ret.type = GRPC_QUEUE_TIMEOUT;
1285 ret.success = 0;
1286 dump_pending_tags(cq);
1287 break;
1288 }
1289 is_finished_arg.first_loop = false;
1290 del_plucker(cq, tag, &worker);
1291 }
1292 done:
1293 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1294 GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1295
1296 CHECK_EQ(is_finished_arg.stolen_completion, nullptr);
1297
1298 return ret;
1299 }
1300
1301 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1302 gpr_timespec deadline, void* reserved) {
1303 return cq->vtable->pluck(cq, tag, deadline, reserved);
1304 }
1305
1306 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1307 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1308
1309 CHECK(cqd->shutdown_called);
1310 CHECK(!cqd->shutdown.load(std::memory_order_relaxed));
1311 cqd->shutdown.store(true, std::memory_order_relaxed);
1312
1313 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1314 }
1315
1316 // NOTE: This function is almost exactly identical to cq_shutdown_next() but
1317 // merging them is a bit tricky and probably not worth it
1318 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1319 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1320
1321 // Need an extra ref for cq here because:
1322 // We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1323 // Pollset shutdown decrements the cq ref count which can potentially destroy
1324 // the cq (if that happens to be the last ref).
1325 // Creating an extra ref here prevents the cq from getting destroyed while
1326 // this function is still active
1327 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1328 gpr_mu_lock(cq->mu);
1329 if (cqd->shutdown_called) {
1330 gpr_mu_unlock(cq->mu);
1331 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1332 return;
1333 }
1334 cqd->shutdown_called = true;
1335 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1336 cq_finish_shutdown_pluck(cq);
1337 }
1338 gpr_mu_unlock(cq->mu);
1339 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1340 }
1341
1342 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1343 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1344 auto* callback = cqd->shutdown_callback;
1345
1346 CHECK(cqd->shutdown_called);
1347
1348 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1349
1350 if (grpc_core::IsEventEngineApplicationCallbacksEnabled()) {
1351 cqd->event_engine->Run([engine = cqd->event_engine, callback]() {
1352 grpc_core::ExecCtx exec_ctx;
1353 callback->functor_run(callback, /*true=*/1);
1354 });
1355 return;
1356 }
1357 if (grpc_iomgr_is_any_background_poller_thread()) {
1358 grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
1359 return;
1360 }
1361
1362 // Schedule the callback on a closure if not internal or triggered
1363 // from a background poller thread.
1364 grpc_core::Executor::Run(
1365 GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
1366 absl::OkStatus());
1367 }
1368
1369 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1370 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1371
1372 // Need an extra ref for cq here because:
1373 // We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1374 // Pollset shutdown decrements the cq ref count which can potentially destroy
1375 // the cq (if that happens to be the last ref).
1376 // Creating an extra ref here prevents the cq from getting destroyed while
1377 // this function is still active
1378 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1379 gpr_mu_lock(cq->mu);
1380 if (cqd->shutdown_called) {
1381 gpr_mu_unlock(cq->mu);
1382 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1383 return;
1384 }
1385 cqd->shutdown_called = true;
1386 if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1387 gpr_mu_unlock(cq->mu);
1388 cq_finish_shutdown_callback(cq);
1389 } else {
1390 gpr_mu_unlock(cq->mu);
1391 }
1392 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1393 }
1394
1395 // Shutdown simply drops a ref that we reserved at creation time; if we drop
1396 // to zero here, then enter shutdown mode and wake up any waiters
1397 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1398 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1399 grpc_core::ExecCtx exec_ctx;
1400 GRPC_TRACE_LOG(api, INFO)
1401 << "grpc_completion_queue_shutdown(cq=" << cq << ")";
1402 cq->vtable->shutdown(cq);
1403 }
1404
1405 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1406 GRPC_TRACE_LOG(api, INFO) << "grpc_completion_queue_destroy(cq=" << cq << ")";
1407 grpc_completion_queue_shutdown(cq);
1408
1409 grpc_core::ExecCtx exec_ctx;
1410 GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1411 }
1412
1413 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1414 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1415 }
1416
1417 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1418 return cq->poller_vtable->can_listen;
1419 }
1420