• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/deadline/deadline_filter.h"
20 
21 #include <stdbool.h>
22 #include <string.h>
23 
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/channel/channel_stack_builder.h"
30 #include "src/core/lib/iomgr/timer.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/surface/channel_init.h"
33 
34 //
35 // grpc_deadline_state
36 //
37 
38 // The on_complete callback used when sending a cancel_error batch down the
39 // filter stack.  Yields the call combiner when the batch returns.
yield_call_combiner(void * arg,grpc_error * ignored)40 static void yield_call_combiner(void* arg, grpc_error* ignored) {
41   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
42   GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
43                           "got on_complete from cancel_stream batch");
44   GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
45 }
46 
47 // This is called via the call combiner, so access to deadline_state is
48 // synchronized.
send_cancel_op_in_call_combiner(void * arg,grpc_error * error)49 static void send_cancel_op_in_call_combiner(void* arg, grpc_error* error) {
50   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
51   grpc_deadline_state* deadline_state =
52       static_cast<grpc_deadline_state*>(elem->call_data);
53   grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
54       GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
55                         deadline_state, grpc_schedule_on_exec_ctx));
56   batch->cancel_stream = true;
57   batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
58   elem->filter->start_transport_stream_op_batch(elem, batch);
59 }
60 
61 // Timer callback.
timer_callback(void * arg,grpc_error * error)62 static void timer_callback(void* arg, grpc_error* error) {
63   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
64   grpc_deadline_state* deadline_state =
65       static_cast<grpc_deadline_state*>(elem->call_data);
66   if (error != GRPC_ERROR_CANCELLED) {
67     error = grpc_error_set_int(
68         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
69         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
70     grpc_call_combiner_cancel(deadline_state->call_combiner,
71                               GRPC_ERROR_REF(error));
72     GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
73                       send_cancel_op_in_call_combiner, elem,
74                       grpc_schedule_on_exec_ctx);
75     GRPC_CALL_COMBINER_START(deadline_state->call_combiner,
76                              &deadline_state->timer_callback, error,
77                              "deadline exceeded -- sending cancel_stream op");
78   } else {
79     GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer");
80   }
81 }
82 
83 // Starts the deadline timer.
84 // This is called via the call combiner, so access to deadline_state is
85 // synchronized.
start_timer_if_needed(grpc_call_element * elem,grpc_millis deadline)86 static void start_timer_if_needed(grpc_call_element* elem,
87                                   grpc_millis deadline) {
88   if (deadline == GRPC_MILLIS_INF_FUTURE) {
89     return;
90   }
91   grpc_deadline_state* deadline_state =
92       static_cast<grpc_deadline_state*>(elem->call_data);
93   grpc_closure* closure = nullptr;
94   switch (deadline_state->timer_state) {
95     case GRPC_DEADLINE_STATE_PENDING:
96       // Note: We do not start the timer if there is already a timer
97       return;
98     case GRPC_DEADLINE_STATE_FINISHED:
99       deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
100       // If we've already created and destroyed a timer, we always create a
101       // new closure: we have no other guarantee that the inlined closure is
102       // not in use (it may hold a pending call to timer_callback)
103       closure =
104           GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
105       break;
106     case GRPC_DEADLINE_STATE_INITIAL:
107       deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
108       closure =
109           GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
110                             elem, grpc_schedule_on_exec_ctx);
111       break;
112   }
113   GPR_ASSERT(closure != nullptr);
114   GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
115   grpc_timer_init(&deadline_state->timer, deadline, closure);
116 }
117 
118 // Cancels the deadline timer.
119 // This is called via the call combiner, so access to deadline_state is
120 // synchronized.
cancel_timer_if_needed(grpc_deadline_state * deadline_state)121 static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
122   if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
123     deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
124     grpc_timer_cancel(&deadline_state->timer);
125   } else {
126     // timer was either in STATE_INITAL (nothing to cancel)
127     // OR in STATE_FINISHED (again nothing to cancel)
128   }
129 }
130 
131 // Callback run when we receive trailing metadata.
recv_trailing_metadata_ready(void * arg,grpc_error * error)132 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
133   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
134   cancel_timer_if_needed(deadline_state);
135   // Invoke the original callback.
136   GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
137                    GRPC_ERROR_REF(error));
138 }
139 
140 // Inject our own recv_trailing_metadata_ready callback into op.
inject_recv_trailing_metadata_ready(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)141 static void inject_recv_trailing_metadata_ready(
142     grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
143   deadline_state->original_recv_trailing_metadata_ready =
144       op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
145   GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
146                     recv_trailing_metadata_ready, deadline_state,
147                     grpc_schedule_on_exec_ctx);
148   op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
149       &deadline_state->recv_trailing_metadata_ready;
150 }
151 
152 // Callback and associated state for starting the timer after call stack
153 // initialization has been completed.
154 struct start_timer_after_init_state {
155   bool in_call_combiner;
156   grpc_call_element* elem;
157   grpc_millis deadline;
158   grpc_closure closure;
159 };
start_timer_after_init(void * arg,grpc_error * error)160 static void start_timer_after_init(void* arg, grpc_error* error) {
161   struct start_timer_after_init_state* state =
162       static_cast<struct start_timer_after_init_state*>(arg);
163   grpc_deadline_state* deadline_state =
164       static_cast<grpc_deadline_state*>(state->elem->call_data);
165   if (!state->in_call_combiner) {
166     // We are initially called without holding the call combiner, so we
167     // need to bounce ourselves into it.
168     state->in_call_combiner = true;
169     GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
170                              GRPC_ERROR_REF(error),
171                              "scheduling deadline timer");
172     return;
173   }
174   start_timer_if_needed(state->elem, state->deadline);
175   gpr_free(state);
176   GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
177                           "done scheduling deadline timer");
178 }
179 
grpc_deadline_state_init(grpc_call_element * elem,grpc_call_stack * call_stack,grpc_call_combiner * call_combiner,grpc_millis deadline)180 void grpc_deadline_state_init(grpc_call_element* elem,
181                               grpc_call_stack* call_stack,
182                               grpc_call_combiner* call_combiner,
183                               grpc_millis deadline) {
184   grpc_deadline_state* deadline_state =
185       static_cast<grpc_deadline_state*>(elem->call_data);
186   deadline_state->call_stack = call_stack;
187   deadline_state->call_combiner = call_combiner;
188   // Deadline will always be infinite on servers, so the timer will only be
189   // set on clients with a finite deadline.
190   if (deadline != GRPC_MILLIS_INF_FUTURE) {
191     // When the deadline passes, we indicate the failure by sending down
192     // an op with cancel_error set.  However, we can't send down any ops
193     // until after the call stack is fully initialized.  If we start the
194     // timer here, we have no guarantee that the timer won't pop before
195     // call stack initialization is finished.  To avoid that problem, we
196     // create a closure to start the timer, and we schedule that closure
197     // to be run after call stack initialization is done.
198     struct start_timer_after_init_state* state =
199         static_cast<struct start_timer_after_init_state*>(
200             gpr_zalloc(sizeof(*state)));
201     state->elem = elem;
202     state->deadline = deadline;
203     GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
204                       grpc_schedule_on_exec_ctx);
205     GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
206   }
207 }
208 
grpc_deadline_state_destroy(grpc_call_element * elem)209 void grpc_deadline_state_destroy(grpc_call_element* elem) {
210   grpc_deadline_state* deadline_state =
211       static_cast<grpc_deadline_state*>(elem->call_data);
212   cancel_timer_if_needed(deadline_state);
213 }
214 
grpc_deadline_state_reset(grpc_call_element * elem,grpc_millis new_deadline)215 void grpc_deadline_state_reset(grpc_call_element* elem,
216                                grpc_millis new_deadline) {
217   grpc_deadline_state* deadline_state =
218       static_cast<grpc_deadline_state*>(elem->call_data);
219   cancel_timer_if_needed(deadline_state);
220   start_timer_if_needed(elem, new_deadline);
221 }
222 
grpc_deadline_state_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)223 void grpc_deadline_state_client_start_transport_stream_op_batch(
224     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
225   grpc_deadline_state* deadline_state =
226       static_cast<grpc_deadline_state*>(elem->call_data);
227   if (op->cancel_stream) {
228     cancel_timer_if_needed(deadline_state);
229   } else {
230     // Make sure we know when the call is complete, so that we can cancel
231     // the timer.
232     if (op->recv_trailing_metadata) {
233       inject_recv_trailing_metadata_ready(deadline_state, op);
234     }
235   }
236 }
237 
238 //
239 // filter code
240 //
241 
242 // Constructor for channel_data.  Used for both client and server filters.
init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)243 static grpc_error* init_channel_elem(grpc_channel_element* elem,
244                                      grpc_channel_element_args* args) {
245   GPR_ASSERT(!args->is_last);
246   return GRPC_ERROR_NONE;
247 }
248 
249 // Destructor for channel_data.  Used for both client and server filters.
destroy_channel_elem(grpc_channel_element * elem)250 static void destroy_channel_elem(grpc_channel_element* elem) {}
251 
252 // Call data used for both client and server filter.
253 typedef struct base_call_data {
254   grpc_deadline_state deadline_state;
255 } base_call_data;
256 
257 // Additional call data used only for the server filter.
258 typedef struct server_call_data {
259   base_call_data base;  // Must be first.
260   // The closure for receiving initial metadata.
261   grpc_closure recv_initial_metadata_ready;
262   // Received initial metadata batch.
263   grpc_metadata_batch* recv_initial_metadata;
264   // The original recv_initial_metadata_ready closure, which we chain to
265   // after our own closure is invoked.
266   grpc_closure* next_recv_initial_metadata_ready;
267 } server_call_data;
268 
269 // Constructor for call_data.  Used for both client and server filters.
init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)270 static grpc_error* init_call_elem(grpc_call_element* elem,
271                                   const grpc_call_element_args* args) {
272   grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
273                            args->deadline);
274   return GRPC_ERROR_NONE;
275 }
276 
277 // Destructor for call_data.  Used for both client and server filters.
destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * ignored)278 static void destroy_call_elem(grpc_call_element* elem,
279                               const grpc_call_final_info* final_info,
280                               grpc_closure* ignored) {
281   grpc_deadline_state_destroy(elem);
282 }
283 
284 // Method for starting a call op for client filter.
client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)285 static void client_start_transport_stream_op_batch(
286     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
287   grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
288   // Chain to next filter.
289   grpc_call_next_op(elem, op);
290 }
291 
292 // Callback for receiving initial metadata on the server.
recv_initial_metadata_ready(void * arg,grpc_error * error)293 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
294   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
295   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
296   start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
297   // Invoke the next callback.
298   GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
299                    GRPC_ERROR_REF(error));
300 }
301 
302 // Method for starting a call op for server filter.
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)303 static void server_start_transport_stream_op_batch(
304     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
305   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
306   if (op->cancel_stream) {
307     cancel_timer_if_needed(&calld->base.deadline_state);
308   } else {
309     // If we're receiving initial metadata, we need to get the deadline
310     // from the recv_initial_metadata_ready callback.  So we inject our
311     // own callback into that hook.
312     if (op->recv_initial_metadata) {
313       calld->next_recv_initial_metadata_ready =
314           op->payload->recv_initial_metadata.recv_initial_metadata_ready;
315       calld->recv_initial_metadata =
316           op->payload->recv_initial_metadata.recv_initial_metadata;
317       GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
318                         recv_initial_metadata_ready, elem,
319                         grpc_schedule_on_exec_ctx);
320       op->payload->recv_initial_metadata.recv_initial_metadata_ready =
321           &calld->recv_initial_metadata_ready;
322     }
323     // Make sure we know when the call is complete, so that we can cancel
324     // the timer.
325     // Note that we trigger this on recv_trailing_metadata, even though
326     // the client never sends trailing metadata, because this is the
327     // hook that tells us when the call is complete on the server side.
328     if (op->recv_trailing_metadata) {
329       inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
330     }
331   }
332   // Chain to next filter.
333   grpc_call_next_op(elem, op);
334 }
335 
336 const grpc_channel_filter grpc_client_deadline_filter = {
337     client_start_transport_stream_op_batch,
338     grpc_channel_next_op,
339     sizeof(base_call_data),
340     init_call_elem,
341     grpc_call_stack_ignore_set_pollset_or_pollset_set,
342     destroy_call_elem,
343     0,  // sizeof(channel_data)
344     init_channel_elem,
345     destroy_channel_elem,
346     grpc_channel_next_get_info,
347     "deadline",
348 };
349 
350 const grpc_channel_filter grpc_server_deadline_filter = {
351     server_start_transport_stream_op_batch,
352     grpc_channel_next_op,
353     sizeof(server_call_data),
354     init_call_elem,
355     grpc_call_stack_ignore_set_pollset_or_pollset_set,
356     destroy_call_elem,
357     0,  // sizeof(channel_data)
358     init_channel_elem,
359     destroy_channel_elem,
360     grpc_channel_next_get_info,
361     "deadline",
362 };
363 
grpc_deadline_checking_enabled(const grpc_channel_args * channel_args)364 bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
365   return grpc_channel_arg_get_bool(
366       grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
367       !grpc_channel_args_want_minimal_stack(channel_args));
368 }
369 
maybe_add_deadline_filter(grpc_channel_stack_builder * builder,void * arg)370 static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
371                                       void* arg) {
372   return grpc_deadline_checking_enabled(
373              grpc_channel_stack_builder_get_channel_arguments(builder))
374              ? grpc_channel_stack_builder_prepend_filter(
375                    builder, static_cast<const grpc_channel_filter*>(arg),
376                    nullptr, nullptr)
377              : true;
378 }
379 
grpc_deadline_filter_init(void)380 void grpc_deadline_filter_init(void) {
381   grpc_channel_init_register_stage(
382       GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
383       maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
384   grpc_channel_init_register_stage(
385       GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
386       maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
387 }
388 
grpc_deadline_filter_shutdown(void)389 void grpc_deadline_filter_shutdown(void) {}
390