• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use crate::buf::GrpcSlice;
4 use crate::call::MessageReader;
5 use crate::error::Result;
6 
7 pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
8 pub type SerializeFn<T> = fn(&T, &mut GrpcSlice) -> Result<()>;
9 
10 /// According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md, grpc uses
11 /// a four bytes to describe the length of a message, so it should not exceed u32::MAX.
12 pub const MAX_MESSAGE_SIZE: usize = std::u32::MAX as usize;
13 
14 /// Defines how to serialize and deserialize between the specialized type and byte slice.
15 pub struct Marshaller<T> {
16     // Use function pointer here to simplify the signature.
17     // Compiler will probably inline the function so performance
18     // impact can be omitted.
19     //
20     // Using trait will require a trait object or generic, which will
21     // either have performance impact or make signature complicated.
22     //
23     // const function is not stable yet (rust-lang/rust#24111), hence
24     // make all fields public.
25     /// The serialize function.
26     pub ser: SerializeFn<T>,
27 
28     /// The deserialize function.
29     pub de: DeserializeFn<T>,
30 }
31 
32 #[cfg(feature = "protobuf-codec")]
33 pub mod pb_codec {
34     use protobuf::{CodedInputStream, CodedOutputStream, Message};
35 
36     use super::{MessageReader, MAX_MESSAGE_SIZE};
37     use crate::buf::GrpcSlice;
38     use crate::error::{Error, Result};
39 
40     #[inline]
ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()>41     pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()> {
42         let cap = t.compute_size() as usize;
43         // FIXME: This is not a practical fix until stepancheg/rust-protobuf#530 is fixed.
44         if cap <= MAX_MESSAGE_SIZE {
45             unsafe {
46                 let bytes = buf.realloc(cap);
47                 let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
48                 let mut s = CodedOutputStream::bytes(raw_bytes);
49                 t.write_to_with_cached_sizes(&mut s).map_err(Into::into)
50             }
51         } else {
52             Err(Error::Codec(
53                 format!("message is too large: {} > {}", cap, MAX_MESSAGE_SIZE).into(),
54             ))
55         }
56     }
57 
58     #[inline]
de<T: Message>(mut reader: MessageReader) -> Result<T>59     pub fn de<T: Message>(mut reader: MessageReader) -> Result<T> {
60         let mut s = CodedInputStream::from_buffered_reader(&mut reader);
61         let mut m = T::new();
62         m.merge_from(&mut s)?;
63         Ok(m)
64     }
65 }
66 
67 #[cfg(feature = "prost-codec")]
68 pub mod pr_codec {
69     use prost::Message;
70 
71     use super::{MessageReader, MAX_MESSAGE_SIZE};
72     use crate::buf::GrpcSlice;
73     use crate::error::{Error, Result};
74 
75     #[inline]
ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()>76     pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()> {
77         let size = msg.encoded_len();
78         if size <= MAX_MESSAGE_SIZE {
79             unsafe {
80                 let bytes = buf.realloc(size);
81                 let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
82                 msg.encode(&mut b)?;
83                 debug_assert!(b.is_empty());
84             }
85             Ok(())
86         } else {
87             Err(Error::Codec(
88                 format!("message is too large: {} > {}", size, MAX_MESSAGE_SIZE).into(),
89             ))
90         }
91     }
92 
93     #[inline]
de<M: Message + Default>(mut reader: MessageReader) -> Result<M>94     pub fn de<M: Message + Default>(mut reader: MessageReader) -> Result<M> {
95         use bytes::buf::Buf;
96         reader.advance(0);
97         M::decode(reader).map_err(Into::into)
98     }
99 }
100