1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 pub mod client;
4 pub mod server;
5
6 use std::fmt::{self, Debug, Display};
7 use std::pin::Pin;
8 use std::sync::Arc;
9 use std::{ptr, slice};
10
11 use crate::cq::CompletionQueue;
12 use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 use libc::c_void;
17 use parking_lot::Mutex;
18
19 use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
20 use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::grpc_status_code::*;
23 use crate::task::{self, BatchFuture, BatchType, CallTag};
24
25 /// An gRPC status code structure.
26 /// This type contains constants for all gRPC status codes.
27 #[derive(PartialEq, Eq, Clone, Copy)]
28 pub struct RpcStatusCode(i32);
29
30 impl From<i32> for RpcStatusCode {
from(code: i32) -> RpcStatusCode31 fn from(code: i32) -> RpcStatusCode {
32 RpcStatusCode(code)
33 }
34 }
35
36 impl From<RpcStatusCode> for i32 {
from(code: RpcStatusCode) -> i3237 fn from(code: RpcStatusCode) -> i32 {
38 code.0
39 }
40 }
41
42 impl Display for RpcStatusCode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result43 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44 Debug::fmt(self, f)
45 }
46 }
47
48 macro_rules! status_codes {
49 (
50 $(
51 ($num:path, $konst:ident);
52 )+
53 ) => {
54 impl RpcStatusCode {
55 $(
56 pub const $konst: RpcStatusCode = RpcStatusCode($num);
57 )+
58 }
59
60 impl Debug for RpcStatusCode {
61 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
62 write!(
63 f,
64 "{}-{}",
65 self.0,
66 match self {
67 $(RpcStatusCode($num) => stringify!($konst),)+
68 RpcStatusCode(_) => "INVALID_STATUS_CODE",
69 }
70 )
71 }
72 }
73 }
74 }
75
76 status_codes! {
77 (GRPC_STATUS_OK, OK);
78 (GRPC_STATUS_CANCELLED, CANCELLED);
79 (GRPC_STATUS_UNKNOWN, UNKNOWN);
80 (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT);
81 (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED);
82 (GRPC_STATUS_NOT_FOUND, NOT_FOUND);
83 (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS);
84 (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED);
85 (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED);
86 (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION);
87 (GRPC_STATUS_ABORTED, ABORTED);
88 (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE);
89 (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED);
90 (GRPC_STATUS_INTERNAL, INTERNAL);
91 (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE);
92 (GRPC_STATUS_DATA_LOSS, DATA_LOSS);
93 (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED);
94 (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE);
95 }
96
97 /// Method types supported by gRPC.
98 #[derive(Clone, Copy)]
99 pub enum MethodType {
100 /// Single request sent from client, single response received from server.
101 Unary,
102
103 /// Stream of requests sent from client, single response received from server.
104 ClientStreaming,
105
106 /// Single request sent from client, stream of responses received from server.
107 ServerStreaming,
108
109 /// Both server and client can stream arbitrary number of requests and responses simultaneously.
110 Duplex,
111 }
112
113 /// A description of a remote method.
114 // TODO: add serializer and deserializer.
115 pub struct Method<Req, Resp> {
116 /// Type of method.
117 pub ty: MethodType,
118
119 /// Full qualified name of the method.
120 pub name: &'static str,
121
122 /// The marshaller used for request messages.
123 pub req_mar: Marshaller<Req>,
124
125 /// The marshaller used for response messages.
126 pub resp_mar: Marshaller<Resp>,
127 }
128
129 impl<Req, Resp> Method<Req, Resp> {
130 /// Get the request serializer.
131 #[inline]
req_ser(&self) -> SerializeFn<Req>132 pub fn req_ser(&self) -> SerializeFn<Req> {
133 self.req_mar.ser
134 }
135
136 /// Get the request deserializer.
137 #[inline]
req_de(&self) -> DeserializeFn<Req>138 pub fn req_de(&self) -> DeserializeFn<Req> {
139 self.req_mar.de
140 }
141
142 /// Get the response serializer.
143 #[inline]
resp_ser(&self) -> SerializeFn<Resp>144 pub fn resp_ser(&self) -> SerializeFn<Resp> {
145 self.resp_mar.ser
146 }
147
148 /// Get the response deserializer.
149 #[inline]
resp_de(&self) -> DeserializeFn<Resp>150 pub fn resp_de(&self) -> DeserializeFn<Resp> {
151 self.resp_mar.de
152 }
153 }
154
155 /// RPC result returned from the server.
156 #[derive(Debug, Clone)]
157 pub struct RpcStatus {
158 /// gRPC status code. `Ok` indicates success, all other values indicate an error.
159 pub status: RpcStatusCode,
160
161 /// Optional detail string.
162 pub details: Option<String>,
163 }
164
165 impl Display for RpcStatus {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result166 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167 Debug::fmt(self, fmt)
168 }
169 }
170
171 impl RpcStatus {
172 /// Create a new [`RpcStatus`].
new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus173 pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus {
174 RpcStatus {
175 status: code.into(),
176 details,
177 }
178 }
179
180 /// Create a new [`RpcStatus`] that status code is Ok.
ok() -> RpcStatus181 pub fn ok() -> RpcStatus {
182 RpcStatus::new(RpcStatusCode::OK, None)
183 }
184 }
185
186 pub type MessageReader = GrpcByteBufferReader;
187
188 /// Context for batch request.
189 pub struct BatchContext {
190 ctx: *mut grpcwrap_batch_context,
191 }
192
193 impl BatchContext {
new() -> BatchContext194 pub fn new() -> BatchContext {
195 BatchContext {
196 ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
197 }
198 }
199
as_ptr(&self) -> *mut grpcwrap_batch_context200 pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
201 self.ctx
202 }
203
take_recv_message(&self) -> Option<GrpcByteBuffer>204 pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
205 let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
206 if ptr.is_null() {
207 None
208 } else {
209 Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
210 }
211 }
212
213 /// Get the status of the rpc call.
rpc_status(&self) -> RpcStatus214 pub fn rpc_status(&self) -> RpcStatus {
215 let status = RpcStatusCode(unsafe {
216 grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
217 });
218
219 let details = if status == RpcStatusCode::OK {
220 None
221 } else {
222 unsafe {
223 let mut details_len = 0;
224 let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
225 self.ctx,
226 &mut details_len,
227 );
228 let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len);
229 Some(String::from_utf8_lossy(details_slice).into_owned())
230 }
231 };
232
233 RpcStatus::new(status, details)
234 }
235
236 /// Fetch the response bytes of the rpc call.
recv_message(&mut self) -> Option<MessageReader>237 pub fn recv_message(&mut self) -> Option<MessageReader> {
238 let buf = self.take_recv_message()?;
239 Some(GrpcByteBufferReader::new(buf))
240 }
241 }
242
243 impl Drop for BatchContext {
drop(&mut self)244 fn drop(&mut self) {
245 unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
246 }
247 }
248
249 #[inline]
box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void)250 fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
251 let tag_box = Box::new(tag);
252 (
253 tag_box.batch_ctx().unwrap().as_ptr(),
254 Box::into_raw(tag_box) as _,
255 )
256 }
257
258 /// A helper function that runs the batch call and checks the result.
check_run<F>(bt: BatchType, f: F) -> BatchFuture where F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,259 fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
260 where
261 F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
262 {
263 let (cq_f, tag) = CallTag::batch_pair(bt);
264 let (batch_ptr, tag_ptr) = box_batch_tag(tag);
265 let code = f(batch_ptr, tag_ptr);
266 if code != grpc_call_error::GRPC_CALL_OK {
267 unsafe {
268 Box::from_raw(tag_ptr);
269 }
270 panic!("create call fail: {:?}", code);
271 }
272 cq_f
273 }
274
275 /// A Call represents an RPC.
276 ///
277 /// When created, it is in a configuration state allowing properties to be
278 /// set until it is invoked. After invoke, the Call can have messages
279 /// written to it and read from it.
280 pub struct Call {
281 pub call: *mut grpc_call,
282 pub cq: CompletionQueue,
283 }
284
285 unsafe impl Send for Call {}
286
287 impl Call {
from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call288 pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
289 assert!(!call.is_null());
290 Call { call, cq }
291 }
292
293 /// Send a message asynchronously.
start_send_message( &mut self, msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture>294 pub fn start_send_message(
295 &mut self,
296 msg: &mut GrpcSlice,
297 write_flags: u32,
298 initial_meta: bool,
299 ) -> Result<BatchFuture> {
300 let _cq_ref = self.cq.borrow()?;
301 let i = if initial_meta { 1 } else { 0 };
302 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
303 grpc_sys::grpcwrap_call_send_message(
304 self.call,
305 ctx,
306 msg.as_mut_ptr(),
307 write_flags,
308 i,
309 tag,
310 )
311 });
312 Ok(f)
313 }
314
315 /// Finish the rpc call from client.
start_send_close_client(&mut self) -> Result<BatchFuture>316 pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
317 let _cq_ref = self.cq.borrow()?;
318 let f = check_run(BatchType::Finish, |_, tag| unsafe {
319 grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
320 });
321 Ok(f)
322 }
323
324 /// Receive a message asynchronously.
start_recv_message(&mut self) -> Result<BatchFuture>325 pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
326 let _cq_ref = self.cq.borrow()?;
327 let f = check_run(BatchType::Read, |ctx, tag| unsafe {
328 grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
329 });
330 Ok(f)
331 }
332
333 /// Start handling from server side.
334 ///
335 /// Future will finish once close is received by the server.
start_server_side(&mut self) -> Result<BatchFuture>336 pub fn start_server_side(&mut self) -> Result<BatchFuture> {
337 let _cq_ref = self.cq.borrow()?;
338 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
339 grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
340 });
341 Ok(f)
342 }
343
344 /// Send a status from server.
start_send_status_from_server( &mut self, status: &RpcStatus, send_empty_metadata: bool, payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture>345 pub fn start_send_status_from_server(
346 &mut self,
347 status: &RpcStatus,
348 send_empty_metadata: bool,
349 payload: &mut Option<GrpcSlice>,
350 write_flags: u32,
351 ) -> Result<BatchFuture> {
352 let _cq_ref = self.cq.borrow()?;
353 let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
354 let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
355 let details_ptr = status
356 .details
357 .as_ref()
358 .map_or_else(ptr::null, |s| s.as_ptr() as _);
359 let details_len = status.details.as_ref().map_or(0, String::len);
360 let payload_p = match payload {
361 Some(p) => p.as_mut_ptr(),
362 None => ptr::null_mut(),
363 };
364 grpc_sys::grpcwrap_call_send_status_from_server(
365 self.call,
366 ctx,
367 status.status.into(),
368 details_ptr,
369 details_len,
370 ptr::null_mut(),
371 send_empty_metadata,
372 payload_p,
373 write_flags,
374 tag,
375 )
376 });
377 Ok(f)
378 }
379
380 /// Abort an rpc call before handler is called.
abort(self, status: &RpcStatus)381 pub fn abort(self, status: &RpcStatus) {
382 match self.cq.borrow() {
383 // Queue is shutdown, ignore.
384 Err(Error::QueueShutdown) => return,
385 Err(e) => panic!("unexpected error when aborting call: {:?}", e),
386 _ => {}
387 }
388 let call_ptr = self.call;
389 let tag = CallTag::abort(self);
390 let (batch_ptr, tag_ptr) = box_batch_tag(tag);
391
392 let code = unsafe {
393 let details_ptr = status
394 .details
395 .as_ref()
396 .map_or_else(ptr::null, |s| s.as_ptr() as _);
397 let details_len = status.details.as_ref().map_or(0, String::len);
398 grpc_sys::grpcwrap_call_send_status_from_server(
399 call_ptr,
400 batch_ptr,
401 status.status.into(),
402 details_ptr,
403 details_len,
404 ptr::null_mut(),
405 1,
406 ptr::null_mut(),
407 0,
408 tag_ptr as *mut c_void,
409 )
410 };
411 if code != grpc_call_error::GRPC_CALL_OK {
412 unsafe {
413 Box::from_raw(tag_ptr);
414 }
415 panic!("create call fail: {:?}", code);
416 }
417 }
418
419 /// Cancel the rpc call by client.
cancel(&self)420 fn cancel(&self) {
421 match self.cq.borrow() {
422 // Queue is shutdown, ignore.
423 Err(Error::QueueShutdown) => return,
424 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
425 _ => {}
426 }
427 unsafe {
428 grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
429 }
430 }
431 }
432
433 impl Drop for Call {
drop(&mut self)434 fn drop(&mut self) {
435 unsafe { grpc_sys::grpc_call_unref(self.call) }
436 }
437 }
438
439 /// A share object for client streaming and duplex streaming call.
440 ///
441 /// In both cases, receiver and sender can be polled in the same time,
442 /// hence we need to share the call in the both sides and abort the sink
443 /// once the call is canceled or finished early.
444 struct ShareCall {
445 call: Call,
446 close_f: BatchFuture,
447 finished: bool,
448 status: Option<RpcStatus>,
449 }
450
451 impl ShareCall {
new(call: Call, close_f: BatchFuture) -> ShareCall452 fn new(call: Call, close_f: BatchFuture) -> ShareCall {
453 ShareCall {
454 call,
455 close_f,
456 finished: false,
457 status: None,
458 }
459 }
460
461 /// Poll if the call is still alive.
462 ///
463 /// If the call is still running, will register a notification for its completion.
poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>>464 fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
465 let res = match Pin::new(&mut self.close_f).poll(cx) {
466 Poll::Ready(Ok(reader)) => {
467 self.status = Some(RpcStatus::ok());
468 Poll::Ready(Ok(reader))
469 }
470 Poll::Pending => return Poll::Pending,
471 Poll::Ready(Err(Error::RpcFailure(status))) => {
472 self.status = Some(status.clone());
473 Poll::Ready(Err(Error::RpcFailure(status)))
474 }
475 res => res,
476 };
477
478 self.finished = true;
479 res
480 }
481
482 /// Check if the call is finished.
check_alive(&mut self) -> Result<()>483 fn check_alive(&mut self) -> Result<()> {
484 if self.finished {
485 // maybe can just take here.
486 return Err(Error::RpcFinished(self.status.clone()));
487 }
488
489 task::check_alive(&self.close_f)
490 }
491 }
492
493 /// A helper trait that allows executing function on the internal `ShareCall` struct.
494 trait ShareCallHolder {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R495 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
496 }
497
498 impl ShareCallHolder for ShareCall {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R499 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
500 f(self)
501 }
502 }
503
504 impl ShareCallHolder for Arc<Mutex<ShareCall>> {
call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R505 fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
506 let mut call = self.lock();
507 f(&mut call)
508 }
509 }
510
511 /// A helper struct for constructing Stream object for batch requests.
512 struct StreamingBase {
513 close_f: Option<BatchFuture>,
514 msg_f: Option<BatchFuture>,
515 read_done: bool,
516 }
517
518 impl StreamingBase {
new(close_f: Option<BatchFuture>) -> StreamingBase519 fn new(close_f: Option<BatchFuture>) -> StreamingBase {
520 StreamingBase {
521 close_f,
522 msg_f: None,
523 read_done: false,
524 }
525 }
526
poll<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, skip_finish_check: bool, ) -> Poll<Option<Result<MessageReader>>>527 fn poll<C: ShareCallHolder>(
528 &mut self,
529 cx: &mut Context,
530 call: &mut C,
531 skip_finish_check: bool,
532 ) -> Poll<Option<Result<MessageReader>>> {
533 if !skip_finish_check {
534 let mut finished = false;
535 if let Some(close_f) = &mut self.close_f {
536 if Pin::new(close_f).poll(cx)?.is_ready() {
537 // Don't return immediately, there may be pending data.
538 finished = true;
539 }
540 }
541 if finished {
542 self.close_f.take();
543 }
544 }
545
546 let mut bytes = None;
547 if !self.read_done {
548 if let Some(msg_f) = &mut self.msg_f {
549 bytes = ready!(Pin::new(msg_f).poll(cx)?);
550 if bytes.is_none() {
551 self.read_done = true;
552 }
553 }
554 }
555
556 if self.read_done {
557 if self.close_f.is_none() {
558 return Poll::Ready(None);
559 }
560 return Poll::Pending;
561 }
562
563 // so msg_f must be either stale or not initialized yet.
564 self.msg_f.take();
565 let msg_f = call.call(|c| c.call.start_recv_message())?;
566 self.msg_f = Some(msg_f);
567 if bytes.is_none() {
568 self.poll(cx, call, true)
569 } else {
570 Poll::Ready(bytes.map(Ok))
571 }
572 }
573
574 // Cancel the call if we still have some messages or did not
575 // receive status code.
on_drop<C: ShareCallHolder>(&self, call: &mut C)576 fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
577 if !self.read_done || self.close_f.is_some() {
578 call.call(|c| c.call.cancel());
579 }
580 }
581 }
582
583 /// Flags for write operations.
584 #[derive(Default, Clone, Copy)]
585 pub struct WriteFlags {
586 flags: u32,
587 }
588
589 impl WriteFlags {
590 /// Hint that the write may be buffered and need not go out on the wire immediately.
591 ///
592 /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
593 /// completion, but it need not buffer completely or at all.
buffer_hint(mut self, need_buffered: bool) -> WriteFlags594 pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
595 client::change_flag(
596 &mut self.flags,
597 grpc_sys::GRPC_WRITE_BUFFER_HINT,
598 need_buffered,
599 );
600 self
601 }
602
603 /// Force compression to be disabled.
force_no_compress(mut self, no_compress: bool) -> WriteFlags604 pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
605 client::change_flag(
606 &mut self.flags,
607 grpc_sys::GRPC_WRITE_NO_COMPRESS,
608 no_compress,
609 );
610 self
611 }
612
613 /// Get whether buffer hint is enabled.
get_buffer_hint(self) -> bool614 pub fn get_buffer_hint(self) -> bool {
615 (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
616 }
617
618 /// Get whether compression is disabled.
get_force_no_compress(self) -> bool619 pub fn get_force_no_compress(self) -> bool {
620 (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
621 }
622 }
623
624 /// A helper struct for constructing Sink object for batch requests.
625 struct SinkBase {
626 // Batch job to be executed in `poll_ready`.
627 batch_f: Option<BatchFuture>,
628 send_metadata: bool,
629 // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
630 // messages as much as possible.
631 enhance_buffer_strategy: bool,
632 // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
633 buffer: GrpcSlice,
634 // Write flags used to control the data to be sent in `buffer`.
635 buf_flags: Option<WriteFlags>,
636 // Used to records whether a message in which `buffer_hint` is false exists.
637 // Note: only used in enhanced buffer strategy.
638 last_buf_hint: bool,
639 }
640
641 impl SinkBase {
new(send_metadata: bool) -> SinkBase642 fn new(send_metadata: bool) -> SinkBase {
643 SinkBase {
644 batch_f: None,
645 buffer: GrpcSlice::default(),
646 buf_flags: None,
647 last_buf_hint: true,
648 send_metadata,
649 enhance_buffer_strategy: false,
650 }
651 }
652
start_send<T, C: ShareCallHolder>( &mut self, call: &mut C, t: &T, flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()>653 fn start_send<T, C: ShareCallHolder>(
654 &mut self,
655 call: &mut C,
656 t: &T,
657 flags: WriteFlags,
658 ser: SerializeFn<T>,
659 ) -> Result<()> {
660 // temporary fix: buffer hint with send meta will not send out any metadata.
661 // note: only the first message can enter this code block.
662 if self.send_metadata {
663 ser(t, &mut self.buffer);
664 self.buf_flags = Some(flags);
665 self.start_send_buffer_message(false, call)?;
666 self.send_metadata = false;
667 return Ok(());
668 }
669
670 // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
671 // that this is not the last message.
672 if self.buf_flags.is_some() {
673 self.start_send_buffer_message(true, call)?;
674 }
675
676 ser(t, &mut self.buffer);
677 let hint = flags.get_buffer_hint();
678 self.last_buf_hint &= hint;
679 self.buf_flags = Some(flags);
680
681 // If sink disable batch, start sending the message in buffer immediately.
682 if !self.enhance_buffer_strategy {
683 self.start_send_buffer_message(hint, call)?;
684 }
685
686 Ok(())
687 }
688
689 #[inline]
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>>690 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
691 match &mut self.batch_f {
692 None => return Poll::Ready(Ok(())),
693 Some(f) => {
694 ready!(Pin::new(f).poll(cx)?);
695 }
696 }
697 self.batch_f.take();
698 Poll::Ready(Ok(()))
699 }
700
701 #[inline]
poll_flush<C: ShareCallHolder>( &mut self, cx: &mut Context, call: &mut C, ) -> Poll<Result<()>>702 fn poll_flush<C: ShareCallHolder>(
703 &mut self,
704 cx: &mut Context,
705 call: &mut C,
706 ) -> Poll<Result<()>> {
707 if self.batch_f.is_some() {
708 ready!(self.poll_ready(cx)?);
709 }
710 if self.buf_flags.is_some() {
711 self.start_send_buffer_message(self.last_buf_hint, call)?;
712 ready!(self.poll_ready(cx)?);
713 }
714 self.last_buf_hint = true;
715 Poll::Ready(Ok(()))
716 }
717
718 #[inline]
start_send_buffer_message<C: ShareCallHolder>( &mut self, buffer_hint: bool, call: &mut C, ) -> Result<()>719 fn start_send_buffer_message<C: ShareCallHolder>(
720 &mut self,
721 buffer_hint: bool,
722 call: &mut C,
723 ) -> Result<()> {
724 // `start_send` is supposed to be called after `poll_ready` returns ready.
725 assert!(self.batch_f.is_none());
726
727 let mut flags = self.buf_flags.clone().unwrap();
728 flags = flags.buffer_hint(buffer_hint);
729 let write_f = call.call(|c| {
730 c.call
731 .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
732 })?;
733 self.batch_f = Some(write_f);
734 if !self.buffer.is_inline() {
735 self.buffer = GrpcSlice::default();
736 }
737 self.buf_flags.take();
738 Ok(())
739 }
740 }
741