1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::ffi::CStr;
4 use std::pin::Pin;
5 use std::sync::Arc;
6 use std::time::Duration;
7 use std::{result, slice};
8
9 use crate::grpc_sys::{
10 self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
11 };
12 use futures::future::Future;
13 use futures::ready;
14 use futures::sink::Sink;
15 use futures::stream::Stream;
16 use futures::task::{Context, Poll};
17 use parking_lot::Mutex;
18
19 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
20 use crate::auth_context::AuthContext;
21 use crate::buf::GrpcSlice;
22 use crate::call::{
23 BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
24 };
25 use crate::codec::{DeserializeFn, SerializeFn};
26 use crate::cq::CompletionQueue;
27 use crate::error::{Error, Result};
28 use crate::metadata::Metadata;
29 use crate::server::ServerChecker;
30 use crate::server::{BoxHandler, RequestCallContext};
31 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
32 use crate::CheckResult;
33
34 /// A time point that an rpc or operation should finished before it.
35 #[derive(Clone, Copy)]
36 pub struct Deadline {
37 pub(crate) spec: gpr_timespec,
38 }
39
40 impl Deadline {
new(spec: gpr_timespec) -> Deadline41 fn new(spec: gpr_timespec) -> Deadline {
42 let realtime_spec =
43 unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
44
45 Deadline {
46 spec: realtime_spec,
47 }
48 }
49
50 /// Checks if the deadline is exceeded.
exceeded(self) -> bool51 pub fn exceeded(self) -> bool {
52 unsafe {
53 let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
54 grpc_sys::gpr_time_cmp(now, self.spec) >= 0
55 }
56 }
57
spec(self) -> gpr_timespec58 pub(crate) fn spec(self) -> gpr_timespec {
59 self.spec
60 }
61 }
62
63 impl From<Duration> for Deadline {
64 /// Build a deadline from given duration.
65 ///
66 /// The deadline will be `now + duration`.
67 #[inline]
from(dur: Duration) -> Deadline68 fn from(dur: Duration) -> Deadline {
69 Deadline::new(dur.into())
70 }
71 }
72
73 /// Context for accepting a request.
74 pub struct RequestContext {
75 ctx: *mut grpcwrap_request_call_context,
76 request_call: Option<RequestCallContext>,
77 }
78
79 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext80 pub fn new(rc: RequestCallContext) -> RequestContext {
81 let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
82
83 RequestContext {
84 ctx,
85 request_call: Some(rc),
86 }
87 }
88
89 /// Try to accept a client side streaming request.
90 ///
91 /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>92 pub fn handle_stream_req(
93 self,
94 cq: &CompletionQueue,
95 rc: &mut RequestCallContext,
96 ) -> result::Result<(), Self> {
97 let checker = rc.get_checker();
98 let handler = unsafe { rc.get_handler(self.method()) };
99 match handler {
100 Some(handler) => match handler.method_type() {
101 MethodType::Unary | MethodType::ServerStreaming => Err(self),
102 _ => {
103 execute(self, cq, None, handler, checker);
104 Ok(())
105 }
106 },
107 None => {
108 execute_unimplemented(self, cq.clone());
109 Ok(())
110 }
111 }
112 }
113
114 /// Accept a client side unary request.
115 ///
116 /// This method should be called after `handle_stream_req`. When handling
117 /// client side unary request, handler will only be called after the unary
118 /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)119 pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
120 // fetch message before calling callback.
121 let tag = Box::new(CallTag::unary_request(self, rc));
122 let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
123 let request_ctx = tag.request_ctx().unwrap().as_ptr();
124 let tag_ptr = Box::into_raw(tag);
125 unsafe {
126 let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
127 let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
128 if code != grpc_call_error::GRPC_CALL_OK {
129 Box::from_raw(tag_ptr);
130 // it should not failed.
131 panic!("try to receive message fail: {:?}", code);
132 }
133 }
134 }
135
take_request_call_context(&mut self) -> Option<RequestCallContext>136 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
137 self.request_call.take()
138 }
139
as_ptr(&self) -> *mut grpcwrap_request_call_context140 pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
141 self.ctx
142 }
143
call(&self, cq: CompletionQueue) -> Call144 fn call(&self, cq: CompletionQueue) -> Call {
145 unsafe {
146 // It is okay to use a mutable pointer on a immutable reference, `self`,
147 // because grpcwrap_request_call_context_ref_call is thread-safe.
148 let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
149 assert!(!call.is_null());
150 Call::from_raw(call, cq)
151 }
152 }
153
method(&self) -> &[u8]154 pub fn method(&self) -> &[u8] {
155 let mut len = 0;
156 let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
157
158 unsafe { slice::from_raw_parts(method as _, len) }
159 }
160
host(&self) -> &[u8]161 fn host(&self) -> &[u8] {
162 let mut len = 0;
163 let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
164
165 unsafe { slice::from_raw_parts(host as _, len) }
166 }
167
deadline(&self) -> Deadline168 fn deadline(&self) -> Deadline {
169 let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
170
171 Deadline::new(t)
172 }
173
metadata(&self) -> &Metadata174 fn metadata(&self) -> &Metadata {
175 unsafe {
176 let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
177 let arr_ptr: *const Metadata = ptr as _;
178 &*arr_ptr
179 }
180 }
181
peer(&self) -> String182 fn peer(&self) -> String {
183 unsafe {
184 // RequestContext always holds a reference of the call.
185 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
186 let p = grpc_sys::grpc_call_get_peer(call);
187 let peer = CStr::from_ptr(p)
188 .to_str()
189 .expect("valid UTF-8 data")
190 .to_owned();
191 grpc_sys::gpr_free(p as _);
192 peer
193 }
194 }
195
196 /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>197 fn auth_context(&self) -> Option<AuthContext> {
198 unsafe {
199 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
200 AuthContext::from_call_ptr(call)
201 }
202 }
203 }
204
205 impl Drop for RequestContext {
drop(&mut self)206 fn drop(&mut self) {
207 unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
208 }
209 }
210
211 /// A context for handling client side unary request.
212 pub struct UnaryRequestContext {
213 request: RequestContext,
214 request_call: Option<RequestCallContext>,
215 batch: BatchContext,
216 }
217
218 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext219 pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
220 UnaryRequestContext {
221 request: ctx,
222 request_call: Some(rc),
223 batch: BatchContext::new(),
224 }
225 }
226
batch_ctx(&self) -> &BatchContext227 pub fn batch_ctx(&self) -> &BatchContext {
228 &self.batch
229 }
230
batch_ctx_mut(&mut self) -> &mut BatchContext231 pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
232 &mut self.batch
233 }
234
request_ctx(&self) -> &RequestContext235 pub fn request_ctx(&self) -> &RequestContext {
236 &self.request
237 }
238
take_request_call_context(&mut self) -> Option<RequestCallContext>239 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
240 self.request_call.take()
241 }
242
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )243 pub fn handle(
244 self,
245 rc: &mut RequestCallContext,
246 cq: &CompletionQueue,
247 reader: Option<MessageReader>,
248 ) {
249 let checker = rc.get_checker();
250 let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
251 if reader.is_some() {
252 return execute(self.request, cq, reader, handler, checker);
253 }
254
255 let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned());
256 self.request.call(cq.clone()).abort(&status)
257 }
258 }
259
260 /// A stream for client a streaming call and a duplex streaming call.
261 ///
262 /// The corresponding RPC will be canceled if the stream did not
263 /// finish before dropping.
264 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
265 pub struct RequestStream<T> {
266 call: Arc<Mutex<ShareCall>>,
267 base: StreamingBase,
268 de: DeserializeFn<T>,
269 }
270
271 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>272 fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
273 RequestStream {
274 call,
275 base: StreamingBase::new(None),
276 de,
277 }
278 }
279 }
280
281 impl<T> Stream for RequestStream<T> {
282 type Item = Result<T>;
283
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>284 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
285 {
286 let mut call = self.call.lock();
287 call.check_alive()?;
288 }
289
290 let t = &mut *self;
291 match ready!(t.base.poll(cx, &mut t.call, false)?) {
292 None => Poll::Ready(None),
293 Some(data) => Poll::Ready(Some((t.de)(data))),
294 }
295 }
296 }
297
298 impl<T> Drop for RequestStream<T> {
299 /// The corresponding RPC will be canceled if the stream did not
300 /// finish before dropping.
drop(&mut self)301 fn drop(&mut self) {
302 self.base.on_drop(&mut self.call);
303 }
304 }
305
306 /// A helper macro used to implement server side unary sink.
307 /// Not using generic here because we don't need to expose
308 /// `CallHolder` or `Call` to caller.
309 // TODO: Use type alias to be friendly for documentation.
310 macro_rules! impl_unary_sink {
311 ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
312 pub struct $rt {
313 call: $holder,
314 cq_f: Option<BatchFuture>,
315 err: Option<Error>,
316 }
317
318 impl Future for $rt {
319 type Output = Result<()>;
320
321 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
322 if let Some(e) = self.err.take() {
323 return Poll::Ready(Err(e));
324 }
325
326 if self.cq_f.is_some() {
327 ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
328 self.cq_f.take();
329 }
330
331 ready!(self.call.call(|c| c.poll_finish(cx))?);
332 Poll::Ready(Ok(()))
333 }
334 }
335
336 $(#[$attr])*
337 pub struct $t<T> {
338 call: Option<$holder>,
339 write_flags: u32,
340 ser: SerializeFn<T>,
341 }
342
343 impl<T> $t<T> {
344 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
345 $t {
346 call: Some(call),
347 write_flags: 0,
348 ser,
349 }
350 }
351
352 pub fn success(self, t: T) -> $rt {
353 self.complete(RpcStatus::ok(), Some(t))
354 }
355
356 pub fn fail(self, status: RpcStatus) -> $rt {
357 self.complete(status, None)
358 }
359
360 fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
361 let mut data = match t {
362 Some(t) => {
363 let mut buf = GrpcSlice::default();
364 if let Err(e) = (self.ser)(&t, &mut buf) {
365 return $rt {
366 call: self.call.take().unwrap(),
367 cq_f: None,
368 err: Some(e),
369 };
370 }
371 Some(buf)
372 }
373 None => None,
374 };
375
376 let write_flags = self.write_flags;
377 let res = self.call.as_mut().unwrap().call(|c| {
378 c.call
379 .start_send_status_from_server(&status, true, &mut data, write_flags)
380 });
381
382 let (cq_f, err) = match res {
383 Ok(f) => (Some(f), None),
384 Err(e) => (None, Some(e)),
385 };
386
387 $rt {
388 call: self.call.take().unwrap(),
389 cq_f,
390 err,
391 }
392 }
393 }
394
395 impl<T> Drop for $t<T> {
396 /// The corresponding RPC will be canceled if the sink did not
397 /// send a response before dropping.
398 fn drop(&mut self) {
399 self.call
400 .as_mut()
401 .map(|call| call.call(|c| c.call.cancel()));
402 }
403 }
404 };
405 }
406
407 impl_unary_sink!(
408 /// A sink for unary call.
409 ///
410 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
411 ///
412 /// [`success`]: #method.success
413 /// [`fail`]: #method.fail
414 #[must_use = "if unused the sink may immediately cancel the RPC"]
415 UnarySink,
416 UnarySinkResult,
417 ShareCall
418 );
419 impl_unary_sink!(
420 /// A sink for client streaming call.
421 ///
422 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
423 ///
424 /// [`success`]: #method.success
425 /// [`fail`]: #method.fail
426 #[must_use = "if unused the sink may immediately cancel the RPC"]
427 ClientStreamingSink,
428 ClientStreamingSinkResult,
429 Arc<Mutex<ShareCall>>
430 );
431
432 // A macro helper to implement server side streaming sink.
433 macro_rules! impl_stream_sink {
434 ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
435 $(#[$attr])*
436 pub struct $t<T> {
437 call: Option<$holder>,
438 base: SinkBase,
439 flush_f: Option<BatchFuture>,
440 status: RpcStatus,
441 flushed: bool,
442 closed: bool,
443 ser: SerializeFn<T>,
444 }
445
446 impl<T> $t<T> {
447 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
448 $t {
449 call: Some(call),
450 base: SinkBase::new(true),
451 flush_f: None,
452 status: RpcStatus::ok(),
453 flushed: false,
454 closed: false,
455 ser,
456 }
457 }
458
459 /// By default it always sends messages with their configured buffer hint. But when the
460 /// `enhance_batch` is enabled, messages will be batched together as many as possible.
461 /// The rules are listed as below:
462 /// - All messages except the last one will be sent with `buffer_hint` set to true.
463 /// - The last message will also be sent with `buffer_hint` set to true unless any message is
464 /// offered with buffer hint set to false.
465 ///
466 /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
467 /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
468 pub fn enhance_batch(&mut self, flag: bool) {
469 self.base.enhance_buffer_strategy = flag;
470 }
471
472 pub fn set_status(&mut self, status: RpcStatus) {
473 assert!(self.flush_f.is_none());
474 self.status = status;
475 }
476
477 pub fn fail(mut self, status: RpcStatus) -> $ft {
478 assert!(self.flush_f.is_none());
479 let send_metadata = self.base.send_metadata;
480 let res = self.call.as_mut().unwrap().call(|c| {
481 c.call
482 .start_send_status_from_server(&status, send_metadata, &mut None, 0)
483 });
484
485 let (fail_f, err) = match res {
486 Ok(f) => (Some(f), None),
487 Err(e) => (None, Some(e)),
488 };
489
490 $ft {
491 call: self.call.take().unwrap(),
492 fail_f,
493 err,
494 }
495 }
496 }
497
498 impl<T> Drop for $t<T> {
499 /// The corresponding RPC will be canceled if the sink did not call
500 /// [`close`] or [`fail`] before dropping.
501 ///
502 /// [`close`]: #method.close
503 /// [`fail`]: #method.fail
504 fn drop(&mut self) {
505 // We did not close it explicitly and it was not dropped in the `fail`.
506 if !self.closed && self.call.is_some() {
507 let mut call = self.call.take().unwrap();
508 call.call(|c| c.call.cancel());
509 }
510 }
511 }
512
513 impl<T> Sink<(T, WriteFlags)> for $t<T> {
514 type Error = Error;
515
516 #[inline]
517 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
518 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
519 return Poll::Ready(Err(Error::RemoteStopped));
520 }
521 Pin::new(&mut self.base).poll_ready(cx)
522 }
523
524 #[inline]
525 fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
526 let t = &mut *self;
527 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
528 }
529
530 #[inline]
531 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
532 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
533 return Poll::Ready(Err(Error::RemoteStopped));
534 }
535 let t = &mut *self;
536 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
537 }
538
539 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
540 if self.flush_f.is_none() {
541 ready!(Pin::new(&mut self.base).poll_ready(cx)?);
542
543 let send_metadata = self.base.send_metadata;
544 let t = &mut *self;
545 let status = &t.status;
546 let flush_f = t.call.as_mut().unwrap().call(|c| {
547 c.call
548 .start_send_status_from_server(status, send_metadata, &mut None, 0)
549 })?;
550 t.flush_f = Some(flush_f);
551 }
552
553 if !self.flushed {
554 ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
555 self.flushed = true;
556 }
557
558 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
559 self.closed = true;
560 Poll::Ready(Ok(()))
561 }
562 }
563
564 #[must_use = "if unused the sink failure may immediately cancel the RPC"]
565 pub struct $ft {
566 call: $holder,
567 fail_f: Option<BatchFuture>,
568 err: Option<Error>,
569 }
570
571 impl Future for $ft {
572 type Output = Result<()>;
573
574 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
575 if let Some(e) = self.err.take() {
576 return Poll::Ready(Err(e));
577 }
578
579 let readiness = self.call.call(|c| {
580 if c.finished {
581 return Poll::Ready(Ok(()));
582 }
583
584 c.poll_finish(cx).map(|r| r.map(|_| ()))
585 })?;
586
587 if let Some(ref mut f) = self.fail_f {
588 ready!(Pin::new(f).poll(cx)?);
589 }
590
591 self.fail_f.take();
592 readiness.map(Ok)
593 }
594 }
595 };
596 }
597
598 impl_stream_sink!(
599 /// A sink for server streaming call.
600 ///
601 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
602 ///
603 /// [`close`]: #method.close
604 /// [`fail`]: #method.fail
605 #[must_use = "if unused the sink may immediately cancel the RPC"]
606 ServerStreamingSink,
607 ServerStreamingSinkFailure,
608 ShareCall
609 );
610 impl_stream_sink!(
611 /// A sink for duplex streaming call.
612 ///
613 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
614 ///
615 /// [`close`]: #method.close
616 /// [`fail`]: #method.fail
617 #[must_use = "if unused the sink may immediately cancel the RPC"]
618 DuplexSink,
619 DuplexSinkFailure,
620 Arc<Mutex<ShareCall>>
621 );
622
623 /// A context for rpc handling.
624 pub struct RpcContext<'a> {
625 ctx: RequestContext,
626 executor: Executor<'a>,
627 deadline: Deadline,
628 }
629
630 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>631 fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
632 RpcContext {
633 deadline: ctx.deadline(),
634 ctx,
635 executor: Executor::new(cq),
636 }
637 }
638
kicker(&self) -> Kicker639 fn kicker(&self) -> Kicker {
640 let call = self.call();
641 Kicker::from_call(call)
642 }
643
call(&self) -> Call644 pub(crate) fn call(&self) -> Call {
645 self.ctx.call(self.executor.cq().clone())
646 }
647
method(&self) -> &[u8]648 pub fn method(&self) -> &[u8] {
649 self.ctx.method()
650 }
651
host(&self) -> &[u8]652 pub fn host(&self) -> &[u8] {
653 self.ctx.host()
654 }
655
deadline(&self) -> Deadline656 pub fn deadline(&self) -> Deadline {
657 self.deadline
658 }
659
660 /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata661 pub fn request_headers(&self) -> &Metadata {
662 self.ctx.metadata()
663 }
664
peer(&self) -> String665 pub fn peer(&self) -> String {
666 self.ctx.peer()
667 }
668
669 /// Wrapper around the gRPC Core AuthContext
670 ///
671 /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>672 pub fn auth_context(&self) -> Option<AuthContext> {
673 self.ctx.auth_context()
674 }
675
676 /// Spawn the future into current gRPC poll thread.
677 ///
678 /// This can reduce a lot of context switching, but please make
679 /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,680 pub fn spawn<F>(&self, f: F)
681 where
682 F: Future<Output = ()> + Send + 'static,
683 {
684 self.executor.spawn(f, self.kicker())
685 }
686 }
687
688 // Following four helper functions are used to create a callback closure.
689
690 macro_rules! accept_call {
691 ($call:expr) => {
692 match $call.start_server_side() {
693 Err(Error::QueueShutdown) => return,
694 Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
695 Ok(f) => f,
696 }
697 };
698 }
699
700 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),701 pub fn execute_unary<P, Q, F>(
702 ctx: RpcContext<'_>,
703 ser: SerializeFn<Q>,
704 de: DeserializeFn<P>,
705 payload: MessageReader,
706 f: &mut F,
707 ) where
708 F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
709 {
710 let mut call = ctx.call();
711 let close_f = accept_call!(call);
712 let request = match de(payload) {
713 Ok(f) => f,
714 Err(e) => {
715 let status = RpcStatus::with_message(
716 RpcStatusCode::INTERNAL,
717 format!("Failed to deserialize response message: {:?}", e),
718 );
719 call.abort(&status);
720 return;
721 }
722 };
723 let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
724 f(ctx, request, sink)
725 }
726
727 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),728 pub fn execute_client_streaming<P, Q, F>(
729 ctx: RpcContext<'_>,
730 ser: SerializeFn<Q>,
731 de: DeserializeFn<P>,
732 f: &mut F,
733 ) where
734 F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
735 {
736 let mut call = ctx.call();
737 let close_f = accept_call!(call);
738 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
739
740 let req_s = RequestStream::new(call.clone(), de);
741 let sink = ClientStreamingSink::new(call, ser);
742 f(ctx, req_s, sink)
743 }
744
745 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),746 pub fn execute_server_streaming<P, Q, F>(
747 ctx: RpcContext<'_>,
748 ser: SerializeFn<Q>,
749 de: DeserializeFn<P>,
750 payload: MessageReader,
751 f: &mut F,
752 ) where
753 F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
754 {
755 let mut call = ctx.call();
756 let close_f = accept_call!(call);
757
758 let request = match de(payload) {
759 Ok(t) => t,
760 Err(e) => {
761 let status = RpcStatus::with_message(
762 RpcStatusCode::INTERNAL,
763 format!("Failed to deserialize response message: {:?}", e),
764 );
765 call.abort(&status);
766 return;
767 }
768 };
769
770 let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
771 f(ctx, request, sink)
772 }
773
774 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),775 pub fn execute_duplex_streaming<P, Q, F>(
776 ctx: RpcContext<'_>,
777 ser: SerializeFn<Q>,
778 de: DeserializeFn<P>,
779 f: &mut F,
780 ) where
781 F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
782 {
783 let mut call = ctx.call();
784 let close_f = accept_call!(call);
785 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
786
787 let req_s = RequestStream::new(call.clone(), de);
788 let sink = DuplexSink::new(call, ser);
789 f(ctx, req_s, sink)
790 }
791
792 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)793 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
794 // Suppress needless-pass-by-value.
795 let ctx = ctx;
796 let mut call = ctx.call(cq);
797 accept_call!(call);
798 call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED))
799 }
800
801 // Helper function to call handler.
802 //
803 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )804 fn execute(
805 ctx: RequestContext,
806 cq: &CompletionQueue,
807 payload: Option<MessageReader>,
808 f: &mut BoxHandler,
809 mut checkers: Vec<Box<dyn ServerChecker>>,
810 ) {
811 let rpc_ctx = RpcContext::new(ctx, cq);
812
813 for handler in checkers.iter_mut() {
814 match handler.check(&rpc_ctx) {
815 CheckResult::Continue => {}
816 CheckResult::Abort(status) => {
817 rpc_ctx.call().abort(&status);
818 return;
819 }
820 }
821 }
822
823 f.handle(rpc_ctx, payload)
824 }
825