1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::cell::UnsafeCell;
4 use std::collections::HashMap;
5 use std::fmt::{self, Debug, Formatter};
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::ptr;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11 
12 use crate::grpc_sys::{self, grpc_call_error, grpc_server};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16 
17 use crate::call::server::*;
18 use crate::call::{MessageReader, Method, MethodType};
19 use crate::channel::ChannelArgs;
20 use crate::cq::CompletionQueue;
21 use crate::env::Environment;
22 use crate::error::{Error, Result};
23 use crate::task::{CallTag, CqFuture};
24 use crate::RpcContext;
25 use crate::RpcStatus;
26 
27 const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024;
28 
29 /// An RPC call holder.
30 #[derive(Clone)]
31 pub struct Handler<F> {
32     method_type: MethodType,
33     cb: F,
34 }
35 
36 impl<F> Handler<F> {
new(method_type: MethodType, cb: F) -> Handler<F>37     pub fn new(method_type: MethodType, cb: F) -> Handler<F> {
38         Handler { method_type, cb }
39     }
40 }
41 
42 pub trait CloneableHandler: Send {
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)43     fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>);
box_clone(&self) -> Box<dyn CloneableHandler>44     fn box_clone(&self) -> Box<dyn CloneableHandler>;
method_type(&self) -> MethodType45     fn method_type(&self) -> MethodType;
46 }
47 
48 impl<F: 'static> CloneableHandler for Handler<F>
49 where
50     F: FnMut(RpcContext<'_>, Option<MessageReader>) + Send + Clone,
51 {
52     #[inline]
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)53     fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>) {
54         (self.cb)(ctx, reqs)
55     }
56 
57     #[inline]
box_clone(&self) -> Box<dyn CloneableHandler>58     fn box_clone(&self) -> Box<dyn CloneableHandler> {
59         Box::new(self.clone())
60     }
61 
62     #[inline]
method_type(&self) -> MethodType63     fn method_type(&self) -> MethodType {
64         self.method_type
65     }
66 }
67 
68 /// Given a host and port, creates a string of the form "host:port" or
69 /// "[host]:port", depending on whether the host is an IPv6 literal.
join_host_port(host: &str, port: u16) -> String70 fn join_host_port(host: &str, port: u16) -> String {
71     if host.starts_with("unix:") | host.starts_with("unix-abstract:") {
72         format!("{}\0", host)
73     } else if let Ok(ip) = host.parse::<IpAddr>() {
74         format!("{}\0", SocketAddr::new(ip, port))
75     } else {
76         format!("{}:{}\0", host, port)
77     }
78 }
79 
80 #[cfg(feature = "secure")]
81 mod imp {
82     use super::join_host_port;
83     use crate::grpc_sys::{self, grpc_server};
84     use crate::security::ServerCredentialsFetcher;
85     use crate::ServerCredentials;
86 
87     pub struct Binder {
88         pub host: String,
89         pub port: u16,
90         cred: Option<ServerCredentials>,
91         // Double allocation to get around C call.
92         #[allow(clippy::redundant_allocation)]
93         _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
94     }
95 
96     impl Binder {
new(host: String, port: u16) -> Binder97         pub fn new(host: String, port: u16) -> Binder {
98             let cred = None;
99             Binder {
100                 host,
101                 port,
102                 cred,
103                 _fetcher: None,
104             }
105         }
106 
107         #[allow(clippy::redundant_allocation)]
with_cred( host: String, port: u16, cred: ServerCredentials, _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>, ) -> Binder108         pub fn with_cred(
109             host: String,
110             port: u16,
111             cred: ServerCredentials,
112             _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
113         ) -> Binder {
114             let cred = Some(cred);
115             Binder {
116                 host,
117                 port,
118                 cred,
119                 _fetcher,
120             }
121         }
122 
bind(&mut self, server: *mut grpc_server) -> u16123         pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
124             let addr = join_host_port(&self.host, self.port);
125             let port = match self.cred.take() {
126                 None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _),
127                 Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port(
128                     server,
129                     addr.as_ptr() as _,
130                     cert.as_mut_ptr(),
131                 ),
132             };
133             port as u16
134         }
135     }
136 }
137 
138 #[cfg(not(feature = "secure"))]
139 mod imp {
140     use super::join_host_port;
141     use crate::grpc_sys::{self, grpc_server};
142 
143     pub struct Binder {
144         pub host: String,
145         pub port: u16,
146     }
147 
148     impl Binder {
new(host: String, port: u16) -> Binder149         pub fn new(host: String, port: u16) -> Binder {
150             Binder { host, port }
151         }
152 
bind(&mut self, server: *mut grpc_server) -> u16153         pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
154             let addr = join_host_port(&self.host, self.port);
155             grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16
156         }
157     }
158 }
159 
160 use self::imp::Binder;
161 
162 impl Debug for Binder {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result163     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164         write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port)
165     }
166 }
167 
168 /// [`Service`] factory in order to configure the properties.
169 ///
170 /// Use it to build a service which can be registered to a server.
171 pub struct ServiceBuilder {
172     handlers: HashMap<&'static [u8], BoxHandler>,
173 }
174 
175 impl ServiceBuilder {
176     /// Initialize a new [`ServiceBuilder`].
new() -> ServiceBuilder177     pub fn new() -> ServiceBuilder {
178         ServiceBuilder {
179             handlers: HashMap::new(),
180         }
181     }
182 
183     /// Add a unary RPC call handler.
add_unary_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,184     pub fn add_unary_handler<Req, Resp, F>(
185         mut self,
186         method: &Method<Req, Resp>,
187         mut handler: F,
188     ) -> ServiceBuilder
189     where
190         Req: 'static,
191         Resp: 'static,
192         F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,
193     {
194         let (ser, de) = (method.resp_ser(), method.req_de());
195         let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
196             execute_unary(ctx, ser, de, payload.unwrap(), &mut handler)
197         };
198         let ch = Box::new(Handler::new(MethodType::Unary, h));
199         self.handlers.insert(method.name.as_bytes(), ch);
200         self
201     }
202 
203     /// Add a client streaming RPC call handler.
add_client_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>) + Send + Clone + 'static,204     pub fn add_client_streaming_handler<Req, Resp, F>(
205         mut self,
206         method: &Method<Req, Resp>,
207         mut handler: F,
208     ) -> ServiceBuilder
209     where
210         Req: 'static,
211         Resp: 'static,
212         F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>)
213             + Send
214             + Clone
215             + 'static,
216     {
217         let (ser, de) = (method.resp_ser(), method.req_de());
218         let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
219             execute_client_streaming(ctx, ser, de, &mut handler)
220         };
221         let ch = Box::new(Handler::new(MethodType::ClientStreaming, h));
222         self.handlers.insert(method.name.as_bytes(), ch);
223         self
224     }
225 
226     /// Add a server streaming RPC call handler.
add_server_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,227     pub fn add_server_streaming_handler<Req, Resp, F>(
228         mut self,
229         method: &Method<Req, Resp>,
230         mut handler: F,
231     ) -> ServiceBuilder
232     where
233         Req: 'static,
234         Resp: 'static,
235         F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,
236     {
237         let (ser, de) = (method.resp_ser(), method.req_de());
238         let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
239             execute_server_streaming(ctx, ser, de, payload.unwrap(), &mut handler)
240         };
241         let ch = Box::new(Handler::new(MethodType::ServerStreaming, h));
242         self.handlers.insert(method.name.as_bytes(), ch);
243         self
244     }
245 
246     /// Add a duplex streaming RPC call handler.
add_duplex_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,247     pub fn add_duplex_streaming_handler<Req, Resp, F>(
248         mut self,
249         method: &Method<Req, Resp>,
250         mut handler: F,
251     ) -> ServiceBuilder
252     where
253         Req: 'static,
254         Resp: 'static,
255         F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,
256     {
257         let (ser, de) = (method.resp_ser(), method.req_de());
258         let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
259             execute_duplex_streaming(ctx, ser, de, &mut handler)
260         };
261         let ch = Box::new(Handler::new(MethodType::Duplex, h));
262         self.handlers.insert(method.name.as_bytes(), ch);
263         self
264     }
265 
266     /// Finalize the [`ServiceBuilder`] and build the [`Service`].
build(self) -> Service267     pub fn build(self) -> Service {
268         Service {
269             handlers: self.handlers,
270         }
271     }
272 }
273 
274 /// Used to indicate the result of the check. If it returns `Abort`,
275 /// skip the subsequent checkers and abort the grpc call.
276 pub enum CheckResult {
277     Continue,
278     Abort(RpcStatus),
279 }
280 
281 pub trait ServerChecker: Send {
check(&mut self, ctx: &RpcContext) -> CheckResult282     fn check(&mut self, ctx: &RpcContext) -> CheckResult;
box_clone(&self) -> Box<dyn ServerChecker>283     fn box_clone(&self) -> Box<dyn ServerChecker>;
284 }
285 
286 impl Clone for Box<dyn ServerChecker> {
clone(&self) -> Self287     fn clone(&self) -> Self {
288         self.box_clone()
289     }
290 }
291 
292 /// A gRPC service.
293 ///
294 /// Use [`ServiceBuilder`] to build a [`Service`].
295 pub struct Service {
296     handlers: HashMap<&'static [u8], BoxHandler>,
297 }
298 
299 /// [`Server`] factory in order to configure the properties.
300 pub struct ServerBuilder {
301     env: Arc<Environment>,
302     binders: Vec<Binder>,
303     args: Option<ChannelArgs>,
304     slots_per_cq: usize,
305     handlers: HashMap<&'static [u8], BoxHandler>,
306     checkers: Vec<Box<dyn ServerChecker>>,
307 }
308 
309 impl ServerBuilder {
310     /// Initialize a new [`ServerBuilder`].
new(env: Arc<Environment>) -> ServerBuilder311     pub fn new(env: Arc<Environment>) -> ServerBuilder {
312         ServerBuilder {
313             env,
314             binders: Vec::new(),
315             args: None,
316             slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ,
317             handlers: HashMap::new(),
318             checkers: Vec::new(),
319         }
320     }
321 
322     /// Bind to an address.
323     ///
324     /// This function can be called multiple times to bind to multiple ports.
bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder325     pub fn bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder {
326         self.binders.push(Binder::new(host.into(), port));
327         self
328     }
329 
330     /// Add additional configuration for each incoming channel.
channel_args(mut self, args: ChannelArgs) -> ServerBuilder331     pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder {
332         self.args = Some(args);
333         self
334     }
335 
336     /// Set how many requests a completion queue can handle.
requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder337     pub fn requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder {
338         self.slots_per_cq = slots;
339         self
340     }
341 
342     /// Register a service.
register_service(mut self, service: Service) -> ServerBuilder343     pub fn register_service(mut self, service: Service) -> ServerBuilder {
344         self.handlers.extend(service.handlers);
345         self
346     }
347 
348     /// Add a custom checker to handle some tasks before the grpc call handler starts.
349     /// This allows users to operate grpc call based on the context. Users can add
350     /// multiple checkers and they will be executed in the order added.
351     ///
352     /// TODO: Extend this interface to intercepte each payload like grpc-c++.
add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder353     pub fn add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder {
354         self.checkers.push(Box::new(checker));
355         self
356     }
357 
358     /// Finalize the [`ServerBuilder`] and build the [`Server`].
build(mut self) -> Result<Server>359     pub fn build(mut self) -> Result<Server> {
360         let args = self
361             .args
362             .as_ref()
363             .map_or_else(ptr::null, ChannelArgs::as_ptr);
364         unsafe {
365             let server = grpc_sys::grpc_server_create(args, ptr::null_mut());
366             for binder in self.binders.iter_mut() {
367                 let bind_port = binder.bind(server);
368                 if bind_port == 0 {
369                     grpc_sys::grpc_server_destroy(server);
370                     return Err(Error::BindFail(binder.host.clone(), binder.port));
371                 }
372                 binder.port = bind_port;
373             }
374 
375             for cq in self.env.completion_queues() {
376                 let cq_ref = cq.borrow()?;
377                 grpc_sys::grpc_server_register_completion_queue(
378                     server,
379                     cq_ref.as_ptr(),
380                     ptr::null_mut(),
381                 );
382             }
383 
384             Ok(Server {
385                 env: self.env,
386                 core: Arc::new(ServerCore {
387                     server,
388                     shutdown: AtomicBool::new(false),
389                     binders: self.binders,
390                     slots_per_cq: self.slots_per_cq,
391                 }),
392                 handlers: self.handlers,
393                 checkers: self.checkers,
394             })
395         }
396     }
397 }
398 
399 #[cfg(feature = "secure")]
400 mod secure_server {
401     use super::{Binder, ServerBuilder};
402     use crate::grpc_sys;
403     use crate::security::{
404         server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials,
405         ServerCredentialsFetcher,
406     };
407 
408     impl ServerBuilder {
409         /// Bind to an address with credentials for secure connection.
410         ///
411         /// This function can be called multiple times to bind to multiple ports.
bind_with_cred<S: Into<String>>( mut self, host: S, port: u16, c: ServerCredentials, ) -> ServerBuilder412         pub fn bind_with_cred<S: Into<String>>(
413             mut self,
414             host: S,
415             port: u16,
416             c: ServerCredentials,
417         ) -> ServerBuilder {
418             self.binders
419                 .push(Binder::with_cred(host.into(), port, c, None));
420             self
421         }
422 
423         /// Bind to an address for secure connection.
424         ///
425         /// The required credentials will be fetched using provided `fetcher`. This
426         /// function can be called multiple times to bind to multiple ports.
bind_with_fetcher<S: Into<String>>( mut self, host: S, port: u16, fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>, cer_request_type: CertificateRequestType, ) -> ServerBuilder427         pub fn bind_with_fetcher<S: Into<String>>(
428             mut self,
429             host: S,
430             port: u16,
431             fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>,
432             cer_request_type: CertificateRequestType,
433         ) -> ServerBuilder {
434             let fetcher_wrap = Box::new(fetcher);
435             let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap);
436             let (sc, fb) = unsafe {
437                 let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher(
438                     cer_request_type.to_native(),
439                     Some(server_cert_fetcher_wrapper),
440                     fetcher_wrap_ptr as _,
441                 );
442                 (
443                     ServerCredentials::frow_raw(
444                         grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt),
445                     ),
446                     Box::from_raw(fetcher_wrap_ptr),
447                 )
448             };
449             self.binders
450                 .push(Binder::with_cred(host.into(), port, sc, Some(fb)));
451             self
452         }
453     }
454 }
455 
456 struct ServerCore {
457     server: *mut grpc_server,
458     binders: Vec<Binder>,
459     slots_per_cq: usize,
460     shutdown: AtomicBool,
461 }
462 
463 impl Drop for ServerCore {
drop(&mut self)464     fn drop(&mut self) {
465         unsafe { grpc_sys::grpc_server_destroy(self.server) }
466     }
467 }
468 
469 unsafe impl Send for ServerCore {}
470 unsafe impl Sync for ServerCore {}
471 
472 pub type BoxHandler = Box<dyn CloneableHandler>;
473 
474 #[derive(Clone)]
475 pub struct RequestCallContext {
476     server: Arc<ServerCore>,
477     registry: Arc<UnsafeCell<HashMap<&'static [u8], BoxHandler>>>,
478     checkers: Vec<Box<dyn ServerChecker>>,
479 }
480 
481 impl RequestCallContext {
482     /// Users should guarantee the method is always called from the same thread.
483     /// TODO: Is there a better way?
484     #[inline]
get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler>485     pub unsafe fn get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler> {
486         let registry = &mut *self.registry.get();
487         registry.get_mut(path)
488     }
489 
get_checker(&self) -> Vec<Box<dyn ServerChecker>>490     pub(crate) fn get_checker(&self) -> Vec<Box<dyn ServerChecker>> {
491         self.checkers.clone()
492     }
493 }
494 
495 // Apparently, its life time is guaranteed by the ref count, hence is safe to be sent
496 // to other thread. However it's not `Sync`, as `BoxHandler` is unnecessarily `Sync`.
497 unsafe impl Send for RequestCallContext {}
498 
499 /// Request notification of a new call.
request_call(ctx: RequestCallContext, cq: &CompletionQueue)500 pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
501     if ctx.server.shutdown.load(Ordering::Relaxed) {
502         return;
503     }
504     let cq_ref = match cq.borrow() {
505         // Shutting down, skip.
506         Err(_) => return,
507         Ok(c) => c,
508     };
509     let server_ptr = ctx.server.server;
510     let prom = CallTag::request(ctx);
511     let request_ptr = prom.request_ctx().unwrap().as_ptr();
512     let prom_box = Box::new(prom);
513     let tag = Box::into_raw(prom_box);
514     let code = unsafe {
515         grpc_sys::grpcwrap_server_request_call(
516             server_ptr,
517             cq_ref.as_ptr(),
518             request_ptr,
519             tag as *mut _,
520         )
521     };
522     if code != grpc_call_error::GRPC_CALL_OK {
523         Box::from(tag);
524         panic!("failed to request call: {:?}", code);
525     }
526 }
527 
528 /// A `Future` that will resolve when shutdown completes.
529 pub struct ShutdownFuture {
530     /// `true` means the future finishes successfully.
531     cq_f: CqFuture<bool>,
532 }
533 
534 impl Future for ShutdownFuture {
535     type Output = Result<()>;
536 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>537     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
538         match ready!(Pin::new(&mut self.cq_f).poll(cx)) {
539             Ok(true) => Poll::Ready(Ok(())),
540             Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)),
541             Err(e) => unreachable!("action future should never resolve to error: {}", e),
542         }
543     }
544 }
545 
546 /// A gRPC server.
547 ///
548 /// A single server can serve arbitrary number of services and can listen on more than one port.
549 ///
550 /// Use [`ServerBuilder`] to build a [`Server`].
551 pub struct Server {
552     env: Arc<Environment>,
553     core: Arc<ServerCore>,
554     handlers: HashMap<&'static [u8], BoxHandler>,
555     checkers: Vec<Box<dyn ServerChecker>>,
556 }
557 
558 impl Server {
559     /// Shutdown the server asynchronously.
shutdown(&mut self) -> ShutdownFuture560     pub fn shutdown(&mut self) -> ShutdownFuture {
561         let (cq_f, prom) = CallTag::action_pair();
562         let prom_box = Box::new(prom);
563         let tag = Box::into_raw(prom_box);
564         unsafe {
565             // Since env still exists, no way can cq been shutdown.
566             let cq_ref = self.env.completion_queues()[0].borrow().unwrap();
567             grpc_sys::grpc_server_shutdown_and_notify(
568                 self.core.server,
569                 cq_ref.as_ptr(),
570                 tag as *mut _,
571             )
572         }
573         self.core.shutdown.store(true, Ordering::SeqCst);
574         ShutdownFuture { cq_f }
575     }
576 
577     /// Cancel all in-progress calls.
578     ///
579     /// Only usable after shutdown.
cancel_all_calls(&mut self)580     pub fn cancel_all_calls(&mut self) {
581         unsafe { grpc_sys::grpc_server_cancel_all_calls(self.core.server) }
582     }
583 
584     /// Start the server.
start(&mut self)585     pub fn start(&mut self) {
586         unsafe {
587             grpc_sys::grpc_server_start(self.core.server);
588             for cq in self.env.completion_queues() {
589                 // Handlers are Send and Clone, but not Sync. So we need to
590                 // provide a replica for each completion queue.
591                 let registry = self
592                     .handlers
593                     .iter()
594                     .map(|(k, v)| (k.to_owned(), v.box_clone()))
595                     .collect();
596                 let rc = RequestCallContext {
597                     server: self.core.clone(),
598                     registry: Arc::new(UnsafeCell::new(registry)),
599                     checkers: self.checkers.clone(),
600                 };
601                 for _ in 0..self.core.slots_per_cq {
602                     request_call(rc.clone(), cq);
603                 }
604             }
605         }
606     }
607 
608     /// Get binded addresses pairs.
bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)>609     pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
610         self.core.binders.iter().map(|b| (&b.host, b.port))
611     }
612 
613     /// Add an rpc channel for an established connection represented as a file
614     /// descriptor. Takes ownership of the file descriptor, closing it when
615     /// channel is closed.
616     ///
617     /// # Safety
618     ///
619     /// The file descriptor must correspond to a connected stream socket. After
620     /// this call, the socket must not be accessed (read / written / closed)
621     /// by other code.
622     #[cfg(unix)]
add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int)623     pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
624         grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
625     }
626 }
627 
628 impl Drop for Server {
drop(&mut self)629     fn drop(&mut self) {
630         // if the server is not shutdown completely, destroy a server will core.
631         // TODO: don't wait here
632         let f = if !self.core.shutdown.load(Ordering::SeqCst) {
633             Some(self.shutdown())
634         } else {
635             None
636         };
637         self.cancel_all_calls();
638         let _ = f.map(futures::executor::block_on);
639     }
640 }
641 
642 impl Debug for Server {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result643     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
644         write!(f, "Server {:?}", self.core.binders)
645     }
646 }
647 
648 #[cfg(test)]
649 mod tests {
650     use super::join_host_port;
651 
652     #[test]
test_join_host_port()653     fn test_join_host_port() {
654         let tbl = vec![
655             ("localhost", 0u16, "localhost:0\0"),
656             ("127.0.0.1", 100u16, "127.0.0.1:100\0"),
657             ("::1", 0u16, "[::1]:0\0"),
658             (
659                 "fe80::7376:45d5:fb08:61e3",
660                 10028u16,
661                 "[fe80::7376:45d5:fb08:61e3]:10028\0",
662             ),
663         ];
664 
665         for (h, p, e) in &tbl {
666             assert_eq!(join_host_port(h, *p), e.to_owned());
667         }
668     }
669 }
670