1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use crate::buf::GrpcSlice; 4 use crate::call::MessageReader; 5 use crate::error::Result; 6 7 pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>; 8 pub type SerializeFn<T> = fn(&T, &mut GrpcSlice) -> Result<()>; 9 10 /// According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md, grpc uses 11 /// a four bytes to describe the length of a message, so it should not exceed u32::MAX. 12 pub const MAX_MESSAGE_SIZE: usize = std::u32::MAX as usize; 13 14 /// Defines how to serialize and deserialize between the specialized type and byte slice. 15 pub struct Marshaller<T> { 16 // Use function pointer here to simplify the signature. 17 // Compiler will probably inline the function so performance 18 // impact can be omitted. 19 // 20 // Using trait will require a trait object or generic, which will 21 // either have performance impact or make signature complicated. 22 // 23 // const function is not stable yet (rust-lang/rust#24111), hence 24 // make all fields public. 25 /// The serialize function. 26 pub ser: SerializeFn<T>, 27 28 /// The deserialize function. 29 pub de: DeserializeFn<T>, 30 } 31 32 #[cfg(feature = "protobuf-codec")] 33 pub mod pb_codec { 34 use protobuf::{CodedInputStream, CodedOutputStream, Message}; 35 36 use super::{MessageReader, MAX_MESSAGE_SIZE}; 37 use crate::buf::GrpcSlice; 38 use crate::error::{Error, Result}; 39 40 #[inline] ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()>41 pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()> { 42 let cap = t.compute_size() as usize; 43 // FIXME: This is not a practical fix until stepancheg/rust-protobuf#530 is fixed. 44 if cap <= MAX_MESSAGE_SIZE { 45 unsafe { 46 let bytes = buf.realloc(cap); 47 let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); 48 let mut s = CodedOutputStream::bytes(raw_bytes); 49 t.write_to_with_cached_sizes(&mut s).map_err(Into::into) 50 } 51 } else { 52 Err(Error::Codec( 53 format!("message is too large: {} > {}", cap, MAX_MESSAGE_SIZE).into(), 54 )) 55 } 56 } 57 58 #[inline] de<T: Message>(mut reader: MessageReader) -> Result<T>59 pub fn de<T: Message>(mut reader: MessageReader) -> Result<T> { 60 let mut s = CodedInputStream::from_buffered_reader(&mut reader); 61 let mut m = T::new(); 62 m.merge_from(&mut s)?; 63 Ok(m) 64 } 65 } 66 67 #[cfg(feature = "prost-codec")] 68 pub mod pr_codec { 69 use prost::Message; 70 71 use super::{MessageReader, MAX_MESSAGE_SIZE}; 72 use crate::buf::GrpcSlice; 73 use crate::error::{Error, Result}; 74 75 #[inline] ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()>76 pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()> { 77 let size = msg.encoded_len(); 78 if size <= MAX_MESSAGE_SIZE { 79 unsafe { 80 let bytes = buf.realloc(size); 81 let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); 82 msg.encode(&mut b)?; 83 debug_assert!(b.is_empty()); 84 } 85 Ok(()) 86 } else { 87 Err(Error::Codec( 88 format!("message is too large: {} > {}", size, MAX_MESSAGE_SIZE).into(), 89 )) 90 } 91 } 92 93 #[inline] de<M: Message + Default>(mut reader: MessageReader) -> Result<M>94 pub fn de<M: Message + Default>(mut reader: MessageReader) -> Result<M> { 95 use bytes::buf::Buf; 96 reader.advance(0); 97 M::decode(reader).map_err(Into::into) 98 } 99 } 100