1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/call_combiner.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28
29 grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
30
decode_cancel_state_error(gpr_atm cancel_state)31 static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
32 if (cancel_state & 1) {
33 return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
34 }
35 return GRPC_ERROR_NONE;
36 }
37
encode_cancel_state_error(grpc_error * error)38 static gpr_atm encode_cancel_state_error(grpc_error* error) {
39 return static_cast<gpr_atm>(1) | (gpr_atm)error;
40 }
41
grpc_call_combiner_init(grpc_call_combiner * call_combiner)42 void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
43 gpr_mpscq_init(&call_combiner->queue);
44 }
45
grpc_call_combiner_destroy(grpc_call_combiner * call_combiner)46 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
47 gpr_mpscq_destroy(&call_combiner->queue);
48 GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
49 }
50
51 #ifndef NDEBUG
52 #define DEBUG_ARGS , const char *file, int line
53 #define DEBUG_FMT_STR "%s:%d: "
54 #define DEBUG_FMT_ARGS , file, line
55 #else
56 #define DEBUG_ARGS
57 #define DEBUG_FMT_STR
58 #define DEBUG_FMT_ARGS
59 #endif
60
grpc_call_combiner_start(grpc_call_combiner * call_combiner,grpc_closure * closure,grpc_error * error DEBUG_ARGS,const char * reason)61 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
62 grpc_closure* closure,
63 grpc_error* error DEBUG_ARGS,
64 const char* reason) {
65 GPR_TIMER_SCOPE("call_combiner_start", 0);
66 if (grpc_call_combiner_trace.enabled()) {
67 gpr_log(GPR_INFO,
68 "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
69 "%s] error=%s",
70 call_combiner, closure DEBUG_FMT_ARGS, reason,
71 grpc_error_string(error));
72 }
73 size_t prev_size = static_cast<size_t>(
74 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
75 if (grpc_call_combiner_trace.enabled()) {
76 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
77 prev_size + 1);
78 }
79 GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
80 if (prev_size == 0) {
81 GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
82
83 GPR_TIMER_MARK("call_combiner_initiate", 0);
84 if (grpc_call_combiner_trace.enabled()) {
85 gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
86 }
87 // Queue was empty, so execute this closure immediately.
88 GRPC_CLOSURE_SCHED(closure, error);
89 } else {
90 if (grpc_call_combiner_trace.enabled()) {
91 gpr_log(GPR_INFO, " QUEUING");
92 }
93 // Queue was not empty, so add closure to queue.
94 closure->error_data.error = error;
95 gpr_mpscq_push(&call_combiner->queue,
96 reinterpret_cast<gpr_mpscq_node*>(closure));
97 }
98 }
99
grpc_call_combiner_stop(grpc_call_combiner * call_combiner DEBUG_ARGS,const char * reason)100 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
101 const char* reason) {
102 GPR_TIMER_SCOPE("call_combiner_stop", 0);
103 if (grpc_call_combiner_trace.enabled()) {
104 gpr_log(GPR_INFO,
105 "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
106 call_combiner DEBUG_FMT_ARGS, reason);
107 }
108 size_t prev_size = static_cast<size_t>(
109 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
110 if (grpc_call_combiner_trace.enabled()) {
111 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
112 prev_size - 1);
113 }
114 GPR_ASSERT(prev_size >= 1);
115 if (prev_size > 1) {
116 while (true) {
117 if (grpc_call_combiner_trace.enabled()) {
118 gpr_log(GPR_INFO, " checking queue");
119 }
120 bool empty;
121 grpc_closure* closure = reinterpret_cast<grpc_closure*>(
122 gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
123 if (closure == nullptr) {
124 // This can happen either due to a race condition within the mpscq
125 // code or because of a race with grpc_call_combiner_start().
126 if (grpc_call_combiner_trace.enabled()) {
127 gpr_log(GPR_INFO, " queue returned no result; checking again");
128 }
129 continue;
130 }
131 if (grpc_call_combiner_trace.enabled()) {
132 gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
133 closure, grpc_error_string(closure->error_data.error));
134 }
135 GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
136 break;
137 }
138 } else if (grpc_call_combiner_trace.enabled()) {
139 gpr_log(GPR_INFO, " queue empty");
140 }
141 }
142
grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner * call_combiner,grpc_closure * closure)143 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
144 grpc_closure* closure) {
145 GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
146 while (true) {
147 // Decode original state.
148 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
149 grpc_error* original_error = decode_cancel_state_error(original_state);
150 // If error is set, invoke the cancellation closure immediately.
151 // Otherwise, store the new closure.
152 if (original_error != GRPC_ERROR_NONE) {
153 if (grpc_call_combiner_trace.enabled()) {
154 gpr_log(GPR_INFO,
155 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
156 "for pre-existing cancellation",
157 call_combiner, closure);
158 }
159 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
160 break;
161 } else {
162 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
163 (gpr_atm)closure)) {
164 if (grpc_call_combiner_trace.enabled()) {
165 gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
166 call_combiner, closure);
167 }
168 // If we replaced an earlier closure, invoke the original
169 // closure with GRPC_ERROR_NONE. This allows callers to clean
170 // up any resources they may be holding for the callback.
171 if (original_state != 0) {
172 closure = (grpc_closure*)original_state;
173 if (grpc_call_combiner_trace.enabled()) {
174 gpr_log(GPR_INFO,
175 "call_combiner=%p: scheduling old cancel callback=%p",
176 call_combiner, closure);
177 }
178 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
179 }
180 break;
181 }
182 }
183 // cas failed, try again.
184 }
185 }
186
grpc_call_combiner_cancel(grpc_call_combiner * call_combiner,grpc_error * error)187 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
188 grpc_error* error) {
189 GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
190 while (true) {
191 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
192 grpc_error* original_error = decode_cancel_state_error(original_state);
193 if (original_error != GRPC_ERROR_NONE) {
194 GRPC_ERROR_UNREF(error);
195 break;
196 }
197 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
198 encode_cancel_state_error(error))) {
199 if (original_state != 0) {
200 grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
201 if (grpc_call_combiner_trace.enabled()) {
202 gpr_log(GPR_INFO,
203 "call_combiner=%p: scheduling notify_on_cancel callback=%p",
204 call_combiner, notify_on_cancel);
205 }
206 GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
207 }
208 break;
209 }
210 // cas failed, try again.
211 }
212 }
213