• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use crate::call::client::{
4     CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
5     ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver,
6 };
7 use crate::call::{Call, Method};
8 use crate::channel::Channel;
9 use crate::error::Result;
10 use crate::task::Executor;
11 use crate::task::Kicker;
12 use futures::executor::block_on;
13 use futures::Future;
14 
15 /// A generic client for making RPC calls.
16 #[derive(Clone)]
17 pub struct Client {
18     channel: Channel,
19     // Used to kick its completion queue.
20     kicker: Kicker,
21 }
22 
23 impl Client {
24     /// Initialize a new [`Client`].
new(channel: Channel) -> Client25     pub fn new(channel: Channel) -> Client {
26         let kicker = channel.create_kicker().unwrap();
27         Client { channel, kicker }
28     }
29 
30     /// Create a synchronized unary RPC call.
31     ///
32     /// It uses futures::executor::block_on to wait for the futures. It's recommended to use
33     /// the asynchronous version.
unary_call<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<Resp>34     pub fn unary_call<Req, Resp>(
35         &self,
36         method: &Method<Req, Resp>,
37         req: &Req,
38         opt: CallOption,
39     ) -> Result<Resp> {
40         block_on(self.unary_call_async(method, req, opt)?)
41     }
42 
43     /// Create an asynchronized unary RPC call.
unary_call_async<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>44     pub fn unary_call_async<Req, Resp>(
45         &self,
46         method: &Method<Req, Resp>,
47         req: &Req,
48         opt: CallOption,
49     ) -> Result<ClientUnaryReceiver<Resp>> {
50         Call::unary_async(&self.channel, method, req, opt)
51     }
52 
53     /// Create an asynchronized client streaming call.
54     ///
55     /// Client can send a stream of requests and server responds with a single response.
client_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>56     pub fn client_streaming<Req, Resp>(
57         &self,
58         method: &Method<Req, Resp>,
59         opt: CallOption,
60     ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
61         Call::client_streaming(&self.channel, method, opt)
62     }
63 
64     /// Create an asynchronized server streaming call.
65     ///
66     /// Client sends on request and server responds with a stream of responses.
server_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>67     pub fn server_streaming<Req, Resp>(
68         &self,
69         method: &Method<Req, Resp>,
70         req: &Req,
71         opt: CallOption,
72     ) -> Result<ClientSStreamReceiver<Resp>> {
73         Call::server_streaming(&self.channel, method, req, opt)
74     }
75 
76     /// Create an asynchronized duplex streaming call.
77     ///
78     /// Client sends a stream of requests and server responds with a stream of responses.
79     /// The response stream is completely independent and both side can be sending messages
80     /// at the same time.
duplex_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>81     pub fn duplex_streaming<Req, Resp>(
82         &self,
83         method: &Method<Req, Resp>,
84         opt: CallOption,
85     ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
86         Call::duplex_streaming(&self.channel, method, opt)
87     }
88 
89     /// Spawn the future into current gRPC poll thread.
90     ///
91     /// This can reduce a lot of context switching, but please make
92     /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,93     pub fn spawn<F>(&self, f: F)
94     where
95         F: Future<Output = ()> + Send + 'static,
96     {
97         let kicker = self.kicker.clone();
98         Executor::new(self.channel.cq()).spawn(f, kicker)
99     }
100 }
101