1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::cell::UnsafeCell;
4 use std::collections::HashMap;
5 use std::fmt::{self, Debug, Formatter};
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::ptr;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11
12 use crate::grpc_sys::{self, grpc_call_error, grpc_server};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16
17 use crate::call::server::*;
18 use crate::call::{MessageReader, Method, MethodType};
19 use crate::channel::ChannelArgs;
20 use crate::cq::CompletionQueue;
21 use crate::env::Environment;
22 use crate::error::{Error, Result};
23 use crate::task::{CallTag, CqFuture};
24 use crate::RpcContext;
25 use crate::RpcStatus;
26
27 const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024;
28
29 /// An RPC call holder.
30 #[derive(Clone)]
31 pub struct Handler<F> {
32 method_type: MethodType,
33 cb: F,
34 }
35
36 impl<F> Handler<F> {
new(method_type: MethodType, cb: F) -> Handler<F>37 pub fn new(method_type: MethodType, cb: F) -> Handler<F> {
38 Handler { method_type, cb }
39 }
40 }
41
42 pub trait CloneableHandler: Send {
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)43 fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>);
box_clone(&self) -> Box<dyn CloneableHandler>44 fn box_clone(&self) -> Box<dyn CloneableHandler>;
method_type(&self) -> MethodType45 fn method_type(&self) -> MethodType;
46 }
47
48 impl<F: 'static> CloneableHandler for Handler<F>
49 where
50 F: FnMut(RpcContext<'_>, Option<MessageReader>) + Send + Clone,
51 {
52 #[inline]
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)53 fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>) {
54 (self.cb)(ctx, reqs)
55 }
56
57 #[inline]
box_clone(&self) -> Box<dyn CloneableHandler>58 fn box_clone(&self) -> Box<dyn CloneableHandler> {
59 Box::new(self.clone())
60 }
61
62 #[inline]
method_type(&self) -> MethodType63 fn method_type(&self) -> MethodType {
64 self.method_type
65 }
66 }
67
68 /// Given a host and port, creates a string of the form "host:port" or
69 /// "[host]:port", depending on whether the host is an IPv6 literal.
join_host_port(host: &str, port: u16) -> String70 fn join_host_port(host: &str, port: u16) -> String {
71 if host.starts_with("unix:") | host.starts_with("unix-abstract:") {
72 format!("{}\0", host)
73 } else if let Ok(ip) = host.parse::<IpAddr>() {
74 format!("{}\0", SocketAddr::new(ip, port))
75 } else {
76 format!("{}:{}\0", host, port)
77 }
78 }
79
80 #[cfg(feature = "secure")]
81 mod imp {
82 use super::join_host_port;
83 use crate::grpc_sys::{self, grpc_server};
84 use crate::security::ServerCredentialsFetcher;
85 use crate::ServerCredentials;
86
87 pub struct Binder {
88 pub host: String,
89 pub port: u16,
90 cred: Option<ServerCredentials>,
91 // Double allocation to get around C call.
92 #[allow(clippy::redundant_allocation)]
93 _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
94 }
95
96 impl Binder {
new(host: String, port: u16) -> Binder97 pub fn new(host: String, port: u16) -> Binder {
98 let cred = None;
99 Binder {
100 host,
101 port,
102 cred,
103 _fetcher: None,
104 }
105 }
106
107 #[allow(clippy::redundant_allocation)]
with_cred( host: String, port: u16, cred: ServerCredentials, _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>, ) -> Binder108 pub fn with_cred(
109 host: String,
110 port: u16,
111 cred: ServerCredentials,
112 _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
113 ) -> Binder {
114 let cred = Some(cred);
115 Binder {
116 host,
117 port,
118 cred,
119 _fetcher,
120 }
121 }
122
bind(&mut self, server: *mut grpc_server) -> u16123 pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
124 let addr = join_host_port(&self.host, self.port);
125 let port = match self.cred.take() {
126 None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _),
127 Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port(
128 server,
129 addr.as_ptr() as _,
130 cert.as_mut_ptr(),
131 ),
132 };
133 port as u16
134 }
135 }
136 }
137
138 #[cfg(not(feature = "secure"))]
139 mod imp {
140 use super::join_host_port;
141 use crate::grpc_sys::{self, grpc_server};
142
143 pub struct Binder {
144 pub host: String,
145 pub port: u16,
146 }
147
148 impl Binder {
new(host: String, port: u16) -> Binder149 pub fn new(host: String, port: u16) -> Binder {
150 Binder { host, port }
151 }
152
bind(&mut self, server: *mut grpc_server) -> u16153 pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
154 let addr = join_host_port(&self.host, self.port);
155 grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16
156 }
157 }
158 }
159
160 use self::imp::Binder;
161
162 impl Debug for Binder {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result163 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164 write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port)
165 }
166 }
167
168 /// [`Service`] factory in order to configure the properties.
169 ///
170 /// Use it to build a service which can be registered to a server.
171 pub struct ServiceBuilder {
172 handlers: HashMap<&'static [u8], BoxHandler>,
173 }
174
175 impl ServiceBuilder {
176 /// Initialize a new [`ServiceBuilder`].
new() -> ServiceBuilder177 pub fn new() -> ServiceBuilder {
178 ServiceBuilder {
179 handlers: HashMap::new(),
180 }
181 }
182
183 /// Add a unary RPC call handler.
add_unary_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,184 pub fn add_unary_handler<Req, Resp, F>(
185 mut self,
186 method: &Method<Req, Resp>,
187 mut handler: F,
188 ) -> ServiceBuilder
189 where
190 Req: 'static,
191 Resp: 'static,
192 F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,
193 {
194 let (ser, de) = (method.resp_ser(), method.req_de());
195 let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
196 execute_unary(ctx, ser, de, payload.unwrap(), &mut handler)
197 };
198 let ch = Box::new(Handler::new(MethodType::Unary, h));
199 self.handlers.insert(method.name.as_bytes(), ch);
200 self
201 }
202
203 /// Add a client streaming RPC call handler.
add_client_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>) + Send + Clone + 'static,204 pub fn add_client_streaming_handler<Req, Resp, F>(
205 mut self,
206 method: &Method<Req, Resp>,
207 mut handler: F,
208 ) -> ServiceBuilder
209 where
210 Req: 'static,
211 Resp: 'static,
212 F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>)
213 + Send
214 + Clone
215 + 'static,
216 {
217 let (ser, de) = (method.resp_ser(), method.req_de());
218 let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
219 execute_client_streaming(ctx, ser, de, &mut handler)
220 };
221 let ch = Box::new(Handler::new(MethodType::ClientStreaming, h));
222 self.handlers.insert(method.name.as_bytes(), ch);
223 self
224 }
225
226 /// Add a server streaming RPC call handler.
add_server_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,227 pub fn add_server_streaming_handler<Req, Resp, F>(
228 mut self,
229 method: &Method<Req, Resp>,
230 mut handler: F,
231 ) -> ServiceBuilder
232 where
233 Req: 'static,
234 Resp: 'static,
235 F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,
236 {
237 let (ser, de) = (method.resp_ser(), method.req_de());
238 let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
239 execute_server_streaming(ctx, ser, de, payload.unwrap(), &mut handler)
240 };
241 let ch = Box::new(Handler::new(MethodType::ServerStreaming, h));
242 self.handlers.insert(method.name.as_bytes(), ch);
243 self
244 }
245
246 /// Add a duplex streaming RPC call handler.
add_duplex_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,247 pub fn add_duplex_streaming_handler<Req, Resp, F>(
248 mut self,
249 method: &Method<Req, Resp>,
250 mut handler: F,
251 ) -> ServiceBuilder
252 where
253 Req: 'static,
254 Resp: 'static,
255 F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,
256 {
257 let (ser, de) = (method.resp_ser(), method.req_de());
258 let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
259 execute_duplex_streaming(ctx, ser, de, &mut handler)
260 };
261 let ch = Box::new(Handler::new(MethodType::Duplex, h));
262 self.handlers.insert(method.name.as_bytes(), ch);
263 self
264 }
265
266 /// Finalize the [`ServiceBuilder`] and build the [`Service`].
build(self) -> Service267 pub fn build(self) -> Service {
268 Service {
269 handlers: self.handlers,
270 }
271 }
272 }
273
274 /// Used to indicate the result of the check. If it returns `Abort`,
275 /// skip the subsequent checkers and abort the grpc call.
276 pub enum CheckResult {
277 Continue,
278 Abort(RpcStatus),
279 }
280
281 pub trait ServerChecker: Send {
check(&mut self, ctx: &RpcContext) -> CheckResult282 fn check(&mut self, ctx: &RpcContext) -> CheckResult;
box_clone(&self) -> Box<dyn ServerChecker>283 fn box_clone(&self) -> Box<dyn ServerChecker>;
284 }
285
286 impl Clone for Box<dyn ServerChecker> {
clone(&self) -> Self287 fn clone(&self) -> Self {
288 self.box_clone()
289 }
290 }
291
292 /// A gRPC service.
293 ///
294 /// Use [`ServiceBuilder`] to build a [`Service`].
295 pub struct Service {
296 handlers: HashMap<&'static [u8], BoxHandler>,
297 }
298
299 /// [`Server`] factory in order to configure the properties.
300 pub struct ServerBuilder {
301 env: Arc<Environment>,
302 binders: Vec<Binder>,
303 args: Option<ChannelArgs>,
304 slots_per_cq: usize,
305 handlers: HashMap<&'static [u8], BoxHandler>,
306 checkers: Vec<Box<dyn ServerChecker>>,
307 }
308
309 impl ServerBuilder {
310 /// Initialize a new [`ServerBuilder`].
new(env: Arc<Environment>) -> ServerBuilder311 pub fn new(env: Arc<Environment>) -> ServerBuilder {
312 ServerBuilder {
313 env,
314 binders: Vec::new(),
315 args: None,
316 slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ,
317 handlers: HashMap::new(),
318 checkers: Vec::new(),
319 }
320 }
321
322 /// Bind to an address.
323 ///
324 /// This function can be called multiple times to bind to multiple ports.
bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder325 pub fn bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder {
326 self.binders.push(Binder::new(host.into(), port));
327 self
328 }
329
330 /// Add additional configuration for each incoming channel.
channel_args(mut self, args: ChannelArgs) -> ServerBuilder331 pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder {
332 self.args = Some(args);
333 self
334 }
335
336 /// Set how many requests a completion queue can handle.
requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder337 pub fn requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder {
338 self.slots_per_cq = slots;
339 self
340 }
341
342 /// Register a service.
register_service(mut self, service: Service) -> ServerBuilder343 pub fn register_service(mut self, service: Service) -> ServerBuilder {
344 self.handlers.extend(service.handlers);
345 self
346 }
347
348 /// Add a custom checker to handle some tasks before the grpc call handler starts.
349 /// This allows users to operate grpc call based on the context. Users can add
350 /// multiple checkers and they will be executed in the order added.
351 ///
352 /// TODO: Extend this interface to intercepte each payload like grpc-c++.
add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder353 pub fn add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder {
354 self.checkers.push(Box::new(checker));
355 self
356 }
357
358 /// Finalize the [`ServerBuilder`] and build the [`Server`].
build(mut self) -> Result<Server>359 pub fn build(mut self) -> Result<Server> {
360 let args = self
361 .args
362 .as_ref()
363 .map_or_else(ptr::null, ChannelArgs::as_ptr);
364 unsafe {
365 let server = grpc_sys::grpc_server_create(args, ptr::null_mut());
366 for binder in self.binders.iter_mut() {
367 let bind_port = binder.bind(server);
368 if bind_port == 0 {
369 grpc_sys::grpc_server_destroy(server);
370 return Err(Error::BindFail(binder.host.clone(), binder.port));
371 }
372 binder.port = bind_port;
373 }
374
375 for cq in self.env.completion_queues() {
376 let cq_ref = cq.borrow()?;
377 grpc_sys::grpc_server_register_completion_queue(
378 server,
379 cq_ref.as_ptr(),
380 ptr::null_mut(),
381 );
382 }
383
384 Ok(Server {
385 env: self.env,
386 core: Arc::new(ServerCore {
387 server,
388 shutdown: AtomicBool::new(false),
389 binders: self.binders,
390 slots_per_cq: self.slots_per_cq,
391 }),
392 handlers: self.handlers,
393 checkers: self.checkers,
394 })
395 }
396 }
397 }
398
399 #[cfg(feature = "secure")]
400 mod secure_server {
401 use super::{Binder, ServerBuilder};
402 use crate::grpc_sys;
403 use crate::security::{
404 server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials,
405 ServerCredentialsFetcher,
406 };
407
408 impl ServerBuilder {
409 /// Bind to an address with credentials for secure connection.
410 ///
411 /// This function can be called multiple times to bind to multiple ports.
bind_with_cred<S: Into<String>>( mut self, host: S, port: u16, c: ServerCredentials, ) -> ServerBuilder412 pub fn bind_with_cred<S: Into<String>>(
413 mut self,
414 host: S,
415 port: u16,
416 c: ServerCredentials,
417 ) -> ServerBuilder {
418 self.binders
419 .push(Binder::with_cred(host.into(), port, c, None));
420 self
421 }
422
423 /// Bind to an address for secure connection.
424 ///
425 /// The required credentials will be fetched using provided `fetcher`. This
426 /// function can be called multiple times to bind to multiple ports.
bind_with_fetcher<S: Into<String>>( mut self, host: S, port: u16, fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>, cer_request_type: CertificateRequestType, ) -> ServerBuilder427 pub fn bind_with_fetcher<S: Into<String>>(
428 mut self,
429 host: S,
430 port: u16,
431 fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>,
432 cer_request_type: CertificateRequestType,
433 ) -> ServerBuilder {
434 let fetcher_wrap = Box::new(fetcher);
435 let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap);
436 let (sc, fb) = unsafe {
437 let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher(
438 cer_request_type.to_native(),
439 Some(server_cert_fetcher_wrapper),
440 fetcher_wrap_ptr as _,
441 );
442 (
443 ServerCredentials::frow_raw(
444 grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt),
445 ),
446 Box::from_raw(fetcher_wrap_ptr),
447 )
448 };
449 self.binders
450 .push(Binder::with_cred(host.into(), port, sc, Some(fb)));
451 self
452 }
453 }
454 }
455
456 struct ServerCore {
457 server: *mut grpc_server,
458 binders: Vec<Binder>,
459 slots_per_cq: usize,
460 shutdown: AtomicBool,
461 }
462
463 impl Drop for ServerCore {
drop(&mut self)464 fn drop(&mut self) {
465 unsafe { grpc_sys::grpc_server_destroy(self.server) }
466 }
467 }
468
469 unsafe impl Send for ServerCore {}
470 unsafe impl Sync for ServerCore {}
471
472 pub type BoxHandler = Box<dyn CloneableHandler>;
473
474 #[derive(Clone)]
475 pub struct RequestCallContext {
476 server: Arc<ServerCore>,
477 registry: Arc<UnsafeCell<HashMap<&'static [u8], BoxHandler>>>,
478 checkers: Vec<Box<dyn ServerChecker>>,
479 }
480
481 impl RequestCallContext {
482 /// Users should guarantee the method is always called from the same thread.
483 /// TODO: Is there a better way?
484 #[inline]
get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler>485 pub unsafe fn get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler> {
486 let registry = &mut *self.registry.get();
487 registry.get_mut(path)
488 }
489
get_checker(&self) -> Vec<Box<dyn ServerChecker>>490 pub(crate) fn get_checker(&self) -> Vec<Box<dyn ServerChecker>> {
491 self.checkers.clone()
492 }
493 }
494
495 // Apparently, its life time is guaranteed by the ref count, hence is safe to be sent
496 // to other thread. However it's not `Sync`, as `BoxHandler` is unnecessarily `Sync`.
497 unsafe impl Send for RequestCallContext {}
498
499 /// Request notification of a new call.
request_call(ctx: RequestCallContext, cq: &CompletionQueue)500 pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
501 if ctx.server.shutdown.load(Ordering::Relaxed) {
502 return;
503 }
504 let cq_ref = match cq.borrow() {
505 // Shutting down, skip.
506 Err(_) => return,
507 Ok(c) => c,
508 };
509 let server_ptr = ctx.server.server;
510 let prom = CallTag::request(ctx);
511 let request_ptr = prom.request_ctx().unwrap().as_ptr();
512 let prom_box = Box::new(prom);
513 let tag = Box::into_raw(prom_box);
514 let code = unsafe {
515 grpc_sys::grpcwrap_server_request_call(
516 server_ptr,
517 cq_ref.as_ptr(),
518 request_ptr,
519 tag as *mut _,
520 )
521 };
522 if code != grpc_call_error::GRPC_CALL_OK {
523 Box::from(tag);
524 panic!("failed to request call: {:?}", code);
525 }
526 }
527
528 /// A `Future` that will resolve when shutdown completes.
529 pub struct ShutdownFuture {
530 /// `true` means the future finishes successfully.
531 cq_f: CqFuture<bool>,
532 }
533
534 impl Future for ShutdownFuture {
535 type Output = Result<()>;
536
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>537 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
538 match ready!(Pin::new(&mut self.cq_f).poll(cx)) {
539 Ok(true) => Poll::Ready(Ok(())),
540 Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)),
541 Err(e) => unreachable!("action future should never resolve to error: {}", e),
542 }
543 }
544 }
545
546 /// A gRPC server.
547 ///
548 /// A single server can serve arbitrary number of services and can listen on more than one port.
549 ///
550 /// Use [`ServerBuilder`] to build a [`Server`].
551 pub struct Server {
552 env: Arc<Environment>,
553 core: Arc<ServerCore>,
554 handlers: HashMap<&'static [u8], BoxHandler>,
555 checkers: Vec<Box<dyn ServerChecker>>,
556 }
557
558 impl Server {
559 /// Shutdown the server asynchronously.
shutdown(&mut self) -> ShutdownFuture560 pub fn shutdown(&mut self) -> ShutdownFuture {
561 let (cq_f, prom) = CallTag::action_pair();
562 let prom_box = Box::new(prom);
563 let tag = Box::into_raw(prom_box);
564 unsafe {
565 // Since env still exists, no way can cq been shutdown.
566 let cq_ref = self.env.completion_queues()[0].borrow().unwrap();
567 grpc_sys::grpc_server_shutdown_and_notify(
568 self.core.server,
569 cq_ref.as_ptr(),
570 tag as *mut _,
571 )
572 }
573 self.core.shutdown.store(true, Ordering::SeqCst);
574 ShutdownFuture { cq_f }
575 }
576
577 /// Cancel all in-progress calls.
578 ///
579 /// Only usable after shutdown.
cancel_all_calls(&mut self)580 pub fn cancel_all_calls(&mut self) {
581 unsafe { grpc_sys::grpc_server_cancel_all_calls(self.core.server) }
582 }
583
584 /// Start the server.
start(&mut self)585 pub fn start(&mut self) {
586 unsafe {
587 grpc_sys::grpc_server_start(self.core.server);
588 for cq in self.env.completion_queues() {
589 // Handlers are Send and Clone, but not Sync. So we need to
590 // provide a replica for each completion queue.
591 let registry = self
592 .handlers
593 .iter()
594 .map(|(k, v)| (k.to_owned(), v.box_clone()))
595 .collect();
596 let rc = RequestCallContext {
597 server: self.core.clone(),
598 registry: Arc::new(UnsafeCell::new(registry)),
599 checkers: self.checkers.clone(),
600 };
601 for _ in 0..self.core.slots_per_cq {
602 request_call(rc.clone(), cq);
603 }
604 }
605 }
606 }
607
608 /// Get binded addresses pairs.
bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)>609 pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
610 self.core.binders.iter().map(|b| (&b.host, b.port))
611 }
612
613 /// Add an rpc channel for an established connection represented as a file
614 /// descriptor. Takes ownership of the file descriptor, closing it when
615 /// channel is closed.
616 ///
617 /// # Safety
618 ///
619 /// The file descriptor must correspond to a connected stream socket. After
620 /// this call, the socket must not be accessed (read / written / closed)
621 /// by other code.
622 #[cfg(unix)]
add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int)623 pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
624 grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
625 }
626 }
627
628 impl Drop for Server {
drop(&mut self)629 fn drop(&mut self) {
630 // if the server is not shutdown completely, destroy a server will core.
631 // TODO: don't wait here
632 let f = if !self.core.shutdown.load(Ordering::SeqCst) {
633 Some(self.shutdown())
634 } else {
635 None
636 };
637 self.cancel_all_calls();
638 let _ = f.map(futures::executor::block_on);
639 }
640 }
641
642 impl Debug for Server {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result643 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
644 write!(f, "Server {:?}", self.core.binders)
645 }
646 }
647
648 #[cfg(test)]
649 mod tests {
650 use super::join_host_port;
651
652 #[test]
test_join_host_port()653 fn test_join_host_port() {
654 let tbl = vec![
655 ("localhost", 0u16, "localhost:0\0"),
656 ("127.0.0.1", 100u16, "127.0.0.1:100\0"),
657 ("::1", 0u16, "[::1]:0\0"),
658 (
659 "fe80::7376:45d5:fb08:61e3",
660 10028u16,
661 "[fe80::7376:45d5:fb08:61e3]:10028\0",
662 ),
663 ];
664
665 for (h, p, e) in &tbl {
666 assert_eq!(join_host_port(h, *p), e.to_owned());
667 }
668 }
669 }
670