1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gpr/mpscq.h"
22
23 #include <grpc/support/log.h>
24
gpr_mpscq_init(gpr_mpscq * q)25 void gpr_mpscq_init(gpr_mpscq* q) {
26 gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
27 q->tail = &q->stub;
28 gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
29 }
30
gpr_mpscq_destroy(gpr_mpscq * q)31 void gpr_mpscq_destroy(gpr_mpscq* q) {
32 GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub);
33 GPR_ASSERT(q->tail == &q->stub);
34 }
35
gpr_mpscq_push(gpr_mpscq * q,gpr_mpscq_node * n)36 bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) {
37 gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
38 gpr_mpscq_node* prev =
39 (gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
40 gpr_atm_rel_store(&prev->next, (gpr_atm)n);
41 return prev == &q->stub;
42 }
43
gpr_mpscq_pop(gpr_mpscq * q)44 gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) {
45 bool empty;
46 return gpr_mpscq_pop_and_check_end(q, &empty);
47 }
48
gpr_mpscq_pop_and_check_end(gpr_mpscq * q,bool * empty)49 gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) {
50 gpr_mpscq_node* tail = q->tail;
51 gpr_mpscq_node* next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
52 if (tail == &q->stub) {
53 // indicates the list is actually (ephemerally) empty
54 if (next == nullptr) {
55 *empty = true;
56 return nullptr;
57 }
58 q->tail = next;
59 tail = next;
60 next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
61 }
62 if (next != nullptr) {
63 *empty = false;
64 q->tail = next;
65 return tail;
66 }
67 gpr_mpscq_node* head = (gpr_mpscq_node*)gpr_atm_acq_load(&q->head);
68 if (tail != head) {
69 *empty = false;
70 // indicates a retry is in order: we're still adding
71 return nullptr;
72 }
73 gpr_mpscq_push(q, &q->stub);
74 next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
75 if (next != nullptr) {
76 *empty = false;
77 q->tail = next;
78 return tail;
79 }
80 // indicates a retry is in order: we're still adding
81 *empty = false;
82 return nullptr;
83 }
84
gpr_locked_mpscq_init(gpr_locked_mpscq * q)85 void gpr_locked_mpscq_init(gpr_locked_mpscq* q) {
86 gpr_mpscq_init(&q->queue);
87 gpr_mu_init(&q->mu);
88 }
89
gpr_locked_mpscq_destroy(gpr_locked_mpscq * q)90 void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) {
91 gpr_mpscq_destroy(&q->queue);
92 gpr_mu_destroy(&q->mu);
93 }
94
gpr_locked_mpscq_push(gpr_locked_mpscq * q,gpr_mpscq_node * n)95 bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) {
96 return gpr_mpscq_push(&q->queue, n);
97 }
98
gpr_locked_mpscq_try_pop(gpr_locked_mpscq * q)99 gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) {
100 if (gpr_mu_trylock(&q->mu)) {
101 gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue);
102 gpr_mu_unlock(&q->mu);
103 return n;
104 }
105 return nullptr;
106 }
107
gpr_locked_mpscq_pop(gpr_locked_mpscq * q)108 gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) {
109 gpr_mu_lock(&q->mu);
110 bool empty = false;
111 gpr_mpscq_node* n;
112 do {
113 n = gpr_mpscq_pop_and_check_end(&q->queue, &empty);
114 } while (n == nullptr && !empty);
115 gpr_mu_unlock(&q->mu);
116 return n;
117 }
118