1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 pub mod client;
4 pub mod server;
5
6 use std::fmt::{self, Debug, Display};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::{ptr, slice};
10
11 use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
12 use crate::{cq::CompletionQueue, Metadata, MetadataBuilder};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 use libc::c_void;
17 use parking_lot::Mutex;
18
19 use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
20 use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::grpc_status_code::*;
23 use crate::task::{self, BatchFuture, BatchType, CallTag};
24
25 /// An gRPC status code structure.
26 /// This type contains constants for all gRPC status codes.
27 #[derive(PartialEq, Eq, Clone, Copy)]
28 pub struct RpcStatusCode(i32);
29
30 impl From<i32> for RpcStatusCode {
from(code: i32) -> RpcStatusCode31 fn from(code: i32) -> RpcStatusCode {
32 RpcStatusCode(code)
33 }
34 }
35
36 impl From<RpcStatusCode> for i32 {
from(code: RpcStatusCode) -> i3237 fn from(code: RpcStatusCode) -> i32 {
38 code.0
39 }
40 }
41
42 impl Display for RpcStatusCode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result43 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44 Debug::fmt(self, f)
45 }
46 }
47
48 macro_rules! status_codes {
49 (
50 $(
51 ($num:path, $konst:ident);
52 )+
53 ) => {
54 impl RpcStatusCode {
55 $(
56 pub const $konst: RpcStatusCode = RpcStatusCode($num);
57 )+
58 }
59
60 impl Debug for RpcStatusCode {
61 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62 write!(
63 f,
64 "{}-{}",
65 self.0,
66 match self {
67 $(RpcStatusCode($num) => stringify!($konst),)+
68 RpcStatusCode(_) => "INVALID_STATUS_CODE",
69 }
70 )
71 }
72 }
73 }
74 }
75
76 status_codes! {
77 (GRPC_STATUS_OK, OK);
78 (GRPC_STATUS_CANCELLED, CANCELLED);
79 (GRPC_STATUS_UNKNOWN, UNKNOWN);
80 (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT);
81 (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED);
82 (GRPC_STATUS_NOT_FOUND, NOT_FOUND);
83 (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS);
84 (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED);
85 (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED);
86 (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION);
87 (GRPC_STATUS_ABORTED, ABORTED);
88 (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE);
89 (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED);
90 (GRPC_STATUS_INTERNAL, INTERNAL);
91 (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE);
92 (GRPC_STATUS_DATA_LOSS, DATA_LOSS);
93 (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED);
94 (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE);
95 }
96
97 /// Method types supported by gRPC.
98 #[derive(Clone, Copy)]
99 pub enum MethodType {
100 /// Single request sent from client, single response received from server.
101 Unary,
102
103 /// Stream of requests sent from client, single response received from server.
104 ClientStreaming,
105
106 /// Single request sent from client, stream of responses received from server.
107 ServerStreaming,
108
109 /// Both server and client can stream arbitrary number of requests and responses simultaneously.
110 Duplex,
111 }
112
113 /// A description of a remote method.
114 // TODO: add serializer and deserializer.
115 pub struct Method<Req, Resp> {
116 /// Type of method.
117 pub ty: MethodType,
118
119 /// Full qualified name of the method.
120 pub name: &'static str,
121
122 /// The marshaller used for request messages.
123 pub req_mar: Marshaller<Req>,
124
125 /// The marshaller used for response messages.
126 pub resp_mar: Marshaller<Resp>,
127 }
128
129 impl<Req, Resp> Method<Req, Resp> {
130 /// Get the request serializer.
131 #[inline]
req_ser(&self) -> SerializeFn<Req>132 pub fn req_ser(&self) -> SerializeFn<Req> {
133 self.req_mar.ser
134 }
135
136 /// Get the request deserializer.
137 #[inline]
req_de(&self) -> DeserializeFn<Req>138 pub fn req_de(&self) -> DeserializeFn<Req> {
139 self.req_mar.de
140 }
141
142 /// Get the response serializer.
143 #[inline]
resp_ser(&self) -> SerializeFn<Resp>144 pub fn resp_ser(&self) -> SerializeFn<Resp> {
145 self.resp_mar.ser
146 }
147
148 /// Get the response deserializer.
149 #[inline]
resp_de(&self) -> DeserializeFn<Resp>150 pub fn resp_de(&self) -> DeserializeFn<Resp> {
151 self.resp_mar.de
152 }
153 }
154
155 /// RPC result returned from the server.
156 #[derive(Debug, Clone)]
157 pub struct RpcStatus {
158 /// gRPC status code. `Ok` indicates success, all other values indicate an error.
159 code: RpcStatusCode,
160
161 /// error message.
162 message: String,
163
164 /// Additional details for rich error model.
165 ///
166 /// See also https://grpc.io/docs/guides/error/#richer-error-model.
167 details: Vec<u8>,
168 }
169
170 impl Display for RpcStatus {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result171 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
172 Debug::fmt(self, fmt)
173 }
174 }
175
176 impl RpcStatus {
177 /// Create a new [`RpcStatus`].
new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus178 pub fn new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus {
179 RpcStatus::with_message(code, String::new())
180 }
181
182 /// Create a new [`RpcStatus`] with given message.
with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus183 pub fn with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus {
184 RpcStatus::with_details(code, message, vec![])
185 }
186
187 /// Create a new [`RpcStats`] with code, message and details.
188 ///
189 /// If using rich error model, `details` should be binary message that sets `code` and
190 /// `message` to the same value. Or you can use `into` method to do automatical
191 /// transformation if using `grpcio_proto::google::rpc::Status`.
with_details<T: Into<RpcStatusCode>>( code: T, message: String, details: Vec<u8>, ) -> RpcStatus192 pub fn with_details<T: Into<RpcStatusCode>>(
193 code: T,
194 message: String,
195 details: Vec<u8>,
196 ) -> RpcStatus {
197 RpcStatus {
198 code: code.into(),
199 message,
200 details,
201 }
202 }
203
204 /// Create a new [`RpcStatus`] that status code is Ok.
ok() -> RpcStatus205 pub fn ok() -> RpcStatus {
206 RpcStatus::new(RpcStatusCode::OK)
207 }
208
209 /// Return the instance's error code.
210 #[inline]
code(&self) -> RpcStatusCode211 pub fn code(&self) -> RpcStatusCode {
212 self.code
213 }
214
215 /// Return the instance's error message.
216 #[inline]
message(&self) -> &str217 pub fn message(&self) -> &str {
218 &self.message
219 }
220
221 /// Return the (binary) error details.
222 ///
223 /// Usually it contains a serialized `google.rpc.Status` proto.
details(&self) -> &[u8]224 pub fn details(&self) -> &[u8] {
225 &self.details
226 }
227 }
228
229 pub type MessageReader = GrpcByteBufferReader;
230
231 /// Context for batch request.
232 pub struct BatchContext {
233 ctx: *mut grpcwrap_batch_context,
234 }
235
236 impl BatchContext {
new() -> BatchContext237 pub fn new() -> BatchContext {
238 BatchContext {
239 ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
240 }
241 }
242
as_ptr(&self) -> *mut grpcwrap_batch_context243 pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
244 self.ctx
245 }
246
take_recv_message(&self) -> Option<GrpcByteBuffer>247 pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
248 let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
249 if ptr.is_null() {
250 None
251 } else {
252 Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
253 }
254 }
255
256 /// Get the status of the rpc call.
rpc_status(&self) -> RpcStatus257 pub fn rpc_status(&self) -> RpcStatus {
258 let status = RpcStatusCode(unsafe {
259 grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
260 });
261
262 if status == RpcStatusCode::OK {
263 RpcStatus::ok()
264 } else {
265 unsafe {
266 let mut msg_len = 0;
267 let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
268 self.ctx,
269 &mut msg_len,
270 );
271 let msg_slice = slice::from_raw_parts(details_ptr as *const _, msg_len);
272 let message = String::from_utf8_lossy(msg_slice).into_owned();
273 let m_ptr =
274 grpc_sys::grpcwrap_batch_context_recv_status_on_client_trailing_metadata(
275 self.ctx,
276 );
277 let metadata = &*(m_ptr as *const Metadata);
278 let details = metadata.search_binary_error_details().to_vec();
279 RpcStatus::with_details(status, message, details)
280 }
281 }
282 }
283
284 /// Fetch the response bytes of the rpc call.
recv_message(&mut self) -> Option<MessageReader>285 pub fn recv_message(&mut self) -> Option<MessageReader> {
286 let buf = self.take_recv_message()?;
287 Some(GrpcByteBufferReader::new(buf))
288 }
289 }
290
291 impl Drop for BatchContext {
drop(&mut self)292 fn drop(&mut self) {
293 unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
294 }
295 }
296
297 #[inline]
box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void)298 fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
299 let tag_box = Box::new(tag);
300 (
301 tag_box.batch_ctx().unwrap().as_ptr(),
302 Box::into_raw(tag_box) as _,
303 )
304 }
305
306 /// A helper function that runs the batch call and checks the result.
check_run<F>(bt: BatchType, f: F) -> BatchFuture where F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,307 fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
308 where
309 F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
310 {
311 let (cq_f, tag) = CallTag::batch_pair(bt);
312 let (batch_ptr, tag_ptr) = box_batch_tag(tag);
313 let code = f(batch_ptr, tag_ptr);
314 if code != grpc_call_error::GRPC_CALL_OK {
315 unsafe {
316 Box::from_raw(tag_ptr);
317 }
318 panic!("create call fail: {:?}", code);
319 }
320 cq_f
321 }
322
323 /// A Call represents an RPC.
324 ///
325 /// When created, it is in a configuration state allowing properties to be
326 /// set until it is invoked. After invoke, the Call can have messages
327 /// written to it and read from it.
328 pub struct Call {
329 pub call: *mut grpc_call,
330 pub cq: CompletionQueue,
331 }
332
333 unsafe impl Send for Call {}
334
335 impl Call {
from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call336 pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
337 assert!(!call.is_null());
338 Call { call, cq }
339 }
340
341 /// Send a message asynchronously.
start_send_message( &mut self, msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture>342 pub fn start_send_message(
343 &mut self,
344 msg: &mut GrpcSlice,
345 write_flags: u32,
346 initial_meta: bool,
347 ) -> Result<BatchFuture> {
348 let _cq_ref = self.cq.borrow()?;
349 let i = if initial_meta { 1 } else { 0 };
350 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
351 grpc_sys::grpcwrap_call_send_message(
352 self.call,
353 ctx,
354 msg.as_mut_ptr(),
355 write_flags,
356 i,
357 tag,
358 )
359 });
360 Ok(f)
361 }
362
363 /// Finish the rpc call from client.
start_send_close_client(&mut self) -> Result<BatchFuture>364 pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
365 let _cq_ref = self.cq.borrow()?;
366 let f = check_run(BatchType::Finish, |_, tag| unsafe {
367 grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
368 });
369 Ok(f)
370 }
371
372 /// Receive a message asynchronously.
start_recv_message(&mut self) -> Result<BatchFuture>373 pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
374 let _cq_ref = self.cq.borrow()?;
375 let f = check_run(BatchType::Read, |ctx, tag| unsafe {
376 grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
377 });
378 Ok(f)
379 }
380
381 /// Start handling from server side.
382 ///
383 /// Future will finish once close is received by the server.
start_server_side(&mut self) -> Result<BatchFuture>384 pub fn start_server_side(&mut self) -> Result<BatchFuture> {
385 let _cq_ref = self.cq.borrow()?;
386 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
387 grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
388 });
389 Ok(f)
390 }
391
392 /// Send a status from server.
start_send_status_from_server( &mut self, status: &RpcStatus, send_empty_metadata: bool, payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture>393 pub fn start_send_status_from_server(
394 &mut self,
395 status: &RpcStatus,
396 send_empty_metadata: bool,
397 payload: &mut Option<GrpcSlice>,
398 write_flags: u32,
399 ) -> Result<BatchFuture> {
400 let _cq_ref = self.cq.borrow()?;
401 let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
402 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
403 let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
404 (ptr::null(), 0)
405 } else {
406 (status.message.as_ptr(), status.message.len())
407 };
408 let payload_p = match payload {
409 Some(p) => p.as_mut_ptr(),
410 None => ptr::null_mut(),
411 };
412 let mut trailing_metadata = if status.details.is_empty() {
413 None
414 } else {
415 let mut builder = MetadataBuilder::new();
416 builder.set_binary_error_details(&status.details);
417 Some(builder.build())
418 };
419 grpc_sys::grpcwrap_call_send_status_from_server(
420 self.call,
421 ctx,
422 status.code().into(),
423 msg_ptr as _,
424 msg_len,
425 trailing_metadata
426 .as_mut()
427 .map_or_else(ptr::null_mut, |m| m as *mut _ as _),
428 send_empty_metadata,
429 payload_p,
430 write_flags,
431 tag,
432 )
433 });
434 Ok(f)
435 }
436
437 /// Abort an rpc call before handler is called.
abort(self, status: &RpcStatus)438 pub fn abort(self, status: &RpcStatus) {
439 match self.cq.borrow() {
440 // Queue is shutdown, ignore.
441 Err(Error::QueueShutdown) => return,
442 Err(e) => panic!("unexpected error when aborting call: {:?}", e),
443 _ => {}
444 }
445 let call_ptr = self.call;
446 let tag = CallTag::abort(self);
447 let (batch_ptr, tag_ptr) = box_batch_tag(tag);
448
449 let code = unsafe {
450 let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
451 (ptr::null(), 0)
452 } else {
453 (status.message.as_ptr(), status.message.len())
454 };
455 grpc_sys::grpcwrap_call_send_status_from_server(
456 call_ptr,
457 batch_ptr,
458 status.code().into(),
459 msg_ptr as _,
460 msg_len,
461 ptr::null_mut(),
462 1,
463 ptr::null_mut(),
464 0,
465 tag_ptr as *mut c_void,
466 )
467 };
468 if code != grpc_call_error::GRPC_CALL_OK {
469 unsafe {
470 Box::from_raw(tag_ptr);
471 }
472 panic!("create call fail: {:?}", code);
473 }
474 }
475
476 /// Cancel the rpc call by client.
cancel(&self)477 fn cancel(&self) {
478 match self.cq.borrow() {
479 // Queue is shutdown, ignore.
480 Err(Error::QueueShutdown) => return,
481 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
482 _ => {}
483 }
484 unsafe {
485 grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
486 }
487 }
488 }
489
490 impl Drop for Call {
drop(&mut self)491 fn drop(&mut self) {
492 unsafe { grpc_sys::grpc_call_unref(self.call) }
493 }
494 }
495
496 /// A share object for client streaming and duplex streaming call.
497 ///
498 /// In both cases, receiver and sender can be polled in the same time,
499 /// hence we need to share the call in the both sides and abort the sink
500 /// once the call is canceled or finished early.
501 struct ShareCall {
502 call: Call,
503 close_f: BatchFuture,
504 finished: bool,
505 status: Option<RpcStatus>,
506 }
507
508 impl ShareCall {
new(call: Call, close_f: BatchFuture) -> ShareCall509 fn new(call: Call, close_f: BatchFuture) -> ShareCall {
510 ShareCall {
511 call,
512 close_f,
513 finished: false,
514 status: None,
515 }
516 }
517
518 /// Poll if the call is still alive.
519 ///
520 /// If the call is still running, will register a notification for its completion.
poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>>521 fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
522 let res = match Pin::new(&mut self.close_f).poll(cx) {
523 Poll::Ready(Ok(reader)) => {
524 self.status = Some(RpcStatus::ok());
525 Poll::Ready(Ok(reader))
526 }
527 Poll::Pending => return Poll::Pending,
528 Poll::Ready(Err(Error::RpcFailure(status))) => {
529 self.status = Some(status.clone());
530 Poll::Ready(Err(Error::RpcFailure(status)))
531 }
532 res => res,
533 };
534
535 self.finished = true;
536 res
537 }
538
539 /// Check if the call is finished.
check_alive(&mut self) -> Result<()>540 fn check_alive(&mut self) -> Result<()> {
541 if self.finished {
542 // maybe can just take here.
543 return Err(Error::RpcFinished(self.status.clone()));
544 }
545
546 task::check_alive(&self.close_f)
547 }
548 }
549
550 /// A helper trait that allows executing function on the internal `ShareCall` struct.
551 trait ShareCallHolder {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R552 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
553 }
554
555 impl ShareCallHolder for ShareCall {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R556 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
557 f(self)
558 }
559 }
560
561 impl ShareCallHolder for Arc<Mutex<ShareCall>> {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R562 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
563 let mut call = self.lock();
564 f(&mut call)
565 }
566 }
567
568 /// A helper struct for constructing Stream object for batch requests.
569 struct StreamingBase {
570 close_f: Option<BatchFuture>,
571 msg_f: Option<BatchFuture>,
572 read_done: bool,
573 }
574
575 impl StreamingBase {
new(close_f: Option<BatchFuture>) -> StreamingBase576 fn new(close_f: Option<BatchFuture>) -> StreamingBase {
577 StreamingBase {
578 close_f,
579 msg_f: None,
580 read_done: false,
581 }
582 }
583
poll<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, skip_finish_check: bool, ) -> Poll<Option<Result<MessageReader>>>584 fn poll<C: ShareCallHolder>(
585 &mut self,
586 cx: &mut Context,
587 call: &mut C,
588 skip_finish_check: bool,
589 ) -> Poll<Option<Result<MessageReader>>> {
590 if !skip_finish_check {
591 let mut finished = false;
592 if let Some(close_f) = &mut self.close_f {
593 if Pin::new(close_f).poll(cx)?.is_ready() {
594 // Don't return immediately, there may be pending data.
595 finished = true;
596 }
597 }
598 if finished {
599 self.close_f.take();
600 }
601 }
602
603 let mut bytes = None;
604 if !self.read_done {
605 if let Some(msg_f) = &mut self.msg_f {
606 bytes = ready!(Pin::new(msg_f).poll(cx)?);
607 if bytes.is_none() {
608 self.read_done = true;
609 }
610 }
611 }
612
613 if self.read_done {
614 if self.close_f.is_none() {
615 return Poll::Ready(None);
616 }
617 return Poll::Pending;
618 }
619
620 // so msg_f must be either stale or not initialized yet.
621 self.msg_f.take();
622 let msg_f = call.call(|c| c.call.start_recv_message())?;
623 self.msg_f = Some(msg_f);
624 if bytes.is_none() {
625 self.poll(cx, call, true)
626 } else {
627 Poll::Ready(bytes.map(Ok))
628 }
629 }
630
631 // Cancel the call if we still have some messages or did not
632 // receive status code.
on_drop<C: ShareCallHolder>(&self, call: &mut C)633 fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
634 if !self.read_done || self.close_f.is_some() {
635 call.call(|c| c.call.cancel());
636 }
637 }
638 }
639
640 /// Flags for write operations.
641 #[derive(Default, Clone, Copy)]
642 pub struct WriteFlags {
643 flags: u32,
644 }
645
646 impl WriteFlags {
647 /// Hint that the write may be buffered and need not go out on the wire immediately.
648 ///
649 /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
650 /// completion, but it need not buffer completely or at all.
buffer_hint(mut self, need_buffered: bool) -> WriteFlags651 pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
652 client::change_flag(
653 &mut self.flags,
654 grpc_sys::GRPC_WRITE_BUFFER_HINT,
655 need_buffered,
656 );
657 self
658 }
659
660 /// Force compression to be disabled.
force_no_compress(mut self, no_compress: bool) -> WriteFlags661 pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
662 client::change_flag(
663 &mut self.flags,
664 grpc_sys::GRPC_WRITE_NO_COMPRESS,
665 no_compress,
666 );
667 self
668 }
669
670 /// Get whether buffer hint is enabled.
get_buffer_hint(self) -> bool671 pub fn get_buffer_hint(self) -> bool {
672 (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
673 }
674
675 /// Get whether compression is disabled.
get_force_no_compress(self) -> bool676 pub fn get_force_no_compress(self) -> bool {
677 (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
678 }
679 }
680
681 /// A helper struct for constructing Sink object for batch requests.
682 struct SinkBase {
683 // Batch job to be executed in `poll_ready`.
684 batch_f: Option<BatchFuture>,
685 send_metadata: bool,
686 // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
687 // messages as much as possible.
688 enhance_buffer_strategy: bool,
689 // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
690 buffer: GrpcSlice,
691 // Write flags used to control the data to be sent in `buffer`.
692 buf_flags: Option<WriteFlags>,
693 // Used to records whether a message in which `buffer_hint` is false exists.
694 // Note: only used in enhanced buffer strategy.
695 last_buf_hint: bool,
696 }
697
698 impl SinkBase {
new(send_metadata: bool) -> SinkBase699 fn new(send_metadata: bool) -> SinkBase {
700 SinkBase {
701 batch_f: None,
702 buffer: GrpcSlice::default(),
703 buf_flags: None,
704 last_buf_hint: true,
705 send_metadata,
706 enhance_buffer_strategy: false,
707 }
708 }
709
start_send<T, C: ShareCallHolder>( &mut self, call: &mut C, t: &T, flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()>710 fn start_send<T, C: ShareCallHolder>(
711 &mut self,
712 call: &mut C,
713 t: &T,
714 flags: WriteFlags,
715 ser: SerializeFn<T>,
716 ) -> Result<()> {
717 // temporary fix: buffer hint with send meta will not send out any metadata.
718 // note: only the first message can enter this code block.
719 if self.send_metadata {
720 ser(t, &mut self.buffer)?;
721 self.buf_flags = Some(flags);
722 self.start_send_buffer_message(false, call)?;
723 self.send_metadata = false;
724 return Ok(());
725 }
726
727 // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
728 // that this is not the last message.
729 if self.buf_flags.is_some() {
730 self.start_send_buffer_message(true, call)?;
731 }
732
733 ser(t, &mut self.buffer)?;
734 let hint = flags.get_buffer_hint();
735 self.last_buf_hint &= hint;
736 self.buf_flags = Some(flags);
737
738 // If sink disable batch, start sending the message in buffer immediately.
739 if !self.enhance_buffer_strategy {
740 self.start_send_buffer_message(hint, call)?;
741 }
742
743 Ok(())
744 }
745
746 #[inline]
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>>747 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
748 match &mut self.batch_f {
749 None => return Poll::Ready(Ok(())),
750 Some(f) => {
751 ready!(Pin::new(f).poll(cx)?);
752 }
753 }
754 self.batch_f.take();
755 Poll::Ready(Ok(()))
756 }
757
758 #[inline]
poll_flush<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, ) -> Poll<Result<()>>759 fn poll_flush<C: ShareCallHolder>(
760 &mut self,
761 cx: &mut Context,
762 call: &mut C,
763 ) -> Poll<Result<()>> {
764 if self.batch_f.is_some() {
765 ready!(self.poll_ready(cx)?);
766 }
767 if self.buf_flags.is_some() {
768 self.start_send_buffer_message(self.last_buf_hint, call)?;
769 ready!(self.poll_ready(cx)?);
770 }
771 self.last_buf_hint = true;
772 Poll::Ready(Ok(()))
773 }
774
775 #[inline]
start_send_buffer_message<C: ShareCallHolder>( &mut self, buffer_hint: bool, call: &mut C, ) -> Result<()>776 fn start_send_buffer_message<C: ShareCallHolder>(
777 &mut self,
778 buffer_hint: bool,
779 call: &mut C,
780 ) -> Result<()> {
781 // `start_send` is supposed to be called after `poll_ready` returns ready.
782 assert!(self.batch_f.is_none());
783
784 let mut flags = self.buf_flags.unwrap();
785 flags = flags.buffer_hint(buffer_hint);
786 let write_f = call.call(|c| {
787 c.call
788 .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
789 })?;
790 self.batch_f = Some(write_f);
791 if !self.buffer.is_inline() {
792 self.buffer = GrpcSlice::default();
793 }
794 self.buf_flags.take();
795 Ok(())
796 }
797 }
798