• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::borrow::Cow;
4 use std::collections::hash_map::Entry;
5 use std::collections::HashMap;
6 use std::ffi::{CStr, CString};
7 use std::future::Future;
8 use std::sync::Arc;
9 use std::time::Duration;
10 use std::{cmp, i32, ptr};
11 
12 use crate::{
13     grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
14     Deadline,
15 };
16 use libc::{self, c_char, c_int};
17 
18 use crate::call::{Call, Method};
19 use crate::cq::CompletionQueue;
20 use crate::env::Environment;
21 use crate::error::Result;
22 use crate::task::CallTag;
23 use crate::task::Kicker;
24 use crate::CallOption;
25 use crate::ResourceQuota;
26 
27 pub use crate::grpc_sys::{
28     grpc_compression_algorithm as CompressionAlgorithms,
29     grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
30 };
31 
32 /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
format_user_agent_string(agent: &str) -> CString33 fn format_user_agent_string(agent: &str) -> CString {
34     let version = "0.9.1";
35     let trimed_agent = agent.trim();
36     let val = if trimed_agent.is_empty() {
37         format!("grpc-rust/{}", version)
38     } else {
39         format!("{} grpc-rust/{}", trimed_agent, version)
40     };
41     CString::new(val).unwrap()
42 }
43 
dur_to_ms(dur: Duration) -> i3244 fn dur_to_ms(dur: Duration) -> i32 {
45     let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
46     cmp::min(i32::MAX as u64, millis) as i32
47 }
48 
49 enum Options {
50     Integer(i32),
51     String(CString),
52     Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
53 }
54 
55 /// The optimization target for a [`Channel`].
56 #[derive(Clone, Copy)]
57 pub enum OptTarget {
58     /// Minimize latency at the cost of throughput.
59     Latency,
60     /// Balance latency and throughput.
61     Blend,
62     /// Maximize throughput at the expense of latency.
63     Throughput,
64 }
65 
66 #[derive(Clone, Copy)]
67 pub enum LbPolicy {
68     PickFirst,
69     RoundRobin,
70 }
71 
72 /// [`Channel`] factory in order to configure the properties.
73 pub struct ChannelBuilder {
74     env: Arc<Environment>,
75     options: HashMap<Cow<'static, [u8]>, Options>,
76 }
77 
78 impl ChannelBuilder {
79     /// Initialize a new [`ChannelBuilder`].
new(env: Arc<Environment>) -> ChannelBuilder80     pub fn new(env: Arc<Environment>) -> ChannelBuilder {
81         ChannelBuilder {
82             env,
83             options: HashMap::new(),
84         }
85     }
86 
87     /// Set default authority to pass if none specified on call construction.
default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder88     pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
89         let authority = CString::new(authority).unwrap();
90         self.options.insert(
91             Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
92             Options::String(authority),
93         );
94         self
95     }
96 
97     /// Set resource quota by consuming a ResourceQuota
set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder98     pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
99         unsafe {
100             self.options.insert(
101                 Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
102                 Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
103             );
104         }
105         self
106     }
107 
108     /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
max_concurrent_stream(mut self, num: i32) -> ChannelBuilder109     pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
110         self.options.insert(
111             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
112             Options::Integer(num),
113         );
114         self
115     }
116 
117     /// Set maximum message length that the channel can receive. `-1` means unlimited.
max_receive_message_len(mut self, len: i32) -> ChannelBuilder118     pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
119         self.options.insert(
120             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
121             Options::Integer(len),
122         );
123         self
124     }
125 
126     /// Set maximum message length that the channel can send. `-1` means unlimited.
max_send_message_len(mut self, len: i32) -> ChannelBuilder127     pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
128         self.options.insert(
129             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
130             Options::Integer(len),
131         );
132         self
133     }
134 
135     /// Set maximum time between subsequent connection attempts.
max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder136     pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
137         self.options.insert(
138             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
139             Options::Integer(dur_to_ms(backoff)),
140         );
141         self
142     }
143 
144     /// Set time between the first and second connection attempts.
initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder145     pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
146         self.options.insert(
147             Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
148             Options::Integer(dur_to_ms(backoff)),
149         );
150         self
151     }
152 
153     /// Set initial sequence number for HTTP/2 transports.
https_initial_seq_number(mut self, number: i32) -> ChannelBuilder154     pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
155         self.options.insert(
156             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
157             Options::Integer(number),
158         );
159         self
160     }
161 
162     /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
163     /// values help throughput on high-latency connections.
stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder164     pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
165         self.options.insert(
166             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
167             Options::Integer(window_size),
168         );
169         self
170     }
171 
172     /// Set primary user agent, which goes at the start of the user-agent metadata sent on
173     /// each request.
primary_user_agent(mut self, agent: &str) -> ChannelBuilder174     pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
175         let agent_string = format_user_agent_string(agent);
176         self.options.insert(
177             Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
178             Options::String(agent_string),
179         );
180         self
181     }
182 
183     /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
reuse_port(mut self, reuse: bool) -> ChannelBuilder184     pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
185         let opt = if reuse { 1 } else { 0 };
186         self.options.insert(
187             Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
188             Options::Integer(opt),
189         );
190         self
191     }
192 
193     /// Set the size of slice to try and read from the wire each time.
tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder194     pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
195         self.options.insert(
196             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
197             Options::Integer(bytes),
198         );
199         self
200     }
201 
202     /// Set the minimum size of slice to try and read from the wire each time.
tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder203     pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
204         self.options.insert(
205             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
206             Options::Integer(bytes),
207         );
208         self
209     }
210 
211     /// Set the maximum size of slice to try and read from the wire each time.
tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder212     pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
213         self.options.insert(
214             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
215             Options::Integer(bytes),
216         );
217         self
218     }
219 
220     /// How much data are we willing to queue up per stream if
221     /// write_buffer_hint is set. This is an upper bound.
http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder222     pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
223         self.options.insert(
224             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
225             Options::Integer(size),
226         );
227         self
228     }
229 
230     /// How big a frame are we willing to receive via HTTP/2.
231     /// Min 16384, max 16777215.
232     /// Larger values give lower CPU usage for large messages, but more head of line
233     /// blocking for small messages.
http2_max_frame_size(mut self, size: i32) -> ChannelBuilder234     pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
235         self.options.insert(
236             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
237             Options::Integer(size),
238         );
239         self
240     }
241 
242     /// Set whether to enable BDP probing.
http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder243     pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
244         self.options.insert(
245             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
246             Options::Integer(enable as i32),
247         );
248         self
249     }
250 
251     /// Minimum time between sending successive ping frames without receiving any
252     /// data frame.
http2_min_sent_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder253     pub fn http2_min_sent_ping_interval_without_data(
254         mut self,
255         interval: Duration,
256     ) -> ChannelBuilder {
257         self.options.insert(
258             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
259             Options::Integer(dur_to_ms(interval)),
260         );
261         self
262     }
263 
264     /// Minimum allowed time between receiving successive ping frames without
265     /// sending any data frame.
http2_min_recv_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder266     pub fn http2_min_recv_ping_interval_without_data(
267         mut self,
268         interval: Duration,
269     ) -> ChannelBuilder {
270         self.options.insert(
271             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
272             Options::Integer(dur_to_ms(interval)),
273         );
274         self
275     }
276 
277     /// How many pings can we send before needing to send a data frame or header
278     /// frame? (0 indicates that an infinite number of pings can be sent without
279     /// sending a data frame or header frame)
http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder280     pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
281         self.options.insert(
282             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
283             Options::Integer(num),
284         );
285         self
286     }
287 
288     /// How many misbehaving pings the server can bear before sending goaway and
289     /// closing the transport? (0 indicates that the server can bear an infinite
290     /// number of misbehaving pings)
http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder291     pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
292         self.options.insert(
293             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
294             Options::Integer(num),
295         );
296         self
297     }
298 
299     /// Set default compression algorithm for the channel.
default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder300     pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
301         self.options.insert(
302             Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
303             Options::Integer(algo as i32),
304         );
305         self
306     }
307 
308     /// Set default compression level for the channel.
default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder309     pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
310         self.options.insert(
311             Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
312             Options::Integer(level as i32),
313         );
314         self
315     }
316 
317     /// After a duration of this time the client/server pings its peer to see
318     /// if the transport is still alive.
keepalive_time(mut self, timeout: Duration) -> ChannelBuilder319     pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
320         self.options.insert(
321             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
322             Options::Integer(dur_to_ms(timeout)),
323         );
324         self
325     }
326 
327     /// After waiting for a duration of this time, if the keepalive ping sender does
328     /// not receive the ping ack, it will close the transport.
keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder329     pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
330         self.options.insert(
331             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
332             Options::Integer(dur_to_ms(timeout)),
333         );
334         self
335     }
336 
337     /// Is it permissible to send keepalive pings without any outstanding streams.
keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder338     pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
339         self.options.insert(
340             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
341             Options::Integer(allow as i32),
342         );
343         self
344     }
345 
346     /// Set optimization target for the channel. See [`OptTarget`] for all available
347     /// optimization targets. Defaults to `OptTarget::Blend`.
optimize_for(mut self, target: OptTarget) -> ChannelBuilder348     pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
349         let val = match target {
350             OptTarget::Latency => CString::new("latency"),
351             OptTarget::Blend => CString::new("blend"),
352             OptTarget::Throughput => CString::new("throughput"),
353         };
354         self.options.insert(
355             Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
356             Options::String(val.unwrap()),
357         );
358         self
359     }
360 
361     /// Set LbPolicy for channel
362     ///
363     /// This method allows one to set the load-balancing policy for a given channel.
load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder364     pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
365         let val = match lb_policy {
366             LbPolicy::PickFirst => CString::new("pick_first"),
367             LbPolicy::RoundRobin => CString::new("round_robin"),
368         };
369         self.options.insert(
370             Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
371             Options::String(val.unwrap()),
372         );
373         self
374     }
375 
376     /// Set a raw integer configuration.
377     ///
378     /// This method is only for bench usage, users should use the encapsulated API instead.
379     #[doc(hidden)]
raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder380     pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
381         self.options
382             .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
383         self
384     }
385 
386     /// Set a raw string configuration.
387     ///
388     /// This method is only for bench usage, users should use the encapsulated API instead.
389     #[doc(hidden)]
raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder390     pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
391         self.options
392             .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
393         self
394     }
395 
396     /// Build `ChannelArgs` from the current configuration.
397     #[allow(clippy::useless_conversion)]
398     #[allow(clippy::cmp_owned)]
build_args(&self) -> ChannelArgs399     pub fn build_args(&self) -> ChannelArgs {
400         let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
401         for (i, (k, v)) in self.options.iter().enumerate() {
402             let key = k.as_ptr() as *const c_char;
403             match *v {
404                 Options::Integer(val) => unsafe {
405                     // On most modern compiler and architect, c_int is the same as i32,
406                     // panic directly to simplify signature.
407                     assert!(
408                         val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
409                         "{} is out of range for {:?}",
410                         val,
411                         CStr::from_bytes_with_nul(k).unwrap()
412                     );
413                     grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
414                 },
415                 Options::String(ref val) => unsafe {
416                     grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
417                 },
418                 Options::Pointer(ref quota, vtable) => unsafe {
419                     grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
420                         args,
421                         i,
422                         key,
423                         quota.get_ptr() as _,
424                         vtable,
425                     )
426                 },
427             }
428         }
429         ChannelArgs { args }
430     }
431 
prepare_connect_args(&mut self) -> ChannelArgs432     fn prepare_connect_args(&mut self) -> ChannelArgs {
433         if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
434             grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
435         )) {
436             e.insert(Options::String(format_user_agent_string("")));
437         }
438         self.build_args()
439     }
440 
441     /// Build an insecure [`Channel`] that connects to a specific address.
connect(mut self, addr: &str) -> Channel442     pub fn connect(mut self, addr: &str) -> Channel {
443         let args = self.prepare_connect_args();
444         let addr = CString::new(addr).unwrap();
445         let addr_ptr = addr.as_ptr();
446         let channel =
447             unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) };
448 
449         unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
450     }
451 
452     /// Build an insecure [`Channel`] taking over an established connection from
453     /// a file descriptor. The target string given is purely informative to
454     /// describe the endpoint of the connection. Takes ownership of the given
455     /// file descriptor and will close it when the connection is closed.
456     ///
457     /// This function is available on posix systems only.
458     ///
459     /// # Safety
460     ///
461     /// The file descriptor must correspond to a connected stream socket. After
462     /// this call, the socket must not be accessed (read / written / closed)
463     /// by other code.
464     #[cfg(unix)]
connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel465     pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
466         let args = self.prepare_connect_args();
467         let target = CString::new(target).unwrap();
468         let target_ptr = target.as_ptr();
469         let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args);
470 
471         Channel::new(self.env.pick_cq(), self.env, channel)
472     }
473 }
474 
475 #[cfg(feature = "secure")]
476 mod secure_channel {
477     use std::borrow::Cow;
478     use std::ffi::CString;
479     use std::ptr;
480 
481     use crate::grpc_sys;
482 
483     use crate::ChannelCredentials;
484 
485     use super::{Channel, ChannelBuilder, Options};
486 
487     const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";
488 
489     impl ChannelBuilder {
490         /// The caller of the secure_channel_create functions may override the target name used
491         /// for SSL host name checking using this channel argument.
492         ///
493         /// This *should* be used for testing only.
494         #[doc(hidden)]
override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder495         pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
496             let target = CString::new(target).unwrap();
497             self.options.insert(
498                 Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
499                 Options::String(target),
500             );
501             self
502         }
503 
504         /// Build a secure [`Channel`] that connects to a specific address.
secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel505         pub fn secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel {
506             let args = self.prepare_connect_args();
507             let addr = CString::new(addr).unwrap();
508             let addr_ptr = addr.as_ptr();
509             let channel = unsafe {
510                 grpc_sys::grpc_secure_channel_create(
511                     creds.as_mut_ptr(),
512                     addr_ptr,
513                     args.args,
514                     ptr::null_mut(),
515                 )
516             };
517 
518             unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
519         }
520     }
521 }
522 
523 pub struct ChannelArgs {
524     args: *mut grpc_channel_args,
525 }
526 
527 impl ChannelArgs {
as_ptr(&self) -> *const grpc_channel_args528     pub fn as_ptr(&self) -> *const grpc_channel_args {
529         self.args
530     }
531 }
532 
533 impl Drop for ChannelArgs {
drop(&mut self)534     fn drop(&mut self) {
535         unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
536     }
537 }
538 
539 struct ChannelInner {
540     _env: Arc<Environment>,
541     channel: *mut grpc_channel,
542 }
543 
544 impl ChannelInner {
545     // If try_to_connect is true, the channel will try to establish a connection, potentially
546     // changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState547     fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
548         let should_try = if try_to_connect { 1 } else { 0 };
549         unsafe { grpc_sys::grpc_channel_check_connectivity_state(self.channel, should_try) }
550     }
551 }
552 
553 impl Drop for ChannelInner {
drop(&mut self)554     fn drop(&mut self) {
555         unsafe {
556             grpc_sys::grpc_channel_destroy(self.channel);
557         }
558     }
559 }
560 
561 /// A gRPC channel.
562 ///
563 /// Channels are an abstraction of long-lived connections to remote servers. More client objects
564 /// can reuse the same channel.
565 ///
566 /// Use [`ChannelBuilder`] to build a [`Channel`].
567 #[derive(Clone)]
568 pub struct Channel {
569     inner: Arc<ChannelInner>,
570     cq: CompletionQueue,
571 }
572 
573 unsafe impl Send for Channel {}
574 unsafe impl Sync for Channel {}
575 
576 impl Channel {
577     /// Create a new channel. Avoid using this directly and use
578     /// [`ChannelBuilder`] to build a [`Channel`] instead.
579     ///
580     /// # Safety
581     ///
582     /// The given grpc_channel must correspond to an instantiated grpc core
583     /// channel. Takes exclusive ownership of the channel and will close it after
584     /// use.
new( cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel, ) -> Channel585     pub unsafe fn new(
586         cq: CompletionQueue,
587         env: Arc<Environment>,
588         channel: *mut grpc_channel,
589     ) -> Channel {
590         Channel {
591             inner: Arc::new(ChannelInner { _env: env, channel }),
592             cq,
593         }
594     }
595 
596     /// If try_to_connect is true, the channel will try to establish a connection, potentially
597     /// changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState598     pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
599         self.inner.check_connectivity_state(try_to_connect)
600     }
601 
602     /// Blocking wait for channel state change or deadline expiration.
603     ///
604     /// `check_connectivity_state` needs to be called to get the current state. Returns false
605     /// means deadline excceeds before observing any state changes.
wait_for_state_change( &self, last_observed: ConnectivityState, deadline: impl Into<Deadline>, ) -> impl Future<Output = bool>606     pub fn wait_for_state_change(
607         &self,
608         last_observed: ConnectivityState,
609         deadline: impl Into<Deadline>,
610     ) -> impl Future<Output = bool> {
611         let (cq_f, prom) = CallTag::action_pair();
612         let prom_box = Box::new(prom);
613         let tag = Box::into_raw(prom_box);
614         let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
615             unsafe {
616                 grpcio_sys::grpc_channel_watch_connectivity_state(
617                     self.inner.channel,
618                     last_observed,
619                     deadline.into().spec(),
620                     cq_ref.as_ptr(),
621                     tag as *mut _,
622                 )
623             }
624             true
625         } else {
626             // It's already shutdown.
627             false
628         };
629         async move { should_wait && cq_f.await.unwrap() }
630     }
631 
632     /// Wait for this channel to be connected.
633     ///
634     /// Returns false means deadline excceeds before connection is connected.
wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool635     pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
636         // Fast path, it's probably connected.
637         let mut state = self.check_connectivity_state(true);
638         if ConnectivityState::GRPC_CHANNEL_READY == state {
639             return true;
640         }
641         let deadline = deadline.into();
642         loop {
643             if self.wait_for_state_change(state, deadline).await {
644                 state = self.check_connectivity_state(true);
645                 match state {
646                     ConnectivityState::GRPC_CHANNEL_READY => return true,
647                     ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
648                     _ => (),
649                 }
650                 continue;
651             }
652             return false;
653         }
654     }
655 
656     /// Create a Kicker.
create_kicker(&self) -> Result<Kicker>657     pub(crate) fn create_kicker(&self) -> Result<Kicker> {
658         let cq_ref = self.cq.borrow()?;
659         let raw_call = unsafe {
660             let ch = self.inner.channel;
661             let cq = cq_ref.as_ptr();
662             // Do not timeout.
663             let timeout = gpr_timespec::inf_future();
664             grpc_sys::grpcwrap_channel_create_call(
665                 ch,
666                 ptr::null_mut(),
667                 0,
668                 cq,
669                 ptr::null(),
670                 0,
671                 ptr::null(),
672                 0,
673                 timeout,
674             )
675         };
676         let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
677         Ok(Kicker::from_call(call))
678     }
679 
680     /// Create a call using the method and option.
create_call<Req, Resp>( &self, method: &Method<Req, Resp>, opt: &CallOption, ) -> Result<Call>681     pub(crate) fn create_call<Req, Resp>(
682         &self,
683         method: &Method<Req, Resp>,
684         opt: &CallOption,
685     ) -> Result<Call> {
686         let cq_ref = self.cq.borrow()?;
687         let raw_call = unsafe {
688             let ch = self.inner.channel;
689             let cq = cq_ref.as_ptr();
690             let method_ptr = method.name.as_ptr();
691             let method_len = method.name.len();
692             let timeout = opt
693                 .get_timeout()
694                 .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
695             grpc_sys::grpcwrap_channel_create_call(
696                 ch,
697                 ptr::null_mut(),
698                 0,
699                 cq,
700                 method_ptr as *const _,
701                 method_len,
702                 ptr::null(),
703                 0,
704                 timeout,
705             )
706         };
707 
708         unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
709     }
710 
cq(&self) -> &CompletionQueue711     pub(crate) fn cq(&self) -> &CompletionQueue {
712         &self.cq
713     }
714 }
715