1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package transport defines and implements message oriented communication 20// channel to complete various transactions (e.g., an RPC). It is meant for 21// grpc-internal usage and is not intended to be imported directly by users. 22package transport // externally used as import "google.golang.org/grpc/transport" 23 24import ( 25 "errors" 26 "fmt" 27 "io" 28 "net" 29 "sync" 30 "sync/atomic" 31 32 "golang.org/x/net/context" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/credentials" 35 "google.golang.org/grpc/keepalive" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/stats" 38 "google.golang.org/grpc/status" 39 "google.golang.org/grpc/tap" 40) 41 42// recvMsg represents the received msg from the transport. All transport 43// protocol specific info has been removed. 44type recvMsg struct { 45 data []byte 46 // nil: received some data 47 // io.EOF: stream is completed. data is nil. 48 // other non-nil error: transport failure. data is nil. 49 err error 50} 51 52// recvBuffer is an unbounded channel of recvMsg structs. 53// Note recvBuffer differs from controlBuffer only in that recvBuffer 54// holds a channel of only recvMsg structs instead of objects implementing "item" interface. 55// recvBuffer is written to much more often than 56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" 57type recvBuffer struct { 58 c chan recvMsg 59 mu sync.Mutex 60 backlog []recvMsg 61 err error 62} 63 64func newRecvBuffer() *recvBuffer { 65 b := &recvBuffer{ 66 c: make(chan recvMsg, 1), 67 } 68 return b 69} 70 71func (b *recvBuffer) put(r recvMsg) { 72 b.mu.Lock() 73 if b.err != nil { 74 b.mu.Unlock() 75 // An error had occurred earlier, don't accept more 76 // data or errors. 77 return 78 } 79 b.err = r.err 80 if len(b.backlog) == 0 { 81 select { 82 case b.c <- r: 83 b.mu.Unlock() 84 return 85 default: 86 } 87 } 88 b.backlog = append(b.backlog, r) 89 b.mu.Unlock() 90} 91 92func (b *recvBuffer) load() { 93 b.mu.Lock() 94 if len(b.backlog) > 0 { 95 select { 96 case b.c <- b.backlog[0]: 97 b.backlog[0] = recvMsg{} 98 b.backlog = b.backlog[1:] 99 default: 100 } 101 } 102 b.mu.Unlock() 103} 104 105// get returns the channel that receives a recvMsg in the buffer. 106// 107// Upon receipt of a recvMsg, the caller should call load to send another 108// recvMsg onto the channel if there is any. 109func (b *recvBuffer) get() <-chan recvMsg { 110 return b.c 111} 112 113// 114// recvBufferReader implements io.Reader interface to read the data from 115// recvBuffer. 116type recvBufferReader struct { 117 ctx context.Context 118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance). 119 recv *recvBuffer 120 last []byte // Stores the remaining data in the previous calls. 121 err error 122} 123 124// Read reads the next len(p) bytes from last. If last is drained, it tries to 125// read additional data from recv. It blocks if there no additional data available 126// in recv. If Read returns any non-nil error, it will continue to return that error. 127func (r *recvBufferReader) Read(p []byte) (n int, err error) { 128 if r.err != nil { 129 return 0, r.err 130 } 131 n, r.err = r.read(p) 132 return n, r.err 133} 134 135func (r *recvBufferReader) read(p []byte) (n int, err error) { 136 if r.last != nil && len(r.last) > 0 { 137 // Read remaining data left in last call. 138 copied := copy(p, r.last) 139 r.last = r.last[copied:] 140 return copied, nil 141 } 142 select { 143 case <-r.ctxDone: 144 return 0, ContextErr(r.ctx.Err()) 145 case m := <-r.recv.get(): 146 r.recv.load() 147 if m.err != nil { 148 return 0, m.err 149 } 150 copied := copy(p, m.data) 151 r.last = m.data[copied:] 152 return copied, nil 153 } 154} 155 156type streamState uint32 157 158const ( 159 streamActive streamState = iota 160 streamWriteDone // EndStream sent 161 streamReadDone // EndStream received 162 streamDone // the entire stream is finished. 163) 164 165// Stream represents an RPC in the transport layer. 166type Stream struct { 167 id uint32 168 st ServerTransport // nil for client side Stream 169 ctx context.Context // the associated context of the stream 170 cancel context.CancelFunc // always nil for client side Stream 171 done chan struct{} // closed at the end of stream to unblock writers. On the client side. 172 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) 173 method string // the associated RPC method of the stream 174 recvCompress string 175 sendCompress string 176 buf *recvBuffer 177 trReader io.Reader 178 fc *inFlow 179 recvQuota uint32 180 wq *writeQuota 181 182 // Callback to state application's intentions to read data. This 183 // is used to adjust flow control, if needed. 184 requestRead func(int) 185 186 headerChan chan struct{} // closed to indicate the end of header metadata. 187 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. 188 189 // hdrMu protects header and trailer metadata on the server-side. 190 hdrMu sync.Mutex 191 header metadata.MD // the received header metadata. 192 trailer metadata.MD // the key-value map of trailer metadata. 193 194 // On the server-side, headerSent is atomically set to 1 when the headers are sent out. 195 headerSent uint32 196 197 state streamState 198 199 // On client-side it is the status error received from the server. 200 // On server-side it is unused. 201 status *status.Status 202 203 bytesReceived uint32 // indicates whether any bytes have been received on this stream 204 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream 205 206 // contentSubtype is the content-subtype for requests. 207 // this must be lowercase or the behavior is undefined. 208 contentSubtype string 209} 210 211// isHeaderSent is only valid on the server-side. 212func (s *Stream) isHeaderSent() bool { 213 return atomic.LoadUint32(&s.headerSent) == 1 214} 215 216// updateHeaderSent updates headerSent and returns true 217// if it was alreay set. It is valid only on server-side. 218func (s *Stream) updateHeaderSent() bool { 219 return atomic.SwapUint32(&s.headerSent, 1) == 1 220} 221 222func (s *Stream) swapState(st streamState) streamState { 223 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) 224} 225 226func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { 227 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) 228} 229 230func (s *Stream) getState() streamState { 231 return streamState(atomic.LoadUint32((*uint32)(&s.state))) 232} 233 234func (s *Stream) waitOnHeader() error { 235 if s.headerChan == nil { 236 // On the server headerChan is always nil since a stream originates 237 // only after having received headers. 238 return nil 239 } 240 select { 241 case <-s.ctx.Done(): 242 return ContextErr(s.ctx.Err()) 243 case <-s.headerChan: 244 return nil 245 } 246} 247 248// RecvCompress returns the compression algorithm applied to the inbound 249// message. It is empty string if there is no compression applied. 250func (s *Stream) RecvCompress() string { 251 if err := s.waitOnHeader(); err != nil { 252 return "" 253 } 254 return s.recvCompress 255} 256 257// SetSendCompress sets the compression algorithm to the stream. 258func (s *Stream) SetSendCompress(str string) { 259 s.sendCompress = str 260} 261 262// Done returns a chanel which is closed when it receives the final status 263// from the server. 264func (s *Stream) Done() <-chan struct{} { 265 return s.done 266} 267 268// Header acquires the key-value pairs of header metadata once it 269// is available. It blocks until i) the metadata is ready or ii) there is no 270// header metadata or iii) the stream is canceled/expired. 271func (s *Stream) Header() (metadata.MD, error) { 272 err := s.waitOnHeader() 273 // Even if the stream is closed, header is returned if available. 274 select { 275 case <-s.headerChan: 276 if s.header == nil { 277 return nil, nil 278 } 279 return s.header.Copy(), nil 280 default: 281 } 282 return nil, err 283} 284 285// Trailer returns the cached trailer metedata. Note that if it is not called 286// after the entire stream is done, it could return an empty MD. Client 287// side only. 288// It can be safely read only after stream has ended that is either read 289// or write have returned io.EOF. 290func (s *Stream) Trailer() metadata.MD { 291 c := s.trailer.Copy() 292 return c 293} 294 295// ServerTransport returns the underlying ServerTransport for the stream. 296// The client side stream always returns nil. 297func (s *Stream) ServerTransport() ServerTransport { 298 return s.st 299} 300 301// ContentSubtype returns the content-subtype for a request. For example, a 302// content-subtype of "proto" will result in a content-type of 303// "application/grpc+proto". This will always be lowercase. See 304// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 305// more details. 306func (s *Stream) ContentSubtype() string { 307 return s.contentSubtype 308} 309 310// Context returns the context of the stream. 311func (s *Stream) Context() context.Context { 312 return s.ctx 313} 314 315// Method returns the method for the stream. 316func (s *Stream) Method() string { 317 return s.method 318} 319 320// Status returns the status received from the server. 321// Status can be read safely only after the stream has ended, 322// that is, read or write has returned io.EOF. 323func (s *Stream) Status() *status.Status { 324 return s.status 325} 326 327// SetHeader sets the header metadata. This can be called multiple times. 328// Server side only. 329// This should not be called in parallel to other data writes. 330func (s *Stream) SetHeader(md metadata.MD) error { 331 if md.Len() == 0 { 332 return nil 333 } 334 if s.isHeaderSent() || s.getState() == streamDone { 335 return ErrIllegalHeaderWrite 336 } 337 s.hdrMu.Lock() 338 s.header = metadata.Join(s.header, md) 339 s.hdrMu.Unlock() 340 return nil 341} 342 343// SendHeader sends the given header metadata. The given metadata is 344// combined with any metadata set by previous calls to SetHeader and 345// then written to the transport stream. 346func (s *Stream) SendHeader(md metadata.MD) error { 347 t := s.ServerTransport() 348 return t.WriteHeader(s, md) 349} 350 351// SetTrailer sets the trailer metadata which will be sent with the RPC status 352// by the server. This can be called multiple times. Server side only. 353// This should not be called parallel to other data writes. 354func (s *Stream) SetTrailer(md metadata.MD) error { 355 if md.Len() == 0 { 356 return nil 357 } 358 if s.getState() == streamDone { 359 return ErrIllegalHeaderWrite 360 } 361 s.hdrMu.Lock() 362 s.trailer = metadata.Join(s.trailer, md) 363 s.hdrMu.Unlock() 364 return nil 365} 366 367func (s *Stream) write(m recvMsg) { 368 s.buf.put(m) 369} 370 371// Read reads all p bytes from the wire for this stream. 372func (s *Stream) Read(p []byte) (n int, err error) { 373 // Don't request a read if there was an error earlier 374 if er := s.trReader.(*transportReader).er; er != nil { 375 return 0, er 376 } 377 s.requestRead(len(p)) 378 return io.ReadFull(s.trReader, p) 379} 380 381// tranportReader reads all the data available for this Stream from the transport and 382// passes them into the decoder, which converts them into a gRPC message stream. 383// The error is io.EOF when the stream is done or another non-nil error if 384// the stream broke. 385type transportReader struct { 386 reader io.Reader 387 // The handler to control the window update procedure for both this 388 // particular stream and the associated transport. 389 windowHandler func(int) 390 er error 391} 392 393func (t *transportReader) Read(p []byte) (n int, err error) { 394 n, err = t.reader.Read(p) 395 if err != nil { 396 t.er = err 397 return 398 } 399 t.windowHandler(n) 400 return 401} 402 403// BytesReceived indicates whether any bytes have been received on this stream. 404func (s *Stream) BytesReceived() bool { 405 return atomic.LoadUint32(&s.bytesReceived) == 1 406} 407 408// Unprocessed indicates whether the server did not process this stream -- 409// i.e. it sent a refused stream or GOAWAY including this stream ID. 410func (s *Stream) Unprocessed() bool { 411 return atomic.LoadUint32(&s.unprocessed) == 1 412} 413 414// GoString is implemented by Stream so context.String() won't 415// race when printing %#v. 416func (s *Stream) GoString() string { 417 return fmt.Sprintf("<stream: %p, %v>", s, s.method) 418} 419 420// state of transport 421type transportState int 422 423const ( 424 reachable transportState = iota 425 closing 426 draining 427) 428 429// ServerConfig consists of all the configurations to establish a server transport. 430type ServerConfig struct { 431 MaxStreams uint32 432 AuthInfo credentials.AuthInfo 433 InTapHandle tap.ServerInHandle 434 StatsHandler stats.Handler 435 KeepaliveParams keepalive.ServerParameters 436 KeepalivePolicy keepalive.EnforcementPolicy 437 InitialWindowSize int32 438 InitialConnWindowSize int32 439 WriteBufferSize int 440 ReadBufferSize int 441 ChannelzParentID int64 442} 443 444// NewServerTransport creates a ServerTransport with conn or non-nil error 445// if it fails. 446func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { 447 return newHTTP2Server(conn, config) 448} 449 450// ConnectOptions covers all relevant options for communicating with the server. 451type ConnectOptions struct { 452 // UserAgent is the application user agent. 453 UserAgent string 454 // Authority is the :authority pseudo-header to use. This field has no effect if 455 // TransportCredentials is set. 456 Authority string 457 // Dialer specifies how to dial a network address. 458 Dialer func(context.Context, string) (net.Conn, error) 459 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. 460 FailOnNonTempDialError bool 461 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. 462 PerRPCCredentials []credentials.PerRPCCredentials 463 // TransportCredentials stores the Authenticator required to setup a client connection. 464 TransportCredentials credentials.TransportCredentials 465 // KeepaliveParams stores the keepalive parameters. 466 KeepaliveParams keepalive.ClientParameters 467 // StatsHandler stores the handler for stats. 468 StatsHandler stats.Handler 469 // InitialWindowSize sets the initial window size for a stream. 470 InitialWindowSize int32 471 // InitialConnWindowSize sets the initial window size for a connection. 472 InitialConnWindowSize int32 473 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. 474 WriteBufferSize int 475 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. 476 ReadBufferSize int 477 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. 478 ChannelzParentID int64 479} 480 481// TargetInfo contains the information of the target such as network address and metadata. 482type TargetInfo struct { 483 Addr string 484 Metadata interface{} 485 Authority string 486} 487 488// NewClientTransport establishes the transport with the required ConnectOptions 489// and returns it to the caller. 490func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { 491 return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) 492} 493 494// Options provides additional hints and information for message 495// transmission. 496type Options struct { 497 // Last indicates whether this write is the last piece for 498 // this stream. 499 Last bool 500 501 // Delay is a hint to the transport implementation for whether 502 // the data could be buffered for a batching write. The 503 // transport implementation may ignore the hint. 504 Delay bool 505} 506 507// CallHdr carries the information of a particular RPC. 508type CallHdr struct { 509 // Host specifies the peer's host. 510 Host string 511 512 // Method specifies the operation to perform. 513 Method string 514 515 // SendCompress specifies the compression algorithm applied on 516 // outbound message. 517 SendCompress string 518 519 // Creds specifies credentials.PerRPCCredentials for a call. 520 Creds credentials.PerRPCCredentials 521 522 // Flush indicates whether a new stream command should be sent 523 // to the peer without waiting for the first data. This is 524 // only a hint. 525 // If it's true, the transport may modify the flush decision 526 // for performance purposes. 527 // If it's false, new stream will never be flushed. 528 Flush bool 529 530 // ContentSubtype specifies the content-subtype for a request. For example, a 531 // content-subtype of "proto" will result in a content-type of 532 // "application/grpc+proto". The value of ContentSubtype must be all 533 // lowercase, otherwise the behavior is undefined. See 534 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests 535 // for more details. 536 ContentSubtype string 537} 538 539// ClientTransport is the common interface for all gRPC client-side transport 540// implementations. 541type ClientTransport interface { 542 // Close tears down this transport. Once it returns, the transport 543 // should not be accessed any more. The caller must make sure this 544 // is called only once. 545 Close() error 546 547 // GracefulClose starts to tear down the transport. It stops accepting 548 // new RPCs and wait the completion of the pending RPCs. 549 GracefulClose() error 550 551 // Write sends the data for the given stream. A nil stream indicates 552 // the write is to be performed on the transport as a whole. 553 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 554 555 // NewStream creates a Stream for an RPC. 556 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) 557 558 // CloseStream clears the footprint of a stream when the stream is 559 // not needed any more. The err indicates the error incurred when 560 // CloseStream is called. Must be called when a stream is finished 561 // unless the associated transport is closing. 562 CloseStream(stream *Stream, err error) 563 564 // Error returns a channel that is closed when some I/O error 565 // happens. Typically the caller should have a goroutine to monitor 566 // this in order to take action (e.g., close the current transport 567 // and create a new one) in error case. It should not return nil 568 // once the transport is initiated. 569 Error() <-chan struct{} 570 571 // GoAway returns a channel that is closed when ClientTransport 572 // receives the draining signal from the server (e.g., GOAWAY frame in 573 // HTTP/2). 574 GoAway() <-chan struct{} 575 576 // GetGoAwayReason returns the reason why GoAway frame was received. 577 GetGoAwayReason() GoAwayReason 578 579 // IncrMsgSent increments the number of message sent through this transport. 580 IncrMsgSent() 581 582 // IncrMsgRecv increments the number of message received through this transport. 583 IncrMsgRecv() 584} 585 586// ServerTransport is the common interface for all gRPC server-side transport 587// implementations. 588// 589// Methods may be called concurrently from multiple goroutines, but 590// Write methods for a given Stream will be called serially. 591type ServerTransport interface { 592 // HandleStreams receives incoming streams using the given handler. 593 HandleStreams(func(*Stream), func(context.Context, string) context.Context) 594 595 // WriteHeader sends the header metadata for the given stream. 596 // WriteHeader may not be called on all streams. 597 WriteHeader(s *Stream, md metadata.MD) error 598 599 // Write sends the data for the given stream. 600 // Write may not be called on all streams. 601 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 602 603 // WriteStatus sends the status of a stream to the client. WriteStatus is 604 // the final call made on a stream and always occurs. 605 WriteStatus(s *Stream, st *status.Status) error 606 607 // Close tears down the transport. Once it is called, the transport 608 // should not be accessed any more. All the pending streams and their 609 // handlers will be terminated asynchronously. 610 Close() error 611 612 // RemoteAddr returns the remote network address. 613 RemoteAddr() net.Addr 614 615 // Drain notifies the client this ServerTransport stops accepting new RPCs. 616 Drain() 617 618 // IncrMsgSent increments the number of message sent through this transport. 619 IncrMsgSent() 620 621 // IncrMsgRecv increments the number of message received through this transport. 622 IncrMsgRecv() 623} 624 625// streamErrorf creates an StreamError with the specified error code and description. 626func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError { 627 return StreamError{ 628 Code: c, 629 Desc: fmt.Sprintf(format, a...), 630 } 631} 632 633// connectionErrorf creates an ConnectionError with the specified error description. 634func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { 635 return ConnectionError{ 636 Desc: fmt.Sprintf(format, a...), 637 temp: temp, 638 err: e, 639 } 640} 641 642// ConnectionError is an error that results in the termination of the 643// entire connection and the retry of all the active streams. 644type ConnectionError struct { 645 Desc string 646 temp bool 647 err error 648} 649 650func (e ConnectionError) Error() string { 651 return fmt.Sprintf("connection error: desc = %q", e.Desc) 652} 653 654// Temporary indicates if this connection error is temporary or fatal. 655func (e ConnectionError) Temporary() bool { 656 return e.temp 657} 658 659// Origin returns the original error of this connection error. 660func (e ConnectionError) Origin() error { 661 // Never return nil error here. 662 // If the original error is nil, return itself. 663 if e.err == nil { 664 return e 665 } 666 return e.err 667} 668 669var ( 670 // ErrConnClosing indicates that the transport is closing. 671 ErrConnClosing = connectionErrorf(true, nil, "transport is closing") 672 // errStreamDrain indicates that the stream is rejected because the 673 // connection is draining. This could be caused by goaway or balancer 674 // removing the address. 675 errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining") 676 // errStreamDone is returned from write at the client side to indiacte application 677 // layer of an error. 678 errStreamDone = errors.New("the stream is done") 679 // StatusGoAway indicates that the server sent a GOAWAY that included this 680 // stream's ID in unprocessed RPCs. 681 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") 682) 683 684// TODO: See if we can replace StreamError with status package errors. 685 686// StreamError is an error that only affects one stream within a connection. 687type StreamError struct { 688 Code codes.Code 689 Desc string 690} 691 692func (e StreamError) Error() string { 693 return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) 694} 695 696// GoAwayReason contains the reason for the GoAway frame received. 697type GoAwayReason uint8 698 699const ( 700 // GoAwayInvalid indicates that no GoAway frame is received. 701 GoAwayInvalid GoAwayReason = 0 702 // GoAwayNoReason is the default value when GoAway frame is received. 703 GoAwayNoReason GoAwayReason = 1 704 // GoAwayTooManyPings indicates that a GoAway frame with 705 // ErrCodeEnhanceYourCalm was received and that the debug data said 706 // "too_many_pings". 707 GoAwayTooManyPings GoAwayReason = 2 708) 709