1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::borrow::Cow;
4 use std::collections::hash_map::Entry;
5 use std::collections::HashMap;
6 use std::ffi::{CStr, CString};
7 use std::future::Future;
8 use std::sync::Arc;
9 use std::time::Duration;
10 use std::{cmp, i32, ptr};
11
12 use crate::{
13 grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
14 Deadline,
15 };
16 use libc::{self, c_char, c_int};
17
18 use crate::call::{Call, Method};
19 use crate::cq::CompletionQueue;
20 use crate::env::Environment;
21 use crate::error::Result;
22 use crate::task::CallTag;
23 use crate::task::Kicker;
24 use crate::CallOption;
25 use crate::ResourceQuota;
26
27 pub use crate::grpc_sys::{
28 grpc_compression_algorithm as CompressionAlgorithms,
29 grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
30 };
31
32 /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
format_user_agent_string(agent: &str) -> CString33 fn format_user_agent_string(agent: &str) -> CString {
34 let version = "0.9.1";
35 let trimed_agent = agent.trim();
36 let val = if trimed_agent.is_empty() {
37 format!("grpc-rust/{}", version)
38 } else {
39 format!("{} grpc-rust/{}", trimed_agent, version)
40 };
41 CString::new(val).unwrap()
42 }
43
dur_to_ms(dur: Duration) -> i3244 fn dur_to_ms(dur: Duration) -> i32 {
45 let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
46 cmp::min(i32::MAX as u64, millis) as i32
47 }
48
49 enum Options {
50 Integer(i32),
51 String(CString),
52 Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
53 }
54
55 /// The optimization target for a [`Channel`].
56 #[derive(Clone, Copy)]
57 pub enum OptTarget {
58 /// Minimize latency at the cost of throughput.
59 Latency,
60 /// Balance latency and throughput.
61 Blend,
62 /// Maximize throughput at the expense of latency.
63 Throughput,
64 }
65
66 #[derive(Clone, Copy)]
67 pub enum LbPolicy {
68 PickFirst,
69 RoundRobin,
70 }
71
72 /// [`Channel`] factory in order to configure the properties.
73 pub struct ChannelBuilder {
74 env: Arc<Environment>,
75 options: HashMap<Cow<'static, [u8]>, Options>,
76 }
77
78 impl ChannelBuilder {
79 /// Initialize a new [`ChannelBuilder`].
new(env: Arc<Environment>) -> ChannelBuilder80 pub fn new(env: Arc<Environment>) -> ChannelBuilder {
81 ChannelBuilder {
82 env,
83 options: HashMap::new(),
84 }
85 }
86
87 /// Set default authority to pass if none specified on call construction.
default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder88 pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
89 let authority = CString::new(authority).unwrap();
90 self.options.insert(
91 Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
92 Options::String(authority),
93 );
94 self
95 }
96
97 /// Set resource quota by consuming a ResourceQuota
set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder98 pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
99 unsafe {
100 self.options.insert(
101 Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
102 Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
103 );
104 }
105 self
106 }
107
108 /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
max_concurrent_stream(mut self, num: i32) -> ChannelBuilder109 pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
110 self.options.insert(
111 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
112 Options::Integer(num),
113 );
114 self
115 }
116
117 /// Set maximum message length that the channel can receive. `-1` means unlimited.
max_receive_message_len(mut self, len: i32) -> ChannelBuilder118 pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
119 self.options.insert(
120 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
121 Options::Integer(len),
122 );
123 self
124 }
125
126 /// Set maximum message length that the channel can send. `-1` means unlimited.
max_send_message_len(mut self, len: i32) -> ChannelBuilder127 pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
128 self.options.insert(
129 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
130 Options::Integer(len),
131 );
132 self
133 }
134
135 /// Set maximum time between subsequent connection attempts.
max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder136 pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
137 self.options.insert(
138 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
139 Options::Integer(dur_to_ms(backoff)),
140 );
141 self
142 }
143
144 /// Set time between the first and second connection attempts.
initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder145 pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
146 self.options.insert(
147 Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
148 Options::Integer(dur_to_ms(backoff)),
149 );
150 self
151 }
152
153 /// Set initial sequence number for HTTP/2 transports.
https_initial_seq_number(mut self, number: i32) -> ChannelBuilder154 pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
155 self.options.insert(
156 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
157 Options::Integer(number),
158 );
159 self
160 }
161
162 /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
163 /// values help throughput on high-latency connections.
stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder164 pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
165 self.options.insert(
166 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
167 Options::Integer(window_size),
168 );
169 self
170 }
171
172 /// Set primary user agent, which goes at the start of the user-agent metadata sent on
173 /// each request.
primary_user_agent(mut self, agent: &str) -> ChannelBuilder174 pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
175 let agent_string = format_user_agent_string(agent);
176 self.options.insert(
177 Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
178 Options::String(agent_string),
179 );
180 self
181 }
182
183 /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
reuse_port(mut self, reuse: bool) -> ChannelBuilder184 pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
185 let opt = if reuse { 1 } else { 0 };
186 self.options.insert(
187 Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
188 Options::Integer(opt),
189 );
190 self
191 }
192
193 /// Set the size of slice to try and read from the wire each time.
tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder194 pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
195 self.options.insert(
196 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
197 Options::Integer(bytes),
198 );
199 self
200 }
201
202 /// Set the minimum size of slice to try and read from the wire each time.
tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder203 pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
204 self.options.insert(
205 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
206 Options::Integer(bytes),
207 );
208 self
209 }
210
211 /// Set the maximum size of slice to try and read from the wire each time.
tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder212 pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
213 self.options.insert(
214 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
215 Options::Integer(bytes),
216 );
217 self
218 }
219
220 /// How much data are we willing to queue up per stream if
221 /// write_buffer_hint is set. This is an upper bound.
http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder222 pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
223 self.options.insert(
224 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
225 Options::Integer(size),
226 );
227 self
228 }
229
230 /// How big a frame are we willing to receive via HTTP/2.
231 /// Min 16384, max 16777215.
232 /// Larger values give lower CPU usage for large messages, but more head of line
233 /// blocking for small messages.
http2_max_frame_size(mut self, size: i32) -> ChannelBuilder234 pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
235 self.options.insert(
236 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
237 Options::Integer(size),
238 );
239 self
240 }
241
242 /// Set whether to enable BDP probing.
http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder243 pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
244 self.options.insert(
245 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
246 Options::Integer(enable as i32),
247 );
248 self
249 }
250
251 /// Minimum time between sending successive ping frames without receiving any
252 /// data frame.
http2_min_sent_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder253 pub fn http2_min_sent_ping_interval_without_data(
254 mut self,
255 interval: Duration,
256 ) -> ChannelBuilder {
257 self.options.insert(
258 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
259 Options::Integer(dur_to_ms(interval)),
260 );
261 self
262 }
263
264 /// Minimum allowed time between receiving successive ping frames without
265 /// sending any data frame.
http2_min_recv_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder266 pub fn http2_min_recv_ping_interval_without_data(
267 mut self,
268 interval: Duration,
269 ) -> ChannelBuilder {
270 self.options.insert(
271 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
272 Options::Integer(dur_to_ms(interval)),
273 );
274 self
275 }
276
277 /// How many pings can we send before needing to send a data frame or header
278 /// frame? (0 indicates that an infinite number of pings can be sent without
279 /// sending a data frame or header frame)
http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder280 pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
281 self.options.insert(
282 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
283 Options::Integer(num),
284 );
285 self
286 }
287
288 /// How many misbehaving pings the server can bear before sending goaway and
289 /// closing the transport? (0 indicates that the server can bear an infinite
290 /// number of misbehaving pings)
http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder291 pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
292 self.options.insert(
293 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
294 Options::Integer(num),
295 );
296 self
297 }
298
299 /// Set default compression algorithm for the channel.
default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder300 pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
301 self.options.insert(
302 Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
303 Options::Integer(algo as i32),
304 );
305 self
306 }
307
308 /// Set default compression level for the channel.
default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder309 pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
310 self.options.insert(
311 Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
312 Options::Integer(level as i32),
313 );
314 self
315 }
316
317 /// After a duration of this time the client/server pings its peer to see
318 /// if the transport is still alive.
keepalive_time(mut self, timeout: Duration) -> ChannelBuilder319 pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
320 self.options.insert(
321 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
322 Options::Integer(dur_to_ms(timeout)),
323 );
324 self
325 }
326
327 /// After waiting for a duration of this time, if the keepalive ping sender does
328 /// not receive the ping ack, it will close the transport.
keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder329 pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
330 self.options.insert(
331 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
332 Options::Integer(dur_to_ms(timeout)),
333 );
334 self
335 }
336
337 /// Is it permissible to send keepalive pings without any outstanding streams.
keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder338 pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
339 self.options.insert(
340 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
341 Options::Integer(allow as i32),
342 );
343 self
344 }
345
346 /// Set optimization target for the channel. See [`OptTarget`] for all available
347 /// optimization targets. Defaults to `OptTarget::Blend`.
optimize_for(mut self, target: OptTarget) -> ChannelBuilder348 pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
349 let val = match target {
350 OptTarget::Latency => CString::new("latency"),
351 OptTarget::Blend => CString::new("blend"),
352 OptTarget::Throughput => CString::new("throughput"),
353 };
354 self.options.insert(
355 Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
356 Options::String(val.unwrap()),
357 );
358 self
359 }
360
361 /// Set LbPolicy for channel
362 ///
363 /// This method allows one to set the load-balancing policy for a given channel.
load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder364 pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
365 let val = match lb_policy {
366 LbPolicy::PickFirst => CString::new("pick_first"),
367 LbPolicy::RoundRobin => CString::new("round_robin"),
368 };
369 self.options.insert(
370 Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
371 Options::String(val.unwrap()),
372 );
373 self
374 }
375
376 /// Set a raw integer configuration.
377 ///
378 /// This method is only for bench usage, users should use the encapsulated API instead.
379 #[doc(hidden)]
raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder380 pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
381 self.options
382 .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
383 self
384 }
385
386 /// Set a raw string configuration.
387 ///
388 /// This method is only for bench usage, users should use the encapsulated API instead.
389 #[doc(hidden)]
raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder390 pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
391 self.options
392 .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
393 self
394 }
395
396 /// Build `ChannelArgs` from the current configuration.
397 #[allow(clippy::useless_conversion)]
398 #[allow(clippy::cmp_owned)]
build_args(&self) -> ChannelArgs399 pub fn build_args(&self) -> ChannelArgs {
400 let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
401 for (i, (k, v)) in self.options.iter().enumerate() {
402 let key = k.as_ptr() as *const c_char;
403 match *v {
404 Options::Integer(val) => unsafe {
405 // On most modern compiler and architect, c_int is the same as i32,
406 // panic directly to simplify signature.
407 assert!(
408 val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
409 "{} is out of range for {:?}",
410 val,
411 CStr::from_bytes_with_nul(k).unwrap()
412 );
413 grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
414 },
415 Options::String(ref val) => unsafe {
416 grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
417 },
418 Options::Pointer(ref quota, vtable) => unsafe {
419 grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
420 args,
421 i,
422 key,
423 quota.get_ptr() as _,
424 vtable,
425 )
426 },
427 }
428 }
429 ChannelArgs { args }
430 }
431
prepare_connect_args(&mut self) -> ChannelArgs432 fn prepare_connect_args(&mut self) -> ChannelArgs {
433 if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
434 grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
435 )) {
436 e.insert(Options::String(format_user_agent_string("")));
437 }
438 self.build_args()
439 }
440
441 /// Build an insecure [`Channel`] that connects to a specific address.
connect(mut self, addr: &str) -> Channel442 pub fn connect(mut self, addr: &str) -> Channel {
443 let args = self.prepare_connect_args();
444 let addr = CString::new(addr).unwrap();
445 let addr_ptr = addr.as_ptr();
446 let channel =
447 unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) };
448
449 unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
450 }
451
452 /// Build an insecure [`Channel`] taking over an established connection from
453 /// a file descriptor. The target string given is purely informative to
454 /// describe the endpoint of the connection. Takes ownership of the given
455 /// file descriptor and will close it when the connection is closed.
456 ///
457 /// This function is available on posix systems only.
458 ///
459 /// # Safety
460 ///
461 /// The file descriptor must correspond to a connected stream socket. After
462 /// this call, the socket must not be accessed (read / written / closed)
463 /// by other code.
464 #[cfg(unix)]
connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel465 pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
466 let args = self.prepare_connect_args();
467 let target = CString::new(target).unwrap();
468 let target_ptr = target.as_ptr();
469 let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args);
470
471 Channel::new(self.env.pick_cq(), self.env, channel)
472 }
473 }
474
475 #[cfg(feature = "secure")]
476 mod secure_channel {
477 use std::borrow::Cow;
478 use std::ffi::CString;
479 use std::ptr;
480
481 use crate::grpc_sys;
482
483 use crate::ChannelCredentials;
484
485 use super::{Channel, ChannelBuilder, Options};
486
487 const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";
488
489 impl ChannelBuilder {
490 /// The caller of the secure_channel_create functions may override the target name used
491 /// for SSL host name checking using this channel argument.
492 ///
493 /// This *should* be used for testing only.
494 #[doc(hidden)]
override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder495 pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
496 let target = CString::new(target).unwrap();
497 self.options.insert(
498 Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
499 Options::String(target),
500 );
501 self
502 }
503
504 /// Build a secure [`Channel`] that connects to a specific address.
secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel505 pub fn secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel {
506 let args = self.prepare_connect_args();
507 let addr = CString::new(addr).unwrap();
508 let addr_ptr = addr.as_ptr();
509 let channel = unsafe {
510 grpc_sys::grpc_secure_channel_create(
511 creds.as_mut_ptr(),
512 addr_ptr,
513 args.args,
514 ptr::null_mut(),
515 )
516 };
517
518 unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
519 }
520 }
521 }
522
523 pub struct ChannelArgs {
524 args: *mut grpc_channel_args,
525 }
526
527 impl ChannelArgs {
as_ptr(&self) -> *const grpc_channel_args528 pub fn as_ptr(&self) -> *const grpc_channel_args {
529 self.args
530 }
531 }
532
533 impl Drop for ChannelArgs {
drop(&mut self)534 fn drop(&mut self) {
535 unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
536 }
537 }
538
539 struct ChannelInner {
540 _env: Arc<Environment>,
541 channel: *mut grpc_channel,
542 }
543
544 impl ChannelInner {
545 // If try_to_connect is true, the channel will try to establish a connection, potentially
546 // changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState547 fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
548 let should_try = if try_to_connect { 1 } else { 0 };
549 unsafe { grpc_sys::grpc_channel_check_connectivity_state(self.channel, should_try) }
550 }
551 }
552
553 impl Drop for ChannelInner {
drop(&mut self)554 fn drop(&mut self) {
555 unsafe {
556 grpc_sys::grpc_channel_destroy(self.channel);
557 }
558 }
559 }
560
561 /// A gRPC channel.
562 ///
563 /// Channels are an abstraction of long-lived connections to remote servers. More client objects
564 /// can reuse the same channel.
565 ///
566 /// Use [`ChannelBuilder`] to build a [`Channel`].
567 #[derive(Clone)]
568 pub struct Channel {
569 inner: Arc<ChannelInner>,
570 cq: CompletionQueue,
571 }
572
573 unsafe impl Send for Channel {}
574 unsafe impl Sync for Channel {}
575
576 impl Channel {
577 /// Create a new channel. Avoid using this directly and use
578 /// [`ChannelBuilder`] to build a [`Channel`] instead.
579 ///
580 /// # Safety
581 ///
582 /// The given grpc_channel must correspond to an instantiated grpc core
583 /// channel. Takes exclusive ownership of the channel and will close it after
584 /// use.
new( cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel, ) -> Channel585 pub unsafe fn new(
586 cq: CompletionQueue,
587 env: Arc<Environment>,
588 channel: *mut grpc_channel,
589 ) -> Channel {
590 Channel {
591 inner: Arc::new(ChannelInner { _env: env, channel }),
592 cq,
593 }
594 }
595
596 /// If try_to_connect is true, the channel will try to establish a connection, potentially
597 /// changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState598 pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
599 self.inner.check_connectivity_state(try_to_connect)
600 }
601
602 /// Blocking wait for channel state change or deadline expiration.
603 ///
604 /// `check_connectivity_state` needs to be called to get the current state. Returns false
605 /// means deadline excceeds before observing any state changes.
wait_for_state_change( &self, last_observed: ConnectivityState, deadline: impl Into<Deadline>, ) -> impl Future<Output = bool>606 pub fn wait_for_state_change(
607 &self,
608 last_observed: ConnectivityState,
609 deadline: impl Into<Deadline>,
610 ) -> impl Future<Output = bool> {
611 let (cq_f, prom) = CallTag::action_pair();
612 let prom_box = Box::new(prom);
613 let tag = Box::into_raw(prom_box);
614 let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
615 unsafe {
616 grpcio_sys::grpc_channel_watch_connectivity_state(
617 self.inner.channel,
618 last_observed,
619 deadline.into().spec(),
620 cq_ref.as_ptr(),
621 tag as *mut _,
622 )
623 }
624 true
625 } else {
626 // It's already shutdown.
627 false
628 };
629 async move { should_wait && cq_f.await.unwrap() }
630 }
631
632 /// Wait for this channel to be connected.
633 ///
634 /// Returns false means deadline excceeds before connection is connected.
wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool635 pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
636 // Fast path, it's probably connected.
637 let mut state = self.check_connectivity_state(true);
638 if ConnectivityState::GRPC_CHANNEL_READY == state {
639 return true;
640 }
641 let deadline = deadline.into();
642 loop {
643 if self.wait_for_state_change(state, deadline).await {
644 state = self.check_connectivity_state(true);
645 match state {
646 ConnectivityState::GRPC_CHANNEL_READY => return true,
647 ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
648 _ => (),
649 }
650 continue;
651 }
652 return false;
653 }
654 }
655
656 /// Create a Kicker.
create_kicker(&self) -> Result<Kicker>657 pub(crate) fn create_kicker(&self) -> Result<Kicker> {
658 let cq_ref = self.cq.borrow()?;
659 let raw_call = unsafe {
660 let ch = self.inner.channel;
661 let cq = cq_ref.as_ptr();
662 // Do not timeout.
663 let timeout = gpr_timespec::inf_future();
664 grpc_sys::grpcwrap_channel_create_call(
665 ch,
666 ptr::null_mut(),
667 0,
668 cq,
669 ptr::null(),
670 0,
671 ptr::null(),
672 0,
673 timeout,
674 )
675 };
676 let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
677 Ok(Kicker::from_call(call))
678 }
679
680 /// Create a call using the method and option.
create_call<Req, Resp>( &self, method: &Method<Req, Resp>, opt: &CallOption, ) -> Result<Call>681 pub(crate) fn create_call<Req, Resp>(
682 &self,
683 method: &Method<Req, Resp>,
684 opt: &CallOption,
685 ) -> Result<Call> {
686 let cq_ref = self.cq.borrow()?;
687 let raw_call = unsafe {
688 let ch = self.inner.channel;
689 let cq = cq_ref.as_ptr();
690 let method_ptr = method.name.as_ptr();
691 let method_len = method.name.len();
692 let timeout = opt
693 .get_timeout()
694 .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
695 grpc_sys::grpcwrap_channel_create_call(
696 ch,
697 ptr::null_mut(),
698 0,
699 cq,
700 method_ptr as *const _,
701 method_len,
702 ptr::null(),
703 0,
704 timeout,
705 )
706 };
707
708 unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
709 }
710
cq(&self) -> &CompletionQueue711 pub(crate) fn cq(&self) -> &CompletionQueue {
712 &self.cq
713 }
714 }
715