• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC).  It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport // externally used as import "google.golang.org/grpc/transport"
23
24import (
25	"errors"
26	"fmt"
27	"io"
28	"net"
29	"sync"
30	"sync/atomic"
31
32	"golang.org/x/net/context"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/credentials"
35	"google.golang.org/grpc/keepalive"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/stats"
38	"google.golang.org/grpc/status"
39	"google.golang.org/grpc/tap"
40)
41
42// recvMsg represents the received msg from the transport. All transport
43// protocol specific info has been removed.
44type recvMsg struct {
45	data []byte
46	// nil: received some data
47	// io.EOF: stream is completed. data is nil.
48	// other non-nil error: transport failure. data is nil.
49	err error
50}
51
52// recvBuffer is an unbounded channel of recvMsg structs.
53// Note recvBuffer differs from controlBuffer only in that recvBuffer
54// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55// recvBuffer is written to much more often than
56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57type recvBuffer struct {
58	c       chan recvMsg
59	mu      sync.Mutex
60	backlog []recvMsg
61	err     error
62}
63
64func newRecvBuffer() *recvBuffer {
65	b := &recvBuffer{
66		c: make(chan recvMsg, 1),
67	}
68	return b
69}
70
71func (b *recvBuffer) put(r recvMsg) {
72	b.mu.Lock()
73	if b.err != nil {
74		b.mu.Unlock()
75		// An error had occurred earlier, don't accept more
76		// data or errors.
77		return
78	}
79	b.err = r.err
80	if len(b.backlog) == 0 {
81		select {
82		case b.c <- r:
83			b.mu.Unlock()
84			return
85		default:
86		}
87	}
88	b.backlog = append(b.backlog, r)
89	b.mu.Unlock()
90}
91
92func (b *recvBuffer) load() {
93	b.mu.Lock()
94	if len(b.backlog) > 0 {
95		select {
96		case b.c <- b.backlog[0]:
97			b.backlog[0] = recvMsg{}
98			b.backlog = b.backlog[1:]
99		default:
100		}
101	}
102	b.mu.Unlock()
103}
104
105// get returns the channel that receives a recvMsg in the buffer.
106//
107// Upon receipt of a recvMsg, the caller should call load to send another
108// recvMsg onto the channel if there is any.
109func (b *recvBuffer) get() <-chan recvMsg {
110	return b.c
111}
112
113//
114// recvBufferReader implements io.Reader interface to read the data from
115// recvBuffer.
116type recvBufferReader struct {
117	ctx     context.Context
118	ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
119	recv    *recvBuffer
120	last    []byte // Stores the remaining data in the previous calls.
121	err     error
122}
123
124// Read reads the next len(p) bytes from last. If last is drained, it tries to
125// read additional data from recv. It blocks if there no additional data available
126// in recv. If Read returns any non-nil error, it will continue to return that error.
127func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128	if r.err != nil {
129		return 0, r.err
130	}
131	n, r.err = r.read(p)
132	return n, r.err
133}
134
135func (r *recvBufferReader) read(p []byte) (n int, err error) {
136	if r.last != nil && len(r.last) > 0 {
137		// Read remaining data left in last call.
138		copied := copy(p, r.last)
139		r.last = r.last[copied:]
140		return copied, nil
141	}
142	select {
143	case <-r.ctxDone:
144		return 0, ContextErr(r.ctx.Err())
145	case m := <-r.recv.get():
146		r.recv.load()
147		if m.err != nil {
148			return 0, m.err
149		}
150		copied := copy(p, m.data)
151		r.last = m.data[copied:]
152		return copied, nil
153	}
154}
155
156type streamState uint32
157
158const (
159	streamActive    streamState = iota
160	streamWriteDone             // EndStream sent
161	streamReadDone              // EndStream received
162	streamDone                  // the entire stream is finished.
163)
164
165// Stream represents an RPC in the transport layer.
166type Stream struct {
167	id           uint32
168	st           ServerTransport    // nil for client side Stream
169	ctx          context.Context    // the associated context of the stream
170	cancel       context.CancelFunc // always nil for client side Stream
171	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
172	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
173	method       string             // the associated RPC method of the stream
174	recvCompress string
175	sendCompress string
176	buf          *recvBuffer
177	trReader     io.Reader
178	fc           *inFlow
179	recvQuota    uint32
180	wq           *writeQuota
181
182	// Callback to state application's intentions to read data. This
183	// is used to adjust flow control, if needed.
184	requestRead func(int)
185
186	headerChan chan struct{} // closed to indicate the end of header metadata.
187	headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
188
189	// hdrMu protects header and trailer metadata on the server-side.
190	hdrMu   sync.Mutex
191	header  metadata.MD // the received header metadata.
192	trailer metadata.MD // the key-value map of trailer metadata.
193
194	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
195	headerSent uint32
196
197	state streamState
198
199	// On client-side it is the status error received from the server.
200	// On server-side it is unused.
201	status *status.Status
202
203	bytesReceived uint32 // indicates whether any bytes have been received on this stream
204	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
205
206	// contentSubtype is the content-subtype for requests.
207	// this must be lowercase or the behavior is undefined.
208	contentSubtype string
209}
210
211// isHeaderSent is only valid on the server-side.
212func (s *Stream) isHeaderSent() bool {
213	return atomic.LoadUint32(&s.headerSent) == 1
214}
215
216// updateHeaderSent updates headerSent and returns true
217// if it was alreay set. It is valid only on server-side.
218func (s *Stream) updateHeaderSent() bool {
219	return atomic.SwapUint32(&s.headerSent, 1) == 1
220}
221
222func (s *Stream) swapState(st streamState) streamState {
223	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
224}
225
226func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
227	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
228}
229
230func (s *Stream) getState() streamState {
231	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
232}
233
234func (s *Stream) waitOnHeader() error {
235	if s.headerChan == nil {
236		// On the server headerChan is always nil since a stream originates
237		// only after having received headers.
238		return nil
239	}
240	select {
241	case <-s.ctx.Done():
242		return ContextErr(s.ctx.Err())
243	case <-s.headerChan:
244		return nil
245	}
246}
247
248// RecvCompress returns the compression algorithm applied to the inbound
249// message. It is empty string if there is no compression applied.
250func (s *Stream) RecvCompress() string {
251	if err := s.waitOnHeader(); err != nil {
252		return ""
253	}
254	return s.recvCompress
255}
256
257// SetSendCompress sets the compression algorithm to the stream.
258func (s *Stream) SetSendCompress(str string) {
259	s.sendCompress = str
260}
261
262// Done returns a chanel which is closed when it receives the final status
263// from the server.
264func (s *Stream) Done() <-chan struct{} {
265	return s.done
266}
267
268// Header acquires the key-value pairs of header metadata once it
269// is available. It blocks until i) the metadata is ready or ii) there is no
270// header metadata or iii) the stream is canceled/expired.
271func (s *Stream) Header() (metadata.MD, error) {
272	err := s.waitOnHeader()
273	// Even if the stream is closed, header is returned if available.
274	select {
275	case <-s.headerChan:
276		if s.header == nil {
277			return nil, nil
278		}
279		return s.header.Copy(), nil
280	default:
281	}
282	return nil, err
283}
284
285// Trailer returns the cached trailer metedata. Note that if it is not called
286// after the entire stream is done, it could return an empty MD. Client
287// side only.
288// It can be safely read only after stream has ended that is either read
289// or write have returned io.EOF.
290func (s *Stream) Trailer() metadata.MD {
291	c := s.trailer.Copy()
292	return c
293}
294
295// ServerTransport returns the underlying ServerTransport for the stream.
296// The client side stream always returns nil.
297func (s *Stream) ServerTransport() ServerTransport {
298	return s.st
299}
300
301// ContentSubtype returns the content-subtype for a request. For example, a
302// content-subtype of "proto" will result in a content-type of
303// "application/grpc+proto". This will always be lowercase.  See
304// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
305// more details.
306func (s *Stream) ContentSubtype() string {
307	return s.contentSubtype
308}
309
310// Context returns the context of the stream.
311func (s *Stream) Context() context.Context {
312	return s.ctx
313}
314
315// Method returns the method for the stream.
316func (s *Stream) Method() string {
317	return s.method
318}
319
320// Status returns the status received from the server.
321// Status can be read safely only after the stream has ended,
322// that is, read or write has returned io.EOF.
323func (s *Stream) Status() *status.Status {
324	return s.status
325}
326
327// SetHeader sets the header metadata. This can be called multiple times.
328// Server side only.
329// This should not be called in parallel to other data writes.
330func (s *Stream) SetHeader(md metadata.MD) error {
331	if md.Len() == 0 {
332		return nil
333	}
334	if s.isHeaderSent() || s.getState() == streamDone {
335		return ErrIllegalHeaderWrite
336	}
337	s.hdrMu.Lock()
338	s.header = metadata.Join(s.header, md)
339	s.hdrMu.Unlock()
340	return nil
341}
342
343// SendHeader sends the given header metadata. The given metadata is
344// combined with any metadata set by previous calls to SetHeader and
345// then written to the transport stream.
346func (s *Stream) SendHeader(md metadata.MD) error {
347	t := s.ServerTransport()
348	return t.WriteHeader(s, md)
349}
350
351// SetTrailer sets the trailer metadata which will be sent with the RPC status
352// by the server. This can be called multiple times. Server side only.
353// This should not be called parallel to other data writes.
354func (s *Stream) SetTrailer(md metadata.MD) error {
355	if md.Len() == 0 {
356		return nil
357	}
358	if s.getState() == streamDone {
359		return ErrIllegalHeaderWrite
360	}
361	s.hdrMu.Lock()
362	s.trailer = metadata.Join(s.trailer, md)
363	s.hdrMu.Unlock()
364	return nil
365}
366
367func (s *Stream) write(m recvMsg) {
368	s.buf.put(m)
369}
370
371// Read reads all p bytes from the wire for this stream.
372func (s *Stream) Read(p []byte) (n int, err error) {
373	// Don't request a read if there was an error earlier
374	if er := s.trReader.(*transportReader).er; er != nil {
375		return 0, er
376	}
377	s.requestRead(len(p))
378	return io.ReadFull(s.trReader, p)
379}
380
381// tranportReader reads all the data available for this Stream from the transport and
382// passes them into the decoder, which converts them into a gRPC message stream.
383// The error is io.EOF when the stream is done or another non-nil error if
384// the stream broke.
385type transportReader struct {
386	reader io.Reader
387	// The handler to control the window update procedure for both this
388	// particular stream and the associated transport.
389	windowHandler func(int)
390	er            error
391}
392
393func (t *transportReader) Read(p []byte) (n int, err error) {
394	n, err = t.reader.Read(p)
395	if err != nil {
396		t.er = err
397		return
398	}
399	t.windowHandler(n)
400	return
401}
402
403// BytesReceived indicates whether any bytes have been received on this stream.
404func (s *Stream) BytesReceived() bool {
405	return atomic.LoadUint32(&s.bytesReceived) == 1
406}
407
408// Unprocessed indicates whether the server did not process this stream --
409// i.e. it sent a refused stream or GOAWAY including this stream ID.
410func (s *Stream) Unprocessed() bool {
411	return atomic.LoadUint32(&s.unprocessed) == 1
412}
413
414// GoString is implemented by Stream so context.String() won't
415// race when printing %#v.
416func (s *Stream) GoString() string {
417	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
418}
419
420// state of transport
421type transportState int
422
423const (
424	reachable transportState = iota
425	closing
426	draining
427)
428
429// ServerConfig consists of all the configurations to establish a server transport.
430type ServerConfig struct {
431	MaxStreams            uint32
432	AuthInfo              credentials.AuthInfo
433	InTapHandle           tap.ServerInHandle
434	StatsHandler          stats.Handler
435	KeepaliveParams       keepalive.ServerParameters
436	KeepalivePolicy       keepalive.EnforcementPolicy
437	InitialWindowSize     int32
438	InitialConnWindowSize int32
439	WriteBufferSize       int
440	ReadBufferSize        int
441	ChannelzParentID      int64
442}
443
444// NewServerTransport creates a ServerTransport with conn or non-nil error
445// if it fails.
446func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
447	return newHTTP2Server(conn, config)
448}
449
450// ConnectOptions covers all relevant options for communicating with the server.
451type ConnectOptions struct {
452	// UserAgent is the application user agent.
453	UserAgent string
454	// Authority is the :authority pseudo-header to use. This field has no effect if
455	// TransportCredentials is set.
456	Authority string
457	// Dialer specifies how to dial a network address.
458	Dialer func(context.Context, string) (net.Conn, error)
459	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
460	FailOnNonTempDialError bool
461	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
462	PerRPCCredentials []credentials.PerRPCCredentials
463	// TransportCredentials stores the Authenticator required to setup a client connection.
464	TransportCredentials credentials.TransportCredentials
465	// KeepaliveParams stores the keepalive parameters.
466	KeepaliveParams keepalive.ClientParameters
467	// StatsHandler stores the handler for stats.
468	StatsHandler stats.Handler
469	// InitialWindowSize sets the initial window size for a stream.
470	InitialWindowSize int32
471	// InitialConnWindowSize sets the initial window size for a connection.
472	InitialConnWindowSize int32
473	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
474	WriteBufferSize int
475	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
476	ReadBufferSize int
477	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
478	ChannelzParentID int64
479}
480
481// TargetInfo contains the information of the target such as network address and metadata.
482type TargetInfo struct {
483	Addr      string
484	Metadata  interface{}
485	Authority string
486}
487
488// NewClientTransport establishes the transport with the required ConnectOptions
489// and returns it to the caller.
490func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
491	return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
492}
493
494// Options provides additional hints and information for message
495// transmission.
496type Options struct {
497	// Last indicates whether this write is the last piece for
498	// this stream.
499	Last bool
500
501	// Delay is a hint to the transport implementation for whether
502	// the data could be buffered for a batching write. The
503	// transport implementation may ignore the hint.
504	Delay bool
505}
506
507// CallHdr carries the information of a particular RPC.
508type CallHdr struct {
509	// Host specifies the peer's host.
510	Host string
511
512	// Method specifies the operation to perform.
513	Method string
514
515	// SendCompress specifies the compression algorithm applied on
516	// outbound message.
517	SendCompress string
518
519	// Creds specifies credentials.PerRPCCredentials for a call.
520	Creds credentials.PerRPCCredentials
521
522	// Flush indicates whether a new stream command should be sent
523	// to the peer without waiting for the first data. This is
524	// only a hint.
525	// If it's true, the transport may modify the flush decision
526	// for performance purposes.
527	// If it's false, new stream will never be flushed.
528	Flush bool
529
530	// ContentSubtype specifies the content-subtype for a request. For example, a
531	// content-subtype of "proto" will result in a content-type of
532	// "application/grpc+proto". The value of ContentSubtype must be all
533	// lowercase, otherwise the behavior is undefined. See
534	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
535	// for more details.
536	ContentSubtype string
537}
538
539// ClientTransport is the common interface for all gRPC client-side transport
540// implementations.
541type ClientTransport interface {
542	// Close tears down this transport. Once it returns, the transport
543	// should not be accessed any more. The caller must make sure this
544	// is called only once.
545	Close() error
546
547	// GracefulClose starts to tear down the transport. It stops accepting
548	// new RPCs and wait the completion of the pending RPCs.
549	GracefulClose() error
550
551	// Write sends the data for the given stream. A nil stream indicates
552	// the write is to be performed on the transport as a whole.
553	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
554
555	// NewStream creates a Stream for an RPC.
556	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
557
558	// CloseStream clears the footprint of a stream when the stream is
559	// not needed any more. The err indicates the error incurred when
560	// CloseStream is called. Must be called when a stream is finished
561	// unless the associated transport is closing.
562	CloseStream(stream *Stream, err error)
563
564	// Error returns a channel that is closed when some I/O error
565	// happens. Typically the caller should have a goroutine to monitor
566	// this in order to take action (e.g., close the current transport
567	// and create a new one) in error case. It should not return nil
568	// once the transport is initiated.
569	Error() <-chan struct{}
570
571	// GoAway returns a channel that is closed when ClientTransport
572	// receives the draining signal from the server (e.g., GOAWAY frame in
573	// HTTP/2).
574	GoAway() <-chan struct{}
575
576	// GetGoAwayReason returns the reason why GoAway frame was received.
577	GetGoAwayReason() GoAwayReason
578
579	// IncrMsgSent increments the number of message sent through this transport.
580	IncrMsgSent()
581
582	// IncrMsgRecv increments the number of message received through this transport.
583	IncrMsgRecv()
584}
585
586// ServerTransport is the common interface for all gRPC server-side transport
587// implementations.
588//
589// Methods may be called concurrently from multiple goroutines, but
590// Write methods for a given Stream will be called serially.
591type ServerTransport interface {
592	// HandleStreams receives incoming streams using the given handler.
593	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
594
595	// WriteHeader sends the header metadata for the given stream.
596	// WriteHeader may not be called on all streams.
597	WriteHeader(s *Stream, md metadata.MD) error
598
599	// Write sends the data for the given stream.
600	// Write may not be called on all streams.
601	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
602
603	// WriteStatus sends the status of a stream to the client.  WriteStatus is
604	// the final call made on a stream and always occurs.
605	WriteStatus(s *Stream, st *status.Status) error
606
607	// Close tears down the transport. Once it is called, the transport
608	// should not be accessed any more. All the pending streams and their
609	// handlers will be terminated asynchronously.
610	Close() error
611
612	// RemoteAddr returns the remote network address.
613	RemoteAddr() net.Addr
614
615	// Drain notifies the client this ServerTransport stops accepting new RPCs.
616	Drain()
617
618	// IncrMsgSent increments the number of message sent through this transport.
619	IncrMsgSent()
620
621	// IncrMsgRecv increments the number of message received through this transport.
622	IncrMsgRecv()
623}
624
625// streamErrorf creates an StreamError with the specified error code and description.
626func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
627	return StreamError{
628		Code: c,
629		Desc: fmt.Sprintf(format, a...),
630	}
631}
632
633// connectionErrorf creates an ConnectionError with the specified error description.
634func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
635	return ConnectionError{
636		Desc: fmt.Sprintf(format, a...),
637		temp: temp,
638		err:  e,
639	}
640}
641
642// ConnectionError is an error that results in the termination of the
643// entire connection and the retry of all the active streams.
644type ConnectionError struct {
645	Desc string
646	temp bool
647	err  error
648}
649
650func (e ConnectionError) Error() string {
651	return fmt.Sprintf("connection error: desc = %q", e.Desc)
652}
653
654// Temporary indicates if this connection error is temporary or fatal.
655func (e ConnectionError) Temporary() bool {
656	return e.temp
657}
658
659// Origin returns the original error of this connection error.
660func (e ConnectionError) Origin() error {
661	// Never return nil error here.
662	// If the original error is nil, return itself.
663	if e.err == nil {
664		return e
665	}
666	return e.err
667}
668
669var (
670	// ErrConnClosing indicates that the transport is closing.
671	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
672	// errStreamDrain indicates that the stream is rejected because the
673	// connection is draining. This could be caused by goaway or balancer
674	// removing the address.
675	errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
676	// errStreamDone is returned from write at the client side to indiacte application
677	// layer of an error.
678	errStreamDone = errors.New("the stream is done")
679	// StatusGoAway indicates that the server sent a GOAWAY that included this
680	// stream's ID in unprocessed RPCs.
681	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
682)
683
684// TODO: See if we can replace StreamError with status package errors.
685
686// StreamError is an error that only affects one stream within a connection.
687type StreamError struct {
688	Code codes.Code
689	Desc string
690}
691
692func (e StreamError) Error() string {
693	return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
694}
695
696// GoAwayReason contains the reason for the GoAway frame received.
697type GoAwayReason uint8
698
699const (
700	// GoAwayInvalid indicates that no GoAway frame is received.
701	GoAwayInvalid GoAwayReason = 0
702	// GoAwayNoReason is the default value when GoAway frame is received.
703	GoAwayNoReason GoAwayReason = 1
704	// GoAwayTooManyPings indicates that a GoAway frame with
705	// ErrCodeEnhanceYourCalm was received and that the debug data said
706	// "too_many_pings".
707	GoAwayTooManyPings GoAwayReason = 2
708)
709