• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bufio"
23	"bytes"
24	"encoding/base64"
25	"fmt"
26	"net"
27	"net/http"
28	"strconv"
29	"strings"
30	"time"
31	"unicode/utf8"
32
33	"github.com/golang/protobuf/proto"
34	"golang.org/x/net/http2"
35	"golang.org/x/net/http2/hpack"
36	spb "google.golang.org/genproto/googleapis/rpc/status"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/status"
39)
40
41const (
42	// http2MaxFrameLen specifies the max length of a HTTP2 frame.
43	http2MaxFrameLen = 16384 // 16KB frame
44	// http://http2.github.io/http2-spec/#SettingValues
45	http2InitHeaderTableSize = 4096
46	// http2IOBufSize specifies the buffer size for sending frames.
47	defaultWriteBufSize = 32 * 1024
48	defaultReadBufSize  = 32 * 1024
49	// baseContentType is the base content-type for gRPC.  This is a valid
50	// content-type on it's own, but can also include a content-subtype such as
51	// "proto" as a suffix after "+" or ";".  See
52	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
53	// for more details.
54	baseContentType = "application/grpc"
55)
56
57var (
58	clientPreface   = []byte(http2.ClientPreface)
59	http2ErrConvTab = map[http2.ErrCode]codes.Code{
60		http2.ErrCodeNo:                 codes.Internal,
61		http2.ErrCodeProtocol:           codes.Internal,
62		http2.ErrCodeInternal:           codes.Internal,
63		http2.ErrCodeFlowControl:        codes.ResourceExhausted,
64		http2.ErrCodeSettingsTimeout:    codes.Internal,
65		http2.ErrCodeStreamClosed:       codes.Internal,
66		http2.ErrCodeFrameSize:          codes.Internal,
67		http2.ErrCodeRefusedStream:      codes.Unavailable,
68		http2.ErrCodeCancel:             codes.Canceled,
69		http2.ErrCodeCompression:        codes.Internal,
70		http2.ErrCodeConnect:            codes.Internal,
71		http2.ErrCodeEnhanceYourCalm:    codes.ResourceExhausted,
72		http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
73		http2.ErrCodeHTTP11Required:     codes.Internal,
74	}
75	statusCodeConvTab = map[codes.Code]http2.ErrCode{
76		codes.Internal:          http2.ErrCodeInternal,
77		codes.Canceled:          http2.ErrCodeCancel,
78		codes.Unavailable:       http2.ErrCodeRefusedStream,
79		codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
80		codes.PermissionDenied:  http2.ErrCodeInadequateSecurity,
81	}
82	httpStatusConvTab = map[int]codes.Code{
83		// 400 Bad Request - INTERNAL.
84		http.StatusBadRequest: codes.Internal,
85		// 401 Unauthorized  - UNAUTHENTICATED.
86		http.StatusUnauthorized: codes.Unauthenticated,
87		// 403 Forbidden - PERMISSION_DENIED.
88		http.StatusForbidden: codes.PermissionDenied,
89		// 404 Not Found - UNIMPLEMENTED.
90		http.StatusNotFound: codes.Unimplemented,
91		// 429 Too Many Requests - UNAVAILABLE.
92		http.StatusTooManyRequests: codes.Unavailable,
93		// 502 Bad Gateway - UNAVAILABLE.
94		http.StatusBadGateway: codes.Unavailable,
95		// 503 Service Unavailable - UNAVAILABLE.
96		http.StatusServiceUnavailable: codes.Unavailable,
97		// 504 Gateway timeout - UNAVAILABLE.
98		http.StatusGatewayTimeout: codes.Unavailable,
99	}
100)
101
102// Records the states during HPACK decoding. Must be reset once the
103// decoding of the entire headers are finished.
104type decodeState struct {
105	encoding string
106	// statusGen caches the stream status received from the trailer the server
107	// sent.  Client side only.  Do not access directly.  After all trailers are
108	// parsed, use the status method to retrieve the status.
109	statusGen *status.Status
110	// rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
111	// intended for direct access outside of parsing.
112	rawStatusCode *int
113	rawStatusMsg  string
114	httpStatus    *int
115	// Server side only fields.
116	timeoutSet bool
117	timeout    time.Duration
118	method     string
119	// key-value metadata map from the peer.
120	mdata          map[string][]string
121	statsTags      []byte
122	statsTrace     []byte
123	contentSubtype string
124}
125
126// isReservedHeader checks whether hdr belongs to HTTP2 headers
127// reserved by gRPC protocol. Any other headers are classified as the
128// user-specified metadata.
129func isReservedHeader(hdr string) bool {
130	if hdr != "" && hdr[0] == ':' {
131		return true
132	}
133	switch hdr {
134	case "content-type",
135		"user-agent",
136		"grpc-message-type",
137		"grpc-encoding",
138		"grpc-message",
139		"grpc-status",
140		"grpc-timeout",
141		"grpc-status-details-bin",
142		"te":
143		return true
144	default:
145		return false
146	}
147}
148
149// isWhitelistedHeader checks whether hdr should be propagated
150// into metadata visible to users.
151func isWhitelistedHeader(hdr string) bool {
152	switch hdr {
153	case ":authority", "user-agent":
154		return true
155	default:
156		return false
157	}
158}
159
160// contentSubtype returns the content-subtype for the given content-type.  The
161// given content-type must be a valid content-type that starts with
162// "application/grpc". A content-subtype will follow "application/grpc" after a
163// "+" or ";". See
164// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
165// more details.
166//
167// If contentType is not a valid content-type for gRPC, the boolean
168// will be false, otherwise true. If content-type == "application/grpc",
169// "application/grpc+", or "application/grpc;", the boolean will be true,
170// but no content-subtype will be returned.
171//
172// contentType is assumed to be lowercase already.
173func contentSubtype(contentType string) (string, bool) {
174	if contentType == baseContentType {
175		return "", true
176	}
177	if !strings.HasPrefix(contentType, baseContentType) {
178		return "", false
179	}
180	// guaranteed since != baseContentType and has baseContentType prefix
181	switch contentType[len(baseContentType)] {
182	case '+', ';':
183		// this will return true for "application/grpc+" or "application/grpc;"
184		// which the previous validContentType function tested to be valid, so we
185		// just say that no content-subtype is specified in this case
186		return contentType[len(baseContentType)+1:], true
187	default:
188		return "", false
189	}
190}
191
192// contentSubtype is assumed to be lowercase
193func contentType(contentSubtype string) string {
194	if contentSubtype == "" {
195		return baseContentType
196	}
197	return baseContentType + "+" + contentSubtype
198}
199
200func (d *decodeState) status() *status.Status {
201	if d.statusGen == nil {
202		// No status-details were provided; generate status using code/msg.
203		d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg)
204	}
205	return d.statusGen
206}
207
208const binHdrSuffix = "-bin"
209
210func encodeBinHeader(v []byte) string {
211	return base64.RawStdEncoding.EncodeToString(v)
212}
213
214func decodeBinHeader(v string) ([]byte, error) {
215	if len(v)%4 == 0 {
216		// Input was padded, or padding was not necessary.
217		return base64.StdEncoding.DecodeString(v)
218	}
219	return base64.RawStdEncoding.DecodeString(v)
220}
221
222func encodeMetadataHeader(k, v string) string {
223	if strings.HasSuffix(k, binHdrSuffix) {
224		return encodeBinHeader(([]byte)(v))
225	}
226	return v
227}
228
229func decodeMetadataHeader(k, v string) (string, error) {
230	if strings.HasSuffix(k, binHdrSuffix) {
231		b, err := decodeBinHeader(v)
232		return string(b), err
233	}
234	return v, nil
235}
236
237func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error {
238	for _, hf := range frame.Fields {
239		if err := d.processHeaderField(hf); err != nil {
240			return err
241		}
242	}
243
244	// If grpc status exists, no need to check further.
245	if d.rawStatusCode != nil || d.statusGen != nil {
246		return nil
247	}
248
249	// If grpc status doesn't exist and http status doesn't exist,
250	// then it's a malformed header.
251	if d.httpStatus == nil {
252		return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
253	}
254
255	if *(d.httpStatus) != http.StatusOK {
256		code, ok := httpStatusConvTab[*(d.httpStatus)]
257		if !ok {
258			code = codes.Unknown
259		}
260		return streamErrorf(code, http.StatusText(*(d.httpStatus)))
261	}
262
263	// gRPC status doesn't exist and http status is OK.
264	// Set rawStatusCode to be unknown and return nil error.
265	// So that, if the stream has ended this Unknown status
266	// will be propagated to the user.
267	// Otherwise, it will be ignored. In which case, status from
268	// a later trailer, that has StreamEnded flag set, is propagated.
269	code := int(codes.Unknown)
270	d.rawStatusCode = &code
271	return nil
272
273}
274
275func (d *decodeState) addMetadata(k, v string) {
276	if d.mdata == nil {
277		d.mdata = make(map[string][]string)
278	}
279	d.mdata[k] = append(d.mdata[k], v)
280}
281
282func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
283	switch f.Name {
284	case "content-type":
285		contentSubtype, validContentType := contentSubtype(f.Value)
286		if !validContentType {
287			return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
288		}
289		d.contentSubtype = contentSubtype
290		// TODO: do we want to propagate the whole content-type in the metadata,
291		// or come up with a way to just propagate the content-subtype if it was set?
292		// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
293		// in the metadata?
294		d.addMetadata(f.Name, f.Value)
295	case "grpc-encoding":
296		d.encoding = f.Value
297	case "grpc-status":
298		code, err := strconv.Atoi(f.Value)
299		if err != nil {
300			return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
301		}
302		d.rawStatusCode = &code
303	case "grpc-message":
304		d.rawStatusMsg = decodeGrpcMessage(f.Value)
305	case "grpc-status-details-bin":
306		v, err := decodeBinHeader(f.Value)
307		if err != nil {
308			return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
309		}
310		s := &spb.Status{}
311		if err := proto.Unmarshal(v, s); err != nil {
312			return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
313		}
314		d.statusGen = status.FromProto(s)
315	case "grpc-timeout":
316		d.timeoutSet = true
317		var err error
318		if d.timeout, err = decodeTimeout(f.Value); err != nil {
319			return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
320		}
321	case ":path":
322		d.method = f.Value
323	case ":status":
324		code, err := strconv.Atoi(f.Value)
325		if err != nil {
326			return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
327		}
328		d.httpStatus = &code
329	case "grpc-tags-bin":
330		v, err := decodeBinHeader(f.Value)
331		if err != nil {
332			return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
333		}
334		d.statsTags = v
335		d.addMetadata(f.Name, string(v))
336	case "grpc-trace-bin":
337		v, err := decodeBinHeader(f.Value)
338		if err != nil {
339			return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
340		}
341		d.statsTrace = v
342		d.addMetadata(f.Name, string(v))
343	default:
344		if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
345			break
346		}
347		v, err := decodeMetadataHeader(f.Name, f.Value)
348		if err != nil {
349			errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
350			return nil
351		}
352		d.addMetadata(f.Name, v)
353	}
354	return nil
355}
356
357type timeoutUnit uint8
358
359const (
360	hour        timeoutUnit = 'H'
361	minute      timeoutUnit = 'M'
362	second      timeoutUnit = 'S'
363	millisecond timeoutUnit = 'm'
364	microsecond timeoutUnit = 'u'
365	nanosecond  timeoutUnit = 'n'
366)
367
368func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
369	switch u {
370	case hour:
371		return time.Hour, true
372	case minute:
373		return time.Minute, true
374	case second:
375		return time.Second, true
376	case millisecond:
377		return time.Millisecond, true
378	case microsecond:
379		return time.Microsecond, true
380	case nanosecond:
381		return time.Nanosecond, true
382	default:
383	}
384	return
385}
386
387const maxTimeoutValue int64 = 100000000 - 1
388
389// div does integer division and round-up the result. Note that this is
390// equivalent to (d+r-1)/r but has less chance to overflow.
391func div(d, r time.Duration) int64 {
392	if m := d % r; m > 0 {
393		return int64(d/r + 1)
394	}
395	return int64(d / r)
396}
397
398// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
399func encodeTimeout(t time.Duration) string {
400	if t <= 0 {
401		return "0n"
402	}
403	if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
404		return strconv.FormatInt(d, 10) + "n"
405	}
406	if d := div(t, time.Microsecond); d <= maxTimeoutValue {
407		return strconv.FormatInt(d, 10) + "u"
408	}
409	if d := div(t, time.Millisecond); d <= maxTimeoutValue {
410		return strconv.FormatInt(d, 10) + "m"
411	}
412	if d := div(t, time.Second); d <= maxTimeoutValue {
413		return strconv.FormatInt(d, 10) + "S"
414	}
415	if d := div(t, time.Minute); d <= maxTimeoutValue {
416		return strconv.FormatInt(d, 10) + "M"
417	}
418	// Note that maxTimeoutValue * time.Hour > MaxInt64.
419	return strconv.FormatInt(div(t, time.Hour), 10) + "H"
420}
421
422func decodeTimeout(s string) (time.Duration, error) {
423	size := len(s)
424	if size < 2 {
425		return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
426	}
427	unit := timeoutUnit(s[size-1])
428	d, ok := timeoutUnitToDuration(unit)
429	if !ok {
430		return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
431	}
432	t, err := strconv.ParseInt(s[:size-1], 10, 64)
433	if err != nil {
434		return 0, err
435	}
436	return d * time.Duration(t), nil
437}
438
439const (
440	spaceByte   = ' '
441	tildeByte   = '~'
442	percentByte = '%'
443)
444
445// encodeGrpcMessage is used to encode status code in header field
446// "grpc-message". It does percent encoding and also replaces invalid utf-8
447// characters with Unicode replacement character.
448//
449// It checks to see if each individual byte in msg is an allowable byte, and
450// then either percent encoding or passing it through. When percent encoding,
451// the byte is converted into hexadecimal notation with a '%' prepended.
452func encodeGrpcMessage(msg string) string {
453	if msg == "" {
454		return ""
455	}
456	lenMsg := len(msg)
457	for i := 0; i < lenMsg; i++ {
458		c := msg[i]
459		if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
460			return encodeGrpcMessageUnchecked(msg)
461		}
462	}
463	return msg
464}
465
466func encodeGrpcMessageUnchecked(msg string) string {
467	var buf bytes.Buffer
468	for len(msg) > 0 {
469		r, size := utf8.DecodeRuneInString(msg)
470		for _, b := range []byte(string(r)) {
471			if size > 1 {
472				// If size > 1, r is not ascii. Always do percent encoding.
473				buf.WriteString(fmt.Sprintf("%%%02X", b))
474				continue
475			}
476
477			// The for loop is necessary even if size == 1. r could be
478			// utf8.RuneError.
479			//
480			// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
481			if b >= spaceByte && b <= tildeByte && b != percentByte {
482				buf.WriteByte(b)
483			} else {
484				buf.WriteString(fmt.Sprintf("%%%02X", b))
485			}
486		}
487		msg = msg[size:]
488	}
489	return buf.String()
490}
491
492// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
493func decodeGrpcMessage(msg string) string {
494	if msg == "" {
495		return ""
496	}
497	lenMsg := len(msg)
498	for i := 0; i < lenMsg; i++ {
499		if msg[i] == percentByte && i+2 < lenMsg {
500			return decodeGrpcMessageUnchecked(msg)
501		}
502	}
503	return msg
504}
505
506func decodeGrpcMessageUnchecked(msg string) string {
507	var buf bytes.Buffer
508	lenMsg := len(msg)
509	for i := 0; i < lenMsg; i++ {
510		c := msg[i]
511		if c == percentByte && i+2 < lenMsg {
512			parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
513			if err != nil {
514				buf.WriteByte(c)
515			} else {
516				buf.WriteByte(byte(parsed))
517				i += 2
518			}
519		} else {
520			buf.WriteByte(c)
521		}
522	}
523	return buf.String()
524}
525
526type bufWriter struct {
527	buf       []byte
528	offset    int
529	batchSize int
530	conn      net.Conn
531	err       error
532
533	onFlush func()
534}
535
536func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
537	return &bufWriter{
538		buf:       make([]byte, batchSize*2),
539		batchSize: batchSize,
540		conn:      conn,
541	}
542}
543
544func (w *bufWriter) Write(b []byte) (n int, err error) {
545	if w.err != nil {
546		return 0, w.err
547	}
548	for len(b) > 0 {
549		nn := copy(w.buf[w.offset:], b)
550		b = b[nn:]
551		w.offset += nn
552		n += nn
553		if w.offset >= w.batchSize {
554			err = w.Flush()
555		}
556	}
557	return n, err
558}
559
560func (w *bufWriter) Flush() error {
561	if w.err != nil {
562		return w.err
563	}
564	if w.offset == 0 {
565		return nil
566	}
567	if w.onFlush != nil {
568		w.onFlush()
569	}
570	_, w.err = w.conn.Write(w.buf[:w.offset])
571	w.offset = 0
572	return w.err
573}
574
575type framer struct {
576	writer *bufWriter
577	fr     *http2.Framer
578}
579
580func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
581	r := bufio.NewReaderSize(conn, readBufferSize)
582	w := newBufWriter(conn, writeBufferSize)
583	f := &framer{
584		writer: w,
585		fr:     http2.NewFramer(w, r),
586	}
587	// Opt-in to Frame reuse API on framer to reduce garbage.
588	// Frames aren't safe to read from after a subsequent call to ReadFrame.
589	f.fr.SetReuseFrames()
590	f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
591	return f
592}
593