1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::ffi::CStr;
4 use std::pin::Pin;
5 use std::sync::Arc;
6 use std::time::Duration;
7 use std::{result, slice};
8
9 use crate::grpc_sys::{
10 self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
11 };
12 use futures::future::Future;
13 use futures::ready;
14 use futures::sink::Sink;
15 use futures::stream::Stream;
16 use futures::task::{Context, Poll};
17 use parking_lot::Mutex;
18
19 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
20 use crate::auth_context::AuthContext;
21 use crate::buf::GrpcSlice;
22 use crate::call::{
23 BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
24 };
25 use crate::codec::{DeserializeFn, SerializeFn};
26 use crate::cq::CompletionQueue;
27 use crate::error::{Error, Result};
28 use crate::metadata::Metadata;
29 use crate::server::ServerChecker;
30 use crate::server::{BoxHandler, RequestCallContext};
31 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
32 use crate::CheckResult;
33
34 /// A time point that an rpc or operation should finished before it.
35 #[derive(Clone, Copy)]
36 pub struct Deadline {
37 pub(crate) spec: gpr_timespec,
38 }
39
40 impl Deadline {
new(spec: gpr_timespec) -> Deadline41 fn new(spec: gpr_timespec) -> Deadline {
42 let realtime_spec =
43 unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
44
45 Deadline {
46 spec: realtime_spec,
47 }
48 }
49
50 /// Checks if the deadline is exceeded.
exceeded(self) -> bool51 pub fn exceeded(self) -> bool {
52 unsafe {
53 let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
54 grpc_sys::gpr_time_cmp(now, self.spec) >= 0
55 }
56 }
57
spec(self) -> gpr_timespec58 pub(crate) fn spec(self) -> gpr_timespec {
59 self.spec
60 }
61 }
62
63 impl From<Duration> for Deadline {
64 /// Build a deadline from given duration.
65 ///
66 /// The deadline will be `now + duration`.
67 #[inline]
from(dur: Duration) -> Deadline68 fn from(dur: Duration) -> Deadline {
69 Deadline::new(dur.into())
70 }
71 }
72
73 /// Context for accepting a request.
74 pub struct RequestContext {
75 ctx: *mut grpcwrap_request_call_context,
76 request_call: Option<RequestCallContext>,
77 }
78
79 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext80 pub fn new(rc: RequestCallContext) -> RequestContext {
81 let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
82
83 RequestContext {
84 ctx,
85 request_call: Some(rc),
86 }
87 }
88
89 /// Try to accept a client side streaming request.
90 ///
91 /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>92 pub fn handle_stream_req(
93 self,
94 cq: &CompletionQueue,
95 rc: &mut RequestCallContext,
96 ) -> result::Result<(), Self> {
97 let checker = rc.get_checker();
98 let handler = unsafe { rc.get_handler(self.method()) };
99 match handler {
100 Some(handler) => match handler.method_type() {
101 MethodType::Unary | MethodType::ServerStreaming => Err(self),
102 _ => {
103 execute(self, cq, None, handler, checker);
104 Ok(())
105 }
106 },
107 None => {
108 execute_unimplemented(self, cq.clone());
109 Ok(())
110 }
111 }
112 }
113
114 /// Accept a client side unary request.
115 ///
116 /// This method should be called after `handle_stream_req`. When handling
117 /// client side unary request, handler will only be called after the unary
118 /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)119 pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
120 // fetch message before calling callback.
121 let tag = Box::new(CallTag::unary_request(self, rc));
122 let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
123 let request_ctx = tag.request_ctx().unwrap().as_ptr();
124 let tag_ptr = Box::into_raw(tag);
125 unsafe {
126 let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
127 let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
128 if code != grpc_call_error::GRPC_CALL_OK {
129 Box::from_raw(tag_ptr);
130 // it should not failed.
131 panic!("try to receive message fail: {:?}", code);
132 }
133 }
134 }
135
take_request_call_context(&mut self) -> Option<RequestCallContext>136 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
137 self.request_call.take()
138 }
139
as_ptr(&self) -> *mut grpcwrap_request_call_context140 pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
141 self.ctx
142 }
143
call(&self, cq: CompletionQueue) -> Call144 fn call(&self, cq: CompletionQueue) -> Call {
145 unsafe {
146 // It is okay to use a mutable pointer on a immutable reference, `self`,
147 // because grpcwrap_request_call_context_ref_call is thread-safe.
148 let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
149 assert!(!call.is_null());
150 Call::from_raw(call, cq)
151 }
152 }
153
method(&self) -> &[u8]154 pub fn method(&self) -> &[u8] {
155 let mut len = 0;
156 let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
157
158 unsafe { slice::from_raw_parts(method as _, len) }
159 }
160
host(&self) -> &[u8]161 fn host(&self) -> &[u8] {
162 let mut len = 0;
163 let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
164
165 unsafe { slice::from_raw_parts(host as _, len) }
166 }
167
deadline(&self) -> Deadline168 fn deadline(&self) -> Deadline {
169 let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
170
171 Deadline::new(t)
172 }
173
metadata(&self) -> &Metadata174 fn metadata(&self) -> &Metadata {
175 unsafe {
176 let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
177 let arr_ptr: *const Metadata = ptr as _;
178 &*arr_ptr
179 }
180 }
181
peer(&self) -> String182 fn peer(&self) -> String {
183 unsafe {
184 // RequestContext always holds a reference of the call.
185 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
186 let p = grpc_sys::grpc_call_get_peer(call);
187 let peer = CStr::from_ptr(p)
188 .to_str()
189 .expect("valid UTF-8 data")
190 .to_owned();
191 grpc_sys::gpr_free(p as _);
192 peer
193 }
194 }
195
196 /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>197 fn auth_context(&self) -> Option<AuthContext> {
198 unsafe {
199 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
200 AuthContext::from_call_ptr(call)
201 }
202 }
203 }
204
205 impl Drop for RequestContext {
drop(&mut self)206 fn drop(&mut self) {
207 unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
208 }
209 }
210
211 /// A context for handling client side unary request.
212 pub struct UnaryRequestContext {
213 request: RequestContext,
214 request_call: Option<RequestCallContext>,
215 batch: BatchContext,
216 }
217
218 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext219 pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
220 UnaryRequestContext {
221 request: ctx,
222 request_call: Some(rc),
223 batch: BatchContext::new(),
224 }
225 }
226
batch_ctx(&self) -> &BatchContext227 pub fn batch_ctx(&self) -> &BatchContext {
228 &self.batch
229 }
230
batch_ctx_mut(&mut self) -> &mut BatchContext231 pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
232 &mut self.batch
233 }
234
request_ctx(&self) -> &RequestContext235 pub fn request_ctx(&self) -> &RequestContext {
236 &self.request
237 }
238
take_request_call_context(&mut self) -> Option<RequestCallContext>239 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
240 self.request_call.take()
241 }
242
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )243 pub fn handle(
244 self,
245 rc: &mut RequestCallContext,
246 cq: &CompletionQueue,
247 reader: Option<MessageReader>,
248 ) {
249 let checker = rc.get_checker();
250 let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
251 if reader.is_some() {
252 return execute(self.request, cq, reader, handler, checker);
253 }
254
255 let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned()));
256 self.request.call(cq.clone()).abort(&status)
257 }
258 }
259
260 /// A stream for client a streaming call and a duplex streaming call.
261 ///
262 /// The corresponding RPC will be canceled if the stream did not
263 /// finish before dropping.
264 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
265 pub struct RequestStream<T> {
266 call: Arc<Mutex<ShareCall>>,
267 base: StreamingBase,
268 de: DeserializeFn<T>,
269 }
270
271 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>272 fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
273 RequestStream {
274 call,
275 base: StreamingBase::new(None),
276 de,
277 }
278 }
279 }
280
281 impl<T> Stream for RequestStream<T> {
282 type Item = Result<T>;
283
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>284 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
285 {
286 let mut call = self.call.lock();
287 call.check_alive()?;
288 }
289
290 let t = &mut *self;
291 match ready!(t.base.poll(cx, &mut t.call, false)?) {
292 None => Poll::Ready(None),
293 Some(data) => Poll::Ready(Some((t.de)(data))),
294 }
295 }
296 }
297
298 impl<T> Drop for RequestStream<T> {
299 /// The corresponding RPC will be canceled if the stream did not
300 /// finish before dropping.
drop(&mut self)301 fn drop(&mut self) {
302 self.base.on_drop(&mut self.call);
303 }
304 }
305
306 /// A helper macro used to implement server side unary sink.
307 /// Not using generic here because we don't need to expose
308 /// `CallHolder` or `Call` to caller.
309 // TODO: Use type alias to be friendly for documentation.
310 macro_rules! impl_unary_sink {
311 ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
312 pub struct $rt {
313 call: $holder,
314 cq_f: Option<BatchFuture>,
315 err: Option<Error>,
316 }
317
318 impl Future for $rt {
319 type Output = Result<()>;
320
321 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
322 if let Some(e) = self.err.take() {
323 return Poll::Ready(Err(e));
324 }
325
326 if self.cq_f.is_some() {
327 ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
328 self.cq_f.take();
329 }
330
331 ready!(self.call.call(|c| c.poll_finish(cx))?);
332 Poll::Ready(Ok(()))
333 }
334 }
335
336 $(#[$attr])*
337 pub struct $t<T> {
338 call: Option<$holder>,
339 write_flags: u32,
340 ser: SerializeFn<T>,
341 }
342
343 impl<T> $t<T> {
344 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
345 $t {
346 call: Some(call),
347 write_flags: 0,
348 ser,
349 }
350 }
351
352 pub fn success(self, t: T) -> $rt {
353 self.complete(RpcStatus::ok(), Some(t))
354 }
355
356 pub fn fail(self, status: RpcStatus) -> $rt {
357 self.complete(status, None)
358 }
359
360 fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
361 let mut data = t.as_ref().map(|t| {
362 let mut buf = GrpcSlice::default();
363 (self.ser)(t, &mut buf);
364 buf
365 });
366
367 let write_flags = self.write_flags;
368 let res = self.call.as_mut().unwrap().call(|c| {
369 c.call
370 .start_send_status_from_server(&status, true, &mut data, write_flags)
371 });
372
373 let (cq_f, err) = match res {
374 Ok(f) => (Some(f), None),
375 Err(e) => (None, Some(e)),
376 };
377
378 $rt {
379 call: self.call.take().unwrap(),
380 cq_f,
381 err,
382 }
383 }
384 }
385
386 impl<T> Drop for $t<T> {
387 /// The corresponding RPC will be canceled if the sink did not
388 /// send a response before dropping.
389 fn drop(&mut self) {
390 self.call
391 .as_mut()
392 .map(|call| call.call(|c| c.call.cancel()));
393 }
394 }
395 };
396 }
397
398 impl_unary_sink!(
399 /// A sink for unary call.
400 ///
401 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
402 ///
403 /// [`success`]: #method.success
404 /// [`fail`]: #method.fail
405 #[must_use = "if unused the sink may immediately cancel the RPC"]
406 UnarySink,
407 UnarySinkResult,
408 ShareCall
409 );
410 impl_unary_sink!(
411 /// A sink for client streaming call.
412 ///
413 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
414 ///
415 /// [`success`]: #method.success
416 /// [`fail`]: #method.fail
417 #[must_use = "if unused the sink may immediately cancel the RPC"]
418 ClientStreamingSink,
419 ClientStreamingSinkResult,
420 Arc<Mutex<ShareCall>>
421 );
422
423 // A macro helper to implement server side streaming sink.
424 macro_rules! impl_stream_sink {
425 ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
426 $(#[$attr])*
427 pub struct $t<T> {
428 call: Option<$holder>,
429 base: SinkBase,
430 flush_f: Option<BatchFuture>,
431 status: RpcStatus,
432 flushed: bool,
433 closed: bool,
434 ser: SerializeFn<T>,
435 }
436
437 impl<T> $t<T> {
438 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
439 $t {
440 call: Some(call),
441 base: SinkBase::new(true),
442 flush_f: None,
443 status: RpcStatus::ok(),
444 flushed: false,
445 closed: false,
446 ser,
447 }
448 }
449
450 /// By default it always sends messages with their configured buffer hint. But when the
451 /// `enhance_batch` is enabled, messages will be batched together as many as possible.
452 /// The rules are listed as below:
453 /// - All messages except the last one will be sent with `buffer_hint` set to true.
454 /// - The last message will also be sent with `buffer_hint` set to true unless any message is
455 /// offered with buffer hint set to false.
456 ///
457 /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
458 /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
459 pub fn enhance_batch(&mut self, flag: bool) {
460 self.base.enhance_buffer_strategy = flag;
461 }
462
463 pub fn set_status(&mut self, status: RpcStatus) {
464 assert!(self.flush_f.is_none());
465 self.status = status;
466 }
467
468 pub fn fail(mut self, status: RpcStatus) -> $ft {
469 assert!(self.flush_f.is_none());
470 let send_metadata = self.base.send_metadata;
471 let res = self.call.as_mut().unwrap().call(|c| {
472 c.call
473 .start_send_status_from_server(&status, send_metadata, &mut None, 0)
474 });
475
476 let (fail_f, err) = match res {
477 Ok(f) => (Some(f), None),
478 Err(e) => (None, Some(e)),
479 };
480
481 $ft {
482 call: self.call.take().unwrap(),
483 fail_f,
484 err,
485 }
486 }
487 }
488
489 impl<T> Drop for $t<T> {
490 /// The corresponding RPC will be canceled if the sink did not call
491 /// [`close`] or [`fail`] before dropping.
492 ///
493 /// [`close`]: #method.close
494 /// [`fail`]: #method.fail
495 fn drop(&mut self) {
496 // We did not close it explicitly and it was not dropped in the `fail`.
497 if !self.closed && self.call.is_some() {
498 let mut call = self.call.take().unwrap();
499 call.call(|c| c.call.cancel());
500 }
501 }
502 }
503
504 impl<T> Sink<(T, WriteFlags)> for $t<T> {
505 type Error = Error;
506
507 #[inline]
508 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
509 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
510 return Poll::Ready(Err(Error::RemoteStopped));
511 }
512 Pin::new(&mut self.base).poll_ready(cx)
513 }
514
515 #[inline]
516 fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
517 let t = &mut *self;
518 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
519 }
520
521 #[inline]
522 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
523 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
524 return Poll::Ready(Err(Error::RemoteStopped));
525 }
526 let t = &mut *self;
527 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
528 }
529
530 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
531 if self.flush_f.is_none() {
532 ready!(Pin::new(&mut self.base).poll_ready(cx)?);
533
534 let send_metadata = self.base.send_metadata;
535 let t = &mut *self;
536 let status = &t.status;
537 let flush_f = t.call.as_mut().unwrap().call(|c| {
538 c.call
539 .start_send_status_from_server(status, send_metadata, &mut None, 0)
540 })?;
541 t.flush_f = Some(flush_f);
542 }
543
544 if !self.flushed {
545 ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
546 self.flushed = true;
547 }
548
549 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
550 self.closed = true;
551 Poll::Ready(Ok(()))
552 }
553 }
554
555 #[must_use = "if unused the sink failure may immediately cancel the RPC"]
556 pub struct $ft {
557 call: $holder,
558 fail_f: Option<BatchFuture>,
559 err: Option<Error>,
560 }
561
562 impl Future for $ft {
563 type Output = Result<()>;
564
565 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
566 if let Some(e) = self.err.take() {
567 return Poll::Ready(Err(e));
568 }
569
570 let readiness = self.call.call(|c| {
571 if c.finished {
572 return Poll::Ready(Ok(()));
573 }
574
575 c.poll_finish(cx).map(|r| r.map(|_| ()))
576 })?;
577
578 if let Some(ref mut f) = self.fail_f {
579 ready!(Pin::new(f).poll(cx)?);
580 }
581
582 self.fail_f.take();
583 readiness.map(Ok)
584 }
585 }
586 };
587 }
588
589 impl_stream_sink!(
590 /// A sink for server streaming call.
591 ///
592 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
593 ///
594 /// [`close`]: #method.close
595 /// [`fail`]: #method.fail
596 #[must_use = "if unused the sink may immediately cancel the RPC"]
597 ServerStreamingSink,
598 ServerStreamingSinkFailure,
599 ShareCall
600 );
601 impl_stream_sink!(
602 /// A sink for duplex streaming call.
603 ///
604 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
605 ///
606 /// [`close`]: #method.close
607 /// [`fail`]: #method.fail
608 #[must_use = "if unused the sink may immediately cancel the RPC"]
609 DuplexSink,
610 DuplexSinkFailure,
611 Arc<Mutex<ShareCall>>
612 );
613
614 /// A context for rpc handling.
615 pub struct RpcContext<'a> {
616 ctx: RequestContext,
617 executor: Executor<'a>,
618 deadline: Deadline,
619 }
620
621 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>622 fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
623 RpcContext {
624 deadline: ctx.deadline(),
625 ctx,
626 executor: Executor::new(cq),
627 }
628 }
629
kicker(&self) -> Kicker630 fn kicker(&self) -> Kicker {
631 let call = self.call();
632 Kicker::from_call(call)
633 }
634
call(&self) -> Call635 pub(crate) fn call(&self) -> Call {
636 self.ctx.call(self.executor.cq().clone())
637 }
638
method(&self) -> &[u8]639 pub fn method(&self) -> &[u8] {
640 self.ctx.method()
641 }
642
host(&self) -> &[u8]643 pub fn host(&self) -> &[u8] {
644 self.ctx.host()
645 }
646
deadline(&self) -> Deadline647 pub fn deadline(&self) -> Deadline {
648 self.deadline
649 }
650
651 /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata652 pub fn request_headers(&self) -> &Metadata {
653 self.ctx.metadata()
654 }
655
peer(&self) -> String656 pub fn peer(&self) -> String {
657 self.ctx.peer()
658 }
659
660 /// Wrapper around the gRPC Core AuthContext
661 ///
662 /// If the server binds in non-secure mode, this will return None
auth_context(&self) -> Option<AuthContext>663 pub fn auth_context(&self) -> Option<AuthContext> {
664 self.ctx.auth_context()
665 }
666
667 /// Spawn the future into current gRPC poll thread.
668 ///
669 /// This can reduce a lot of context switching, but please make
670 /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,671 pub fn spawn<F>(&self, f: F)
672 where
673 F: Future<Output = ()> + Send + 'static,
674 {
675 self.executor.spawn(f, self.kicker())
676 }
677 }
678
679 // Following four helper functions are used to create a callback closure.
680
681 macro_rules! accept_call {
682 ($call:expr) => {
683 match $call.start_server_side() {
684 Err(Error::QueueShutdown) => return,
685 Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
686 Ok(f) => f,
687 }
688 };
689 }
690
691 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),692 pub fn execute_unary<P, Q, F>(
693 ctx: RpcContext<'_>,
694 ser: SerializeFn<Q>,
695 de: DeserializeFn<P>,
696 payload: MessageReader,
697 f: &mut F,
698 ) where
699 F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
700 {
701 let mut call = ctx.call();
702 let close_f = accept_call!(call);
703 let request = match de(payload) {
704 Ok(f) => f,
705 Err(e) => {
706 let status = RpcStatus::new(
707 RpcStatusCode::INTERNAL,
708 Some(format!("Failed to deserialize response message: {:?}", e)),
709 );
710 call.abort(&status);
711 return;
712 }
713 };
714 let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
715 f(ctx, request, sink)
716 }
717
718 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),719 pub fn execute_client_streaming<P, Q, F>(
720 ctx: RpcContext<'_>,
721 ser: SerializeFn<Q>,
722 de: DeserializeFn<P>,
723 f: &mut F,
724 ) where
725 F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
726 {
727 let mut call = ctx.call();
728 let close_f = accept_call!(call);
729 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
730
731 let req_s = RequestStream::new(call.clone(), de);
732 let sink = ClientStreamingSink::new(call, ser);
733 f(ctx, req_s, sink)
734 }
735
736 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),737 pub fn execute_server_streaming<P, Q, F>(
738 ctx: RpcContext<'_>,
739 ser: SerializeFn<Q>,
740 de: DeserializeFn<P>,
741 payload: MessageReader,
742 f: &mut F,
743 ) where
744 F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
745 {
746 let mut call = ctx.call();
747 let close_f = accept_call!(call);
748
749 let request = match de(payload) {
750 Ok(t) => t,
751 Err(e) => {
752 let status = RpcStatus::new(
753 RpcStatusCode::INTERNAL,
754 Some(format!("Failed to deserialize response message: {:?}", e)),
755 );
756 call.abort(&status);
757 return;
758 }
759 };
760
761 let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
762 f(ctx, request, sink)
763 }
764
765 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),766 pub fn execute_duplex_streaming<P, Q, F>(
767 ctx: RpcContext<'_>,
768 ser: SerializeFn<Q>,
769 de: DeserializeFn<P>,
770 f: &mut F,
771 ) where
772 F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
773 {
774 let mut call = ctx.call();
775 let close_f = accept_call!(call);
776 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
777
778 let req_s = RequestStream::new(call.clone(), de);
779 let sink = DuplexSink::new(call, ser);
780 f(ctx, req_s, sink)
781 }
782
783 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)784 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
785 // Suppress needless-pass-by-value.
786 let ctx = ctx;
787 let mut call = ctx.call(cq);
788 accept_call!(call);
789 call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None))
790 }
791
792 // Helper function to call handler.
793 //
794 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )795 fn execute(
796 ctx: RequestContext,
797 cq: &CompletionQueue,
798 payload: Option<MessageReader>,
799 f: &mut BoxHandler,
800 mut checkers: Vec<Box<dyn ServerChecker>>,
801 ) {
802 let rpc_ctx = RpcContext::new(ctx, cq);
803
804 for handler in checkers.iter_mut() {
805 match handler.check(&rpc_ctx) {
806 CheckResult::Continue => {}
807 CheckResult::Abort(status) => {
808 rpc_ctx.call().abort(&status);
809 return;
810 }
811 }
812 }
813
814 f.handle(rpc_ctx, payload)
815 }
816