• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::ffi::CStr;
4 use std::pin::Pin;
5 use std::sync::Arc;
6 use std::time::Duration;
7 use std::{result, slice};
8 
9 use crate::grpc_sys::{
10     self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
11 };
12 use futures::future::Future;
13 use futures::ready;
14 use futures::sink::Sink;
15 use futures::stream::Stream;
16 use futures::task::{Context, Poll};
17 use parking_lot::Mutex;
18 
19 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
20 use crate::auth_context::AuthContext;
21 use crate::buf::GrpcSlice;
22 use crate::call::{
23     BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
24 };
25 use crate::codec::{DeserializeFn, SerializeFn};
26 use crate::cq::CompletionQueue;
27 use crate::error::{Error, Result};
28 use crate::metadata::Metadata;
29 use crate::server::ServerChecker;
30 use crate::server::{BoxHandler, RequestCallContext};
31 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
32 use crate::CheckResult;
33 
34 /// A time point that an rpc or operation should finished before it.
35 #[derive(Clone, Copy)]
36 pub struct Deadline {
37     pub(crate) spec: gpr_timespec,
38 }
39 
40 impl Deadline {
new(spec: gpr_timespec) -> Deadline41     fn new(spec: gpr_timespec) -> Deadline {
42         let realtime_spec =
43             unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
44 
45         Deadline {
46             spec: realtime_spec,
47         }
48     }
49 
50     /// Checks if the deadline is exceeded.
exceeded(self) -> bool51     pub fn exceeded(self) -> bool {
52         unsafe {
53             let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
54             grpc_sys::gpr_time_cmp(now, self.spec) >= 0
55         }
56     }
57 
spec(self) -> gpr_timespec58     pub(crate) fn spec(self) -> gpr_timespec {
59         self.spec
60     }
61 }
62 
63 impl From<Duration> for Deadline {
64     /// Build a deadline from given duration.
65     ///
66     /// The deadline will be `now + duration`.
67     #[inline]
from(dur: Duration) -> Deadline68     fn from(dur: Duration) -> Deadline {
69         Deadline::new(dur.into())
70     }
71 }
72 
73 /// Context for accepting a request.
74 pub struct RequestContext {
75     ctx: *mut grpcwrap_request_call_context,
76     request_call: Option<RequestCallContext>,
77 }
78 
79 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext80     pub fn new(rc: RequestCallContext) -> RequestContext {
81         let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
82 
83         RequestContext {
84             ctx,
85             request_call: Some(rc),
86         }
87     }
88 
89     /// Try to accept a client side streaming request.
90     ///
91     /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>92     pub fn handle_stream_req(
93         self,
94         cq: &CompletionQueue,
95         rc: &mut RequestCallContext,
96     ) -> result::Result<(), Self> {
97         let checker = rc.get_checker();
98         let handler = unsafe { rc.get_handler(self.method()) };
99         match handler {
100             Some(handler) => match handler.method_type() {
101                 MethodType::Unary | MethodType::ServerStreaming => Err(self),
102                 _ => {
103                     execute(self, cq, None, handler, checker);
104                     Ok(())
105                 }
106             },
107             None => {
108                 execute_unimplemented(self, cq.clone());
109                 Ok(())
110             }
111         }
112     }
113 
114     /// Accept a client side unary request.
115     ///
116     /// This method should be called after `handle_stream_req`. When handling
117     /// client side unary request, handler will only be called after the unary
118     /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)119     pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
120         // fetch message before calling callback.
121         let tag = Box::new(CallTag::unary_request(self, rc));
122         let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
123         let request_ctx = tag.request_ctx().unwrap().as_ptr();
124         let tag_ptr = Box::into_raw(tag);
125         unsafe {
126             let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
127             let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
128             if code != grpc_call_error::GRPC_CALL_OK {
129                 Box::from_raw(tag_ptr);
130                 // it should not failed.
131                 panic!("try to receive message fail: {:?}", code);
132             }
133         }
134     }
135 
take_request_call_context(&mut self) -> Option<RequestCallContext>136     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
137         self.request_call.take()
138     }
139 
as_ptr(&self) -> *mut grpcwrap_request_call_context140     pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
141         self.ctx
142     }
143 
call(&self, cq: CompletionQueue) -> Call144     fn call(&self, cq: CompletionQueue) -> Call {
145         unsafe {
146             // It is okay to use a mutable pointer on a immutable reference, `self`,
147             // because grpcwrap_request_call_context_ref_call is thread-safe.
148             let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
149             assert!(!call.is_null());
150             Call::from_raw(call, cq)
151         }
152     }
153 
method(&self) -> &[u8]154     pub fn method(&self) -> &[u8] {
155         let mut len = 0;
156         let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
157 
158         unsafe { slice::from_raw_parts(method as _, len) }
159     }
160 
host(&self) -> &[u8]161     fn host(&self) -> &[u8] {
162         let mut len = 0;
163         let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
164 
165         unsafe { slice::from_raw_parts(host as _, len) }
166     }
167 
deadline(&self) -> Deadline168     fn deadline(&self) -> Deadline {
169         let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
170 
171         Deadline::new(t)
172     }
173 
metadata(&self) -> &Metadata174     fn metadata(&self) -> &Metadata {
175         unsafe {
176             let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
177             let arr_ptr: *const Metadata = ptr as _;
178             &*arr_ptr
179         }
180     }
181 
peer(&self) -> String182     fn peer(&self) -> String {
183         unsafe {
184             // RequestContext always holds a reference of the call.
185             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
186             let p = grpc_sys::grpc_call_get_peer(call);
187             let peer = CStr::from_ptr(p)
188                 .to_str()
189                 .expect("valid UTF-8 data")
190                 .to_owned();
191             grpc_sys::gpr_free(p as _);
192             peer
193         }
194     }
195 
196     /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>197     fn auth_context(&self) -> Option<AuthContext> {
198         unsafe {
199             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
200             AuthContext::from_call_ptr(call)
201         }
202     }
203 }
204 
205 impl Drop for RequestContext {
drop(&mut self)206     fn drop(&mut self) {
207         unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
208     }
209 }
210 
211 /// A context for handling client side unary request.
212 pub struct UnaryRequestContext {
213     request: RequestContext,
214     request_call: Option<RequestCallContext>,
215     batch: BatchContext,
216 }
217 
218 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext219     pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
220         UnaryRequestContext {
221             request: ctx,
222             request_call: Some(rc),
223             batch: BatchContext::new(),
224         }
225     }
226 
batch_ctx(&self) -> &BatchContext227     pub fn batch_ctx(&self) -> &BatchContext {
228         &self.batch
229     }
230 
batch_ctx_mut(&mut self) -> &mut BatchContext231     pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
232         &mut self.batch
233     }
234 
request_ctx(&self) -> &RequestContext235     pub fn request_ctx(&self) -> &RequestContext {
236         &self.request
237     }
238 
take_request_call_context(&mut self) -> Option<RequestCallContext>239     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
240         self.request_call.take()
241     }
242 
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )243     pub fn handle(
244         self,
245         rc: &mut RequestCallContext,
246         cq: &CompletionQueue,
247         reader: Option<MessageReader>,
248     ) {
249         let checker = rc.get_checker();
250         let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
251         if reader.is_some() {
252             return execute(self.request, cq, reader, handler, checker);
253         }
254 
255         let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned());
256         self.request.call(cq.clone()).abort(&status)
257     }
258 }
259 
260 /// A stream for client a streaming call and a duplex streaming call.
261 ///
262 /// The corresponding RPC will be canceled if the stream did not
263 /// finish before dropping.
264 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
265 pub struct RequestStream<T> {
266     call: Arc<Mutex<ShareCall>>,
267     base: StreamingBase,
268     de: DeserializeFn<T>,
269 }
270 
271 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>272     fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
273         RequestStream {
274             call,
275             base: StreamingBase::new(None),
276             de,
277         }
278     }
279 }
280 
281 impl<T> Stream for RequestStream<T> {
282     type Item = Result<T>;
283 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>284     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
285         {
286             let mut call = self.call.lock();
287             call.check_alive()?;
288         }
289 
290         let t = &mut *self;
291         match ready!(t.base.poll(cx, &mut t.call, false)?) {
292             None => Poll::Ready(None),
293             Some(data) => Poll::Ready(Some((t.de)(data))),
294         }
295     }
296 }
297 
298 impl<T> Drop for RequestStream<T> {
299     /// The corresponding RPC will be canceled if the stream did not
300     /// finish before dropping.
drop(&mut self)301     fn drop(&mut self) {
302         self.base.on_drop(&mut self.call);
303     }
304 }
305 
306 /// A helper macro used to implement server side unary sink.
307 /// Not using generic here because we don't need to expose
308 /// `CallHolder` or `Call` to caller.
309 // TODO: Use type alias to be friendly for documentation.
310 macro_rules! impl_unary_sink {
311     ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
312         pub struct $rt {
313             call: $holder,
314             cq_f: Option<BatchFuture>,
315             err: Option<Error>,
316         }
317 
318         impl Future for $rt {
319             type Output = Result<()>;
320 
321             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
322                 if let Some(e) = self.err.take() {
323                     return Poll::Ready(Err(e));
324                 }
325 
326                 if self.cq_f.is_some() {
327                     ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
328                     self.cq_f.take();
329                 }
330 
331                 ready!(self.call.call(|c| c.poll_finish(cx))?);
332                 Poll::Ready(Ok(()))
333             }
334         }
335 
336         $(#[$attr])*
337         pub struct $t<T> {
338             call: Option<$holder>,
339             write_flags: u32,
340             ser: SerializeFn<T>,
341         }
342 
343         impl<T> $t<T> {
344             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
345                 $t {
346                     call: Some(call),
347                     write_flags: 0,
348                     ser,
349                 }
350             }
351 
352             pub fn success(self, t: T) -> $rt {
353                 self.complete(RpcStatus::ok(), Some(t))
354             }
355 
356             pub fn fail(self, status: RpcStatus) -> $rt {
357                 self.complete(status, None)
358             }
359 
360             fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
361                 let mut data = match t {
362                     Some(t) => {
363                         let mut buf = GrpcSlice::default();
364                         if let Err(e) = (self.ser)(&t, &mut buf) {
365                             return $rt {
366                                 call: self.call.take().unwrap(),
367                                 cq_f: None,
368                                 err: Some(e),
369                             };
370                         }
371                         Some(buf)
372                     }
373                     None => None,
374                 };
375 
376                 let write_flags = self.write_flags;
377                 let res = self.call.as_mut().unwrap().call(|c| {
378                     c.call
379                         .start_send_status_from_server(&status, true, &mut data, write_flags)
380                 });
381 
382                 let (cq_f, err) = match res {
383                     Ok(f) => (Some(f), None),
384                     Err(e) => (None, Some(e)),
385                 };
386 
387                 $rt {
388                     call: self.call.take().unwrap(),
389                     cq_f,
390                     err,
391                 }
392             }
393         }
394 
395         impl<T> Drop for $t<T> {
396             /// The corresponding RPC will be canceled if the sink did not
397             /// send a response before dropping.
398             fn drop(&mut self) {
399                 self.call
400                     .as_mut()
401                     .map(|call| call.call(|c| c.call.cancel()));
402             }
403         }
404     };
405 }
406 
407 impl_unary_sink!(
408     /// A sink for unary call.
409     ///
410     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
411     ///
412     /// [`success`]: #method.success
413     /// [`fail`]: #method.fail
414     #[must_use = "if unused the sink may immediately cancel the RPC"]
415     UnarySink,
416     UnarySinkResult,
417     ShareCall
418 );
419 impl_unary_sink!(
420     /// A sink for client streaming call.
421     ///
422     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
423     ///
424     /// [`success`]: #method.success
425     /// [`fail`]: #method.fail
426     #[must_use = "if unused the sink may immediately cancel the RPC"]
427     ClientStreamingSink,
428     ClientStreamingSinkResult,
429     Arc<Mutex<ShareCall>>
430 );
431 
432 // A macro helper to implement server side streaming sink.
433 macro_rules! impl_stream_sink {
434     ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
435         $(#[$attr])*
436         pub struct $t<T> {
437             call: Option<$holder>,
438             base: SinkBase,
439             flush_f: Option<BatchFuture>,
440             status: RpcStatus,
441             flushed: bool,
442             closed: bool,
443             ser: SerializeFn<T>,
444         }
445 
446         impl<T> $t<T> {
447             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
448                 $t {
449                     call: Some(call),
450                     base: SinkBase::new(true),
451                     flush_f: None,
452                     status: RpcStatus::ok(),
453                     flushed: false,
454                     closed: false,
455                     ser,
456                 }
457             }
458 
459             /// By default it always sends messages with their configured buffer hint. But when the
460             /// `enhance_batch` is enabled, messages will be batched together as many as possible.
461             /// The rules are listed as below:
462             /// - All messages except the last one will be sent with `buffer_hint` set to true.
463             /// - The last message will also be sent with `buffer_hint` set to true unless any message is
464             ///    offered with buffer hint set to false.
465             ///
466             /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
467             /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
468             pub fn enhance_batch(&mut self, flag: bool) {
469                 self.base.enhance_buffer_strategy = flag;
470             }
471 
472             pub fn set_status(&mut self, status: RpcStatus) {
473                 assert!(self.flush_f.is_none());
474                 self.status = status;
475             }
476 
477             pub fn fail(mut self, status: RpcStatus) -> $ft {
478                 assert!(self.flush_f.is_none());
479                 let send_metadata = self.base.send_metadata;
480                 let res = self.call.as_mut().unwrap().call(|c| {
481                     c.call
482                         .start_send_status_from_server(&status, send_metadata, &mut None, 0)
483                 });
484 
485                 let (fail_f, err) = match res {
486                     Ok(f) => (Some(f), None),
487                     Err(e) => (None, Some(e)),
488                 };
489 
490                 $ft {
491                     call: self.call.take().unwrap(),
492                     fail_f,
493                     err,
494                 }
495             }
496         }
497 
498         impl<T> Drop for $t<T> {
499             /// The corresponding RPC will be canceled if the sink did not call
500             /// [`close`] or [`fail`] before dropping.
501             ///
502             /// [`close`]: #method.close
503             /// [`fail`]: #method.fail
504             fn drop(&mut self) {
505                 // We did not close it explicitly and it was not dropped in the `fail`.
506                 if !self.closed && self.call.is_some() {
507                     let mut call = self.call.take().unwrap();
508                     call.call(|c| c.call.cancel());
509                 }
510             }
511         }
512 
513         impl<T> Sink<(T, WriteFlags)> for $t<T> {
514             type Error = Error;
515 
516             #[inline]
517             fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
518                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
519                     return Poll::Ready(Err(Error::RemoteStopped));
520                 }
521                 Pin::new(&mut self.base).poll_ready(cx)
522             }
523 
524             #[inline]
525             fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
526                 let t = &mut *self;
527                 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
528             }
529 
530             #[inline]
531             fn poll_flush(mut self: Pin<&mut Self>,  cx: &mut Context) -> Poll<Result<()>> {
532                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
533                     return Poll::Ready(Err(Error::RemoteStopped));
534                 }
535                 let t = &mut *self;
536                 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
537             }
538 
539             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
540                 if self.flush_f.is_none() {
541                     ready!(Pin::new(&mut self.base).poll_ready(cx)?);
542 
543                     let send_metadata = self.base.send_metadata;
544                     let t = &mut *self;
545                     let status = &t.status;
546                     let flush_f = t.call.as_mut().unwrap().call(|c| {
547                         c.call
548                             .start_send_status_from_server(status, send_metadata, &mut None, 0)
549                     })?;
550                     t.flush_f = Some(flush_f);
551                 }
552 
553                 if !self.flushed {
554                     ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
555                     self.flushed = true;
556                 }
557 
558                 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
559                 self.closed = true;
560                 Poll::Ready(Ok(()))
561             }
562         }
563 
564         #[must_use = "if unused the sink failure may immediately cancel the RPC"]
565         pub struct $ft {
566             call: $holder,
567             fail_f: Option<BatchFuture>,
568             err: Option<Error>,
569         }
570 
571         impl Future for $ft {
572             type Output = Result<()>;
573 
574             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
575                 if let Some(e) = self.err.take() {
576                     return Poll::Ready(Err(e));
577                 }
578 
579                 let readiness = self.call.call(|c| {
580                     if c.finished {
581                         return Poll::Ready(Ok(()));
582                     }
583 
584                     c.poll_finish(cx).map(|r| r.map(|_| ()))
585                 })?;
586 
587                 if let Some(ref mut f) = self.fail_f {
588                     ready!(Pin::new(f).poll(cx)?);
589                 }
590 
591                 self.fail_f.take();
592                 readiness.map(Ok)
593             }
594         }
595     };
596 }
597 
598 impl_stream_sink!(
599     /// A sink for server streaming call.
600     ///
601     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
602     ///
603     /// [`close`]: #method.close
604     /// [`fail`]: #method.fail
605     #[must_use = "if unused the sink may immediately cancel the RPC"]
606     ServerStreamingSink,
607     ServerStreamingSinkFailure,
608     ShareCall
609 );
610 impl_stream_sink!(
611     /// A sink for duplex streaming call.
612     ///
613     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
614     ///
615     /// [`close`]: #method.close
616     /// [`fail`]: #method.fail
617     #[must_use = "if unused the sink may immediately cancel the RPC"]
618     DuplexSink,
619     DuplexSinkFailure,
620     Arc<Mutex<ShareCall>>
621 );
622 
623 /// A context for rpc handling.
624 pub struct RpcContext<'a> {
625     ctx: RequestContext,
626     executor: Executor<'a>,
627     deadline: Deadline,
628 }
629 
630 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>631     fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
632         RpcContext {
633             deadline: ctx.deadline(),
634             ctx,
635             executor: Executor::new(cq),
636         }
637     }
638 
kicker(&self) -> Kicker639     fn kicker(&self) -> Kicker {
640         let call = self.call();
641         Kicker::from_call(call)
642     }
643 
call(&self) -> Call644     pub(crate) fn call(&self) -> Call {
645         self.ctx.call(self.executor.cq().clone())
646     }
647 
method(&self) -> &[u8]648     pub fn method(&self) -> &[u8] {
649         self.ctx.method()
650     }
651 
host(&self) -> &[u8]652     pub fn host(&self) -> &[u8] {
653         self.ctx.host()
654     }
655 
deadline(&self) -> Deadline656     pub fn deadline(&self) -> Deadline {
657         self.deadline
658     }
659 
660     /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata661     pub fn request_headers(&self) -> &Metadata {
662         self.ctx.metadata()
663     }
664 
peer(&self) -> String665     pub fn peer(&self) -> String {
666         self.ctx.peer()
667     }
668 
669     /// Wrapper around the gRPC Core AuthContext
670     ///
671     /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>672     pub fn auth_context(&self) -> Option<AuthContext> {
673         self.ctx.auth_context()
674     }
675 
676     /// Spawn the future into current gRPC poll thread.
677     ///
678     /// This can reduce a lot of context switching, but please make
679     /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,680     pub fn spawn<F>(&self, f: F)
681     where
682         F: Future<Output = ()> + Send + 'static,
683     {
684         self.executor.spawn(f, self.kicker())
685     }
686 }
687 
688 // Following four helper functions are used to create a callback closure.
689 
690 macro_rules! accept_call {
691     ($call:expr) => {
692         match $call.start_server_side() {
693             Err(Error::QueueShutdown) => return,
694             Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
695             Ok(f) => f,
696         }
697     };
698 }
699 
700 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),701 pub fn execute_unary<P, Q, F>(
702     ctx: RpcContext<'_>,
703     ser: SerializeFn<Q>,
704     de: DeserializeFn<P>,
705     payload: MessageReader,
706     f: &mut F,
707 ) where
708     F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
709 {
710     let mut call = ctx.call();
711     let close_f = accept_call!(call);
712     let request = match de(payload) {
713         Ok(f) => f,
714         Err(e) => {
715             let status = RpcStatus::with_message(
716                 RpcStatusCode::INTERNAL,
717                 format!("Failed to deserialize response message: {:?}", e),
718             );
719             call.abort(&status);
720             return;
721         }
722     };
723     let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
724     f(ctx, request, sink)
725 }
726 
727 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),728 pub fn execute_client_streaming<P, Q, F>(
729     ctx: RpcContext<'_>,
730     ser: SerializeFn<Q>,
731     de: DeserializeFn<P>,
732     f: &mut F,
733 ) where
734     F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
735 {
736     let mut call = ctx.call();
737     let close_f = accept_call!(call);
738     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
739 
740     let req_s = RequestStream::new(call.clone(), de);
741     let sink = ClientStreamingSink::new(call, ser);
742     f(ctx, req_s, sink)
743 }
744 
745 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),746 pub fn execute_server_streaming<P, Q, F>(
747     ctx: RpcContext<'_>,
748     ser: SerializeFn<Q>,
749     de: DeserializeFn<P>,
750     payload: MessageReader,
751     f: &mut F,
752 ) where
753     F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
754 {
755     let mut call = ctx.call();
756     let close_f = accept_call!(call);
757 
758     let request = match de(payload) {
759         Ok(t) => t,
760         Err(e) => {
761             let status = RpcStatus::with_message(
762                 RpcStatusCode::INTERNAL,
763                 format!("Failed to deserialize response message: {:?}", e),
764             );
765             call.abort(&status);
766             return;
767         }
768     };
769 
770     let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
771     f(ctx, request, sink)
772 }
773 
774 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),775 pub fn execute_duplex_streaming<P, Q, F>(
776     ctx: RpcContext<'_>,
777     ser: SerializeFn<Q>,
778     de: DeserializeFn<P>,
779     f: &mut F,
780 ) where
781     F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
782 {
783     let mut call = ctx.call();
784     let close_f = accept_call!(call);
785     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
786 
787     let req_s = RequestStream::new(call.clone(), de);
788     let sink = DuplexSink::new(call, ser);
789     f(ctx, req_s, sink)
790 }
791 
792 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)793 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
794     // Suppress needless-pass-by-value.
795     let ctx = ctx;
796     let mut call = ctx.call(cq);
797     accept_call!(call);
798     call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED))
799 }
800 
801 // Helper function to call handler.
802 //
803 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )804 fn execute(
805     ctx: RequestContext,
806     cq: &CompletionQueue,
807     payload: Option<MessageReader>,
808     f: &mut BoxHandler,
809     mut checkers: Vec<Box<dyn ServerChecker>>,
810 ) {
811     let rpc_ctx = RpcContext::new(ctx, cq);
812 
813     for handler in checkers.iter_mut() {
814         match handler.check(&rpc_ctx) {
815             CheckResult::Continue => {}
816             CheckResult::Abort(status) => {
817                 rpc_ctx.call().abort(&status);
818                 return;
819             }
820         }
821     }
822 
823     f.handle(rpc_ctx, payload)
824 }
825