1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use crate::call::client::{ 4 CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver, 5 ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, 6 }; 7 use crate::call::{Call, Method}; 8 use crate::channel::Channel; 9 use crate::error::Result; 10 use crate::task::Executor; 11 use crate::task::Kicker; 12 use futures::executor::block_on; 13 use futures::Future; 14 15 /// A generic client for making RPC calls. 16 #[derive(Clone)] 17 pub struct Client { 18 channel: Channel, 19 // Used to kick its completion queue. 20 kicker: Kicker, 21 } 22 23 impl Client { 24 /// Initialize a new [`Client`]. new(channel: Channel) -> Client25 pub fn new(channel: Channel) -> Client { 26 let kicker = channel.create_kicker().unwrap(); 27 Client { channel, kicker } 28 } 29 30 /// Create a synchronized unary RPC call. 31 /// 32 /// It uses futures::executor::block_on to wait for the futures. It's recommended to use 33 /// the asynchronous version. unary_call<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<Resp>34 pub fn unary_call<Req, Resp>( 35 &self, 36 method: &Method<Req, Resp>, 37 req: &Req, 38 opt: CallOption, 39 ) -> Result<Resp> { 40 block_on(self.unary_call_async(method, req, opt)?) 41 } 42 43 /// Create an asynchronized unary RPC call. unary_call_async<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>44 pub fn unary_call_async<Req, Resp>( 45 &self, 46 method: &Method<Req, Resp>, 47 req: &Req, 48 opt: CallOption, 49 ) -> Result<ClientUnaryReceiver<Resp>> { 50 Call::unary_async(&self.channel, method, req, opt) 51 } 52 53 /// Create an asynchronized client streaming call. 54 /// 55 /// Client can send a stream of requests and server responds with a single response. client_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>56 pub fn client_streaming<Req, Resp>( 57 &self, 58 method: &Method<Req, Resp>, 59 opt: CallOption, 60 ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> { 61 Call::client_streaming(&self.channel, method, opt) 62 } 63 64 /// Create an asynchronized server streaming call. 65 /// 66 /// Client sends on request and server responds with a stream of responses. server_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>67 pub fn server_streaming<Req, Resp>( 68 &self, 69 method: &Method<Req, Resp>, 70 req: &Req, 71 opt: CallOption, 72 ) -> Result<ClientSStreamReceiver<Resp>> { 73 Call::server_streaming(&self.channel, method, req, opt) 74 } 75 76 /// Create an asynchronized duplex streaming call. 77 /// 78 /// Client sends a stream of requests and server responds with a stream of responses. 79 /// The response stream is completely independent and both side can be sending messages 80 /// at the same time. duplex_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>81 pub fn duplex_streaming<Req, Resp>( 82 &self, 83 method: &Method<Req, Resp>, 84 opt: CallOption, 85 ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> { 86 Call::duplex_streaming(&self.channel, method, opt) 87 } 88 89 /// Spawn the future into current gRPC poll thread. 90 /// 91 /// This can reduce a lot of context switching, but please make 92 /// sure there is no heavy work in the future. spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,93 pub fn spawn<F>(&self, f: F) 94 where 95 F: Future<Output = ()> + Send + 'static, 96 { 97 let kicker = self.kicker.clone(); 98 Executor::new(self.channel.cq()).spawn(f, kicker) 99 } 100 } 101