1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "bytes" 23 "errors" 24 "fmt" 25 "io" 26 "math" 27 "net" 28 "net/http" 29 "reflect" 30 "runtime" 31 "strings" 32 "sync" 33 "time" 34 35 "io/ioutil" 36 37 "golang.org/x/net/context" 38 "golang.org/x/net/http2" 39 "golang.org/x/net/trace" 40 41 "google.golang.org/grpc/codes" 42 "google.golang.org/grpc/credentials" 43 "google.golang.org/grpc/encoding" 44 "google.golang.org/grpc/encoding/proto" 45 "google.golang.org/grpc/grpclog" 46 "google.golang.org/grpc/internal" 47 "google.golang.org/grpc/internal/channelz" 48 "google.golang.org/grpc/keepalive" 49 "google.golang.org/grpc/metadata" 50 "google.golang.org/grpc/stats" 51 "google.golang.org/grpc/status" 52 "google.golang.org/grpc/tap" 53 "google.golang.org/grpc/transport" 54) 55 56const ( 57 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 58 defaultServerMaxSendMessageSize = math.MaxInt32 59) 60 61type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 62 63// MethodDesc represents an RPC service's method specification. 64type MethodDesc struct { 65 MethodName string 66 Handler methodHandler 67} 68 69// ServiceDesc represents an RPC service's specification. 70type ServiceDesc struct { 71 ServiceName string 72 // The pointer to the service interface. Used to check whether the user 73 // provided implementation satisfies the interface requirements. 74 HandlerType interface{} 75 Methods []MethodDesc 76 Streams []StreamDesc 77 Metadata interface{} 78} 79 80// service consists of the information of the server serving this service and 81// the methods in this service. 82type service struct { 83 server interface{} // the server for service methods 84 md map[string]*MethodDesc 85 sd map[string]*StreamDesc 86 mdata interface{} 87} 88 89// Server is a gRPC server to serve RPC requests. 90type Server struct { 91 opts options 92 93 mu sync.Mutex // guards following 94 lis map[net.Listener]bool 95 conns map[io.Closer]bool 96 serve bool 97 drain bool 98 cv *sync.Cond // signaled when connections close for GracefulStop 99 m map[string]*service // service name -> service info 100 events trace.EventLog 101 102 quit chan struct{} 103 done chan struct{} 104 quitOnce sync.Once 105 doneOnce sync.Once 106 channelzRemoveOnce sync.Once 107 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop 108 109 channelzID int64 // channelz unique identification number 110 czmu sync.RWMutex 111 callsStarted int64 112 callsFailed int64 113 callsSucceeded int64 114 lastCallStartedTime time.Time 115} 116 117type options struct { 118 creds credentials.TransportCredentials 119 codec baseCodec 120 cp Compressor 121 dc Decompressor 122 unaryInt UnaryServerInterceptor 123 streamInt StreamServerInterceptor 124 inTapHandle tap.ServerInHandle 125 statsHandler stats.Handler 126 maxConcurrentStreams uint32 127 maxReceiveMessageSize int 128 maxSendMessageSize int 129 useHandlerImpl bool // use http.Handler-based server 130 unknownStreamDesc *StreamDesc 131 keepaliveParams keepalive.ServerParameters 132 keepalivePolicy keepalive.EnforcementPolicy 133 initialWindowSize int32 134 initialConnWindowSize int32 135 writeBufferSize int 136 readBufferSize int 137 connectionTimeout time.Duration 138} 139 140var defaultServerOptions = options{ 141 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 142 maxSendMessageSize: defaultServerMaxSendMessageSize, 143 connectionTimeout: 120 * time.Second, 144} 145 146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 147type ServerOption func(*options) 148 149// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched 150// before doing a write on the wire. 151func WriteBufferSize(s int) ServerOption { 152 return func(o *options) { 153 o.writeBufferSize = s 154 } 155} 156 157// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 158// for one read syscall. 159func ReadBufferSize(s int) ServerOption { 160 return func(o *options) { 161 o.readBufferSize = s 162 } 163} 164 165// InitialWindowSize returns a ServerOption that sets window size for stream. 166// The lower bound for window size is 64K and any value smaller than that will be ignored. 167func InitialWindowSize(s int32) ServerOption { 168 return func(o *options) { 169 o.initialWindowSize = s 170 } 171} 172 173// InitialConnWindowSize returns a ServerOption that sets window size for a connection. 174// The lower bound for window size is 64K and any value smaller than that will be ignored. 175func InitialConnWindowSize(s int32) ServerOption { 176 return func(o *options) { 177 o.initialConnWindowSize = s 178 } 179} 180 181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. 182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { 183 return func(o *options) { 184 o.keepaliveParams = kp 185 } 186} 187 188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. 189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { 190 return func(o *options) { 191 o.keepalivePolicy = kep 192 } 193} 194 195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 196// 197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. 198func CustomCodec(codec Codec) ServerOption { 199 return func(o *options) { 200 o.codec = codec 201 } 202} 203 204// RPCCompressor returns a ServerOption that sets a compressor for outbound 205// messages. For backward compatibility, all outbound messages will be sent 206// using this compressor, regardless of incoming message compression. By 207// default, server messages will be sent using the same compressor with which 208// request messages were sent. 209// 210// Deprecated: use encoding.RegisterCompressor instead. 211func RPCCompressor(cp Compressor) ServerOption { 212 return func(o *options) { 213 o.cp = cp 214 } 215} 216 217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound 218// messages. It has higher priority than decompressors registered via 219// encoding.RegisterCompressor. 220// 221// Deprecated: use encoding.RegisterCompressor instead. 222func RPCDecompressor(dc Decompressor) ServerOption { 223 return func(o *options) { 224 o.dc = dc 225 } 226} 227 228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 229// If this is not set, gRPC uses the default limit. 230// 231// Deprecated: use MaxRecvMsgSize instead. 232func MaxMsgSize(m int) ServerOption { 233 return MaxRecvMsgSize(m) 234} 235 236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 237// If this is not set, gRPC uses the default 4MB. 238func MaxRecvMsgSize(m int) ServerOption { 239 return func(o *options) { 240 o.maxReceiveMessageSize = m 241 } 242} 243 244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. 245// If this is not set, gRPC uses the default 4MB. 246func MaxSendMsgSize(m int) ServerOption { 247 return func(o *options) { 248 o.maxSendMessageSize = m 249 } 250} 251 252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number 253// of concurrent streams to each ServerTransport. 254func MaxConcurrentStreams(n uint32) ServerOption { 255 return func(o *options) { 256 o.maxConcurrentStreams = n 257 } 258} 259 260// Creds returns a ServerOption that sets credentials for server connections. 261func Creds(c credentials.TransportCredentials) ServerOption { 262 return func(o *options) { 263 o.creds = c 264 } 265} 266 267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the 268// server. Only one unary interceptor can be installed. The construction of multiple 269// interceptors (e.g., chaining) can be implemented at the caller. 270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { 271 return func(o *options) { 272 if o.unaryInt != nil { 273 panic("The unary server interceptor was already set and may not be reset.") 274 } 275 o.unaryInt = i 276 } 277} 278 279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the 280// server. Only one stream interceptor can be installed. 281func StreamInterceptor(i StreamServerInterceptor) ServerOption { 282 return func(o *options) { 283 if o.streamInt != nil { 284 panic("The stream server interceptor was already set and may not be reset.") 285 } 286 o.streamInt = i 287 } 288} 289 290// InTapHandle returns a ServerOption that sets the tap handle for all the server 291// transport to be created. Only one can be installed. 292func InTapHandle(h tap.ServerInHandle) ServerOption { 293 return func(o *options) { 294 if o.inTapHandle != nil { 295 panic("The tap handle was already set and may not be reset.") 296 } 297 o.inTapHandle = h 298 } 299} 300 301// StatsHandler returns a ServerOption that sets the stats handler for the server. 302func StatsHandler(h stats.Handler) ServerOption { 303 return func(o *options) { 304 o.statsHandler = h 305 } 306} 307 308// UnknownServiceHandler returns a ServerOption that allows for adding a custom 309// unknown service handler. The provided method is a bidi-streaming RPC service 310// handler that will be invoked instead of returning the "unimplemented" gRPC 311// error whenever a request is received for an unregistered service or method. 312// The handling function has full access to the Context of the request and the 313// stream, and the invocation bypasses interceptors. 314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 315 return func(o *options) { 316 o.unknownStreamDesc = &StreamDesc{ 317 StreamName: "unknown_service_handler", 318 Handler: streamHandler, 319 // We need to assume that the users of the streamHandler will want to use both. 320 ClientStreams: true, 321 ServerStreams: true, 322 } 323 } 324} 325 326// ConnectionTimeout returns a ServerOption that sets the timeout for 327// connection establishment (up to and including HTTP/2 handshaking) for all 328// new connections. If this is not set, the default is 120 seconds. A zero or 329// negative value will result in an immediate timeout. 330// 331// This API is EXPERIMENTAL. 332func ConnectionTimeout(d time.Duration) ServerOption { 333 return func(o *options) { 334 o.connectionTimeout = d 335 } 336} 337 338// NewServer creates a gRPC server which has no service registered and has not 339// started to accept requests yet. 340func NewServer(opt ...ServerOption) *Server { 341 opts := defaultServerOptions 342 for _, o := range opt { 343 o(&opts) 344 } 345 s := &Server{ 346 lis: make(map[net.Listener]bool), 347 opts: opts, 348 conns: make(map[io.Closer]bool), 349 m: make(map[string]*service), 350 quit: make(chan struct{}), 351 done: make(chan struct{}), 352 } 353 s.cv = sync.NewCond(&s.mu) 354 if EnableTracing { 355 _, file, line, _ := runtime.Caller(1) 356 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 357 } 358 359 if channelz.IsOn() { 360 s.channelzID = channelz.RegisterServer(s, "") 361 } 362 return s 363} 364 365// printf records an event in s's event log, unless s has been stopped. 366// REQUIRES s.mu is held. 367func (s *Server) printf(format string, a ...interface{}) { 368 if s.events != nil { 369 s.events.Printf(format, a...) 370 } 371} 372 373// errorf records an error in s's event log, unless s has been stopped. 374// REQUIRES s.mu is held. 375func (s *Server) errorf(format string, a ...interface{}) { 376 if s.events != nil { 377 s.events.Errorf(format, a...) 378 } 379} 380 381// RegisterService registers a service and its implementation to the gRPC 382// server. It is called from the IDL generated code. This must be called before 383// invoking Serve. 384func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { 385 ht := reflect.TypeOf(sd.HandlerType).Elem() 386 st := reflect.TypeOf(ss) 387 if !st.Implements(ht) { 388 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) 389 } 390 s.register(sd, ss) 391} 392 393func (s *Server) register(sd *ServiceDesc, ss interface{}) { 394 s.mu.Lock() 395 defer s.mu.Unlock() 396 s.printf("RegisterService(%q)", sd.ServiceName) 397 if s.serve { 398 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) 399 } 400 if _, ok := s.m[sd.ServiceName]; ok { 401 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) 402 } 403 srv := &service{ 404 server: ss, 405 md: make(map[string]*MethodDesc), 406 sd: make(map[string]*StreamDesc), 407 mdata: sd.Metadata, 408 } 409 for i := range sd.Methods { 410 d := &sd.Methods[i] 411 srv.md[d.MethodName] = d 412 } 413 for i := range sd.Streams { 414 d := &sd.Streams[i] 415 srv.sd[d.StreamName] = d 416 } 417 s.m[sd.ServiceName] = srv 418} 419 420// MethodInfo contains the information of an RPC including its method name and type. 421type MethodInfo struct { 422 // Name is the method name only, without the service name or package name. 423 Name string 424 // IsClientStream indicates whether the RPC is a client streaming RPC. 425 IsClientStream bool 426 // IsServerStream indicates whether the RPC is a server streaming RPC. 427 IsServerStream bool 428} 429 430// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. 431type ServiceInfo struct { 432 Methods []MethodInfo 433 // Metadata is the metadata specified in ServiceDesc when registering service. 434 Metadata interface{} 435} 436 437// GetServiceInfo returns a map from service names to ServiceInfo. 438// Service names include the package names, in the form of <package>.<service>. 439func (s *Server) GetServiceInfo() map[string]ServiceInfo { 440 ret := make(map[string]ServiceInfo) 441 for n, srv := range s.m { 442 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) 443 for m := range srv.md { 444 methods = append(methods, MethodInfo{ 445 Name: m, 446 IsClientStream: false, 447 IsServerStream: false, 448 }) 449 } 450 for m, d := range srv.sd { 451 methods = append(methods, MethodInfo{ 452 Name: m, 453 IsClientStream: d.ClientStreams, 454 IsServerStream: d.ServerStreams, 455 }) 456 } 457 458 ret[n] = ServiceInfo{ 459 Methods: methods, 460 Metadata: srv.mdata, 461 } 462 } 463 return ret 464} 465 466// ErrServerStopped indicates that the operation is now illegal because of 467// the server being stopped. 468var ErrServerStopped = errors.New("grpc: the server has been stopped") 469 470func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 471 if s.opts.creds == nil { 472 return rawConn, nil, nil 473 } 474 return s.opts.creds.ServerHandshake(rawConn) 475} 476 477type listenSocket struct { 478 net.Listener 479 channelzID int64 480} 481 482func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { 483 return &channelz.SocketInternalMetric{ 484 SocketOptions: channelz.GetSocketOption(l.Listener), 485 LocalAddr: l.Listener.Addr(), 486 } 487} 488 489func (l *listenSocket) Close() error { 490 err := l.Listener.Close() 491 if channelz.IsOn() { 492 channelz.RemoveEntry(l.channelzID) 493 } 494 return err 495} 496 497// Serve accepts incoming connections on the listener lis, creating a new 498// ServerTransport and service goroutine for each. The service goroutines 499// read gRPC requests and then call the registered handlers to reply to them. 500// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 501// this method returns. 502// Serve will return a non-nil error unless Stop or GracefulStop is called. 503func (s *Server) Serve(lis net.Listener) error { 504 s.mu.Lock() 505 s.printf("serving") 506 s.serve = true 507 if s.lis == nil { 508 // Serve called after Stop or GracefulStop. 509 s.mu.Unlock() 510 lis.Close() 511 return ErrServerStopped 512 } 513 514 s.serveWG.Add(1) 515 defer func() { 516 s.serveWG.Done() 517 select { 518 // Stop or GracefulStop called; block until done and return nil. 519 case <-s.quit: 520 <-s.done 521 default: 522 } 523 }() 524 525 ls := &listenSocket{Listener: lis} 526 s.lis[ls] = true 527 528 if channelz.IsOn() { 529 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "") 530 } 531 s.mu.Unlock() 532 533 defer func() { 534 s.mu.Lock() 535 if s.lis != nil && s.lis[ls] { 536 ls.Close() 537 delete(s.lis, ls) 538 } 539 s.mu.Unlock() 540 }() 541 542 var tempDelay time.Duration // how long to sleep on accept failure 543 544 for { 545 rawConn, err := lis.Accept() 546 if err != nil { 547 if ne, ok := err.(interface { 548 Temporary() bool 549 }); ok && ne.Temporary() { 550 if tempDelay == 0 { 551 tempDelay = 5 * time.Millisecond 552 } else { 553 tempDelay *= 2 554 } 555 if max := 1 * time.Second; tempDelay > max { 556 tempDelay = max 557 } 558 s.mu.Lock() 559 s.printf("Accept error: %v; retrying in %v", err, tempDelay) 560 s.mu.Unlock() 561 timer := time.NewTimer(tempDelay) 562 select { 563 case <-timer.C: 564 case <-s.quit: 565 timer.Stop() 566 return nil 567 } 568 continue 569 } 570 s.mu.Lock() 571 s.printf("done serving; Accept = %v", err) 572 s.mu.Unlock() 573 574 select { 575 case <-s.quit: 576 return nil 577 default: 578 } 579 return err 580 } 581 tempDelay = 0 582 // Start a new goroutine to deal with rawConn so we don't stall this Accept 583 // loop goroutine. 584 // 585 // Make sure we account for the goroutine so GracefulStop doesn't nil out 586 // s.conns before this conn can be added. 587 s.serveWG.Add(1) 588 go func() { 589 s.handleRawConn(rawConn) 590 s.serveWG.Done() 591 }() 592 } 593} 594 595// handleRawConn forks a goroutine to handle a just-accepted connection that 596// has not had any I/O performed on it yet. 597func (s *Server) handleRawConn(rawConn net.Conn) { 598 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) 599 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 600 if err != nil { 601 s.mu.Lock() 602 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 603 s.mu.Unlock() 604 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 605 // If serverHandshake returns ErrConnDispatched, keep rawConn open. 606 if err != credentials.ErrConnDispatched { 607 rawConn.Close() 608 } 609 rawConn.SetDeadline(time.Time{}) 610 return 611 } 612 613 s.mu.Lock() 614 if s.conns == nil { 615 s.mu.Unlock() 616 conn.Close() 617 return 618 } 619 s.mu.Unlock() 620 621 var serve func() 622 c := conn.(io.Closer) 623 if s.opts.useHandlerImpl { 624 serve = func() { s.serveUsingHandler(conn) } 625 } else { 626 // Finish handshaking (HTTP2) 627 st := s.newHTTP2Transport(conn, authInfo) 628 if st == nil { 629 return 630 } 631 c = st 632 serve = func() { s.serveStreams(st) } 633 } 634 635 rawConn.SetDeadline(time.Time{}) 636 if !s.addConn(c) { 637 return 638 } 639 go func() { 640 serve() 641 s.removeConn(c) 642 }() 643} 644 645// newHTTP2Transport sets up a http/2 transport (using the 646// gRPC http2 server transport in transport/http2_server.go). 647func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { 648 config := &transport.ServerConfig{ 649 MaxStreams: s.opts.maxConcurrentStreams, 650 AuthInfo: authInfo, 651 InTapHandle: s.opts.inTapHandle, 652 StatsHandler: s.opts.statsHandler, 653 KeepaliveParams: s.opts.keepaliveParams, 654 KeepalivePolicy: s.opts.keepalivePolicy, 655 InitialWindowSize: s.opts.initialWindowSize, 656 InitialConnWindowSize: s.opts.initialConnWindowSize, 657 WriteBufferSize: s.opts.writeBufferSize, 658 ReadBufferSize: s.opts.readBufferSize, 659 ChannelzParentID: s.channelzID, 660 } 661 st, err := transport.NewServerTransport("http2", c, config) 662 if err != nil { 663 s.mu.Lock() 664 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) 665 s.mu.Unlock() 666 c.Close() 667 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) 668 return nil 669 } 670 671 return st 672} 673 674func (s *Server) serveStreams(st transport.ServerTransport) { 675 defer st.Close() 676 var wg sync.WaitGroup 677 st.HandleStreams(func(stream *transport.Stream) { 678 wg.Add(1) 679 go func() { 680 defer wg.Done() 681 s.handleStream(st, stream, s.traceInfo(st, stream)) 682 }() 683 }, func(ctx context.Context, method string) context.Context { 684 if !EnableTracing { 685 return ctx 686 } 687 tr := trace.New("grpc.Recv."+methodFamily(method), method) 688 return trace.NewContext(ctx, tr) 689 }) 690 wg.Wait() 691} 692 693var _ http.Handler = (*Server)(nil) 694 695// serveUsingHandler is called from handleRawConn when s is configured 696// to handle requests via the http.Handler interface. It sets up a 697// net/http.Server to handle the just-accepted conn. The http.Server 698// is configured to route all incoming requests (all HTTP/2 streams) 699// to ServeHTTP, which creates a new ServerTransport for each stream. 700// serveUsingHandler blocks until conn closes. 701// 702// This codepath is only used when Server.TestingUseHandlerImpl has 703// been configured. This lets the end2end tests exercise the ServeHTTP 704// method as one of the environment types. 705// 706// conn is the *tls.Conn that's already been authenticated. 707func (s *Server) serveUsingHandler(conn net.Conn) { 708 h2s := &http2.Server{ 709 MaxConcurrentStreams: s.opts.maxConcurrentStreams, 710 } 711 h2s.ServeConn(conn, &http2.ServeConnOpts{ 712 Handler: s, 713 }) 714} 715 716// ServeHTTP implements the Go standard library's http.Handler 717// interface by responding to the gRPC request r, by looking up 718// the requested gRPC method in the gRPC server s. 719// 720// The provided HTTP request must have arrived on an HTTP/2 721// connection. When using the Go standard library's server, 722// practically this means that the Request must also have arrived 723// over TLS. 724// 725// To share one port (such as 443 for https) between gRPC and an 726// existing http.Handler, use a root http.Handler such as: 727// 728// if r.ProtoMajor == 2 && strings.HasPrefix( 729// r.Header.Get("Content-Type"), "application/grpc") { 730// grpcServer.ServeHTTP(w, r) 731// } else { 732// yourMux.ServeHTTP(w, r) 733// } 734// 735// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally 736// separate from grpc-go's HTTP/2 server. Performance and features may vary 737// between the two paths. ServeHTTP does not support some gRPC features 738// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 739// and subject to change. 740func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 741 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) 742 if err != nil { 743 http.Error(w, err.Error(), http.StatusInternalServerError) 744 return 745 } 746 if !s.addConn(st) { 747 return 748 } 749 defer s.removeConn(st) 750 s.serveStreams(st) 751} 752 753// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. 754// If tracing is not enabled, it returns nil. 755func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { 756 tr, ok := trace.FromContext(stream.Context()) 757 if !ok { 758 return nil 759 } 760 761 trInfo = &traceInfo{ 762 tr: tr, 763 } 764 trInfo.firstLine.client = false 765 trInfo.firstLine.remoteAddr = st.RemoteAddr() 766 767 if dl, ok := stream.Context().Deadline(); ok { 768 trInfo.firstLine.deadline = dl.Sub(time.Now()) 769 } 770 return trInfo 771} 772 773func (s *Server) addConn(c io.Closer) bool { 774 s.mu.Lock() 775 defer s.mu.Unlock() 776 if s.conns == nil { 777 c.Close() 778 return false 779 } 780 if s.drain { 781 // Transport added after we drained our existing conns: drain it 782 // immediately. 783 c.(transport.ServerTransport).Drain() 784 } 785 s.conns[c] = true 786 return true 787} 788 789func (s *Server) removeConn(c io.Closer) { 790 s.mu.Lock() 791 defer s.mu.Unlock() 792 if s.conns != nil { 793 delete(s.conns, c) 794 s.cv.Broadcast() 795 } 796} 797 798// ChannelzMetric returns ServerInternalMetric of current server. 799// This is an EXPERIMENTAL API. 800func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric { 801 s.czmu.RLock() 802 defer s.czmu.RUnlock() 803 return &channelz.ServerInternalMetric{ 804 CallsStarted: s.callsStarted, 805 CallsSucceeded: s.callsSucceeded, 806 CallsFailed: s.callsFailed, 807 LastCallStartedTimestamp: s.lastCallStartedTime, 808 } 809} 810 811func (s *Server) incrCallsStarted() { 812 s.czmu.Lock() 813 s.callsStarted++ 814 s.lastCallStartedTime = time.Now() 815 s.czmu.Unlock() 816} 817 818func (s *Server) incrCallsSucceeded() { 819 s.czmu.Lock() 820 s.callsSucceeded++ 821 s.czmu.Unlock() 822} 823 824func (s *Server) incrCallsFailed() { 825 s.czmu.Lock() 826 s.callsFailed++ 827 s.czmu.Unlock() 828} 829 830func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { 831 data, err := encode(s.getCodec(stream.ContentSubtype()), msg) 832 if err != nil { 833 grpclog.Errorln("grpc: server failed to encode response: ", err) 834 return err 835 } 836 compData, err := compress(data, cp, comp) 837 if err != nil { 838 grpclog.Errorln("grpc: server failed to compress response: ", err) 839 return err 840 } 841 hdr, payload := msgHeader(data, compData) 842 // TODO(dfawley): should we be checking len(data) instead? 843 if len(payload) > s.opts.maxSendMessageSize { 844 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) 845 } 846 err = t.Write(stream, hdr, payload, opts) 847 if err == nil && s.opts.statsHandler != nil { 848 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) 849 } 850 return err 851} 852 853func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 854 if channelz.IsOn() { 855 s.incrCallsStarted() 856 defer func() { 857 if err != nil && err != io.EOF { 858 s.incrCallsFailed() 859 } else { 860 s.incrCallsSucceeded() 861 } 862 }() 863 } 864 sh := s.opts.statsHandler 865 if sh != nil { 866 beginTime := time.Now() 867 begin := &stats.Begin{ 868 BeginTime: beginTime, 869 } 870 sh.HandleRPC(stream.Context(), begin) 871 defer func() { 872 end := &stats.End{ 873 BeginTime: beginTime, 874 EndTime: time.Now(), 875 } 876 if err != nil && err != io.EOF { 877 end.Error = toRPCErr(err) 878 } 879 sh.HandleRPC(stream.Context(), end) 880 }() 881 } 882 if trInfo != nil { 883 defer trInfo.tr.Finish() 884 trInfo.firstLine.client = false 885 trInfo.tr.LazyLog(&trInfo.firstLine, false) 886 defer func() { 887 if err != nil && err != io.EOF { 888 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 889 trInfo.tr.SetError() 890 } 891 }() 892 } 893 894 // comp and cp are used for compression. decomp and dc are used for 895 // decompression. If comp and decomp are both set, they are the same; 896 // however they are kept separate to ensure that at most one of the 897 // compressor/decompressor variable pairs are set for use later. 898 var comp, decomp encoding.Compressor 899 var cp Compressor 900 var dc Decompressor 901 902 // If dc is set and matches the stream's compression, use it. Otherwise, try 903 // to find a matching registered compressor for decomp. 904 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 905 dc = s.opts.dc 906 } else if rc != "" && rc != encoding.Identity { 907 decomp = encoding.GetCompressor(rc) 908 if decomp == nil { 909 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 910 t.WriteStatus(stream, st) 911 return st.Err() 912 } 913 } 914 915 // If cp is set, use it. Otherwise, attempt to compress the response using 916 // the incoming message compression method. 917 // 918 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 919 if s.opts.cp != nil { 920 cp = s.opts.cp 921 stream.SetSendCompress(cp.Type()) 922 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 923 // Legacy compressor not specified; attempt to respond with same encoding. 924 comp = encoding.GetCompressor(rc) 925 if comp != nil { 926 stream.SetSendCompress(rc) 927 } 928 } 929 930 p := &parser{r: stream} 931 pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) 932 if err == io.EOF { 933 // The entire stream is done (for unary RPC only). 934 return err 935 } 936 if err == io.ErrUnexpectedEOF { 937 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 938 } 939 if err != nil { 940 if st, ok := status.FromError(err); ok { 941 if e := t.WriteStatus(stream, st); e != nil { 942 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 943 } 944 } else { 945 switch st := err.(type) { 946 case transport.ConnectionError: 947 // Nothing to do here. 948 case transport.StreamError: 949 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { 950 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 951 } 952 default: 953 panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) 954 } 955 } 956 return err 957 } 958 if channelz.IsOn() { 959 t.IncrMsgRecv() 960 } 961 if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil { 962 if e := t.WriteStatus(stream, st); e != nil { 963 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 964 } 965 return st.Err() 966 } 967 var inPayload *stats.InPayload 968 if sh != nil { 969 inPayload = &stats.InPayload{ 970 RecvTime: time.Now(), 971 } 972 } 973 df := func(v interface{}) error { 974 if inPayload != nil { 975 inPayload.WireLength = len(req) 976 } 977 if pf == compressionMade { 978 var err error 979 if dc != nil { 980 req, err = dc.Do(bytes.NewReader(req)) 981 if err != nil { 982 return status.Errorf(codes.Internal, err.Error()) 983 } 984 } else { 985 tmp, _ := decomp.Decompress(bytes.NewReader(req)) 986 req, err = ioutil.ReadAll(tmp) 987 if err != nil { 988 return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) 989 } 990 } 991 } 992 if len(req) > s.opts.maxReceiveMessageSize { 993 // TODO: Revisit the error code. Currently keep it consistent with 994 // java implementation. 995 return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize) 996 } 997 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil { 998 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 999 } 1000 if inPayload != nil { 1001 inPayload.Payload = v 1002 inPayload.Data = req 1003 inPayload.Length = len(req) 1004 sh.HandleRPC(stream.Context(), inPayload) 1005 } 1006 if trInfo != nil { 1007 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 1008 } 1009 return nil 1010 } 1011 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1012 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 1013 if appErr != nil { 1014 appStatus, ok := status.FromError(appErr) 1015 if !ok { 1016 // Convert appErr if it is not a grpc status error. 1017 appErr = status.Error(codes.Unknown, appErr.Error()) 1018 appStatus, _ = status.FromError(appErr) 1019 } 1020 if trInfo != nil { 1021 trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1022 trInfo.tr.SetError() 1023 } 1024 if e := t.WriteStatus(stream, appStatus); e != nil { 1025 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1026 } 1027 return appErr 1028 } 1029 if trInfo != nil { 1030 trInfo.tr.LazyLog(stringer("OK"), false) 1031 } 1032 opts := &transport.Options{ 1033 Last: true, 1034 Delay: false, 1035 } 1036 1037 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { 1038 if err == io.EOF { 1039 // The entire stream is done (for unary RPC only). 1040 return err 1041 } 1042 if s, ok := status.FromError(err); ok { 1043 if e := t.WriteStatus(stream, s); e != nil { 1044 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 1045 } 1046 } else { 1047 switch st := err.(type) { 1048 case transport.ConnectionError: 1049 // Nothing to do here. 1050 case transport.StreamError: 1051 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { 1052 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 1053 } 1054 default: 1055 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1056 } 1057 } 1058 return err 1059 } 1060 if channelz.IsOn() { 1061 t.IncrMsgSent() 1062 } 1063 if trInfo != nil { 1064 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1065 } 1066 // TODO: Should we be logging if writing status failed here, like above? 1067 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1068 // error or allow the stats handler to see it? 1069 return t.WriteStatus(stream, status.New(codes.OK, "")) 1070} 1071 1072func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1073 if channelz.IsOn() { 1074 s.incrCallsStarted() 1075 defer func() { 1076 if err != nil && err != io.EOF { 1077 s.incrCallsFailed() 1078 } else { 1079 s.incrCallsSucceeded() 1080 } 1081 }() 1082 } 1083 sh := s.opts.statsHandler 1084 if sh != nil { 1085 beginTime := time.Now() 1086 begin := &stats.Begin{ 1087 BeginTime: beginTime, 1088 } 1089 sh.HandleRPC(stream.Context(), begin) 1090 defer func() { 1091 end := &stats.End{ 1092 BeginTime: beginTime, 1093 EndTime: time.Now(), 1094 } 1095 if err != nil && err != io.EOF { 1096 end.Error = toRPCErr(err) 1097 } 1098 sh.HandleRPC(stream.Context(), end) 1099 }() 1100 } 1101 ctx := NewContextWithServerTransportStream(stream.Context(), stream) 1102 ss := &serverStream{ 1103 ctx: ctx, 1104 t: t, 1105 s: stream, 1106 p: &parser{r: stream}, 1107 codec: s.getCodec(stream.ContentSubtype()), 1108 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1109 maxSendMessageSize: s.opts.maxSendMessageSize, 1110 trInfo: trInfo, 1111 statsHandler: sh, 1112 } 1113 1114 // If dc is set and matches the stream's compression, use it. Otherwise, try 1115 // to find a matching registered compressor for decomp. 1116 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { 1117 ss.dc = s.opts.dc 1118 } else if rc != "" && rc != encoding.Identity { 1119 ss.decomp = encoding.GetCompressor(rc) 1120 if ss.decomp == nil { 1121 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) 1122 t.WriteStatus(ss.s, st) 1123 return st.Err() 1124 } 1125 } 1126 1127 // If cp is set, use it. Otherwise, attempt to compress the response using 1128 // the incoming message compression method. 1129 // 1130 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 1131 if s.opts.cp != nil { 1132 ss.cp = s.opts.cp 1133 stream.SetSendCompress(s.opts.cp.Type()) 1134 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { 1135 // Legacy compressor not specified; attempt to respond with same encoding. 1136 ss.comp = encoding.GetCompressor(rc) 1137 if ss.comp != nil { 1138 stream.SetSendCompress(rc) 1139 } 1140 } 1141 1142 if trInfo != nil { 1143 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1144 defer func() { 1145 ss.mu.Lock() 1146 if err != nil && err != io.EOF { 1147 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1148 ss.trInfo.tr.SetError() 1149 } 1150 ss.trInfo.tr.Finish() 1151 ss.trInfo.tr = nil 1152 ss.mu.Unlock() 1153 }() 1154 } 1155 var appErr error 1156 var server interface{} 1157 if srv != nil { 1158 server = srv.server 1159 } 1160 if s.opts.streamInt == nil { 1161 appErr = sd.Handler(server, ss) 1162 } else { 1163 info := &StreamServerInfo{ 1164 FullMethod: stream.Method(), 1165 IsClientStream: sd.ClientStreams, 1166 IsServerStream: sd.ServerStreams, 1167 } 1168 appErr = s.opts.streamInt(server, ss, info, sd.Handler) 1169 } 1170 if appErr != nil { 1171 appStatus, ok := status.FromError(appErr) 1172 if !ok { 1173 switch err := appErr.(type) { 1174 case transport.StreamError: 1175 appStatus = status.New(err.Code, err.Desc) 1176 default: 1177 appStatus = status.New(codes.Unknown, appErr.Error()) 1178 } 1179 appErr = appStatus.Err() 1180 } 1181 if trInfo != nil { 1182 ss.mu.Lock() 1183 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) 1184 ss.trInfo.tr.SetError() 1185 ss.mu.Unlock() 1186 } 1187 t.WriteStatus(ss.s, appStatus) 1188 // TODO: Should we log an error from WriteStatus here and below? 1189 return appErr 1190 } 1191 if trInfo != nil { 1192 ss.mu.Lock() 1193 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1194 ss.mu.Unlock() 1195 } 1196 return t.WriteStatus(ss.s, status.New(codes.OK, "")) 1197} 1198 1199func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1200 sm := stream.Method() 1201 if sm != "" && sm[0] == '/' { 1202 sm = sm[1:] 1203 } 1204 pos := strings.LastIndex(sm, "/") 1205 if pos == -1 { 1206 if trInfo != nil { 1207 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) 1208 trInfo.tr.SetError() 1209 } 1210 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) 1211 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { 1212 if trInfo != nil { 1213 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1214 trInfo.tr.SetError() 1215 } 1216 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1217 } 1218 if trInfo != nil { 1219 trInfo.tr.Finish() 1220 } 1221 return 1222 } 1223 service := sm[:pos] 1224 method := sm[pos+1:] 1225 srv, ok := s.m[service] 1226 if !ok { 1227 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1228 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1229 return 1230 } 1231 if trInfo != nil { 1232 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) 1233 trInfo.tr.SetError() 1234 } 1235 errDesc := fmt.Sprintf("unknown service %v", service) 1236 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1237 if trInfo != nil { 1238 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1239 trInfo.tr.SetError() 1240 } 1241 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1242 } 1243 if trInfo != nil { 1244 trInfo.tr.Finish() 1245 } 1246 return 1247 } 1248 // Unary RPC or Streaming RPC? 1249 if md, ok := srv.md[method]; ok { 1250 s.processUnaryRPC(t, stream, srv, md, trInfo) 1251 return 1252 } 1253 if sd, ok := srv.sd[method]; ok { 1254 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1255 return 1256 } 1257 if trInfo != nil { 1258 trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) 1259 trInfo.tr.SetError() 1260 } 1261 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1262 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1263 return 1264 } 1265 errDesc := fmt.Sprintf("unknown method %v", method) 1266 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1267 if trInfo != nil { 1268 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1269 trInfo.tr.SetError() 1270 } 1271 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) 1272 } 1273 if trInfo != nil { 1274 trInfo.tr.Finish() 1275 } 1276} 1277 1278// The key to save ServerTransportStream in the context. 1279type streamKey struct{} 1280 1281// NewContextWithServerTransportStream creates a new context from ctx and 1282// attaches stream to it. 1283// 1284// This API is EXPERIMENTAL. 1285func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { 1286 return context.WithValue(ctx, streamKey{}, stream) 1287} 1288 1289// ServerTransportStream is a minimal interface that a transport stream must 1290// implement. This can be used to mock an actual transport stream for tests of 1291// handler code that use, for example, grpc.SetHeader (which requires some 1292// stream to be in context). 1293// 1294// See also NewContextWithServerTransportStream. 1295// 1296// This API is EXPERIMENTAL. 1297type ServerTransportStream interface { 1298 Method() string 1299 SetHeader(md metadata.MD) error 1300 SendHeader(md metadata.MD) error 1301 SetTrailer(md metadata.MD) error 1302} 1303 1304// ServerTransportStreamFromContext returns the ServerTransportStream saved in 1305// ctx. Returns nil if the given context has no stream associated with it 1306// (which implies it is not an RPC invocation context). 1307// 1308// This API is EXPERIMENTAL. 1309func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { 1310 s, _ := ctx.Value(streamKey{}).(ServerTransportStream) 1311 return s 1312} 1313 1314// Stop stops the gRPC server. It immediately closes all open 1315// connections and listeners. 1316// It cancels all active RPCs on the server side and the corresponding 1317// pending RPCs on the client side will get notified by connection 1318// errors. 1319func (s *Server) Stop() { 1320 s.quitOnce.Do(func() { 1321 close(s.quit) 1322 }) 1323 1324 defer func() { 1325 s.serveWG.Wait() 1326 s.doneOnce.Do(func() { 1327 close(s.done) 1328 }) 1329 }() 1330 1331 s.channelzRemoveOnce.Do(func() { 1332 if channelz.IsOn() { 1333 channelz.RemoveEntry(s.channelzID) 1334 } 1335 }) 1336 1337 s.mu.Lock() 1338 listeners := s.lis 1339 s.lis = nil 1340 st := s.conns 1341 s.conns = nil 1342 // interrupt GracefulStop if Stop and GracefulStop are called concurrently. 1343 s.cv.Broadcast() 1344 s.mu.Unlock() 1345 1346 for lis := range listeners { 1347 lis.Close() 1348 } 1349 for c := range st { 1350 c.Close() 1351 } 1352 1353 s.mu.Lock() 1354 if s.events != nil { 1355 s.events.Finish() 1356 s.events = nil 1357 } 1358 s.mu.Unlock() 1359} 1360 1361// GracefulStop stops the gRPC server gracefully. It stops the server from 1362// accepting new connections and RPCs and blocks until all the pending RPCs are 1363// finished. 1364func (s *Server) GracefulStop() { 1365 s.quitOnce.Do(func() { 1366 close(s.quit) 1367 }) 1368 1369 defer func() { 1370 s.doneOnce.Do(func() { 1371 close(s.done) 1372 }) 1373 }() 1374 1375 s.channelzRemoveOnce.Do(func() { 1376 if channelz.IsOn() { 1377 channelz.RemoveEntry(s.channelzID) 1378 } 1379 }) 1380 s.mu.Lock() 1381 if s.conns == nil { 1382 s.mu.Unlock() 1383 return 1384 } 1385 1386 for lis := range s.lis { 1387 lis.Close() 1388 } 1389 s.lis = nil 1390 if !s.drain { 1391 for c := range s.conns { 1392 c.(transport.ServerTransport).Drain() 1393 } 1394 s.drain = true 1395 } 1396 1397 // Wait for serving threads to be ready to exit. Only then can we be sure no 1398 // new conns will be created. 1399 s.mu.Unlock() 1400 s.serveWG.Wait() 1401 s.mu.Lock() 1402 1403 for len(s.conns) != 0 { 1404 s.cv.Wait() 1405 } 1406 s.conns = nil 1407 if s.events != nil { 1408 s.events.Finish() 1409 s.events = nil 1410 } 1411 s.mu.Unlock() 1412} 1413 1414func init() { 1415 internal.TestingUseHandlerImpl = func(arg interface{}) { 1416 arg.(*Server).opts.useHandlerImpl = true 1417 } 1418} 1419 1420// contentSubtype must be lowercase 1421// cannot return nil 1422func (s *Server) getCodec(contentSubtype string) baseCodec { 1423 if s.opts.codec != nil { 1424 return s.opts.codec 1425 } 1426 if contentSubtype == "" { 1427 return encoding.GetCodec(proto.Name) 1428 } 1429 codec := encoding.GetCodec(contentSubtype) 1430 if codec == nil { 1431 return encoding.GetCodec(proto.Name) 1432 } 1433 return codec 1434} 1435 1436// SetHeader sets the header metadata. 1437// When called multiple times, all the provided metadata will be merged. 1438// All the metadata will be sent out when one of the following happens: 1439// - grpc.SendHeader() is called; 1440// - The first response is sent out; 1441// - An RPC status is sent out (error or success). 1442func SetHeader(ctx context.Context, md metadata.MD) error { 1443 if md.Len() == 0 { 1444 return nil 1445 } 1446 stream := ServerTransportStreamFromContext(ctx) 1447 if stream == nil { 1448 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1449 } 1450 return stream.SetHeader(md) 1451} 1452 1453// SendHeader sends header metadata. It may be called at most once. 1454// The provided md and headers set by SetHeader() will be sent. 1455func SendHeader(ctx context.Context, md metadata.MD) error { 1456 stream := ServerTransportStreamFromContext(ctx) 1457 if stream == nil { 1458 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1459 } 1460 if err := stream.SendHeader(md); err != nil { 1461 return toRPCErr(err) 1462 } 1463 return nil 1464} 1465 1466// SetTrailer sets the trailer metadata that will be sent when an RPC returns. 1467// When called more than once, all the provided metadata will be merged. 1468func SetTrailer(ctx context.Context, md metadata.MD) error { 1469 if md.Len() == 0 { 1470 return nil 1471 } 1472 stream := ServerTransportStreamFromContext(ctx) 1473 if stream == nil { 1474 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1475 } 1476 return stream.SetTrailer(md) 1477} 1478 1479// Method returns the method string for the server context. The returned 1480// string is in the format of "/service/method". 1481func Method(ctx context.Context) (string, bool) { 1482 s := ServerTransportStreamFromContext(ctx) 1483 if s == nil { 1484 return "", false 1485 } 1486 return s.Method(), true 1487} 1488