• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"bytes"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"net"
28	"net/http"
29	"reflect"
30	"runtime"
31	"strings"
32	"sync"
33	"time"
34
35	"io/ioutil"
36
37	"golang.org/x/net/context"
38	"golang.org/x/net/http2"
39	"golang.org/x/net/trace"
40
41	"google.golang.org/grpc/codes"
42	"google.golang.org/grpc/credentials"
43	"google.golang.org/grpc/encoding"
44	"google.golang.org/grpc/encoding/proto"
45	"google.golang.org/grpc/grpclog"
46	"google.golang.org/grpc/internal"
47	"google.golang.org/grpc/internal/channelz"
48	"google.golang.org/grpc/keepalive"
49	"google.golang.org/grpc/metadata"
50	"google.golang.org/grpc/stats"
51	"google.golang.org/grpc/status"
52	"google.golang.org/grpc/tap"
53	"google.golang.org/grpc/transport"
54)
55
56const (
57	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
58	defaultServerMaxSendMessageSize    = math.MaxInt32
59)
60
61type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
62
63// MethodDesc represents an RPC service's method specification.
64type MethodDesc struct {
65	MethodName string
66	Handler    methodHandler
67}
68
69// ServiceDesc represents an RPC service's specification.
70type ServiceDesc struct {
71	ServiceName string
72	// The pointer to the service interface. Used to check whether the user
73	// provided implementation satisfies the interface requirements.
74	HandlerType interface{}
75	Methods     []MethodDesc
76	Streams     []StreamDesc
77	Metadata    interface{}
78}
79
80// service consists of the information of the server serving this service and
81// the methods in this service.
82type service struct {
83	server interface{} // the server for service methods
84	md     map[string]*MethodDesc
85	sd     map[string]*StreamDesc
86	mdata  interface{}
87}
88
89// Server is a gRPC server to serve RPC requests.
90type Server struct {
91	opts options
92
93	mu     sync.Mutex // guards following
94	lis    map[net.Listener]bool
95	conns  map[io.Closer]bool
96	serve  bool
97	drain  bool
98	cv     *sync.Cond          // signaled when connections close for GracefulStop
99	m      map[string]*service // service name -> service info
100	events trace.EventLog
101
102	quit               chan struct{}
103	done               chan struct{}
104	quitOnce           sync.Once
105	doneOnce           sync.Once
106	channelzRemoveOnce sync.Once
107	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
108
109	channelzID          int64 // channelz unique identification number
110	czmu                sync.RWMutex
111	callsStarted        int64
112	callsFailed         int64
113	callsSucceeded      int64
114	lastCallStartedTime time.Time
115}
116
117type options struct {
118	creds                 credentials.TransportCredentials
119	codec                 baseCodec
120	cp                    Compressor
121	dc                    Decompressor
122	unaryInt              UnaryServerInterceptor
123	streamInt             StreamServerInterceptor
124	inTapHandle           tap.ServerInHandle
125	statsHandler          stats.Handler
126	maxConcurrentStreams  uint32
127	maxReceiveMessageSize int
128	maxSendMessageSize    int
129	useHandlerImpl        bool // use http.Handler-based server
130	unknownStreamDesc     *StreamDesc
131	keepaliveParams       keepalive.ServerParameters
132	keepalivePolicy       keepalive.EnforcementPolicy
133	initialWindowSize     int32
134	initialConnWindowSize int32
135	writeBufferSize       int
136	readBufferSize        int
137	connectionTimeout     time.Duration
138}
139
140var defaultServerOptions = options{
141	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
142	maxSendMessageSize:    defaultServerMaxSendMessageSize,
143	connectionTimeout:     120 * time.Second,
144}
145
146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
147type ServerOption func(*options)
148
149// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
150// before doing a write on the wire.
151func WriteBufferSize(s int) ServerOption {
152	return func(o *options) {
153		o.writeBufferSize = s
154	}
155}
156
157// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
158// for one read syscall.
159func ReadBufferSize(s int) ServerOption {
160	return func(o *options) {
161		o.readBufferSize = s
162	}
163}
164
165// InitialWindowSize returns a ServerOption that sets window size for stream.
166// The lower bound for window size is 64K and any value smaller than that will be ignored.
167func InitialWindowSize(s int32) ServerOption {
168	return func(o *options) {
169		o.initialWindowSize = s
170	}
171}
172
173// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
174// The lower bound for window size is 64K and any value smaller than that will be ignored.
175func InitialConnWindowSize(s int32) ServerOption {
176	return func(o *options) {
177		o.initialConnWindowSize = s
178	}
179}
180
181// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
182func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
183	return func(o *options) {
184		o.keepaliveParams = kp
185	}
186}
187
188// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
189func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
190	return func(o *options) {
191		o.keepalivePolicy = kep
192	}
193}
194
195// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
196//
197// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
198func CustomCodec(codec Codec) ServerOption {
199	return func(o *options) {
200		o.codec = codec
201	}
202}
203
204// RPCCompressor returns a ServerOption that sets a compressor for outbound
205// messages.  For backward compatibility, all outbound messages will be sent
206// using this compressor, regardless of incoming message compression.  By
207// default, server messages will be sent using the same compressor with which
208// request messages were sent.
209//
210// Deprecated: use encoding.RegisterCompressor instead.
211func RPCCompressor(cp Compressor) ServerOption {
212	return func(o *options) {
213		o.cp = cp
214	}
215}
216
217// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
218// messages.  It has higher priority than decompressors registered via
219// encoding.RegisterCompressor.
220//
221// Deprecated: use encoding.RegisterCompressor instead.
222func RPCDecompressor(dc Decompressor) ServerOption {
223	return func(o *options) {
224		o.dc = dc
225	}
226}
227
228// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
229// If this is not set, gRPC uses the default limit.
230//
231// Deprecated: use MaxRecvMsgSize instead.
232func MaxMsgSize(m int) ServerOption {
233	return MaxRecvMsgSize(m)
234}
235
236// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
237// If this is not set, gRPC uses the default 4MB.
238func MaxRecvMsgSize(m int) ServerOption {
239	return func(o *options) {
240		o.maxReceiveMessageSize = m
241	}
242}
243
244// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
245// If this is not set, gRPC uses the default 4MB.
246func MaxSendMsgSize(m int) ServerOption {
247	return func(o *options) {
248		o.maxSendMessageSize = m
249	}
250}
251
252// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
253// of concurrent streams to each ServerTransport.
254func MaxConcurrentStreams(n uint32) ServerOption {
255	return func(o *options) {
256		o.maxConcurrentStreams = n
257	}
258}
259
260// Creds returns a ServerOption that sets credentials for server connections.
261func Creds(c credentials.TransportCredentials) ServerOption {
262	return func(o *options) {
263		o.creds = c
264	}
265}
266
267// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
268// server. Only one unary interceptor can be installed. The construction of multiple
269// interceptors (e.g., chaining) can be implemented at the caller.
270func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
271	return func(o *options) {
272		if o.unaryInt != nil {
273			panic("The unary server interceptor was already set and may not be reset.")
274		}
275		o.unaryInt = i
276	}
277}
278
279// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
280// server. Only one stream interceptor can be installed.
281func StreamInterceptor(i StreamServerInterceptor) ServerOption {
282	return func(o *options) {
283		if o.streamInt != nil {
284			panic("The stream server interceptor was already set and may not be reset.")
285		}
286		o.streamInt = i
287	}
288}
289
290// InTapHandle returns a ServerOption that sets the tap handle for all the server
291// transport to be created. Only one can be installed.
292func InTapHandle(h tap.ServerInHandle) ServerOption {
293	return func(o *options) {
294		if o.inTapHandle != nil {
295			panic("The tap handle was already set and may not be reset.")
296		}
297		o.inTapHandle = h
298	}
299}
300
301// StatsHandler returns a ServerOption that sets the stats handler for the server.
302func StatsHandler(h stats.Handler) ServerOption {
303	return func(o *options) {
304		o.statsHandler = h
305	}
306}
307
308// UnknownServiceHandler returns a ServerOption that allows for adding a custom
309// unknown service handler. The provided method is a bidi-streaming RPC service
310// handler that will be invoked instead of returning the "unimplemented" gRPC
311// error whenever a request is received for an unregistered service or method.
312// The handling function has full access to the Context of the request and the
313// stream, and the invocation bypasses interceptors.
314func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
315	return func(o *options) {
316		o.unknownStreamDesc = &StreamDesc{
317			StreamName: "unknown_service_handler",
318			Handler:    streamHandler,
319			// We need to assume that the users of the streamHandler will want to use both.
320			ClientStreams: true,
321			ServerStreams: true,
322		}
323	}
324}
325
326// ConnectionTimeout returns a ServerOption that sets the timeout for
327// connection establishment (up to and including HTTP/2 handshaking) for all
328// new connections.  If this is not set, the default is 120 seconds.  A zero or
329// negative value will result in an immediate timeout.
330//
331// This API is EXPERIMENTAL.
332func ConnectionTimeout(d time.Duration) ServerOption {
333	return func(o *options) {
334		o.connectionTimeout = d
335	}
336}
337
338// NewServer creates a gRPC server which has no service registered and has not
339// started to accept requests yet.
340func NewServer(opt ...ServerOption) *Server {
341	opts := defaultServerOptions
342	for _, o := range opt {
343		o(&opts)
344	}
345	s := &Server{
346		lis:   make(map[net.Listener]bool),
347		opts:  opts,
348		conns: make(map[io.Closer]bool),
349		m:     make(map[string]*service),
350		quit:  make(chan struct{}),
351		done:  make(chan struct{}),
352	}
353	s.cv = sync.NewCond(&s.mu)
354	if EnableTracing {
355		_, file, line, _ := runtime.Caller(1)
356		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
357	}
358
359	if channelz.IsOn() {
360		s.channelzID = channelz.RegisterServer(s, "")
361	}
362	return s
363}
364
365// printf records an event in s's event log, unless s has been stopped.
366// REQUIRES s.mu is held.
367func (s *Server) printf(format string, a ...interface{}) {
368	if s.events != nil {
369		s.events.Printf(format, a...)
370	}
371}
372
373// errorf records an error in s's event log, unless s has been stopped.
374// REQUIRES s.mu is held.
375func (s *Server) errorf(format string, a ...interface{}) {
376	if s.events != nil {
377		s.events.Errorf(format, a...)
378	}
379}
380
381// RegisterService registers a service and its implementation to the gRPC
382// server. It is called from the IDL generated code. This must be called before
383// invoking Serve.
384func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
385	ht := reflect.TypeOf(sd.HandlerType).Elem()
386	st := reflect.TypeOf(ss)
387	if !st.Implements(ht) {
388		grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
389	}
390	s.register(sd, ss)
391}
392
393func (s *Server) register(sd *ServiceDesc, ss interface{}) {
394	s.mu.Lock()
395	defer s.mu.Unlock()
396	s.printf("RegisterService(%q)", sd.ServiceName)
397	if s.serve {
398		grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
399	}
400	if _, ok := s.m[sd.ServiceName]; ok {
401		grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
402	}
403	srv := &service{
404		server: ss,
405		md:     make(map[string]*MethodDesc),
406		sd:     make(map[string]*StreamDesc),
407		mdata:  sd.Metadata,
408	}
409	for i := range sd.Methods {
410		d := &sd.Methods[i]
411		srv.md[d.MethodName] = d
412	}
413	for i := range sd.Streams {
414		d := &sd.Streams[i]
415		srv.sd[d.StreamName] = d
416	}
417	s.m[sd.ServiceName] = srv
418}
419
420// MethodInfo contains the information of an RPC including its method name and type.
421type MethodInfo struct {
422	// Name is the method name only, without the service name or package name.
423	Name string
424	// IsClientStream indicates whether the RPC is a client streaming RPC.
425	IsClientStream bool
426	// IsServerStream indicates whether the RPC is a server streaming RPC.
427	IsServerStream bool
428}
429
430// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
431type ServiceInfo struct {
432	Methods []MethodInfo
433	// Metadata is the metadata specified in ServiceDesc when registering service.
434	Metadata interface{}
435}
436
437// GetServiceInfo returns a map from service names to ServiceInfo.
438// Service names include the package names, in the form of <package>.<service>.
439func (s *Server) GetServiceInfo() map[string]ServiceInfo {
440	ret := make(map[string]ServiceInfo)
441	for n, srv := range s.m {
442		methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
443		for m := range srv.md {
444			methods = append(methods, MethodInfo{
445				Name:           m,
446				IsClientStream: false,
447				IsServerStream: false,
448			})
449		}
450		for m, d := range srv.sd {
451			methods = append(methods, MethodInfo{
452				Name:           m,
453				IsClientStream: d.ClientStreams,
454				IsServerStream: d.ServerStreams,
455			})
456		}
457
458		ret[n] = ServiceInfo{
459			Methods:  methods,
460			Metadata: srv.mdata,
461		}
462	}
463	return ret
464}
465
466// ErrServerStopped indicates that the operation is now illegal because of
467// the server being stopped.
468var ErrServerStopped = errors.New("grpc: the server has been stopped")
469
470func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
471	if s.opts.creds == nil {
472		return rawConn, nil, nil
473	}
474	return s.opts.creds.ServerHandshake(rawConn)
475}
476
477type listenSocket struct {
478	net.Listener
479	channelzID int64
480}
481
482func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
483	return &channelz.SocketInternalMetric{
484		SocketOptions: channelz.GetSocketOption(l.Listener),
485		LocalAddr:     l.Listener.Addr(),
486	}
487}
488
489func (l *listenSocket) Close() error {
490	err := l.Listener.Close()
491	if channelz.IsOn() {
492		channelz.RemoveEntry(l.channelzID)
493	}
494	return err
495}
496
497// Serve accepts incoming connections on the listener lis, creating a new
498// ServerTransport and service goroutine for each. The service goroutines
499// read gRPC requests and then call the registered handlers to reply to them.
500// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
501// this method returns.
502// Serve will return a non-nil error unless Stop or GracefulStop is called.
503func (s *Server) Serve(lis net.Listener) error {
504	s.mu.Lock()
505	s.printf("serving")
506	s.serve = true
507	if s.lis == nil {
508		// Serve called after Stop or GracefulStop.
509		s.mu.Unlock()
510		lis.Close()
511		return ErrServerStopped
512	}
513
514	s.serveWG.Add(1)
515	defer func() {
516		s.serveWG.Done()
517		select {
518		// Stop or GracefulStop called; block until done and return nil.
519		case <-s.quit:
520			<-s.done
521		default:
522		}
523	}()
524
525	ls := &listenSocket{Listener: lis}
526	s.lis[ls] = true
527
528	if channelz.IsOn() {
529		ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
530	}
531	s.mu.Unlock()
532
533	defer func() {
534		s.mu.Lock()
535		if s.lis != nil && s.lis[ls] {
536			ls.Close()
537			delete(s.lis, ls)
538		}
539		s.mu.Unlock()
540	}()
541
542	var tempDelay time.Duration // how long to sleep on accept failure
543
544	for {
545		rawConn, err := lis.Accept()
546		if err != nil {
547			if ne, ok := err.(interface {
548				Temporary() bool
549			}); ok && ne.Temporary() {
550				if tempDelay == 0 {
551					tempDelay = 5 * time.Millisecond
552				} else {
553					tempDelay *= 2
554				}
555				if max := 1 * time.Second; tempDelay > max {
556					tempDelay = max
557				}
558				s.mu.Lock()
559				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
560				s.mu.Unlock()
561				timer := time.NewTimer(tempDelay)
562				select {
563				case <-timer.C:
564				case <-s.quit:
565					timer.Stop()
566					return nil
567				}
568				continue
569			}
570			s.mu.Lock()
571			s.printf("done serving; Accept = %v", err)
572			s.mu.Unlock()
573
574			select {
575			case <-s.quit:
576				return nil
577			default:
578			}
579			return err
580		}
581		tempDelay = 0
582		// Start a new goroutine to deal with rawConn so we don't stall this Accept
583		// loop goroutine.
584		//
585		// Make sure we account for the goroutine so GracefulStop doesn't nil out
586		// s.conns before this conn can be added.
587		s.serveWG.Add(1)
588		go func() {
589			s.handleRawConn(rawConn)
590			s.serveWG.Done()
591		}()
592	}
593}
594
595// handleRawConn forks a goroutine to handle a just-accepted connection that
596// has not had any I/O performed on it yet.
597func (s *Server) handleRawConn(rawConn net.Conn) {
598	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
599	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
600	if err != nil {
601		s.mu.Lock()
602		s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
603		s.mu.Unlock()
604		grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
605		// If serverHandshake returns ErrConnDispatched, keep rawConn open.
606		if err != credentials.ErrConnDispatched {
607			rawConn.Close()
608		}
609		rawConn.SetDeadline(time.Time{})
610		return
611	}
612
613	s.mu.Lock()
614	if s.conns == nil {
615		s.mu.Unlock()
616		conn.Close()
617		return
618	}
619	s.mu.Unlock()
620
621	var serve func()
622	c := conn.(io.Closer)
623	if s.opts.useHandlerImpl {
624		serve = func() { s.serveUsingHandler(conn) }
625	} else {
626		// Finish handshaking (HTTP2)
627		st := s.newHTTP2Transport(conn, authInfo)
628		if st == nil {
629			return
630		}
631		c = st
632		serve = func() { s.serveStreams(st) }
633	}
634
635	rawConn.SetDeadline(time.Time{})
636	if !s.addConn(c) {
637		return
638	}
639	go func() {
640		serve()
641		s.removeConn(c)
642	}()
643}
644
645// newHTTP2Transport sets up a http/2 transport (using the
646// gRPC http2 server transport in transport/http2_server.go).
647func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
648	config := &transport.ServerConfig{
649		MaxStreams:            s.opts.maxConcurrentStreams,
650		AuthInfo:              authInfo,
651		InTapHandle:           s.opts.inTapHandle,
652		StatsHandler:          s.opts.statsHandler,
653		KeepaliveParams:       s.opts.keepaliveParams,
654		KeepalivePolicy:       s.opts.keepalivePolicy,
655		InitialWindowSize:     s.opts.initialWindowSize,
656		InitialConnWindowSize: s.opts.initialConnWindowSize,
657		WriteBufferSize:       s.opts.writeBufferSize,
658		ReadBufferSize:        s.opts.readBufferSize,
659		ChannelzParentID:      s.channelzID,
660	}
661	st, err := transport.NewServerTransport("http2", c, config)
662	if err != nil {
663		s.mu.Lock()
664		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
665		s.mu.Unlock()
666		c.Close()
667		grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
668		return nil
669	}
670
671	return st
672}
673
674func (s *Server) serveStreams(st transport.ServerTransport) {
675	defer st.Close()
676	var wg sync.WaitGroup
677	st.HandleStreams(func(stream *transport.Stream) {
678		wg.Add(1)
679		go func() {
680			defer wg.Done()
681			s.handleStream(st, stream, s.traceInfo(st, stream))
682		}()
683	}, func(ctx context.Context, method string) context.Context {
684		if !EnableTracing {
685			return ctx
686		}
687		tr := trace.New("grpc.Recv."+methodFamily(method), method)
688		return trace.NewContext(ctx, tr)
689	})
690	wg.Wait()
691}
692
693var _ http.Handler = (*Server)(nil)
694
695// serveUsingHandler is called from handleRawConn when s is configured
696// to handle requests via the http.Handler interface. It sets up a
697// net/http.Server to handle the just-accepted conn. The http.Server
698// is configured to route all incoming requests (all HTTP/2 streams)
699// to ServeHTTP, which creates a new ServerTransport for each stream.
700// serveUsingHandler blocks until conn closes.
701//
702// This codepath is only used when Server.TestingUseHandlerImpl has
703// been configured. This lets the end2end tests exercise the ServeHTTP
704// method as one of the environment types.
705//
706// conn is the *tls.Conn that's already been authenticated.
707func (s *Server) serveUsingHandler(conn net.Conn) {
708	h2s := &http2.Server{
709		MaxConcurrentStreams: s.opts.maxConcurrentStreams,
710	}
711	h2s.ServeConn(conn, &http2.ServeConnOpts{
712		Handler: s,
713	})
714}
715
716// ServeHTTP implements the Go standard library's http.Handler
717// interface by responding to the gRPC request r, by looking up
718// the requested gRPC method in the gRPC server s.
719//
720// The provided HTTP request must have arrived on an HTTP/2
721// connection. When using the Go standard library's server,
722// practically this means that the Request must also have arrived
723// over TLS.
724//
725// To share one port (such as 443 for https) between gRPC and an
726// existing http.Handler, use a root http.Handler such as:
727//
728//   if r.ProtoMajor == 2 && strings.HasPrefix(
729//   	r.Header.Get("Content-Type"), "application/grpc") {
730//   	grpcServer.ServeHTTP(w, r)
731//   } else {
732//   	yourMux.ServeHTTP(w, r)
733//   }
734//
735// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
736// separate from grpc-go's HTTP/2 server. Performance and features may vary
737// between the two paths. ServeHTTP does not support some gRPC features
738// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
739// and subject to change.
740func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
741	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
742	if err != nil {
743		http.Error(w, err.Error(), http.StatusInternalServerError)
744		return
745	}
746	if !s.addConn(st) {
747		return
748	}
749	defer s.removeConn(st)
750	s.serveStreams(st)
751}
752
753// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
754// If tracing is not enabled, it returns nil.
755func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
756	tr, ok := trace.FromContext(stream.Context())
757	if !ok {
758		return nil
759	}
760
761	trInfo = &traceInfo{
762		tr: tr,
763	}
764	trInfo.firstLine.client = false
765	trInfo.firstLine.remoteAddr = st.RemoteAddr()
766
767	if dl, ok := stream.Context().Deadline(); ok {
768		trInfo.firstLine.deadline = dl.Sub(time.Now())
769	}
770	return trInfo
771}
772
773func (s *Server) addConn(c io.Closer) bool {
774	s.mu.Lock()
775	defer s.mu.Unlock()
776	if s.conns == nil {
777		c.Close()
778		return false
779	}
780	if s.drain {
781		// Transport added after we drained our existing conns: drain it
782		// immediately.
783		c.(transport.ServerTransport).Drain()
784	}
785	s.conns[c] = true
786	return true
787}
788
789func (s *Server) removeConn(c io.Closer) {
790	s.mu.Lock()
791	defer s.mu.Unlock()
792	if s.conns != nil {
793		delete(s.conns, c)
794		s.cv.Broadcast()
795	}
796}
797
798// ChannelzMetric returns ServerInternalMetric of current server.
799// This is an EXPERIMENTAL API.
800func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
801	s.czmu.RLock()
802	defer s.czmu.RUnlock()
803	return &channelz.ServerInternalMetric{
804		CallsStarted:             s.callsStarted,
805		CallsSucceeded:           s.callsSucceeded,
806		CallsFailed:              s.callsFailed,
807		LastCallStartedTimestamp: s.lastCallStartedTime,
808	}
809}
810
811func (s *Server) incrCallsStarted() {
812	s.czmu.Lock()
813	s.callsStarted++
814	s.lastCallStartedTime = time.Now()
815	s.czmu.Unlock()
816}
817
818func (s *Server) incrCallsSucceeded() {
819	s.czmu.Lock()
820	s.callsSucceeded++
821	s.czmu.Unlock()
822}
823
824func (s *Server) incrCallsFailed() {
825	s.czmu.Lock()
826	s.callsFailed++
827	s.czmu.Unlock()
828}
829
830func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
831	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
832	if err != nil {
833		grpclog.Errorln("grpc: server failed to encode response: ", err)
834		return err
835	}
836	compData, err := compress(data, cp, comp)
837	if err != nil {
838		grpclog.Errorln("grpc: server failed to compress response: ", err)
839		return err
840	}
841	hdr, payload := msgHeader(data, compData)
842	// TODO(dfawley): should we be checking len(data) instead?
843	if len(payload) > s.opts.maxSendMessageSize {
844		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
845	}
846	err = t.Write(stream, hdr, payload, opts)
847	if err == nil && s.opts.statsHandler != nil {
848		s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
849	}
850	return err
851}
852
853func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
854	if channelz.IsOn() {
855		s.incrCallsStarted()
856		defer func() {
857			if err != nil && err != io.EOF {
858				s.incrCallsFailed()
859			} else {
860				s.incrCallsSucceeded()
861			}
862		}()
863	}
864	sh := s.opts.statsHandler
865	if sh != nil {
866		beginTime := time.Now()
867		begin := &stats.Begin{
868			BeginTime: beginTime,
869		}
870		sh.HandleRPC(stream.Context(), begin)
871		defer func() {
872			end := &stats.End{
873				BeginTime: beginTime,
874				EndTime:   time.Now(),
875			}
876			if err != nil && err != io.EOF {
877				end.Error = toRPCErr(err)
878			}
879			sh.HandleRPC(stream.Context(), end)
880		}()
881	}
882	if trInfo != nil {
883		defer trInfo.tr.Finish()
884		trInfo.firstLine.client = false
885		trInfo.tr.LazyLog(&trInfo.firstLine, false)
886		defer func() {
887			if err != nil && err != io.EOF {
888				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
889				trInfo.tr.SetError()
890			}
891		}()
892	}
893
894	// comp and cp are used for compression.  decomp and dc are used for
895	// decompression.  If comp and decomp are both set, they are the same;
896	// however they are kept separate to ensure that at most one of the
897	// compressor/decompressor variable pairs are set for use later.
898	var comp, decomp encoding.Compressor
899	var cp Compressor
900	var dc Decompressor
901
902	// If dc is set and matches the stream's compression, use it.  Otherwise, try
903	// to find a matching registered compressor for decomp.
904	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
905		dc = s.opts.dc
906	} else if rc != "" && rc != encoding.Identity {
907		decomp = encoding.GetCompressor(rc)
908		if decomp == nil {
909			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
910			t.WriteStatus(stream, st)
911			return st.Err()
912		}
913	}
914
915	// If cp is set, use it.  Otherwise, attempt to compress the response using
916	// the incoming message compression method.
917	//
918	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
919	if s.opts.cp != nil {
920		cp = s.opts.cp
921		stream.SetSendCompress(cp.Type())
922	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
923		// Legacy compressor not specified; attempt to respond with same encoding.
924		comp = encoding.GetCompressor(rc)
925		if comp != nil {
926			stream.SetSendCompress(rc)
927		}
928	}
929
930	p := &parser{r: stream}
931	pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
932	if err == io.EOF {
933		// The entire stream is done (for unary RPC only).
934		return err
935	}
936	if err == io.ErrUnexpectedEOF {
937		err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
938	}
939	if err != nil {
940		if st, ok := status.FromError(err); ok {
941			if e := t.WriteStatus(stream, st); e != nil {
942				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
943			}
944		} else {
945			switch st := err.(type) {
946			case transport.ConnectionError:
947				// Nothing to do here.
948			case transport.StreamError:
949				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
950					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
951				}
952			default:
953				panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
954			}
955		}
956		return err
957	}
958	if channelz.IsOn() {
959		t.IncrMsgRecv()
960	}
961	if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
962		if e := t.WriteStatus(stream, st); e != nil {
963			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
964		}
965		return st.Err()
966	}
967	var inPayload *stats.InPayload
968	if sh != nil {
969		inPayload = &stats.InPayload{
970			RecvTime: time.Now(),
971		}
972	}
973	df := func(v interface{}) error {
974		if inPayload != nil {
975			inPayload.WireLength = len(req)
976		}
977		if pf == compressionMade {
978			var err error
979			if dc != nil {
980				req, err = dc.Do(bytes.NewReader(req))
981				if err != nil {
982					return status.Errorf(codes.Internal, err.Error())
983				}
984			} else {
985				tmp, _ := decomp.Decompress(bytes.NewReader(req))
986				req, err = ioutil.ReadAll(tmp)
987				if err != nil {
988					return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
989				}
990			}
991		}
992		if len(req) > s.opts.maxReceiveMessageSize {
993			// TODO: Revisit the error code. Currently keep it consistent with
994			// java implementation.
995			return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
996		}
997		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
998			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
999		}
1000		if inPayload != nil {
1001			inPayload.Payload = v
1002			inPayload.Data = req
1003			inPayload.Length = len(req)
1004			sh.HandleRPC(stream.Context(), inPayload)
1005		}
1006		if trInfo != nil {
1007			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1008		}
1009		return nil
1010	}
1011	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1012	reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1013	if appErr != nil {
1014		appStatus, ok := status.FromError(appErr)
1015		if !ok {
1016			// Convert appErr if it is not a grpc status error.
1017			appErr = status.Error(codes.Unknown, appErr.Error())
1018			appStatus, _ = status.FromError(appErr)
1019		}
1020		if trInfo != nil {
1021			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1022			trInfo.tr.SetError()
1023		}
1024		if e := t.WriteStatus(stream, appStatus); e != nil {
1025			grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1026		}
1027		return appErr
1028	}
1029	if trInfo != nil {
1030		trInfo.tr.LazyLog(stringer("OK"), false)
1031	}
1032	opts := &transport.Options{
1033		Last:  true,
1034		Delay: false,
1035	}
1036
1037	if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1038		if err == io.EOF {
1039			// The entire stream is done (for unary RPC only).
1040			return err
1041		}
1042		if s, ok := status.FromError(err); ok {
1043			if e := t.WriteStatus(stream, s); e != nil {
1044				grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1045			}
1046		} else {
1047			switch st := err.(type) {
1048			case transport.ConnectionError:
1049				// Nothing to do here.
1050			case transport.StreamError:
1051				if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
1052					grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
1053				}
1054			default:
1055				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1056			}
1057		}
1058		return err
1059	}
1060	if channelz.IsOn() {
1061		t.IncrMsgSent()
1062	}
1063	if trInfo != nil {
1064		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1065	}
1066	// TODO: Should we be logging if writing status failed here, like above?
1067	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
1068	// error or allow the stats handler to see it?
1069	return t.WriteStatus(stream, status.New(codes.OK, ""))
1070}
1071
1072func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1073	if channelz.IsOn() {
1074		s.incrCallsStarted()
1075		defer func() {
1076			if err != nil && err != io.EOF {
1077				s.incrCallsFailed()
1078			} else {
1079				s.incrCallsSucceeded()
1080			}
1081		}()
1082	}
1083	sh := s.opts.statsHandler
1084	if sh != nil {
1085		beginTime := time.Now()
1086		begin := &stats.Begin{
1087			BeginTime: beginTime,
1088		}
1089		sh.HandleRPC(stream.Context(), begin)
1090		defer func() {
1091			end := &stats.End{
1092				BeginTime: beginTime,
1093				EndTime:   time.Now(),
1094			}
1095			if err != nil && err != io.EOF {
1096				end.Error = toRPCErr(err)
1097			}
1098			sh.HandleRPC(stream.Context(), end)
1099		}()
1100	}
1101	ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1102	ss := &serverStream{
1103		ctx:   ctx,
1104		t:     t,
1105		s:     stream,
1106		p:     &parser{r: stream},
1107		codec: s.getCodec(stream.ContentSubtype()),
1108		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1109		maxSendMessageSize:    s.opts.maxSendMessageSize,
1110		trInfo:                trInfo,
1111		statsHandler:          sh,
1112	}
1113
1114	// If dc is set and matches the stream's compression, use it.  Otherwise, try
1115	// to find a matching registered compressor for decomp.
1116	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1117		ss.dc = s.opts.dc
1118	} else if rc != "" && rc != encoding.Identity {
1119		ss.decomp = encoding.GetCompressor(rc)
1120		if ss.decomp == nil {
1121			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1122			t.WriteStatus(ss.s, st)
1123			return st.Err()
1124		}
1125	}
1126
1127	// If cp is set, use it.  Otherwise, attempt to compress the response using
1128	// the incoming message compression method.
1129	//
1130	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1131	if s.opts.cp != nil {
1132		ss.cp = s.opts.cp
1133		stream.SetSendCompress(s.opts.cp.Type())
1134	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1135		// Legacy compressor not specified; attempt to respond with same encoding.
1136		ss.comp = encoding.GetCompressor(rc)
1137		if ss.comp != nil {
1138			stream.SetSendCompress(rc)
1139		}
1140	}
1141
1142	if trInfo != nil {
1143		trInfo.tr.LazyLog(&trInfo.firstLine, false)
1144		defer func() {
1145			ss.mu.Lock()
1146			if err != nil && err != io.EOF {
1147				ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1148				ss.trInfo.tr.SetError()
1149			}
1150			ss.trInfo.tr.Finish()
1151			ss.trInfo.tr = nil
1152			ss.mu.Unlock()
1153		}()
1154	}
1155	var appErr error
1156	var server interface{}
1157	if srv != nil {
1158		server = srv.server
1159	}
1160	if s.opts.streamInt == nil {
1161		appErr = sd.Handler(server, ss)
1162	} else {
1163		info := &StreamServerInfo{
1164			FullMethod:     stream.Method(),
1165			IsClientStream: sd.ClientStreams,
1166			IsServerStream: sd.ServerStreams,
1167		}
1168		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1169	}
1170	if appErr != nil {
1171		appStatus, ok := status.FromError(appErr)
1172		if !ok {
1173			switch err := appErr.(type) {
1174			case transport.StreamError:
1175				appStatus = status.New(err.Code, err.Desc)
1176			default:
1177				appStatus = status.New(codes.Unknown, appErr.Error())
1178			}
1179			appErr = appStatus.Err()
1180		}
1181		if trInfo != nil {
1182			ss.mu.Lock()
1183			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1184			ss.trInfo.tr.SetError()
1185			ss.mu.Unlock()
1186		}
1187		t.WriteStatus(ss.s, appStatus)
1188		// TODO: Should we log an error from WriteStatus here and below?
1189		return appErr
1190	}
1191	if trInfo != nil {
1192		ss.mu.Lock()
1193		ss.trInfo.tr.LazyLog(stringer("OK"), false)
1194		ss.mu.Unlock()
1195	}
1196	return t.WriteStatus(ss.s, status.New(codes.OK, ""))
1197}
1198
1199func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1200	sm := stream.Method()
1201	if sm != "" && sm[0] == '/' {
1202		sm = sm[1:]
1203	}
1204	pos := strings.LastIndex(sm, "/")
1205	if pos == -1 {
1206		if trInfo != nil {
1207			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1208			trInfo.tr.SetError()
1209		}
1210		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1211		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1212			if trInfo != nil {
1213				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1214				trInfo.tr.SetError()
1215			}
1216			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1217		}
1218		if trInfo != nil {
1219			trInfo.tr.Finish()
1220		}
1221		return
1222	}
1223	service := sm[:pos]
1224	method := sm[pos+1:]
1225	srv, ok := s.m[service]
1226	if !ok {
1227		if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1228			s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1229			return
1230		}
1231		if trInfo != nil {
1232			trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1233			trInfo.tr.SetError()
1234		}
1235		errDesc := fmt.Sprintf("unknown service %v", service)
1236		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1237			if trInfo != nil {
1238				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1239				trInfo.tr.SetError()
1240			}
1241			grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1242		}
1243		if trInfo != nil {
1244			trInfo.tr.Finish()
1245		}
1246		return
1247	}
1248	// Unary RPC or Streaming RPC?
1249	if md, ok := srv.md[method]; ok {
1250		s.processUnaryRPC(t, stream, srv, md, trInfo)
1251		return
1252	}
1253	if sd, ok := srv.sd[method]; ok {
1254		s.processStreamingRPC(t, stream, srv, sd, trInfo)
1255		return
1256	}
1257	if trInfo != nil {
1258		trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
1259		trInfo.tr.SetError()
1260	}
1261	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1262		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1263		return
1264	}
1265	errDesc := fmt.Sprintf("unknown method %v", method)
1266	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1267		if trInfo != nil {
1268			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1269			trInfo.tr.SetError()
1270		}
1271		grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1272	}
1273	if trInfo != nil {
1274		trInfo.tr.Finish()
1275	}
1276}
1277
1278// The key to save ServerTransportStream in the context.
1279type streamKey struct{}
1280
1281// NewContextWithServerTransportStream creates a new context from ctx and
1282// attaches stream to it.
1283//
1284// This API is EXPERIMENTAL.
1285func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1286	return context.WithValue(ctx, streamKey{}, stream)
1287}
1288
1289// ServerTransportStream is a minimal interface that a transport stream must
1290// implement. This can be used to mock an actual transport stream for tests of
1291// handler code that use, for example, grpc.SetHeader (which requires some
1292// stream to be in context).
1293//
1294// See also NewContextWithServerTransportStream.
1295//
1296// This API is EXPERIMENTAL.
1297type ServerTransportStream interface {
1298	Method() string
1299	SetHeader(md metadata.MD) error
1300	SendHeader(md metadata.MD) error
1301	SetTrailer(md metadata.MD) error
1302}
1303
1304// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1305// ctx. Returns nil if the given context has no stream associated with it
1306// (which implies it is not an RPC invocation context).
1307//
1308// This API is EXPERIMENTAL.
1309func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1310	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1311	return s
1312}
1313
1314// Stop stops the gRPC server. It immediately closes all open
1315// connections and listeners.
1316// It cancels all active RPCs on the server side and the corresponding
1317// pending RPCs on the client side will get notified by connection
1318// errors.
1319func (s *Server) Stop() {
1320	s.quitOnce.Do(func() {
1321		close(s.quit)
1322	})
1323
1324	defer func() {
1325		s.serveWG.Wait()
1326		s.doneOnce.Do(func() {
1327			close(s.done)
1328		})
1329	}()
1330
1331	s.channelzRemoveOnce.Do(func() {
1332		if channelz.IsOn() {
1333			channelz.RemoveEntry(s.channelzID)
1334		}
1335	})
1336
1337	s.mu.Lock()
1338	listeners := s.lis
1339	s.lis = nil
1340	st := s.conns
1341	s.conns = nil
1342	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1343	s.cv.Broadcast()
1344	s.mu.Unlock()
1345
1346	for lis := range listeners {
1347		lis.Close()
1348	}
1349	for c := range st {
1350		c.Close()
1351	}
1352
1353	s.mu.Lock()
1354	if s.events != nil {
1355		s.events.Finish()
1356		s.events = nil
1357	}
1358	s.mu.Unlock()
1359}
1360
1361// GracefulStop stops the gRPC server gracefully. It stops the server from
1362// accepting new connections and RPCs and blocks until all the pending RPCs are
1363// finished.
1364func (s *Server) GracefulStop() {
1365	s.quitOnce.Do(func() {
1366		close(s.quit)
1367	})
1368
1369	defer func() {
1370		s.doneOnce.Do(func() {
1371			close(s.done)
1372		})
1373	}()
1374
1375	s.channelzRemoveOnce.Do(func() {
1376		if channelz.IsOn() {
1377			channelz.RemoveEntry(s.channelzID)
1378		}
1379	})
1380	s.mu.Lock()
1381	if s.conns == nil {
1382		s.mu.Unlock()
1383		return
1384	}
1385
1386	for lis := range s.lis {
1387		lis.Close()
1388	}
1389	s.lis = nil
1390	if !s.drain {
1391		for c := range s.conns {
1392			c.(transport.ServerTransport).Drain()
1393		}
1394		s.drain = true
1395	}
1396
1397	// Wait for serving threads to be ready to exit.  Only then can we be sure no
1398	// new conns will be created.
1399	s.mu.Unlock()
1400	s.serveWG.Wait()
1401	s.mu.Lock()
1402
1403	for len(s.conns) != 0 {
1404		s.cv.Wait()
1405	}
1406	s.conns = nil
1407	if s.events != nil {
1408		s.events.Finish()
1409		s.events = nil
1410	}
1411	s.mu.Unlock()
1412}
1413
1414func init() {
1415	internal.TestingUseHandlerImpl = func(arg interface{}) {
1416		arg.(*Server).opts.useHandlerImpl = true
1417	}
1418}
1419
1420// contentSubtype must be lowercase
1421// cannot return nil
1422func (s *Server) getCodec(contentSubtype string) baseCodec {
1423	if s.opts.codec != nil {
1424		return s.opts.codec
1425	}
1426	if contentSubtype == "" {
1427		return encoding.GetCodec(proto.Name)
1428	}
1429	codec := encoding.GetCodec(contentSubtype)
1430	if codec == nil {
1431		return encoding.GetCodec(proto.Name)
1432	}
1433	return codec
1434}
1435
1436// SetHeader sets the header metadata.
1437// When called multiple times, all the provided metadata will be merged.
1438// All the metadata will be sent out when one of the following happens:
1439//  - grpc.SendHeader() is called;
1440//  - The first response is sent out;
1441//  - An RPC status is sent out (error or success).
1442func SetHeader(ctx context.Context, md metadata.MD) error {
1443	if md.Len() == 0 {
1444		return nil
1445	}
1446	stream := ServerTransportStreamFromContext(ctx)
1447	if stream == nil {
1448		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1449	}
1450	return stream.SetHeader(md)
1451}
1452
1453// SendHeader sends header metadata. It may be called at most once.
1454// The provided md and headers set by SetHeader() will be sent.
1455func SendHeader(ctx context.Context, md metadata.MD) error {
1456	stream := ServerTransportStreamFromContext(ctx)
1457	if stream == nil {
1458		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1459	}
1460	if err := stream.SendHeader(md); err != nil {
1461		return toRPCErr(err)
1462	}
1463	return nil
1464}
1465
1466// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1467// When called more than once, all the provided metadata will be merged.
1468func SetTrailer(ctx context.Context, md metadata.MD) error {
1469	if md.Len() == 0 {
1470		return nil
1471	}
1472	stream := ServerTransportStreamFromContext(ctx)
1473	if stream == nil {
1474		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1475	}
1476	return stream.SetTrailer(md)
1477}
1478
1479// Method returns the method string for the server context.  The returned
1480// string is in the format of "/service/method".
1481func Method(ctx context.Context) (string, bool) {
1482	s := ServerTransportStreamFromContext(ctx)
1483	if s == nil {
1484		return "", false
1485	}
1486	return s.Method(), true
1487}
1488