1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/surface/completion_queue.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpc/support/time.h>
31
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/gpr/spinlock.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gpr/tls.h"
36 #include "src/core/lib/iomgr/pollset.h"
37 #include "src/core/lib/iomgr/timer.h"
38 #include "src/core/lib/profiling/timers.h"
39 #include "src/core/lib/surface/api_trace.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/surface/event_string.h"
42
43 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
44 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
45 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
46
47 // Specifies a cq thread local cache.
48 // The first event that occurs on a thread
49 // with a cq cache will go into that cache, and
50 // will only be returned on the thread that initialized the cache.
51 // NOTE: Only one event will ever be cached.
52 GPR_TLS_DECL(g_cached_event);
53 GPR_TLS_DECL(g_cached_cq);
54
55 typedef struct {
56 grpc_pollset_worker** worker;
57 void* tag;
58 } plucker;
59
60 typedef struct {
61 bool can_get_pollset;
62 bool can_listen;
63 size_t (*size)(void);
64 void (*init)(grpc_pollset* pollset, gpr_mu** mu);
65 grpc_error* (*kick)(grpc_pollset* pollset,
66 grpc_pollset_worker* specific_worker);
67 grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
68 grpc_millis deadline);
69 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
70 void (*destroy)(grpc_pollset* pollset);
71 } cq_poller_vtable;
72
73 typedef struct non_polling_worker {
74 gpr_cv cv;
75 bool kicked;
76 struct non_polling_worker* next;
77 struct non_polling_worker* prev;
78 } non_polling_worker;
79
80 typedef struct {
81 gpr_mu mu;
82 non_polling_worker* root;
83 grpc_closure* shutdown;
84 } non_polling_poller;
85
non_polling_poller_size(void)86 static size_t non_polling_poller_size(void) {
87 return sizeof(non_polling_poller);
88 }
89
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
91 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
92 gpr_mu_init(&npp->mu);
93 *mu = &npp->mu;
94 }
95
non_polling_poller_destroy(grpc_pollset * pollset)96 static void non_polling_poller_destroy(grpc_pollset* pollset) {
97 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
98 gpr_mu_destroy(&npp->mu);
99 }
100
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)101 static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
102 grpc_pollset_worker** worker,
103 grpc_millis deadline) {
104 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
105 if (npp->shutdown) return GRPC_ERROR_NONE;
106 non_polling_worker w;
107 gpr_cv_init(&w.cv);
108 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
109 if (npp->root == nullptr) {
110 npp->root = w.next = w.prev = &w;
111 } else {
112 w.next = npp->root;
113 w.prev = w.next->prev;
114 w.next->prev = w.prev->next = &w;
115 }
116 w.kicked = false;
117 gpr_timespec deadline_ts =
118 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
119 while (!npp->shutdown && !w.kicked &&
120 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
121 ;
122 grpc_core::ExecCtx::Get()->InvalidateNow();
123 if (&w == npp->root) {
124 npp->root = w.next;
125 if (&w == npp->root) {
126 if (npp->shutdown) {
127 GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
128 }
129 npp->root = nullptr;
130 }
131 }
132 w.next->prev = w.prev;
133 w.prev->next = w.next;
134 gpr_cv_destroy(&w.cv);
135 if (worker != nullptr) *worker = nullptr;
136 return GRPC_ERROR_NONE;
137 }
138
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)139 static grpc_error* non_polling_poller_kick(
140 grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
141 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
142 if (specific_worker == nullptr)
143 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
144 if (specific_worker != nullptr) {
145 non_polling_worker* w =
146 reinterpret_cast<non_polling_worker*>(specific_worker);
147 if (!w->kicked) {
148 w->kicked = true;
149 gpr_cv_signal(&w->cv);
150 }
151 }
152 return GRPC_ERROR_NONE;
153 }
154
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)155 static void non_polling_poller_shutdown(grpc_pollset* pollset,
156 grpc_closure* closure) {
157 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
158 GPR_ASSERT(closure != nullptr);
159 p->shutdown = closure;
160 if (p->root == nullptr) {
161 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
162 } else {
163 non_polling_worker* w = p->root;
164 do {
165 gpr_cv_signal(&w->cv);
166 w = w->next;
167 } while (w != p->root);
168 }
169 }
170
171 static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
172 /* GRPC_CQ_DEFAULT_POLLING */
173 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
174 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
175 /* GRPC_CQ_NON_LISTENING */
176 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
177 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
178 /* GRPC_CQ_NON_POLLING */
179 {false, false, non_polling_poller_size, non_polling_poller_init,
180 non_polling_poller_kick, non_polling_poller_work,
181 non_polling_poller_shutdown, non_polling_poller_destroy},
182 };
183
184 typedef struct cq_vtable {
185 grpc_cq_completion_type cq_completion_type;
186 size_t data_size;
187 void (*init)(void* data,
188 grpc_experimental_completion_queue_functor* shutdown_callback);
189 void (*shutdown)(grpc_completion_queue* cq);
190 void (*destroy)(void* data);
191 bool (*begin_op)(grpc_completion_queue* cq, void* tag);
192 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
193 void (*done)(void* done_arg, grpc_cq_completion* storage),
194 void* done_arg, grpc_cq_completion* storage);
195 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
196 void* reserved);
197 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
198 gpr_timespec deadline, void* reserved);
199 } cq_vtable;
200
201 /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
202 * (a lockfree multiproducer single consumer queue). It uses a queue_lock
203 * to support multiple consumers.
204 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
205 typedef struct grpc_cq_event_queue {
206 /* Spinlock to serialize consumers i.e pop() operations */
207 gpr_spinlock queue_lock;
208
209 gpr_mpscq queue;
210
211 /* A lazy counter of number of items in the queue. This is NOT atomically
212 incremented/decremented along with push/pop operations and hence is only
213 eventually consistent */
214 gpr_atm num_queue_items;
215 } grpc_cq_event_queue;
216
217 typedef struct cq_next_data {
218 /** Completed events for completion-queues of type GRPC_CQ_NEXT */
219 grpc_cq_event_queue queue;
220
221 /** Counter of how many things have ever been queued on this completion queue
222 useful for avoiding locks to check the queue */
223 gpr_atm things_queued_ever;
224
225 /* Number of outstanding events (+1 if not shut down) */
226 gpr_atm pending_events;
227
228 /** 0 initially. 1 once we initiated shutdown */
229 bool shutdown_called;
230 } cq_next_data;
231
232 typedef struct cq_pluck_data {
233 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
234 grpc_cq_completion completed_head;
235 grpc_cq_completion* completed_tail;
236
237 /** Number of pending events (+1 if we're not shutdown) */
238 gpr_atm pending_events;
239
240 /** Counter of how many things have ever been queued on this completion queue
241 useful for avoiding locks to check the queue */
242 gpr_atm things_queued_ever;
243
244 /** 0 initially. 1 once we completed shutting */
245 /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
246 * (pending_events == 0). So consider removing this in future and use
247 * pending_events */
248 gpr_atm shutdown;
249
250 /** 0 initially. 1 once we initiated shutdown */
251 bool shutdown_called;
252
253 int num_pluckers;
254 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
255 } cq_pluck_data;
256
257 typedef struct cq_callback_data {
258 /** No actual completed events queue, unlike other types */
259
260 /** Number of pending events (+1 if we're not shutdown) */
261 gpr_atm pending_events;
262
263 /** Counter of how many things have ever been queued on this completion queue
264 useful for avoiding locks to check the queue */
265 gpr_atm things_queued_ever;
266
267 /** 0 initially. 1 once we initiated shutdown */
268 bool shutdown_called;
269
270 /** A callback that gets invoked when the CQ completes shutdown */
271 grpc_experimental_completion_queue_functor* shutdown_callback;
272 } cq_callback_data;
273
274 /* Completion queue structure */
275 struct grpc_completion_queue {
276 /** Once owning_refs drops to zero, we will destroy the cq */
277 gpr_refcount owning_refs;
278
279 gpr_mu* mu;
280
281 const cq_vtable* vtable;
282 const cq_poller_vtable* poller_vtable;
283
284 #ifndef NDEBUG
285 void** outstanding_tags;
286 size_t outstanding_tag_count;
287 size_t outstanding_tag_capacity;
288 #endif
289
290 grpc_closure pollset_shutdown_done;
291 int num_polls;
292 };
293
294 /* Forward declarations */
295 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
296 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
297 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
298 static void cq_shutdown_next(grpc_completion_queue* cq);
299 static void cq_shutdown_pluck(grpc_completion_queue* cq);
300 static void cq_shutdown_callback(grpc_completion_queue* cq);
301
302 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
303 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
304 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
305
306 // A cq_end_op function is called when an operation on a given CQ with
307 // a given tag has completed. The storage argument is a reference to the
308 // space reserved for this completion as it is placed into the corresponding
309 // queue. The done argument is a callback that will be invoked when it is
310 // safe to free up that storage. The storage MUST NOT be freed until the
311 // done callback is invoked.
312 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
313 grpc_error* error,
314 void (*done)(void* done_arg,
315 grpc_cq_completion* storage),
316 void* done_arg, grpc_cq_completion* storage);
317
318 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
319 grpc_error* error,
320 void (*done)(void* done_arg,
321 grpc_cq_completion* storage),
322 void* done_arg, grpc_cq_completion* storage);
323
324 static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
325 grpc_error* error,
326 void (*done)(void* done_arg,
327 grpc_cq_completion* storage),
328 void* done_arg, grpc_cq_completion* storage);
329
330 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
331 void* reserved);
332
333 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
334 gpr_timespec deadline, void* reserved);
335
336 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
337 static void cq_init_next(
338 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
339 static void cq_init_pluck(
340 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
341 static void cq_init_callback(
342 void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
343 static void cq_destroy_next(void* data);
344 static void cq_destroy_pluck(void* data);
345 static void cq_destroy_callback(void* data);
346
347 /* Completion queue vtables based on the completion-type */
348 static const cq_vtable g_cq_vtable[] = {
349 /* GRPC_CQ_NEXT */
350 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
351 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
352 nullptr},
353 /* GRPC_CQ_PLUCK */
354 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
355 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
356 cq_pluck},
357 /* GRPC_CQ_CALLBACK */
358 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
359 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
360 cq_end_op_for_callback, nullptr, nullptr},
361 };
362
363 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
364 #define POLLSET_FROM_CQ(cq) \
365 ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
366
367 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
368
369 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
370 if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() || \
371 (event)->type != GRPC_QUEUE_TIMEOUT)) { \
372 char* _ev = grpc_event_string(event); \
373 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
374 gpr_free(_ev); \
375 }
376
377 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
378
grpc_cq_global_init()379 void grpc_cq_global_init() {
380 gpr_tls_init(&g_cached_event);
381 gpr_tls_init(&g_cached_cq);
382 }
383
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)384 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
385 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
386 gpr_tls_set(&g_cached_event, (intptr_t)0);
387 gpr_tls_set(&g_cached_cq, (intptr_t)cq);
388 }
389 }
390
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)391 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
392 void** tag, int* ok) {
393 grpc_cq_completion* storage =
394 (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
395 int ret = 0;
396 if (storage != nullptr &&
397 (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
398 *tag = storage->tag;
399 grpc_core::ExecCtx exec_ctx;
400 *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
401 storage->done(storage->done_arg, storage);
402 ret = 1;
403 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
404 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
405 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
406 gpr_mu_lock(cq->mu);
407 cq_finish_shutdown_next(cq);
408 gpr_mu_unlock(cq->mu);
409 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
410 }
411 }
412 gpr_tls_set(&g_cached_event, (intptr_t)0);
413 gpr_tls_set(&g_cached_cq, (intptr_t)0);
414
415 return ret;
416 }
417
cq_event_queue_init(grpc_cq_event_queue * q)418 static void cq_event_queue_init(grpc_cq_event_queue* q) {
419 gpr_mpscq_init(&q->queue);
420 q->queue_lock = GPR_SPINLOCK_INITIALIZER;
421 gpr_atm_no_barrier_store(&q->num_queue_items, 0);
422 }
423
cq_event_queue_destroy(grpc_cq_event_queue * q)424 static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
425 gpr_mpscq_destroy(&q->queue);
426 }
427
cq_event_queue_push(grpc_cq_event_queue * q,grpc_cq_completion * c)428 static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
429 gpr_mpscq_push(&q->queue, reinterpret_cast<gpr_mpscq_node*>(c));
430 return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
431 }
432
cq_event_queue_pop(grpc_cq_event_queue * q)433 static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
434 grpc_cq_completion* c = nullptr;
435
436 if (gpr_spinlock_trylock(&q->queue_lock)) {
437 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
438
439 bool is_empty = false;
440 c = reinterpret_cast<grpc_cq_completion*>(
441 gpr_mpscq_pop_and_check_end(&q->queue, &is_empty));
442 gpr_spinlock_unlock(&q->queue_lock);
443
444 if (c == nullptr && !is_empty) {
445 GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
446 }
447 } else {
448 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
449 }
450
451 if (c) {
452 gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
453 }
454
455 return c;
456 }
457
458 /* Note: The counter is not incremented/decremented atomically with push/pop.
459 * The count is only eventually consistent */
cq_event_queue_num_items(grpc_cq_event_queue * q)460 static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
461 return static_cast<long>(gpr_atm_no_barrier_load(&q->num_queue_items));
462 }
463
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_experimental_completion_queue_functor * shutdown_callback)464 grpc_completion_queue* grpc_completion_queue_create_internal(
465 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
466 grpc_experimental_completion_queue_functor* shutdown_callback) {
467 GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
468
469 grpc_completion_queue* cq;
470
471 GRPC_API_TRACE(
472 "grpc_completion_queue_create_internal(completion_type=%d, "
473 "polling_type=%d)",
474 2, (completion_type, polling_type));
475
476 const cq_vtable* vtable = &g_cq_vtable[completion_type];
477 const cq_poller_vtable* poller_vtable =
478 &g_poller_vtable_by_poller_type[polling_type];
479
480 grpc_core::ExecCtx exec_ctx;
481 GRPC_STATS_INC_CQS_CREATED();
482
483 cq = static_cast<grpc_completion_queue*>(
484 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
485 poller_vtable->size()));
486
487 cq->vtable = vtable;
488 cq->poller_vtable = poller_vtable;
489
490 /* One for destroy(), one for pollset_shutdown */
491 gpr_ref_init(&cq->owning_refs, 2);
492
493 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
494 vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
495
496 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
497 grpc_schedule_on_exec_ctx);
498 return cq;
499 }
500
cq_init_next(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)501 static void cq_init_next(
502 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
503 cq_next_data* cqd = static_cast<cq_next_data*>(data);
504 /* Initial count is dropped by grpc_completion_queue_shutdown */
505 gpr_atm_no_barrier_store(&cqd->pending_events, 1);
506 cqd->shutdown_called = false;
507 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
508 cq_event_queue_init(&cqd->queue);
509 }
510
cq_destroy_next(void * data)511 static void cq_destroy_next(void* data) {
512 cq_next_data* cqd = static_cast<cq_next_data*>(data);
513 GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
514 cq_event_queue_destroy(&cqd->queue);
515 }
516
cq_init_pluck(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)517 static void cq_init_pluck(
518 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
519 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
520 /* Initial count is dropped by grpc_completion_queue_shutdown */
521 gpr_atm_no_barrier_store(&cqd->pending_events, 1);
522 cqd->completed_tail = &cqd->completed_head;
523 cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
524 gpr_atm_no_barrier_store(&cqd->shutdown, 0);
525 cqd->shutdown_called = false;
526 cqd->num_pluckers = 0;
527 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
528 }
529
cq_destroy_pluck(void * data)530 static void cq_destroy_pluck(void* data) {
531 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
532 GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
533 }
534
cq_init_callback(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)535 static void cq_init_callback(
536 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
537 cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
538 /* Initial count is dropped by grpc_completion_queue_shutdown */
539 gpr_atm_no_barrier_store(&cqd->pending_events, 1);
540 cqd->shutdown_called = false;
541 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
542 cqd->shutdown_callback = shutdown_callback;
543 }
544
cq_destroy_callback(void * data)545 static void cq_destroy_callback(void* data) {}
546
grpc_get_cq_completion_type(grpc_completion_queue * cq)547 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
548 return cq->vtable->cq_completion_type;
549 }
550
grpc_get_cq_poll_num(grpc_completion_queue * cq)551 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
552 int cur_num_polls;
553 gpr_mu_lock(cq->mu);
554 cur_num_polls = cq->num_polls;
555 gpr_mu_unlock(cq->mu);
556 return cur_num_polls;
557 }
558
559 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)560 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
561 const char* file, int line) {
562 if (grpc_trace_cq_refcount.enabled()) {
563 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
564 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
565 "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
566 reason);
567 }
568 #else
569 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
570 #endif
571 gpr_ref(&cq->owning_refs);
572 }
573
574 static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
575 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
576 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
577 }
578
579 #ifndef NDEBUG
580 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
581 const char* file, int line) {
582 if (grpc_trace_cq_refcount.enabled()) {
583 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
584 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
585 "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
586 reason);
587 }
588 #else
589 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
590 #endif
591 if (gpr_unref(&cq->owning_refs)) {
592 cq->vtable->destroy(DATA_FROM_CQ(cq));
593 cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
594 #ifndef NDEBUG
595 gpr_free(cq->outstanding_tags);
596 #endif
597 gpr_free(cq);
598 }
599 }
600
601 #ifndef NDEBUG
602 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
603 int found = 0;
604 if (lock_cq) {
605 gpr_mu_lock(cq->mu);
606 }
607
608 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
609 if (cq->outstanding_tags[i] == tag) {
610 cq->outstanding_tag_count--;
611 GPR_SWAP(void*, cq->outstanding_tags[i],
612 cq->outstanding_tags[cq->outstanding_tag_count]);
613 found = 1;
614 break;
615 }
616 }
617
618 if (lock_cq) {
619 gpr_mu_unlock(cq->mu);
620 }
621
622 GPR_ASSERT(found);
623 }
624 #else
625 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
626 #endif
627
628 /* Atomically increments a counter only if the counter is not zero. Returns
629 * true if the increment was successful; false if the counter is zero */
630 static bool atm_inc_if_nonzero(gpr_atm* counter) {
631 while (true) {
632 gpr_atm count = gpr_atm_acq_load(counter);
633 /* If zero, we are done. If not, we must to a CAS (instead of an atomic
634 * increment) to maintain the contract: do not increment the counter if it
635 * is zero. */
636 if (count == 0) {
637 return false;
638 } else if (gpr_atm_full_cas(counter, count, count + 1)) {
639 break;
640 }
641 }
642
643 return true;
644 }
645
646 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
647 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
648 return atm_inc_if_nonzero(&cqd->pending_events);
649 }
650
651 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
652 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
653 return atm_inc_if_nonzero(&cqd->pending_events);
654 }
655
656 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
657 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
658 return atm_inc_if_nonzero(&cqd->pending_events);
659 }
660
661 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
662 #ifndef NDEBUG
663 gpr_mu_lock(cq->mu);
664 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
665 cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
666 cq->outstanding_tags = static_cast<void**>(gpr_realloc(
667 cq->outstanding_tags,
668 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
669 }
670 cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
671 gpr_mu_unlock(cq->mu);
672 #endif
673 return cq->vtable->begin_op(cq, tag);
674 }
675
676 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
677 * completion
678 * type of GRPC_CQ_NEXT) */
679 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
680 grpc_error* error,
681 void (*done)(void* done_arg,
682 grpc_cq_completion* storage),
683 void* done_arg, grpc_cq_completion* storage) {
684 GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
685
686 if (grpc_api_trace.enabled() ||
687 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
688 const char* errmsg = grpc_error_string(error);
689 GRPC_API_TRACE(
690 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
691 "done=%p, done_arg=%p, storage=%p)",
692 6, (cq, tag, errmsg, done, done_arg, storage));
693 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
694 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
695 }
696 }
697 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
698 int is_success = (error == GRPC_ERROR_NONE);
699
700 storage->tag = tag;
701 storage->done = done;
702 storage->done_arg = done_arg;
703 storage->next = static_cast<uintptr_t>(is_success);
704
705 cq_check_tag(cq, tag, true); /* Used in debug builds only */
706
707 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
708 (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
709 gpr_tls_set(&g_cached_event, (intptr_t)storage);
710 } else {
711 /* Add the completion to the queue */
712 bool is_first = cq_event_queue_push(&cqd->queue, storage);
713 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
714
715 /* Since we do not hold the cq lock here, it is important to do an 'acquire'
716 load here (instead of a 'no_barrier' load) to match with the release
717 store
718 (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
719 */
720 bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
721
722 if (!will_definitely_shutdown) {
723 /* Only kick if this is the first item queued */
724 if (is_first) {
725 gpr_mu_lock(cq->mu);
726 grpc_error* kick_error =
727 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
728 gpr_mu_unlock(cq->mu);
729
730 if (kick_error != GRPC_ERROR_NONE) {
731 const char* msg = grpc_error_string(kick_error);
732 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
733 GRPC_ERROR_UNREF(kick_error);
734 }
735 }
736 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
737 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
738 gpr_mu_lock(cq->mu);
739 cq_finish_shutdown_next(cq);
740 gpr_mu_unlock(cq->mu);
741 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
742 }
743 } else {
744 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
745 gpr_atm_rel_store(&cqd->pending_events, 0);
746 gpr_mu_lock(cq->mu);
747 cq_finish_shutdown_next(cq);
748 gpr_mu_unlock(cq->mu);
749 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
750 }
751 }
752
753 GRPC_ERROR_UNREF(error);
754 }
755
756 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
757 * completion
758 * type of GRPC_CQ_PLUCK) */
759 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
760 grpc_error* error,
761 void (*done)(void* done_arg,
762 grpc_cq_completion* storage),
763 void* done_arg, grpc_cq_completion* storage) {
764 GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
765
766 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
767 int is_success = (error == GRPC_ERROR_NONE);
768
769 if (grpc_api_trace.enabled() ||
770 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
771 const char* errmsg = grpc_error_string(error);
772 GRPC_API_TRACE(
773 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
774 "done=%p, done_arg=%p, storage=%p)",
775 6, (cq, tag, errmsg, done, done_arg, storage));
776 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
777 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
778 }
779 }
780
781 storage->tag = tag;
782 storage->done = done;
783 storage->done_arg = done_arg;
784 storage->next =
785 ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
786
787 gpr_mu_lock(cq->mu);
788 cq_check_tag(cq, tag, false); /* Used in debug builds only */
789
790 /* Add to the list of completions */
791 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
792 cqd->completed_tail->next =
793 ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
794 cqd->completed_tail = storage;
795
796 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
797 cq_finish_shutdown_pluck(cq);
798 gpr_mu_unlock(cq->mu);
799 } else {
800 grpc_pollset_worker* pluck_worker = nullptr;
801 for (int i = 0; i < cqd->num_pluckers; i++) {
802 if (cqd->pluckers[i].tag == tag) {
803 pluck_worker = *cqd->pluckers[i].worker;
804 break;
805 }
806 }
807
808 grpc_error* kick_error =
809 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
810
811 gpr_mu_unlock(cq->mu);
812
813 if (kick_error != GRPC_ERROR_NONE) {
814 const char* msg = grpc_error_string(kick_error);
815 gpr_log(GPR_ERROR, "Kick failed: %s", msg);
816
817 GRPC_ERROR_UNREF(kick_error);
818 }
819 }
820
821 GRPC_ERROR_UNREF(error);
822 }
823
824 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
825 static void cq_end_op_for_callback(
826 grpc_completion_queue* cq, void* tag, grpc_error* error,
827 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
828 grpc_cq_completion* storage) {
829 GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
830
831 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
832 bool is_success = (error == GRPC_ERROR_NONE);
833
834 if (grpc_api_trace.enabled() ||
835 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
836 const char* errmsg = grpc_error_string(error);
837 GRPC_API_TRACE(
838 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
839 "done=%p, done_arg=%p, storage=%p)",
840 6, (cq, tag, errmsg, done, done_arg, storage));
841 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
842 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
843 }
844 }
845
846 // The callback-based CQ isn't really a queue at all and thus has no need
847 // for reserved storage. Invoke the done callback right away to release it.
848 done(done_arg, storage);
849
850 gpr_mu_lock(cq->mu);
851 cq_check_tag(cq, tag, false); /* Used in debug builds only */
852
853 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
854 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
855 cq_finish_shutdown_callback(cq);
856 gpr_mu_unlock(cq->mu);
857 } else {
858 gpr_mu_unlock(cq->mu);
859 }
860
861 GRPC_ERROR_UNREF(error);
862
863 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
864 (*functor->functor_run)(functor, is_success);
865 }
866
867 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
868 void (*done)(void* done_arg, grpc_cq_completion* storage),
869 void* done_arg, grpc_cq_completion* storage) {
870 cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
871 }
872
873 typedef struct {
874 gpr_atm last_seen_things_queued_ever;
875 grpc_completion_queue* cq;
876 grpc_millis deadline;
877 grpc_cq_completion* stolen_completion;
878 void* tag; /* for pluck */
879 bool first_loop;
880 } cq_is_finished_arg;
881
882 class ExecCtxNext : public grpc_core::ExecCtx {
883 public:
884 ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
885
886 bool CheckReadyToFinish() override {
887 cq_is_finished_arg* a =
888 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
889 grpc_completion_queue* cq = a->cq;
890 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
891 GPR_ASSERT(a->stolen_completion == nullptr);
892
893 gpr_atm current_last_seen_things_queued_ever =
894 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
895
896 if (current_last_seen_things_queued_ever !=
897 a->last_seen_things_queued_ever) {
898 a->last_seen_things_queued_ever =
899 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
900
901 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
902 * might return NULL in some cases even if the queue is not empty; but
903 * that
904 * is ok and doesn't affect correctness. Might effect the tail latencies a
905 * bit) */
906 a->stolen_completion = cq_event_queue_pop(&cqd->queue);
907 if (a->stolen_completion != nullptr) {
908 return true;
909 }
910 }
911 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
912 }
913
914 private:
915 void* check_ready_to_finish_arg_;
916 };
917
918 #ifndef NDEBUG
919 static void dump_pending_tags(grpc_completion_queue* cq) {
920 if (!grpc_trace_pending_tags.enabled()) return;
921
922 gpr_strvec v;
923 gpr_strvec_init(&v);
924 gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
925 gpr_mu_lock(cq->mu);
926 for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
927 char* s;
928 gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
929 gpr_strvec_add(&v, s);
930 }
931 gpr_mu_unlock(cq->mu);
932 char* out = gpr_strvec_flatten(&v, nullptr);
933 gpr_strvec_destroy(&v);
934 gpr_log(GPR_DEBUG, "%s", out);
935 gpr_free(out);
936 }
937 #else
938 static void dump_pending_tags(grpc_completion_queue* cq) {}
939 #endif
940
941 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
942 void* reserved) {
943 GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
944
945 grpc_event ret;
946 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
947
948 GRPC_API_TRACE(
949 "grpc_completion_queue_next("
950 "cq=%p, "
951 "deadline=gpr_timespec { tv_sec: %" PRId64
952 ", tv_nsec: %d, clock_type: %d }, "
953 "reserved=%p)",
954 5,
955 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
956 reserved));
957 GPR_ASSERT(!reserved);
958
959 dump_pending_tags(cq);
960
961 GRPC_CQ_INTERNAL_REF(cq, "next");
962
963 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
964 cq_is_finished_arg is_finished_arg = {
965 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
966 cq,
967 deadline_millis,
968 nullptr,
969 nullptr,
970 true};
971 ExecCtxNext exec_ctx(&is_finished_arg);
972 for (;;) {
973 grpc_millis iteration_deadline = deadline_millis;
974
975 if (is_finished_arg.stolen_completion != nullptr) {
976 grpc_cq_completion* c = is_finished_arg.stolen_completion;
977 is_finished_arg.stolen_completion = nullptr;
978 ret.type = GRPC_OP_COMPLETE;
979 ret.success = c->next & 1u;
980 ret.tag = c->tag;
981 c->done(c->done_arg, c);
982 break;
983 }
984
985 grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
986
987 if (c != nullptr) {
988 ret.type = GRPC_OP_COMPLETE;
989 ret.success = c->next & 1u;
990 ret.tag = c->tag;
991 c->done(c->done_arg, c);
992 break;
993 } else {
994 /* If c == NULL it means either the queue is empty OR in an transient
995 inconsistent state. If it is the latter, we shold do a 0-timeout poll
996 so that the thread comes back quickly from poll to make a second
997 attempt at popping. Not doing this can potentially deadlock this
998 thread forever (if the deadline is infinity) */
999 if (cq_event_queue_num_items(&cqd->queue) > 0) {
1000 iteration_deadline = 0;
1001 }
1002 }
1003
1004 if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
1005 /* Before returning, check if the queue has any items left over (since
1006 gpr_mpscq_pop() can sometimes return NULL even if the queue is not
1007 empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
1008 if (cq_event_queue_num_items(&cqd->queue) > 0) {
1009 /* Go to the beginning of the loop. No point doing a poll because
1010 (cq->shutdown == true) is only possible when there is no pending
1011 work (i.e cq->pending_events == 0) and any outstanding completion
1012 events should have already been queued on this cq */
1013 continue;
1014 }
1015
1016 memset(&ret, 0, sizeof(ret));
1017 ret.type = GRPC_QUEUE_SHUTDOWN;
1018 break;
1019 }
1020
1021 if (!is_finished_arg.first_loop &&
1022 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1023 memset(&ret, 0, sizeof(ret));
1024 ret.type = GRPC_QUEUE_TIMEOUT;
1025 dump_pending_tags(cq);
1026 break;
1027 }
1028
1029 /* The main polling work happens in grpc_pollset_work */
1030 gpr_mu_lock(cq->mu);
1031 cq->num_polls++;
1032 grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1033 iteration_deadline);
1034 gpr_mu_unlock(cq->mu);
1035
1036 if (err != GRPC_ERROR_NONE) {
1037 const char* msg = grpc_error_string(err);
1038 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1039
1040 GRPC_ERROR_UNREF(err);
1041 memset(&ret, 0, sizeof(ret));
1042 ret.type = GRPC_QUEUE_TIMEOUT;
1043 dump_pending_tags(cq);
1044 break;
1045 }
1046 is_finished_arg.first_loop = false;
1047 }
1048
1049 if (cq_event_queue_num_items(&cqd->queue) > 0 &&
1050 gpr_atm_acq_load(&cqd->pending_events) > 0) {
1051 gpr_mu_lock(cq->mu);
1052 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1053 gpr_mu_unlock(cq->mu);
1054 }
1055
1056 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1057 GRPC_CQ_INTERNAL_UNREF(cq, "next");
1058
1059 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1060
1061 return ret;
1062 }
1063
1064 /* Finishes the completion queue shutdown. This means that there are no more
1065 completion events / tags expected from the completion queue
1066 - Must be called under completion queue lock
1067 - Must be called only once in completion queue's lifetime
1068 - grpc_completion_queue_shutdown() MUST have been called before calling
1069 this function */
1070 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1071 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1072
1073 GPR_ASSERT(cqd->shutdown_called);
1074 GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
1075
1076 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1077 }
1078
1079 static void cq_shutdown_next(grpc_completion_queue* cq) {
1080 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1081
1082 /* Need an extra ref for cq here because:
1083 * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1084 * Pollset shutdown decrements the cq ref count which can potentially destroy
1085 * the cq (if that happens to be the last ref).
1086 * Creating an extra ref here prevents the cq from getting destroyed while
1087 * this function is still active */
1088 GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1089 gpr_mu_lock(cq->mu);
1090 if (cqd->shutdown_called) {
1091 gpr_mu_unlock(cq->mu);
1092 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1093 return;
1094 }
1095 cqd->shutdown_called = true;
1096 /* Doing a full_fetch_add (i.e acq/release) here to match with
1097 * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
1098 * on this counter without necessarily holding a lock on cq */
1099 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1100 cq_finish_shutdown_next(cq);
1101 }
1102 gpr_mu_unlock(cq->mu);
1103 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1104 }
1105
1106 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1107 gpr_timespec deadline, void* reserved) {
1108 return cq->vtable->next(cq, deadline, reserved);
1109 }
1110
1111 static int add_plucker(grpc_completion_queue* cq, void* tag,
1112 grpc_pollset_worker** worker) {
1113 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1114 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1115 return 0;
1116 }
1117 cqd->pluckers[cqd->num_pluckers].tag = tag;
1118 cqd->pluckers[cqd->num_pluckers].worker = worker;
1119 cqd->num_pluckers++;
1120 return 1;
1121 }
1122
1123 static void del_plucker(grpc_completion_queue* cq, void* tag,
1124 grpc_pollset_worker** worker) {
1125 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1126 for (int i = 0; i < cqd->num_pluckers; i++) {
1127 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1128 cqd->num_pluckers--;
1129 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1130 return;
1131 }
1132 }
1133 GPR_UNREACHABLE_CODE(return );
1134 }
1135
1136 class ExecCtxPluck : public grpc_core::ExecCtx {
1137 public:
1138 ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1139
1140 bool CheckReadyToFinish() override {
1141 cq_is_finished_arg* a =
1142 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1143 grpc_completion_queue* cq = a->cq;
1144 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1145
1146 GPR_ASSERT(a->stolen_completion == nullptr);
1147 gpr_atm current_last_seen_things_queued_ever =
1148 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
1149 if (current_last_seen_things_queued_ever !=
1150 a->last_seen_things_queued_ever) {
1151 gpr_mu_lock(cq->mu);
1152 a->last_seen_things_queued_ever =
1153 gpr_atm_no_barrier_load(&cqd->things_queued_ever);
1154 grpc_cq_completion* c;
1155 grpc_cq_completion* prev = &cqd->completed_head;
1156 while ((c = (grpc_cq_completion*)(prev->next &
1157 ~static_cast<uintptr_t>(1))) !=
1158 &cqd->completed_head) {
1159 if (c->tag == a->tag) {
1160 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1161 (c->next & ~static_cast<uintptr_t>(1));
1162 if (c == cqd->completed_tail) {
1163 cqd->completed_tail = prev;
1164 }
1165 gpr_mu_unlock(cq->mu);
1166 a->stolen_completion = c;
1167 return true;
1168 }
1169 prev = c;
1170 }
1171 gpr_mu_unlock(cq->mu);
1172 }
1173 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1174 }
1175
1176 private:
1177 void* check_ready_to_finish_arg_;
1178 };
1179
1180 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1181 gpr_timespec deadline, void* reserved) {
1182 GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1183
1184 grpc_event ret;
1185 grpc_cq_completion* c;
1186 grpc_cq_completion* prev;
1187 grpc_pollset_worker* worker = nullptr;
1188 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1189
1190 if (grpc_cq_pluck_trace.enabled()) {
1191 GRPC_API_TRACE(
1192 "grpc_completion_queue_pluck("
1193 "cq=%p, tag=%p, "
1194 "deadline=gpr_timespec { tv_sec: %" PRId64
1195 ", tv_nsec: %d, clock_type: %d }, "
1196 "reserved=%p)",
1197 6,
1198 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1199 reserved));
1200 }
1201 GPR_ASSERT(!reserved);
1202
1203 dump_pending_tags(cq);
1204
1205 GRPC_CQ_INTERNAL_REF(cq, "pluck");
1206 gpr_mu_lock(cq->mu);
1207 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1208 cq_is_finished_arg is_finished_arg = {
1209 gpr_atm_no_barrier_load(&cqd->things_queued_ever),
1210 cq,
1211 deadline_millis,
1212 nullptr,
1213 tag,
1214 true};
1215 ExecCtxPluck exec_ctx(&is_finished_arg);
1216 for (;;) {
1217 if (is_finished_arg.stolen_completion != nullptr) {
1218 gpr_mu_unlock(cq->mu);
1219 c = is_finished_arg.stolen_completion;
1220 is_finished_arg.stolen_completion = nullptr;
1221 ret.type = GRPC_OP_COMPLETE;
1222 ret.success = c->next & 1u;
1223 ret.tag = c->tag;
1224 c->done(c->done_arg, c);
1225 break;
1226 }
1227 prev = &cqd->completed_head;
1228 while (
1229 (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1230 &cqd->completed_head) {
1231 if (c->tag == tag) {
1232 prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1233 (c->next & ~static_cast<uintptr_t>(1));
1234 if (c == cqd->completed_tail) {
1235 cqd->completed_tail = prev;
1236 }
1237 gpr_mu_unlock(cq->mu);
1238 ret.type = GRPC_OP_COMPLETE;
1239 ret.success = c->next & 1u;
1240 ret.tag = c->tag;
1241 c->done(c->done_arg, c);
1242 goto done;
1243 }
1244 prev = c;
1245 }
1246 if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
1247 gpr_mu_unlock(cq->mu);
1248 memset(&ret, 0, sizeof(ret));
1249 ret.type = GRPC_QUEUE_SHUTDOWN;
1250 break;
1251 }
1252 if (!add_plucker(cq, tag, &worker)) {
1253 gpr_log(GPR_DEBUG,
1254 "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1255 "is %d",
1256 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1257 gpr_mu_unlock(cq->mu);
1258 memset(&ret, 0, sizeof(ret));
1259 /* TODO(ctiller): should we use a different result here */
1260 ret.type = GRPC_QUEUE_TIMEOUT;
1261 dump_pending_tags(cq);
1262 break;
1263 }
1264 if (!is_finished_arg.first_loop &&
1265 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1266 del_plucker(cq, tag, &worker);
1267 gpr_mu_unlock(cq->mu);
1268 memset(&ret, 0, sizeof(ret));
1269 ret.type = GRPC_QUEUE_TIMEOUT;
1270 dump_pending_tags(cq);
1271 break;
1272 }
1273 cq->num_polls++;
1274 grpc_error* err =
1275 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1276 if (err != GRPC_ERROR_NONE) {
1277 del_plucker(cq, tag, &worker);
1278 gpr_mu_unlock(cq->mu);
1279 const char* msg = grpc_error_string(err);
1280 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1281
1282 GRPC_ERROR_UNREF(err);
1283 memset(&ret, 0, sizeof(ret));
1284 ret.type = GRPC_QUEUE_TIMEOUT;
1285 dump_pending_tags(cq);
1286 break;
1287 }
1288 is_finished_arg.first_loop = false;
1289 del_plucker(cq, tag, &worker);
1290 }
1291 done:
1292 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1293 GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1294
1295 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1296
1297 return ret;
1298 }
1299
1300 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1301 gpr_timespec deadline, void* reserved) {
1302 return cq->vtable->pluck(cq, tag, deadline, reserved);
1303 }
1304
1305 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1306 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1307
1308 GPR_ASSERT(cqd->shutdown_called);
1309 GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
1310 gpr_atm_no_barrier_store(&cqd->shutdown, 1);
1311
1312 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1313 }
1314
1315 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1316 * merging them is a bit tricky and probably not worth it */
1317 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1318 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1319
1320 /* Need an extra ref for cq here because:
1321 * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1322 * Pollset shutdown decrements the cq ref count which can potentially destroy
1323 * the cq (if that happens to be the last ref).
1324 * Creating an extra ref here prevents the cq from getting destroyed while
1325 * this function is still active */
1326 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1327 gpr_mu_lock(cq->mu);
1328 if (cqd->shutdown_called) {
1329 gpr_mu_unlock(cq->mu);
1330 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1331 return;
1332 }
1333 cqd->shutdown_called = true;
1334 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1335 cq_finish_shutdown_pluck(cq);
1336 }
1337 gpr_mu_unlock(cq->mu);
1338 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1339 }
1340
1341 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1342 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1343 auto* callback = cqd->shutdown_callback;
1344
1345 GPR_ASSERT(cqd->shutdown_called);
1346
1347 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1348 (*callback->functor_run)(callback, true);
1349 }
1350
1351 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1352 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1353
1354 /* Need an extra ref for cq here because:
1355 * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1356 * Pollset shutdown decrements the cq ref count which can potentially destroy
1357 * the cq (if that happens to be the last ref).
1358 * Creating an extra ref here prevents the cq from getting destroyed while
1359 * this function is still active */
1360 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1361 gpr_mu_lock(cq->mu);
1362 if (cqd->shutdown_called) {
1363 gpr_mu_unlock(cq->mu);
1364 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1365 return;
1366 }
1367 cqd->shutdown_called = true;
1368 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1369 gpr_mu_unlock(cq->mu);
1370 cq_finish_shutdown_callback(cq);
1371 } else {
1372 gpr_mu_unlock(cq->mu);
1373 }
1374 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1375 }
1376
1377 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1378 to zero here, then enter shutdown mode and wake up any waiters */
1379 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1380 GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1381 grpc_core::ExecCtx exec_ctx;
1382 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1383 cq->vtable->shutdown(cq);
1384 }
1385
1386 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1387 GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1388 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1389 grpc_completion_queue_shutdown(cq);
1390
1391 grpc_core::ExecCtx exec_ctx;
1392 GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1393 }
1394
1395 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1396 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1397 }
1398
1399 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1400 return cq->poller_vtable->can_listen;
1401 }
1402