1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::cell::UnsafeCell;
4 use std::collections::HashMap;
5 use std::fmt::{self, Debug, Formatter};
6 use std::net::{IpAddr, SocketAddr};
7 use std::pin::Pin;
8 use std::ptr;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11
12 use crate::grpc_sys::{self, grpc_call_error, grpc_server};
13 use futures::future::Future;
14 use futures::ready;
15 use futures::task::{Context, Poll};
16
17 use crate::call::server::*;
18 use crate::call::{MessageReader, Method, MethodType};
19 use crate::channel::ChannelArgs;
20 use crate::cq::CompletionQueue;
21 use crate::env::Environment;
22 use crate::error::{Error, Result};
23 use crate::task::{CallTag, CqFuture};
24 use crate::RpcContext;
25 use crate::RpcStatus;
26
27 const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024;
28
29 /// An RPC call holder.
30 #[derive(Clone)]
31 pub struct Handler<F> {
32 method_type: MethodType,
33 cb: F,
34 }
35
36 impl<F> Handler<F> {
new(method_type: MethodType, cb: F) -> Handler<F>37 pub fn new(method_type: MethodType, cb: F) -> Handler<F> {
38 Handler { method_type, cb }
39 }
40 }
41
42 pub trait CloneableHandler: Send {
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)43 fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>);
box_clone(&self) -> Box<dyn CloneableHandler>44 fn box_clone(&self) -> Box<dyn CloneableHandler>;
method_type(&self) -> MethodType45 fn method_type(&self) -> MethodType;
46 }
47
48 impl<F: 'static> CloneableHandler for Handler<F>
49 where
50 F: FnMut(RpcContext<'_>, Option<MessageReader>) + Send + Clone,
51 {
52 #[inline]
handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>)53 fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>) {
54 (self.cb)(ctx, reqs)
55 }
56
57 #[inline]
box_clone(&self) -> Box<dyn CloneableHandler>58 fn box_clone(&self) -> Box<dyn CloneableHandler> {
59 Box::new(self.clone())
60 }
61
62 #[inline]
method_type(&self) -> MethodType63 fn method_type(&self) -> MethodType {
64 self.method_type
65 }
66 }
67
68 /// Given a host and port, creates a string of the form "host:port" or
69 /// "[host]:port", depending on whether the host is an IPv6 literal.
join_host_port(host: &str, port: u16) -> String70 fn join_host_port(host: &str, port: u16) -> String {
71 if host.starts_with("unix:") {
72 format!("{}\0", host)
73 } else if let Ok(ip) = host.parse::<IpAddr>() {
74 format!("{}\0", SocketAddr::new(ip, port))
75 } else {
76 format!("{}:{}\0", host, port)
77 }
78 }
79
80 #[cfg(feature = "secure")]
81 mod imp {
82 use super::join_host_port;
83 use crate::grpc_sys::{self, grpc_server};
84 use crate::security::ServerCredentialsFetcher;
85 use crate::ServerCredentials;
86
87 pub struct Binder {
88 pub host: String,
89 pub port: u16,
90 cred: Option<ServerCredentials>,
91 _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
92 }
93
94 impl Binder {
new(host: String, port: u16) -> Binder95 pub fn new(host: String, port: u16) -> Binder {
96 let cred = None;
97 Binder {
98 host,
99 port,
100 cred,
101 _fetcher: None,
102 }
103 }
104
with_cred( host: String, port: u16, cred: ServerCredentials, _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>, ) -> Binder105 pub fn with_cred(
106 host: String,
107 port: u16,
108 cred: ServerCredentials,
109 _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
110 ) -> Binder {
111 let cred = Some(cred);
112 Binder {
113 host,
114 port,
115 cred,
116 _fetcher,
117 }
118 }
119
bind(&mut self, server: *mut grpc_server) -> u16120 pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
121 let addr = join_host_port(&self.host, self.port);
122 let port = match self.cred.take() {
123 None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _),
124 Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port(
125 server,
126 addr.as_ptr() as _,
127 cert.as_mut_ptr(),
128 ),
129 };
130 port as u16
131 }
132 }
133 }
134
135 #[cfg(not(feature = "secure"))]
136 mod imp {
137 use super::join_host_port;
138 use crate::grpc_sys::{self, grpc_server};
139
140 pub struct Binder {
141 pub host: String,
142 pub port: u16,
143 }
144
145 impl Binder {
new(host: String, port: u16) -> Binder146 pub fn new(host: String, port: u16) -> Binder {
147 Binder { host, port }
148 }
149
bind(&mut self, server: *mut grpc_server) -> u16150 pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
151 let addr = join_host_port(&self.host, self.port);
152 grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16
153 }
154 }
155 }
156
157 use self::imp::Binder;
158
159 impl Debug for Binder {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result160 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161 write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port)
162 }
163 }
164
165 /// [`Service`] factory in order to configure the properties.
166 ///
167 /// Use it to build a service which can be registered to a server.
168 pub struct ServiceBuilder {
169 handlers: HashMap<&'static [u8], BoxHandler>,
170 }
171
172 impl ServiceBuilder {
173 /// Initialize a new [`ServiceBuilder`].
new() -> ServiceBuilder174 pub fn new() -> ServiceBuilder {
175 ServiceBuilder {
176 handlers: HashMap::new(),
177 }
178 }
179
180 /// Add a unary RPC call handler.
add_unary_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,181 pub fn add_unary_handler<Req, Resp, F>(
182 mut self,
183 method: &Method<Req, Resp>,
184 mut handler: F,
185 ) -> ServiceBuilder
186 where
187 Req: 'static,
188 Resp: 'static,
189 F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,
190 {
191 let (ser, de) = (method.resp_ser(), method.req_de());
192 let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
193 execute_unary(ctx, ser, de, payload.unwrap(), &mut handler)
194 };
195 let ch = Box::new(Handler::new(MethodType::Unary, h));
196 self.handlers.insert(method.name.as_bytes(), ch);
197 self
198 }
199
200 /// Add a client streaming RPC call handler.
add_client_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>) + Send + Clone + 'static,201 pub fn add_client_streaming_handler<Req, Resp, F>(
202 mut self,
203 method: &Method<Req, Resp>,
204 mut handler: F,
205 ) -> ServiceBuilder
206 where
207 Req: 'static,
208 Resp: 'static,
209 F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>)
210 + Send
211 + Clone
212 + 'static,
213 {
214 let (ser, de) = (method.resp_ser(), method.req_de());
215 let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
216 execute_client_streaming(ctx, ser, de, &mut handler)
217 };
218 let ch = Box::new(Handler::new(MethodType::ClientStreaming, h));
219 self.handlers.insert(method.name.as_bytes(), ch);
220 self
221 }
222
223 /// Add a server streaming RPC call handler.
add_server_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,224 pub fn add_server_streaming_handler<Req, Resp, F>(
225 mut self,
226 method: &Method<Req, Resp>,
227 mut handler: F,
228 ) -> ServiceBuilder
229 where
230 Req: 'static,
231 Resp: 'static,
232 F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,
233 {
234 let (ser, de) = (method.resp_ser(), method.req_de());
235 let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
236 execute_server_streaming(ctx, ser, de, payload.unwrap(), &mut handler)
237 };
238 let ch = Box::new(Handler::new(MethodType::ServerStreaming, h));
239 self.handlers.insert(method.name.as_bytes(), ch);
240 self
241 }
242
243 /// Add a duplex streaming RPC call handler.
add_duplex_streaming_handler<Req, Resp, F>( mut self, method: &Method<Req, Resp>, mut handler: F, ) -> ServiceBuilder where Req: 'static, Resp: 'static, F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,244 pub fn add_duplex_streaming_handler<Req, Resp, F>(
245 mut self,
246 method: &Method<Req, Resp>,
247 mut handler: F,
248 ) -> ServiceBuilder
249 where
250 Req: 'static,
251 Resp: 'static,
252 F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,
253 {
254 let (ser, de) = (method.resp_ser(), method.req_de());
255 let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
256 execute_duplex_streaming(ctx, ser, de, &mut handler)
257 };
258 let ch = Box::new(Handler::new(MethodType::Duplex, h));
259 self.handlers.insert(method.name.as_bytes(), ch);
260 self
261 }
262
263 /// Finalize the [`ServiceBuilder`] and build the [`Service`].
build(self) -> Service264 pub fn build(self) -> Service {
265 Service {
266 handlers: self.handlers,
267 }
268 }
269 }
270
271 /// Used to indicate the result of the check. If it returns `Abort`,
272 /// skip the subsequent checkers and abort the grpc call.
273 pub enum CheckResult {
274 Continue,
275 Abort(RpcStatus),
276 }
277
278 pub trait ServerChecker: Send {
check(&mut self, ctx: &RpcContext) -> CheckResult279 fn check(&mut self, ctx: &RpcContext) -> CheckResult;
box_clone(&self) -> Box<dyn ServerChecker>280 fn box_clone(&self) -> Box<dyn ServerChecker>;
281 }
282
283 impl Clone for Box<dyn ServerChecker> {
clone(&self) -> Self284 fn clone(&self) -> Self {
285 self.box_clone()
286 }
287 }
288
289 /// A gRPC service.
290 ///
291 /// Use [`ServiceBuilder`] to build a [`Service`].
292 pub struct Service {
293 handlers: HashMap<&'static [u8], BoxHandler>,
294 }
295
296 /// [`Server`] factory in order to configure the properties.
297 pub struct ServerBuilder {
298 env: Arc<Environment>,
299 binders: Vec<Binder>,
300 args: Option<ChannelArgs>,
301 slots_per_cq: usize,
302 handlers: HashMap<&'static [u8], BoxHandler>,
303 checkers: Vec<Box<dyn ServerChecker>>,
304 }
305
306 impl ServerBuilder {
307 /// Initialize a new [`ServerBuilder`].
new(env: Arc<Environment>) -> ServerBuilder308 pub fn new(env: Arc<Environment>) -> ServerBuilder {
309 ServerBuilder {
310 env,
311 binders: Vec::new(),
312 args: None,
313 slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ,
314 handlers: HashMap::new(),
315 checkers: Vec::new(),
316 }
317 }
318
319 /// Bind to an address.
320 ///
321 /// This function can be called multiple times to bind to multiple ports.
bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder322 pub fn bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder {
323 self.binders.push(Binder::new(host.into(), port));
324 self
325 }
326
327 /// Add additional configuration for each incoming channel.
channel_args(mut self, args: ChannelArgs) -> ServerBuilder328 pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder {
329 self.args = Some(args);
330 self
331 }
332
333 /// Set how many requests a completion queue can handle.
requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder334 pub fn requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder {
335 self.slots_per_cq = slots;
336 self
337 }
338
339 /// Register a service.
register_service(mut self, service: Service) -> ServerBuilder340 pub fn register_service(mut self, service: Service) -> ServerBuilder {
341 self.handlers.extend(service.handlers);
342 self
343 }
344
345 /// Add a custom checker to handle some tasks before the grpc call handler starts.
346 /// This allows users to operate grpc call based on the context. Users can add
347 /// multiple checkers and they will be executed in the order added.
348 ///
349 /// TODO: Extend this interface to intercepte each payload like grpc-c++.
add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder350 pub fn add_checker<C: ServerChecker + 'static>(mut self, checker: C) -> ServerBuilder {
351 self.checkers.push(Box::new(checker));
352 self
353 }
354
355 /// Finalize the [`ServerBuilder`] and build the [`Server`].
build(mut self) -> Result<Server>356 pub fn build(mut self) -> Result<Server> {
357 let args = self
358 .args
359 .as_ref()
360 .map_or_else(ptr::null, ChannelArgs::as_ptr);
361 unsafe {
362 let server = grpc_sys::grpc_server_create(args, ptr::null_mut());
363 for binder in self.binders.iter_mut() {
364 let bind_port = binder.bind(server);
365 if bind_port == 0 {
366 grpc_sys::grpc_server_destroy(server);
367 return Err(Error::BindFail(binder.host.clone(), binder.port));
368 }
369 binder.port = bind_port;
370 }
371
372 for cq in self.env.completion_queues() {
373 let cq_ref = cq.borrow()?;
374 grpc_sys::grpc_server_register_completion_queue(
375 server,
376 cq_ref.as_ptr(),
377 ptr::null_mut(),
378 );
379 }
380
381 Ok(Server {
382 env: self.env,
383 core: Arc::new(ServerCore {
384 server,
385 shutdown: AtomicBool::new(false),
386 binders: self.binders,
387 slots_per_cq: self.slots_per_cq,
388 }),
389 handlers: self.handlers,
390 checkers: self.checkers,
391 })
392 }
393 }
394 }
395
396 #[cfg(feature = "secure")]
397 mod secure_server {
398 use super::{Binder, ServerBuilder};
399 use crate::grpc_sys;
400 use crate::security::{
401 server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials,
402 ServerCredentialsFetcher,
403 };
404
405 impl ServerBuilder {
406 /// Bind to an address with credentials for secure connection.
407 ///
408 /// This function can be called multiple times to bind to multiple ports.
bind_with_cred<S: Into<String>>( mut self, host: S, port: u16, c: ServerCredentials, ) -> ServerBuilder409 pub fn bind_with_cred<S: Into<String>>(
410 mut self,
411 host: S,
412 port: u16,
413 c: ServerCredentials,
414 ) -> ServerBuilder {
415 self.binders
416 .push(Binder::with_cred(host.into(), port, c, None));
417 self
418 }
419
420 /// Bind to an address for secure connection.
421 ///
422 /// The required credentials will be fetched using provided `fetcher`. This
423 /// function can be called multiple times to bind to multiple ports.
bind_with_fetcher<S: Into<String>>( mut self, host: S, port: u16, fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>, cer_request_type: CertificateRequestType, ) -> ServerBuilder424 pub fn bind_with_fetcher<S: Into<String>>(
425 mut self,
426 host: S,
427 port: u16,
428 fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>,
429 cer_request_type: CertificateRequestType,
430 ) -> ServerBuilder {
431 let fetcher_wrap = Box::new(fetcher);
432 let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap);
433 let (sc, fb) = unsafe {
434 let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher(
435 cer_request_type.to_native(),
436 Some(server_cert_fetcher_wrapper),
437 fetcher_wrap_ptr as _,
438 );
439 (
440 ServerCredentials::frow_raw(
441 grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt),
442 ),
443 Box::from_raw(fetcher_wrap_ptr),
444 )
445 };
446 self.binders
447 .push(Binder::with_cred(host.into(), port, sc, Some(fb)));
448 self
449 }
450 }
451 }
452
453 struct ServerCore {
454 server: *mut grpc_server,
455 binders: Vec<Binder>,
456 slots_per_cq: usize,
457 shutdown: AtomicBool,
458 }
459
460 impl Drop for ServerCore {
drop(&mut self)461 fn drop(&mut self) {
462 unsafe { grpc_sys::grpc_server_destroy(self.server) }
463 }
464 }
465
466 unsafe impl Send for ServerCore {}
467 unsafe impl Sync for ServerCore {}
468
469 pub type BoxHandler = Box<dyn CloneableHandler>;
470
471 #[derive(Clone)]
472 pub struct RequestCallContext {
473 server: Arc<ServerCore>,
474 registry: Arc<UnsafeCell<HashMap<&'static [u8], BoxHandler>>>,
475 checkers: Vec<Box<dyn ServerChecker>>,
476 }
477
478 impl RequestCallContext {
479 /// Users should guarantee the method is always called from the same thread.
480 /// TODO: Is there a better way?
481 #[inline]
get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler>482 pub unsafe fn get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler> {
483 let registry = &mut *self.registry.get();
484 registry.get_mut(path)
485 }
486
get_checker(&self) -> Vec<Box<dyn ServerChecker>>487 pub(crate) fn get_checker(&self) -> Vec<Box<dyn ServerChecker>> {
488 self.checkers.clone()
489 }
490 }
491
492 // Apparently, its life time is guaranteed by the ref count, hence is safe to be sent
493 // to other thread. However it's not `Sync`, as `BoxHandler` is unnecessarily `Sync`.
494 unsafe impl Send for RequestCallContext {}
495
496 /// Request notification of a new call.
request_call(ctx: RequestCallContext, cq: &CompletionQueue)497 pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
498 if ctx.server.shutdown.load(Ordering::Relaxed) {
499 return;
500 }
501 let cq_ref = match cq.borrow() {
502 // Shutting down, skip.
503 Err(_) => return,
504 Ok(c) => c,
505 };
506 let server_ptr = ctx.server.server;
507 let prom = CallTag::request(ctx);
508 let request_ptr = prom.request_ctx().unwrap().as_ptr();
509 let prom_box = Box::new(prom);
510 let tag = Box::into_raw(prom_box);
511 let code = unsafe {
512 grpc_sys::grpcwrap_server_request_call(
513 server_ptr,
514 cq_ref.as_ptr(),
515 request_ptr,
516 tag as *mut _,
517 )
518 };
519 if code != grpc_call_error::GRPC_CALL_OK {
520 Box::from(tag);
521 panic!("failed to request call: {:?}", code);
522 }
523 }
524
525 /// A `Future` that will resolve when shutdown completes.
526 pub struct ShutdownFuture {
527 /// `true` means the future finishes successfully.
528 cq_f: CqFuture<bool>,
529 }
530
531 impl Future for ShutdownFuture {
532 type Output = Result<()>;
533
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>534 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
535 match ready!(Pin::new(&mut self.cq_f).poll(cx)) {
536 Ok(true) => Poll::Ready(Ok(())),
537 Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)),
538 Err(e) => unreachable!("action future should never resolve to error: {}", e),
539 }
540 }
541 }
542
543 /// A gRPC server.
544 ///
545 /// A single server can serve arbitrary number of services and can listen on more than one port.
546 ///
547 /// Use [`ServerBuilder`] to build a [`Server`].
548 pub struct Server {
549 env: Arc<Environment>,
550 core: Arc<ServerCore>,
551 handlers: HashMap<&'static [u8], BoxHandler>,
552 checkers: Vec<Box<dyn ServerChecker>>,
553 }
554
555 impl Server {
556 /// Shutdown the server asynchronously.
shutdown(&mut self) -> ShutdownFuture557 pub fn shutdown(&mut self) -> ShutdownFuture {
558 let (cq_f, prom) = CallTag::action_pair();
559 let prom_box = Box::new(prom);
560 let tag = Box::into_raw(prom_box);
561 unsafe {
562 // Since env still exists, no way can cq been shutdown.
563 let cq_ref = self.env.completion_queues()[0].borrow().unwrap();
564 grpc_sys::grpc_server_shutdown_and_notify(
565 self.core.server,
566 cq_ref.as_ptr(),
567 tag as *mut _,
568 )
569 }
570 self.core.shutdown.store(true, Ordering::SeqCst);
571 ShutdownFuture { cq_f }
572 }
573
574 /// Cancel all in-progress calls.
575 ///
576 /// Only usable after shutdown.
cancel_all_calls(&mut self)577 pub fn cancel_all_calls(&mut self) {
578 unsafe { grpc_sys::grpc_server_cancel_all_calls(self.core.server) }
579 }
580
581 /// Start the server.
start(&mut self)582 pub fn start(&mut self) {
583 unsafe {
584 grpc_sys::grpc_server_start(self.core.server);
585 for cq in self.env.completion_queues() {
586 // Handlers are Send and Clone, but not Sync. So we need to
587 // provide a replica for each completion queue.
588 let registry = self
589 .handlers
590 .iter()
591 .map(|(k, v)| (k.to_owned(), v.box_clone()))
592 .collect();
593 let rc = RequestCallContext {
594 server: self.core.clone(),
595 registry: Arc::new(UnsafeCell::new(registry)),
596 checkers: self.checkers.clone(),
597 };
598 for _ in 0..self.core.slots_per_cq {
599 request_call(rc.clone(), cq);
600 }
601 }
602 }
603 }
604
605 /// Get binded addresses pairs.
bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)>606 pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
607 self.core.binders.iter().map(|b| (&b.host, b.port))
608 }
609
610 /// Add an rpc channel for an established connection represented as a file
611 /// descriptor. Takes ownership of the file descriptor, closing it when
612 /// channel is closed.
613 ///
614 /// # Safety
615 ///
616 /// The file descriptor must correspond to a connected stream socket. After
617 /// this call, the socket must not be accessed (read / written / closed)
618 /// by other code.
619 #[cfg(unix)]
add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int)620 pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
621 grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
622 }
623 }
624
625 impl Drop for Server {
drop(&mut self)626 fn drop(&mut self) {
627 // if the server is not shutdown completely, destroy a server will core.
628 // TODO: don't wait here
629 let f = if !self.core.shutdown.load(Ordering::SeqCst) {
630 Some(self.shutdown())
631 } else {
632 None
633 };
634 self.cancel_all_calls();
635 let _ = f.map(futures::executor::block_on);
636 }
637 }
638
639 impl Debug for Server {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result640 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
641 write!(f, "Server {:?}", self.core.binders)
642 }
643 }
644
645 #[cfg(test)]
646 mod tests {
647 use super::join_host_port;
648
649 #[test]
test_join_host_port()650 fn test_join_host_port() {
651 let tbl = vec![
652 ("localhost", 0u16, "localhost:0\0"),
653 ("127.0.0.1", 100u16, "127.0.0.1:100\0"),
654 ("::1", 0u16, "[::1]:0\0"),
655 (
656 "fe80::7376:45d5:fb08:61e3",
657 10028u16,
658 "[fe80::7376:45d5:fb08:61e3]:10028\0",
659 ),
660 ];
661
662 for (h, p, e) in &tbl {
663 assert_eq!(join_host_port(h, *p), e.to_owned());
664 }
665 }
666 }
667