1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/deadline/deadline_filter.h"
20
21 #include <stdbool.h>
22 #include <string.h>
23
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/lib/channel/channel_stack_builder.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/iomgr/timer.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/surface/channel_init.h"
34
35 namespace grpc_core {
36
37 // A fire-and-forget class representing a pending deadline timer.
38 // Allocated on the call arena.
39 class TimerState {
40 public:
TimerState(grpc_call_element * elem,grpc_millis deadline)41 TimerState(grpc_call_element* elem, grpc_millis deadline) : elem_(elem) {
42 grpc_deadline_state* deadline_state =
43 static_cast<grpc_deadline_state*>(elem_->call_data);
44 GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
45 GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
46 grpc_timer_init(&timer_, deadline, &closure_);
47 }
48
Cancel()49 void Cancel() { grpc_timer_cancel(&timer_); }
50
51 private:
52 // The on_complete callback used when sending a cancel_error batch down the
53 // filter stack. Yields the call combiner when the batch returns.
YieldCallCombiner(void * arg,grpc_error *)54 static void YieldCallCombiner(void* arg, grpc_error* /*ignored*/) {
55 TimerState* self = static_cast<TimerState*>(arg);
56 grpc_deadline_state* deadline_state =
57 static_cast<grpc_deadline_state*>(self->elem_->call_data);
58 GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
59 "got on_complete from cancel_stream batch");
60 GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
61 }
62
63 // This is called via the call combiner, so access to deadline_state is
64 // synchronized.
SendCancelOpInCallCombiner(void * arg,grpc_error * error)65 static void SendCancelOpInCallCombiner(void* arg, grpc_error* error) {
66 TimerState* self = static_cast<TimerState*>(arg);
67 grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
68 GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
69 batch->cancel_stream = true;
70 batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
71 self->elem_->filter->start_transport_stream_op_batch(self->elem_, batch);
72 }
73
74 // Timer callback.
TimerCallback(void * arg,grpc_error * error)75 static void TimerCallback(void* arg, grpc_error* error) {
76 TimerState* self = static_cast<TimerState*>(arg);
77 grpc_deadline_state* deadline_state =
78 static_cast<grpc_deadline_state*>(self->elem_->call_data);
79 if (error != GRPC_ERROR_CANCELLED) {
80 error = grpc_error_set_int(
81 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
82 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
83 deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
84 GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
85 nullptr);
86 GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &self->closure_,
87 error,
88 "deadline exceeded -- sending cancel_stream op");
89 } else {
90 GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
91 }
92 }
93
94 // NOTE: This object's dtor is never called, so do not add any data
95 // members that require destruction!
96 // TODO(roth): We should ideally call this object's dtor somewhere,
97 // but that would require adding more synchronization, because we'd
98 // need to call the dtor only after both (a) the timer callback
99 // finishes and (b) the filter sees the call completion and attempts
100 // to cancel the timer.
101 grpc_call_element* elem_;
102 grpc_timer timer_;
103 grpc_closure closure_;
104 };
105
106 } // namespace grpc_core
107
108 //
109 // grpc_deadline_state
110 //
111
112 // Starts the deadline timer.
113 // This is called via the call combiner, so access to deadline_state is
114 // synchronized.
start_timer_if_needed(grpc_call_element * elem,grpc_millis deadline)115 static void start_timer_if_needed(grpc_call_element* elem,
116 grpc_millis deadline) {
117 if (deadline == GRPC_MILLIS_INF_FUTURE) {
118 return;
119 }
120 grpc_deadline_state* deadline_state =
121 static_cast<grpc_deadline_state*>(elem->call_data);
122 GPR_ASSERT(deadline_state->timer_state == nullptr);
123 deadline_state->timer_state =
124 deadline_state->arena->New<grpc_core::TimerState>(elem, deadline);
125 }
126
127 // Cancels the deadline timer.
128 // This is called via the call combiner, so access to deadline_state is
129 // synchronized.
cancel_timer_if_needed(grpc_deadline_state * deadline_state)130 static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
131 if (deadline_state->timer_state != nullptr) {
132 deadline_state->timer_state->Cancel();
133 deadline_state->timer_state = nullptr;
134 }
135 }
136
137 // Callback run when we receive trailing metadata.
recv_trailing_metadata_ready(void * arg,grpc_error * error)138 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
139 grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
140 cancel_timer_if_needed(deadline_state);
141 // Invoke the original callback.
142 grpc_core::Closure::Run(DEBUG_LOCATION,
143 deadline_state->original_recv_trailing_metadata_ready,
144 GRPC_ERROR_REF(error));
145 }
146
147 // Inject our own recv_trailing_metadata_ready callback into op.
inject_recv_trailing_metadata_ready(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)148 static void inject_recv_trailing_metadata_ready(
149 grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
150 deadline_state->original_recv_trailing_metadata_ready =
151 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
152 GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
153 recv_trailing_metadata_ready, deadline_state,
154 grpc_schedule_on_exec_ctx);
155 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
156 &deadline_state->recv_trailing_metadata_ready;
157 }
158
159 // Callback and associated state for starting the timer after call stack
160 // initialization has been completed.
161 struct start_timer_after_init_state {
start_timer_after_init_statestart_timer_after_init_state162 start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
163 : elem(elem), deadline(deadline) {}
~start_timer_after_init_statestart_timer_after_init_state164 ~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
165
166 bool in_call_combiner = false;
167 grpc_call_element* elem;
168 grpc_millis deadline;
169 grpc_closure closure;
170 };
start_timer_after_init(void * arg,grpc_error * error)171 static void start_timer_after_init(void* arg, grpc_error* error) {
172 struct start_timer_after_init_state* state =
173 static_cast<struct start_timer_after_init_state*>(arg);
174 grpc_deadline_state* deadline_state =
175 static_cast<grpc_deadline_state*>(state->elem->call_data);
176 if (!state->in_call_combiner) {
177 // We are initially called without holding the call combiner, so we
178 // need to bounce ourselves into it.
179 state->in_call_combiner = true;
180 GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
181 GRPC_ERROR_REF(error),
182 "scheduling deadline timer");
183 return;
184 }
185 delete state;
186 GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
187 "done scheduling deadline timer");
188 }
189
grpc_deadline_state(grpc_call_element * elem,const grpc_call_element_args & args,grpc_millis deadline)190 grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
191 const grpc_call_element_args& args,
192 grpc_millis deadline)
193 : call_stack(args.call_stack),
194 call_combiner(args.call_combiner),
195 arena(args.arena) {
196 // Deadline will always be infinite on servers, so the timer will only be
197 // set on clients with a finite deadline.
198 if (deadline != GRPC_MILLIS_INF_FUTURE) {
199 // When the deadline passes, we indicate the failure by sending down
200 // an op with cancel_error set. However, we can't send down any ops
201 // until after the call stack is fully initialized. If we start the
202 // timer here, we have no guarantee that the timer won't pop before
203 // call stack initialization is finished. To avoid that problem, we
204 // create a closure to start the timer, and we schedule that closure
205 // to be run after call stack initialization is done.
206 struct start_timer_after_init_state* state =
207 new start_timer_after_init_state(elem, deadline);
208 GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
209 grpc_schedule_on_exec_ctx);
210 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE);
211 }
212 }
213
~grpc_deadline_state()214 grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
215
grpc_deadline_state_reset(grpc_call_element * elem,grpc_millis new_deadline)216 void grpc_deadline_state_reset(grpc_call_element* elem,
217 grpc_millis new_deadline) {
218 grpc_deadline_state* deadline_state =
219 static_cast<grpc_deadline_state*>(elem->call_data);
220 cancel_timer_if_needed(deadline_state);
221 start_timer_if_needed(elem, new_deadline);
222 }
223
grpc_deadline_state_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)224 void grpc_deadline_state_client_start_transport_stream_op_batch(
225 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
226 grpc_deadline_state* deadline_state =
227 static_cast<grpc_deadline_state*>(elem->call_data);
228 if (op->cancel_stream) {
229 cancel_timer_if_needed(deadline_state);
230 } else {
231 // Make sure we know when the call is complete, so that we can cancel
232 // the timer.
233 if (op->recv_trailing_metadata) {
234 inject_recv_trailing_metadata_ready(deadline_state, op);
235 }
236 }
237 }
238
239 //
240 // filter code
241 //
242
243 // Constructor for channel_data. Used for both client and server filters.
deadline_init_channel_elem(grpc_channel_element *,grpc_channel_element_args * args)244 static grpc_error* deadline_init_channel_elem(grpc_channel_element* /*elem*/,
245 grpc_channel_element_args* args) {
246 GPR_ASSERT(!args->is_last);
247 return GRPC_ERROR_NONE;
248 }
249
250 // Destructor for channel_data. Used for both client and server filters.
deadline_destroy_channel_elem(grpc_channel_element *)251 static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
252
253 // Call data used for both client and server filter.
254 typedef struct base_call_data {
255 grpc_deadline_state deadline_state;
256 } base_call_data;
257
258 // Additional call data used only for the server filter.
259 typedef struct server_call_data {
260 base_call_data base; // Must be first.
261 // The closure for receiving initial metadata.
262 grpc_closure recv_initial_metadata_ready;
263 // Received initial metadata batch.
264 grpc_metadata_batch* recv_initial_metadata;
265 // The original recv_initial_metadata_ready closure, which we chain to
266 // after our own closure is invoked.
267 grpc_closure* next_recv_initial_metadata_ready;
268 } server_call_data;
269
270 // Constructor for call_data. Used for both client and server filters.
deadline_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)271 static grpc_error* deadline_init_call_elem(grpc_call_element* elem,
272 const grpc_call_element_args* args) {
273 new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
274 return GRPC_ERROR_NONE;
275 }
276
277 // Destructor for call_data. Used for both client and server filters.
deadline_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)278 static void deadline_destroy_call_elem(
279 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
280 grpc_closure* /*ignored*/) {
281 grpc_deadline_state* deadline_state =
282 static_cast<grpc_deadline_state*>(elem->call_data);
283 deadline_state->~grpc_deadline_state();
284 }
285
286 // Method for starting a call op for client filter.
deadline_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)287 static void deadline_client_start_transport_stream_op_batch(
288 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
289 grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
290 // Chain to next filter.
291 grpc_call_next_op(elem, op);
292 }
293
294 // Callback for receiving initial metadata on the server.
recv_initial_metadata_ready(void * arg,grpc_error * error)295 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
296 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
297 server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
298 start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
299 // Invoke the next callback.
300 grpc_core::Closure::Run(DEBUG_LOCATION,
301 calld->next_recv_initial_metadata_ready,
302 GRPC_ERROR_REF(error));
303 }
304
305 // Method for starting a call op for server filter.
deadline_server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)306 static void deadline_server_start_transport_stream_op_batch(
307 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
308 server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
309 if (op->cancel_stream) {
310 cancel_timer_if_needed(&calld->base.deadline_state);
311 } else {
312 // If we're receiving initial metadata, we need to get the deadline
313 // from the recv_initial_metadata_ready callback. So we inject our
314 // own callback into that hook.
315 if (op->recv_initial_metadata) {
316 calld->next_recv_initial_metadata_ready =
317 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
318 calld->recv_initial_metadata =
319 op->payload->recv_initial_metadata.recv_initial_metadata;
320 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
321 recv_initial_metadata_ready, elem,
322 grpc_schedule_on_exec_ctx);
323 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
324 &calld->recv_initial_metadata_ready;
325 }
326 // Make sure we know when the call is complete, so that we can cancel
327 // the timer.
328 // Note that we trigger this on recv_trailing_metadata, even though
329 // the client never sends trailing metadata, because this is the
330 // hook that tells us when the call is complete on the server side.
331 if (op->recv_trailing_metadata) {
332 inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
333 }
334 }
335 // Chain to next filter.
336 grpc_call_next_op(elem, op);
337 }
338
339 const grpc_channel_filter grpc_client_deadline_filter = {
340 deadline_client_start_transport_stream_op_batch,
341 grpc_channel_next_op,
342 sizeof(base_call_data),
343 deadline_init_call_elem,
344 grpc_call_stack_ignore_set_pollset_or_pollset_set,
345 deadline_destroy_call_elem,
346 0, // sizeof(channel_data)
347 deadline_init_channel_elem,
348 deadline_destroy_channel_elem,
349 grpc_channel_next_get_info,
350 "deadline",
351 };
352
353 const grpc_channel_filter grpc_server_deadline_filter = {
354 deadline_server_start_transport_stream_op_batch,
355 grpc_channel_next_op,
356 sizeof(server_call_data),
357 deadline_init_call_elem,
358 grpc_call_stack_ignore_set_pollset_or_pollset_set,
359 deadline_destroy_call_elem,
360 0, // sizeof(channel_data)
361 deadline_init_channel_elem,
362 deadline_destroy_channel_elem,
363 grpc_channel_next_get_info,
364 "deadline",
365 };
366
grpc_deadline_checking_enabled(const grpc_channel_args * channel_args)367 bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
368 return grpc_channel_arg_get_bool(
369 grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
370 !grpc_channel_args_want_minimal_stack(channel_args));
371 }
372
maybe_add_deadline_filter(grpc_channel_stack_builder * builder,void * arg)373 static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
374 void* arg) {
375 return grpc_deadline_checking_enabled(
376 grpc_channel_stack_builder_get_channel_arguments(builder))
377 ? grpc_channel_stack_builder_prepend_filter(
378 builder, static_cast<const grpc_channel_filter*>(arg),
379 nullptr, nullptr)
380 : true;
381 }
382
grpc_deadline_filter_init(void)383 void grpc_deadline_filter_init(void) {
384 grpc_channel_init_register_stage(
385 GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
386 maybe_add_deadline_filter,
387 const_cast<grpc_channel_filter*>(&grpc_client_deadline_filter));
388 grpc_channel_init_register_stage(
389 GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
390 maybe_add_deadline_filter,
391 const_cast<grpc_channel_filter*>(&grpc_server_deadline_filter));
392 }
393
grpc_deadline_filter_shutdown(void)394 void grpc_deadline_filter_shutdown(void) {}
395