• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/deadline/deadline_filter.h"
20 
21 #include <stdbool.h>
22 #include <string.h>
23 
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/channel/channel_stack_builder.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/iomgr/timer.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/surface/channel_init.h"
34 
35 namespace grpc_core {
36 
37 // A fire-and-forget class representing a pending deadline timer.
38 // Allocated on the call arena.
39 class TimerState {
40  public:
TimerState(grpc_call_element * elem,grpc_millis deadline)41   TimerState(grpc_call_element* elem, grpc_millis deadline) : elem_(elem) {
42     grpc_deadline_state* deadline_state =
43         static_cast<grpc_deadline_state*>(elem_->call_data);
44     GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
45     GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
46     grpc_timer_init(&timer_, deadline, &closure_);
47   }
48 
Cancel()49   void Cancel() { grpc_timer_cancel(&timer_); }
50 
51  private:
52   // The on_complete callback used when sending a cancel_error batch down the
53   // filter stack.  Yields the call combiner when the batch returns.
YieldCallCombiner(void * arg,grpc_error *)54   static void YieldCallCombiner(void* arg, grpc_error* /*ignored*/) {
55     TimerState* self = static_cast<TimerState*>(arg);
56     grpc_deadline_state* deadline_state =
57         static_cast<grpc_deadline_state*>(self->elem_->call_data);
58     GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
59                             "got on_complete from cancel_stream batch");
60     GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
61   }
62 
63   // This is called via the call combiner, so access to deadline_state is
64   // synchronized.
SendCancelOpInCallCombiner(void * arg,grpc_error * error)65   static void SendCancelOpInCallCombiner(void* arg, grpc_error* error) {
66     TimerState* self = static_cast<TimerState*>(arg);
67     grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
68         GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
69     batch->cancel_stream = true;
70     batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
71     self->elem_->filter->start_transport_stream_op_batch(self->elem_, batch);
72   }
73 
74   // Timer callback.
TimerCallback(void * arg,grpc_error * error)75   static void TimerCallback(void* arg, grpc_error* error) {
76     TimerState* self = static_cast<TimerState*>(arg);
77     grpc_deadline_state* deadline_state =
78         static_cast<grpc_deadline_state*>(self->elem_->call_data);
79     if (error != GRPC_ERROR_CANCELLED) {
80       error = grpc_error_set_int(
81           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
82           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
83       deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
84       GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
85                         nullptr);
86       GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &self->closure_,
87                                error,
88                                "deadline exceeded -- sending cancel_stream op");
89     } else {
90       GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
91     }
92   }
93 
94   // NOTE: This object's dtor is never called, so do not add any data
95   // members that require destruction!
96   // TODO(roth): We should ideally call this object's dtor somewhere,
97   // but that would require adding more synchronization, because we'd
98   // need to call the dtor only after both (a) the timer callback
99   // finishes and (b) the filter sees the call completion and attempts
100   // to cancel the timer.
101   grpc_call_element* elem_;
102   grpc_timer timer_;
103   grpc_closure closure_;
104 };
105 
106 }  // namespace grpc_core
107 
108 //
109 // grpc_deadline_state
110 //
111 
112 // Starts the deadline timer.
113 // This is called via the call combiner, so access to deadline_state is
114 // synchronized.
start_timer_if_needed(grpc_call_element * elem,grpc_millis deadline)115 static void start_timer_if_needed(grpc_call_element* elem,
116                                   grpc_millis deadline) {
117   if (deadline == GRPC_MILLIS_INF_FUTURE) {
118     return;
119   }
120   grpc_deadline_state* deadline_state =
121       static_cast<grpc_deadline_state*>(elem->call_data);
122   GPR_ASSERT(deadline_state->timer_state == nullptr);
123   deadline_state->timer_state =
124       deadline_state->arena->New<grpc_core::TimerState>(elem, deadline);
125 }
126 
127 // Cancels the deadline timer.
128 // This is called via the call combiner, so access to deadline_state is
129 // synchronized.
cancel_timer_if_needed(grpc_deadline_state * deadline_state)130 static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
131   if (deadline_state->timer_state != nullptr) {
132     deadline_state->timer_state->Cancel();
133     deadline_state->timer_state = nullptr;
134   }
135 }
136 
137 // Callback run when we receive trailing metadata.
recv_trailing_metadata_ready(void * arg,grpc_error * error)138 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
139   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
140   cancel_timer_if_needed(deadline_state);
141   // Invoke the original callback.
142   grpc_core::Closure::Run(DEBUG_LOCATION,
143                           deadline_state->original_recv_trailing_metadata_ready,
144                           GRPC_ERROR_REF(error));
145 }
146 
147 // Inject our own recv_trailing_metadata_ready callback into op.
inject_recv_trailing_metadata_ready(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)148 static void inject_recv_trailing_metadata_ready(
149     grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
150   deadline_state->original_recv_trailing_metadata_ready =
151       op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
152   GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
153                     recv_trailing_metadata_ready, deadline_state,
154                     grpc_schedule_on_exec_ctx);
155   op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
156       &deadline_state->recv_trailing_metadata_ready;
157 }
158 
159 // Callback and associated state for starting the timer after call stack
160 // initialization has been completed.
161 struct start_timer_after_init_state {
start_timer_after_init_statestart_timer_after_init_state162   start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
163       : elem(elem), deadline(deadline) {}
~start_timer_after_init_statestart_timer_after_init_state164   ~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
165 
166   bool in_call_combiner = false;
167   grpc_call_element* elem;
168   grpc_millis deadline;
169   grpc_closure closure;
170 };
start_timer_after_init(void * arg,grpc_error * error)171 static void start_timer_after_init(void* arg, grpc_error* error) {
172   struct start_timer_after_init_state* state =
173       static_cast<struct start_timer_after_init_state*>(arg);
174   grpc_deadline_state* deadline_state =
175       static_cast<grpc_deadline_state*>(state->elem->call_data);
176   if (!state->in_call_combiner) {
177     // We are initially called without holding the call combiner, so we
178     // need to bounce ourselves into it.
179     state->in_call_combiner = true;
180     GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
181                              GRPC_ERROR_REF(error),
182                              "scheduling deadline timer");
183     return;
184   }
185   delete state;
186   GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
187                           "done scheduling deadline timer");
188 }
189 
grpc_deadline_state(grpc_call_element * elem,const grpc_call_element_args & args,grpc_millis deadline)190 grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
191                                          const grpc_call_element_args& args,
192                                          grpc_millis deadline)
193     : call_stack(args.call_stack),
194       call_combiner(args.call_combiner),
195       arena(args.arena) {
196   // Deadline will always be infinite on servers, so the timer will only be
197   // set on clients with a finite deadline.
198   if (deadline != GRPC_MILLIS_INF_FUTURE) {
199     // When the deadline passes, we indicate the failure by sending down
200     // an op with cancel_error set.  However, we can't send down any ops
201     // until after the call stack is fully initialized.  If we start the
202     // timer here, we have no guarantee that the timer won't pop before
203     // call stack initialization is finished.  To avoid that problem, we
204     // create a closure to start the timer, and we schedule that closure
205     // to be run after call stack initialization is done.
206     struct start_timer_after_init_state* state =
207         new start_timer_after_init_state(elem, deadline);
208     GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
209                       grpc_schedule_on_exec_ctx);
210     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE);
211   }
212 }
213 
~grpc_deadline_state()214 grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
215 
grpc_deadline_state_reset(grpc_call_element * elem,grpc_millis new_deadline)216 void grpc_deadline_state_reset(grpc_call_element* elem,
217                                grpc_millis new_deadline) {
218   grpc_deadline_state* deadline_state =
219       static_cast<grpc_deadline_state*>(elem->call_data);
220   cancel_timer_if_needed(deadline_state);
221   start_timer_if_needed(elem, new_deadline);
222 }
223 
grpc_deadline_state_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)224 void grpc_deadline_state_client_start_transport_stream_op_batch(
225     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
226   grpc_deadline_state* deadline_state =
227       static_cast<grpc_deadline_state*>(elem->call_data);
228   if (op->cancel_stream) {
229     cancel_timer_if_needed(deadline_state);
230   } else {
231     // Make sure we know when the call is complete, so that we can cancel
232     // the timer.
233     if (op->recv_trailing_metadata) {
234       inject_recv_trailing_metadata_ready(deadline_state, op);
235     }
236   }
237 }
238 
239 //
240 // filter code
241 //
242 
243 // Constructor for channel_data.  Used for both client and server filters.
deadline_init_channel_elem(grpc_channel_element *,grpc_channel_element_args * args)244 static grpc_error* deadline_init_channel_elem(grpc_channel_element* /*elem*/,
245                                               grpc_channel_element_args* args) {
246   GPR_ASSERT(!args->is_last);
247   return GRPC_ERROR_NONE;
248 }
249 
250 // Destructor for channel_data.  Used for both client and server filters.
deadline_destroy_channel_elem(grpc_channel_element *)251 static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
252 
253 // Call data used for both client and server filter.
254 typedef struct base_call_data {
255   grpc_deadline_state deadline_state;
256 } base_call_data;
257 
258 // Additional call data used only for the server filter.
259 typedef struct server_call_data {
260   base_call_data base;  // Must be first.
261   // The closure for receiving initial metadata.
262   grpc_closure recv_initial_metadata_ready;
263   // Received initial metadata batch.
264   grpc_metadata_batch* recv_initial_metadata;
265   // The original recv_initial_metadata_ready closure, which we chain to
266   // after our own closure is invoked.
267   grpc_closure* next_recv_initial_metadata_ready;
268 } server_call_data;
269 
270 // Constructor for call_data.  Used for both client and server filters.
deadline_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)271 static grpc_error* deadline_init_call_elem(grpc_call_element* elem,
272                                            const grpc_call_element_args* args) {
273   new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
274   return GRPC_ERROR_NONE;
275 }
276 
277 // Destructor for call_data.  Used for both client and server filters.
deadline_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)278 static void deadline_destroy_call_elem(
279     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
280     grpc_closure* /*ignored*/) {
281   grpc_deadline_state* deadline_state =
282       static_cast<grpc_deadline_state*>(elem->call_data);
283   deadline_state->~grpc_deadline_state();
284 }
285 
286 // Method for starting a call op for client filter.
deadline_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)287 static void deadline_client_start_transport_stream_op_batch(
288     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
289   grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
290   // Chain to next filter.
291   grpc_call_next_op(elem, op);
292 }
293 
294 // Callback for receiving initial metadata on the server.
recv_initial_metadata_ready(void * arg,grpc_error * error)295 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
296   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
297   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
298   start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
299   // Invoke the next callback.
300   grpc_core::Closure::Run(DEBUG_LOCATION,
301                           calld->next_recv_initial_metadata_ready,
302                           GRPC_ERROR_REF(error));
303 }
304 
305 // Method for starting a call op for server filter.
deadline_server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)306 static void deadline_server_start_transport_stream_op_batch(
307     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
308   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
309   if (op->cancel_stream) {
310     cancel_timer_if_needed(&calld->base.deadline_state);
311   } else {
312     // If we're receiving initial metadata, we need to get the deadline
313     // from the recv_initial_metadata_ready callback.  So we inject our
314     // own callback into that hook.
315     if (op->recv_initial_metadata) {
316       calld->next_recv_initial_metadata_ready =
317           op->payload->recv_initial_metadata.recv_initial_metadata_ready;
318       calld->recv_initial_metadata =
319           op->payload->recv_initial_metadata.recv_initial_metadata;
320       GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
321                         recv_initial_metadata_ready, elem,
322                         grpc_schedule_on_exec_ctx);
323       op->payload->recv_initial_metadata.recv_initial_metadata_ready =
324           &calld->recv_initial_metadata_ready;
325     }
326     // Make sure we know when the call is complete, so that we can cancel
327     // the timer.
328     // Note that we trigger this on recv_trailing_metadata, even though
329     // the client never sends trailing metadata, because this is the
330     // hook that tells us when the call is complete on the server side.
331     if (op->recv_trailing_metadata) {
332       inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
333     }
334   }
335   // Chain to next filter.
336   grpc_call_next_op(elem, op);
337 }
338 
339 const grpc_channel_filter grpc_client_deadline_filter = {
340     deadline_client_start_transport_stream_op_batch,
341     grpc_channel_next_op,
342     sizeof(base_call_data),
343     deadline_init_call_elem,
344     grpc_call_stack_ignore_set_pollset_or_pollset_set,
345     deadline_destroy_call_elem,
346     0,  // sizeof(channel_data)
347     deadline_init_channel_elem,
348     deadline_destroy_channel_elem,
349     grpc_channel_next_get_info,
350     "deadline",
351 };
352 
353 const grpc_channel_filter grpc_server_deadline_filter = {
354     deadline_server_start_transport_stream_op_batch,
355     grpc_channel_next_op,
356     sizeof(server_call_data),
357     deadline_init_call_elem,
358     grpc_call_stack_ignore_set_pollset_or_pollset_set,
359     deadline_destroy_call_elem,
360     0,  // sizeof(channel_data)
361     deadline_init_channel_elem,
362     deadline_destroy_channel_elem,
363     grpc_channel_next_get_info,
364     "deadline",
365 };
366 
grpc_deadline_checking_enabled(const grpc_channel_args * channel_args)367 bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
368   return grpc_channel_arg_get_bool(
369       grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
370       !grpc_channel_args_want_minimal_stack(channel_args));
371 }
372 
maybe_add_deadline_filter(grpc_channel_stack_builder * builder,void * arg)373 static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
374                                       void* arg) {
375   return grpc_deadline_checking_enabled(
376              grpc_channel_stack_builder_get_channel_arguments(builder))
377              ? grpc_channel_stack_builder_prepend_filter(
378                    builder, static_cast<const grpc_channel_filter*>(arg),
379                    nullptr, nullptr)
380              : true;
381 }
382 
grpc_deadline_filter_init(void)383 void grpc_deadline_filter_init(void) {
384   grpc_channel_init_register_stage(
385       GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
386       maybe_add_deadline_filter,
387       const_cast<grpc_channel_filter*>(&grpc_client_deadline_filter));
388   grpc_channel_init_register_stage(
389       GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
390       maybe_add_deadline_filter,
391       const_cast<grpc_channel_filter*>(&grpc_server_deadline_filter));
392 }
393 
grpc_deadline_filter_shutdown(void)394 void grpc_deadline_filter_shutdown(void) {}
395