• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 pub mod client;
4 pub mod server;
5 
6 use std::fmt::{self, Debug, Display};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::{ptr, slice};
10 
11 use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
12 use crate::{cq::CompletionQueue, Metadata, MetadataBuilder};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 use libc::c_void;
17 use parking_lot::Mutex;
18 
19 use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
20 use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::grpc_status_code::*;
23 use crate::task::{self, BatchFuture, BatchType, CallTag};
24 
25 /// An gRPC status code structure.
26 /// This type contains constants for all gRPC status codes.
27 #[derive(PartialEq, Eq, Clone, Copy)]
28 pub struct RpcStatusCode(i32);
29 
30 impl From<i32> for RpcStatusCode {
from(code: i32) -> RpcStatusCode31     fn from(code: i32) -> RpcStatusCode {
32         RpcStatusCode(code)
33     }
34 }
35 
36 impl From<RpcStatusCode> for i32 {
from(code: RpcStatusCode) -> i3237     fn from(code: RpcStatusCode) -> i32 {
38         code.0
39     }
40 }
41 
42 impl Display for RpcStatusCode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result43     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44         Debug::fmt(self, f)
45     }
46 }
47 
48 macro_rules! status_codes {
49     (
50         $(
51             ($num:path, $konst:ident);
52         )+
53     ) => {
54         impl RpcStatusCode {
55         $(
56             pub const $konst: RpcStatusCode = RpcStatusCode($num);
57         )+
58         }
59 
60         impl Debug for RpcStatusCode {
61             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62                 write!(
63                     f,
64                     "{}-{}",
65                     self.0,
66                     match self {
67                         $(RpcStatusCode($num) => stringify!($konst),)+
68                         RpcStatusCode(_) => "INVALID_STATUS_CODE",
69                     }
70                 )
71             }
72         }
73     }
74 }
75 
76 status_codes! {
77     (GRPC_STATUS_OK, OK);
78     (GRPC_STATUS_CANCELLED, CANCELLED);
79     (GRPC_STATUS_UNKNOWN, UNKNOWN);
80     (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT);
81     (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED);
82     (GRPC_STATUS_NOT_FOUND, NOT_FOUND);
83     (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS);
84     (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED);
85     (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED);
86     (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION);
87     (GRPC_STATUS_ABORTED, ABORTED);
88     (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE);
89     (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED);
90     (GRPC_STATUS_INTERNAL, INTERNAL);
91     (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE);
92     (GRPC_STATUS_DATA_LOSS, DATA_LOSS);
93     (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED);
94     (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE);
95 }
96 
97 /// Method types supported by gRPC.
98 #[derive(Clone, Copy)]
99 pub enum MethodType {
100     /// Single request sent from client, single response received from server.
101     Unary,
102 
103     /// Stream of requests sent from client, single response received from server.
104     ClientStreaming,
105 
106     /// Single request sent from client, stream of responses received from server.
107     ServerStreaming,
108 
109     /// Both server and client can stream arbitrary number of requests and responses simultaneously.
110     Duplex,
111 }
112 
113 /// A description of a remote method.
114 // TODO: add serializer and deserializer.
115 pub struct Method<Req, Resp> {
116     /// Type of method.
117     pub ty: MethodType,
118 
119     /// Full qualified name of the method.
120     pub name: &'static str,
121 
122     /// The marshaller used for request messages.
123     pub req_mar: Marshaller<Req>,
124 
125     /// The marshaller used for response messages.
126     pub resp_mar: Marshaller<Resp>,
127 }
128 
129 impl<Req, Resp> Method<Req, Resp> {
130     /// Get the request serializer.
131     #[inline]
req_ser(&self) -> SerializeFn<Req>132     pub fn req_ser(&self) -> SerializeFn<Req> {
133         self.req_mar.ser
134     }
135 
136     /// Get the request deserializer.
137     #[inline]
req_de(&self) -> DeserializeFn<Req>138     pub fn req_de(&self) -> DeserializeFn<Req> {
139         self.req_mar.de
140     }
141 
142     /// Get the response serializer.
143     #[inline]
resp_ser(&self) -> SerializeFn<Resp>144     pub fn resp_ser(&self) -> SerializeFn<Resp> {
145         self.resp_mar.ser
146     }
147 
148     /// Get the response deserializer.
149     #[inline]
resp_de(&self) -> DeserializeFn<Resp>150     pub fn resp_de(&self) -> DeserializeFn<Resp> {
151         self.resp_mar.de
152     }
153 }
154 
155 /// RPC result returned from the server.
156 #[derive(Debug, Clone)]
157 pub struct RpcStatus {
158     /// gRPC status code. `Ok` indicates success, all other values indicate an error.
159     code: RpcStatusCode,
160 
161     /// error message.
162     message: String,
163 
164     /// Additional details for rich error model.
165     ///
166     /// See also https://grpc.io/docs/guides/error/#richer-error-model.
167     details: Vec<u8>,
168 }
169 
170 impl Display for RpcStatus {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result171     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
172         Debug::fmt(self, fmt)
173     }
174 }
175 
176 impl RpcStatus {
177     /// Create a new [`RpcStatus`].
new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus178     pub fn new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus {
179         RpcStatus::with_message(code, String::new())
180     }
181 
182     /// Create a new [`RpcStatus`] with given message.
with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus183     pub fn with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus {
184         RpcStatus::with_details(code, message, vec![])
185     }
186 
187     /// Create a new [`RpcStats`] with code, message and details.
188     ///
189     /// If using rich error model, `details` should be binary message that sets `code` and
190     /// `message` to the same value. Or you can use `into` method to do automatical
191     /// transformation if using `grpcio_proto::google::rpc::Status`.
with_details<T: Into<RpcStatusCode>>( code: T, message: String, details: Vec<u8>, ) -> RpcStatus192     pub fn with_details<T: Into<RpcStatusCode>>(
193         code: T,
194         message: String,
195         details: Vec<u8>,
196     ) -> RpcStatus {
197         RpcStatus {
198             code: code.into(),
199             message,
200             details,
201         }
202     }
203 
204     /// Create a new [`RpcStatus`] that status code is Ok.
ok() -> RpcStatus205     pub fn ok() -> RpcStatus {
206         RpcStatus::new(RpcStatusCode::OK)
207     }
208 
209     /// Return the instance's error code.
210     #[inline]
code(&self) -> RpcStatusCode211     pub fn code(&self) -> RpcStatusCode {
212         self.code
213     }
214 
215     /// Return the instance's error message.
216     #[inline]
message(&self) -> &str217     pub fn message(&self) -> &str {
218         &self.message
219     }
220 
221     /// Return the (binary) error details.
222     ///
223     /// Usually it contains a serialized `google.rpc.Status` proto.
details(&self) -> &[u8]224     pub fn details(&self) -> &[u8] {
225         &self.details
226     }
227 }
228 
229 pub type MessageReader = GrpcByteBufferReader;
230 
231 /// Context for batch request.
232 pub struct BatchContext {
233     ctx: *mut grpcwrap_batch_context,
234 }
235 
236 impl BatchContext {
new() -> BatchContext237     pub fn new() -> BatchContext {
238         BatchContext {
239             ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
240         }
241     }
242 
as_ptr(&self) -> *mut grpcwrap_batch_context243     pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
244         self.ctx
245     }
246 
take_recv_message(&self) -> Option<GrpcByteBuffer>247     pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
248         let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
249         if ptr.is_null() {
250             None
251         } else {
252             Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
253         }
254     }
255 
256     /// Get the status of the rpc call.
rpc_status(&self) -> RpcStatus257     pub fn rpc_status(&self) -> RpcStatus {
258         let status = RpcStatusCode(unsafe {
259             grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
260         });
261 
262         if status == RpcStatusCode::OK {
263             RpcStatus::ok()
264         } else {
265             unsafe {
266                 let mut msg_len = 0;
267                 let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
268                     self.ctx,
269                     &mut msg_len,
270                 );
271                 let msg_slice = slice::from_raw_parts(details_ptr as *const _, msg_len);
272                 let message = String::from_utf8_lossy(msg_slice).into_owned();
273                 let m_ptr =
274                     grpc_sys::grpcwrap_batch_context_recv_status_on_client_trailing_metadata(
275                         self.ctx,
276                     );
277                 let metadata = &*(m_ptr as *const Metadata);
278                 let details = metadata.search_binary_error_details().to_vec();
279                 RpcStatus::with_details(status, message, details)
280             }
281         }
282     }
283 
284     /// Fetch the response bytes of the rpc call.
recv_message(&mut self) -> Option<MessageReader>285     pub fn recv_message(&mut self) -> Option<MessageReader> {
286         let buf = self.take_recv_message()?;
287         Some(GrpcByteBufferReader::new(buf))
288     }
289 }
290 
291 impl Drop for BatchContext {
drop(&mut self)292     fn drop(&mut self) {
293         unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
294     }
295 }
296 
297 #[inline]
box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void)298 fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
299     let tag_box = Box::new(tag);
300     (
301         tag_box.batch_ctx().unwrap().as_ptr(),
302         Box::into_raw(tag_box) as _,
303     )
304 }
305 
306 /// A helper function that runs the batch call and checks the result.
check_run<F>(bt: BatchType, f: F) -> BatchFuture where F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,307 fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
308 where
309     F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
310 {
311     let (cq_f, tag) = CallTag::batch_pair(bt);
312     let (batch_ptr, tag_ptr) = box_batch_tag(tag);
313     let code = f(batch_ptr, tag_ptr);
314     if code != grpc_call_error::GRPC_CALL_OK {
315         unsafe {
316             Box::from_raw(tag_ptr);
317         }
318         panic!("create call fail: {:?}", code);
319     }
320     cq_f
321 }
322 
323 /// A Call represents an RPC.
324 ///
325 /// When created, it is in a configuration state allowing properties to be
326 /// set until it is invoked. After invoke, the Call can have messages
327 /// written to it and read from it.
328 pub struct Call {
329     pub call: *mut grpc_call,
330     pub cq: CompletionQueue,
331 }
332 
333 unsafe impl Send for Call {}
334 
335 impl Call {
from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call336     pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
337         assert!(!call.is_null());
338         Call { call, cq }
339     }
340 
341     /// Send a message asynchronously.
start_send_message( &mut self, msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture>342     pub fn start_send_message(
343         &mut self,
344         msg: &mut GrpcSlice,
345         write_flags: u32,
346         initial_meta: bool,
347     ) -> Result<BatchFuture> {
348         let _cq_ref = self.cq.borrow()?;
349         let i = if initial_meta { 1 } else { 0 };
350         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
351             grpc_sys::grpcwrap_call_send_message(
352                 self.call,
353                 ctx,
354                 msg.as_mut_ptr(),
355                 write_flags,
356                 i,
357                 tag,
358             )
359         });
360         Ok(f)
361     }
362 
363     /// Finish the rpc call from client.
start_send_close_client(&mut self) -> Result<BatchFuture>364     pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
365         let _cq_ref = self.cq.borrow()?;
366         let f = check_run(BatchType::Finish, |_, tag| unsafe {
367             grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
368         });
369         Ok(f)
370     }
371 
372     /// Receive a message asynchronously.
start_recv_message(&mut self) -> Result<BatchFuture>373     pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
374         let _cq_ref = self.cq.borrow()?;
375         let f = check_run(BatchType::Read, |ctx, tag| unsafe {
376             grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
377         });
378         Ok(f)
379     }
380 
381     /// Start handling from server side.
382     ///
383     /// Future will finish once close is received by the server.
start_server_side(&mut self) -> Result<BatchFuture>384     pub fn start_server_side(&mut self) -> Result<BatchFuture> {
385         let _cq_ref = self.cq.borrow()?;
386         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
387             grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
388         });
389         Ok(f)
390     }
391 
392     /// Send a status from server.
start_send_status_from_server( &mut self, status: &RpcStatus, send_empty_metadata: bool, payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture>393     pub fn start_send_status_from_server(
394         &mut self,
395         status: &RpcStatus,
396         send_empty_metadata: bool,
397         payload: &mut Option<GrpcSlice>,
398         write_flags: u32,
399     ) -> Result<BatchFuture> {
400         let _cq_ref = self.cq.borrow()?;
401         let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
402         let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
403             let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
404                 (ptr::null(), 0)
405             } else {
406                 (status.message.as_ptr(), status.message.len())
407             };
408             let payload_p = match payload {
409                 Some(p) => p.as_mut_ptr(),
410                 None => ptr::null_mut(),
411             };
412             let mut trailing_metadata = if status.details.is_empty() {
413                 None
414             } else {
415                 let mut builder = MetadataBuilder::new();
416                 builder.set_binary_error_details(&status.details);
417                 Some(builder.build())
418             };
419             grpc_sys::grpcwrap_call_send_status_from_server(
420                 self.call,
421                 ctx,
422                 status.code().into(),
423                 msg_ptr as _,
424                 msg_len,
425                 trailing_metadata
426                     .as_mut()
427                     .map_or_else(ptr::null_mut, |m| m as *mut _ as _),
428                 send_empty_metadata,
429                 payload_p,
430                 write_flags,
431                 tag,
432             )
433         });
434         Ok(f)
435     }
436 
437     /// Abort an rpc call before handler is called.
abort(self, status: &RpcStatus)438     pub fn abort(self, status: &RpcStatus) {
439         match self.cq.borrow() {
440             // Queue is shutdown, ignore.
441             Err(Error::QueueShutdown) => return,
442             Err(e) => panic!("unexpected error when aborting call: {:?}", e),
443             _ => {}
444         }
445         let call_ptr = self.call;
446         let tag = CallTag::abort(self);
447         let (batch_ptr, tag_ptr) = box_batch_tag(tag);
448 
449         let code = unsafe {
450             let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
451                 (ptr::null(), 0)
452             } else {
453                 (status.message.as_ptr(), status.message.len())
454             };
455             grpc_sys::grpcwrap_call_send_status_from_server(
456                 call_ptr,
457                 batch_ptr,
458                 status.code().into(),
459                 msg_ptr as _,
460                 msg_len,
461                 ptr::null_mut(),
462                 1,
463                 ptr::null_mut(),
464                 0,
465                 tag_ptr as *mut c_void,
466             )
467         };
468         if code != grpc_call_error::GRPC_CALL_OK {
469             unsafe {
470                 Box::from_raw(tag_ptr);
471             }
472             panic!("create call fail: {:?}", code);
473         }
474     }
475 
476     /// Cancel the rpc call by client.
cancel(&self)477     fn cancel(&self) {
478         match self.cq.borrow() {
479             // Queue is shutdown, ignore.
480             Err(Error::QueueShutdown) => return,
481             Err(e) => panic!("unexpected error when canceling call: {:?}", e),
482             _ => {}
483         }
484         unsafe {
485             grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
486         }
487     }
488 }
489 
490 impl Drop for Call {
drop(&mut self)491     fn drop(&mut self) {
492         unsafe { grpc_sys::grpc_call_unref(self.call) }
493     }
494 }
495 
496 /// A share object for client streaming and duplex streaming call.
497 ///
498 /// In both cases, receiver and sender can be polled in the same time,
499 /// hence we need to share the call in the both sides and abort the sink
500 /// once the call is canceled or finished early.
501 struct ShareCall {
502     call: Call,
503     close_f: BatchFuture,
504     finished: bool,
505     status: Option<RpcStatus>,
506 }
507 
508 impl ShareCall {
new(call: Call, close_f: BatchFuture) -> ShareCall509     fn new(call: Call, close_f: BatchFuture) -> ShareCall {
510         ShareCall {
511             call,
512             close_f,
513             finished: false,
514             status: None,
515         }
516     }
517 
518     /// Poll if the call is still alive.
519     ///
520     /// If the call is still running, will register a notification for its completion.
poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>>521     fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
522         let res = match Pin::new(&mut self.close_f).poll(cx) {
523             Poll::Ready(Ok(reader)) => {
524                 self.status = Some(RpcStatus::ok());
525                 Poll::Ready(Ok(reader))
526             }
527             Poll::Pending => return Poll::Pending,
528             Poll::Ready(Err(Error::RpcFailure(status))) => {
529                 self.status = Some(status.clone());
530                 Poll::Ready(Err(Error::RpcFailure(status)))
531             }
532             res => res,
533         };
534 
535         self.finished = true;
536         res
537     }
538 
539     /// Check if the call is finished.
check_alive(&mut self) -> Result<()>540     fn check_alive(&mut self) -> Result<()> {
541         if self.finished {
542             // maybe can just take here.
543             return Err(Error::RpcFinished(self.status.clone()));
544         }
545 
546         task::check_alive(&self.close_f)
547     }
548 }
549 
550 /// A helper trait that allows executing function on the internal `ShareCall` struct.
551 trait ShareCallHolder {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R552     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
553 }
554 
555 impl ShareCallHolder for ShareCall {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R556     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
557         f(self)
558     }
559 }
560 
561 impl ShareCallHolder for Arc<Mutex<ShareCall>> {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R562     fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
563         let mut call = self.lock();
564         f(&mut call)
565     }
566 }
567 
568 /// A helper struct for constructing Stream object for batch requests.
569 struct StreamingBase {
570     close_f: Option<BatchFuture>,
571     msg_f: Option<BatchFuture>,
572     read_done: bool,
573 }
574 
575 impl StreamingBase {
new(close_f: Option<BatchFuture>) -> StreamingBase576     fn new(close_f: Option<BatchFuture>) -> StreamingBase {
577         StreamingBase {
578             close_f,
579             msg_f: None,
580             read_done: false,
581         }
582     }
583 
poll<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, skip_finish_check: bool, ) -> Poll<Option<Result<MessageReader>>>584     fn poll<C: ShareCallHolder>(
585         &mut self,
586         cx: &mut Context,
587         call: &mut C,
588         skip_finish_check: bool,
589     ) -> Poll<Option<Result<MessageReader>>> {
590         if !skip_finish_check {
591             let mut finished = false;
592             if let Some(close_f) = &mut self.close_f {
593                 if Pin::new(close_f).poll(cx)?.is_ready() {
594                     // Don't return immediately, there may be pending data.
595                     finished = true;
596                 }
597             }
598             if finished {
599                 self.close_f.take();
600             }
601         }
602 
603         let mut bytes = None;
604         if !self.read_done {
605             if let Some(msg_f) = &mut self.msg_f {
606                 bytes = ready!(Pin::new(msg_f).poll(cx)?);
607                 if bytes.is_none() {
608                     self.read_done = true;
609                 }
610             }
611         }
612 
613         if self.read_done {
614             if self.close_f.is_none() {
615                 return Poll::Ready(None);
616             }
617             return Poll::Pending;
618         }
619 
620         // so msg_f must be either stale or not initialized yet.
621         self.msg_f.take();
622         let msg_f = call.call(|c| c.call.start_recv_message())?;
623         self.msg_f = Some(msg_f);
624         if bytes.is_none() {
625             self.poll(cx, call, true)
626         } else {
627             Poll::Ready(bytes.map(Ok))
628         }
629     }
630 
631     // Cancel the call if we still have some messages or did not
632     // receive status code.
on_drop<C: ShareCallHolder>(&self, call: &mut C)633     fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
634         if !self.read_done || self.close_f.is_some() {
635             call.call(|c| c.call.cancel());
636         }
637     }
638 }
639 
640 /// Flags for write operations.
641 #[derive(Default, Clone, Copy)]
642 pub struct WriteFlags {
643     flags: u32,
644 }
645 
646 impl WriteFlags {
647     /// Hint that the write may be buffered and need not go out on the wire immediately.
648     ///
649     /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
650     /// completion, but it need not buffer completely or at all.
buffer_hint(mut self, need_buffered: bool) -> WriteFlags651     pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
652         client::change_flag(
653             &mut self.flags,
654             grpc_sys::GRPC_WRITE_BUFFER_HINT,
655             need_buffered,
656         );
657         self
658     }
659 
660     /// Force compression to be disabled.
force_no_compress(mut self, no_compress: bool) -> WriteFlags661     pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
662         client::change_flag(
663             &mut self.flags,
664             grpc_sys::GRPC_WRITE_NO_COMPRESS,
665             no_compress,
666         );
667         self
668     }
669 
670     /// Get whether buffer hint is enabled.
get_buffer_hint(self) -> bool671     pub fn get_buffer_hint(self) -> bool {
672         (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
673     }
674 
675     /// Get whether compression is disabled.
get_force_no_compress(self) -> bool676     pub fn get_force_no_compress(self) -> bool {
677         (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
678     }
679 }
680 
681 /// A helper struct for constructing Sink object for batch requests.
682 struct SinkBase {
683     // Batch job to be executed in `poll_ready`.
684     batch_f: Option<BatchFuture>,
685     send_metadata: bool,
686     // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
687     // messages as much as possible.
688     enhance_buffer_strategy: bool,
689     // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
690     buffer: GrpcSlice,
691     // Write flags used to control the data to be sent in `buffer`.
692     buf_flags: Option<WriteFlags>,
693     // Used to records whether a message in which `buffer_hint` is false exists.
694     // Note: only used in enhanced buffer strategy.
695     last_buf_hint: bool,
696 }
697 
698 impl SinkBase {
new(send_metadata: bool) -> SinkBase699     fn new(send_metadata: bool) -> SinkBase {
700         SinkBase {
701             batch_f: None,
702             buffer: GrpcSlice::default(),
703             buf_flags: None,
704             last_buf_hint: true,
705             send_metadata,
706             enhance_buffer_strategy: false,
707         }
708     }
709 
start_send<T, C: ShareCallHolder>( &mut self, call: &mut C, t: &T, flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()>710     fn start_send<T, C: ShareCallHolder>(
711         &mut self,
712         call: &mut C,
713         t: &T,
714         flags: WriteFlags,
715         ser: SerializeFn<T>,
716     ) -> Result<()> {
717         // temporary fix: buffer hint with send meta will not send out any metadata.
718         // note: only the first message can enter this code block.
719         if self.send_metadata {
720             ser(t, &mut self.buffer)?;
721             self.buf_flags = Some(flags);
722             self.start_send_buffer_message(false, call)?;
723             self.send_metadata = false;
724             return Ok(());
725         }
726 
727         // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
728         // that this is not the last message.
729         if self.buf_flags.is_some() {
730             self.start_send_buffer_message(true, call)?;
731         }
732 
733         ser(t, &mut self.buffer)?;
734         let hint = flags.get_buffer_hint();
735         self.last_buf_hint &= hint;
736         self.buf_flags = Some(flags);
737 
738         // If sink disable batch, start sending the message in buffer immediately.
739         if !self.enhance_buffer_strategy {
740             self.start_send_buffer_message(hint, call)?;
741         }
742 
743         Ok(())
744     }
745 
746     #[inline]
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>>747     fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
748         match &mut self.batch_f {
749             None => return Poll::Ready(Ok(())),
750             Some(f) => {
751                 ready!(Pin::new(f).poll(cx)?);
752             }
753         }
754         self.batch_f.take();
755         Poll::Ready(Ok(()))
756     }
757 
758     #[inline]
poll_flush<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, ) -> Poll<Result<()>>759     fn poll_flush<C: ShareCallHolder>(
760         &mut self,
761         cx: &mut Context,
762         call: &mut C,
763     ) -> Poll<Result<()>> {
764         if self.batch_f.is_some() {
765             ready!(self.poll_ready(cx)?);
766         }
767         if self.buf_flags.is_some() {
768             self.start_send_buffer_message(self.last_buf_hint, call)?;
769             ready!(self.poll_ready(cx)?);
770         }
771         self.last_buf_hint = true;
772         Poll::Ready(Ok(()))
773     }
774 
775     #[inline]
start_send_buffer_message<C: ShareCallHolder>( &mut self, buffer_hint: bool, call: &mut C, ) -> Result<()>776     fn start_send_buffer_message<C: ShareCallHolder>(
777         &mut self,
778         buffer_hint: bool,
779         call: &mut C,
780     ) -> Result<()> {
781         // `start_send` is supposed to be called after `poll_ready` returns ready.
782         assert!(self.batch_f.is_none());
783 
784         let mut flags = self.buf_flags.unwrap();
785         flags = flags.buffer_hint(buffer_hint);
786         let write_f = call.call(|c| {
787             c.call
788                 .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
789         })?;
790         self.batch_f = Some(write_f);
791         if !self.buffer.is_inline() {
792             self.buffer = GrpcSlice::default();
793         }
794         self.buf_flags.take();
795         Ok(())
796     }
797 }
798