• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.base.Charsets;
20 import com.google.common.base.Preconditions;
21 import io.grpc.InternalMetadata;
22 import io.grpc.InternalStatus;
23 import io.grpc.Metadata;
24 import io.grpc.Status;
25 import java.nio.charset.Charset;
26 import javax.annotation.Nullable;
27 
28 /**
29  * Base implementation for client streams using HTTP2 as the transport.
30  */
31 public abstract class Http2ClientStreamTransportState extends AbstractClientStream.TransportState {
32 
33   /**
34    * Metadata marshaller for HTTP status lines.
35    */
36   private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER =
37       new InternalMetadata.TrustedAsciiMarshaller<Integer>() {
38         @Override
39         public byte[] toAsciiString(Integer value) {
40           throw new UnsupportedOperationException();
41         }
42 
43         /**
44          * RFC 7231 says status codes are 3 digits long.
45          *
46          * @see <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a>
47          */
48         @Override
49         public Integer parseAsciiString(byte[] serialized) {
50           if (serialized.length >= 3) {
51             return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0');
52           }
53           throw new NumberFormatException(
54               "Malformed status code " + new String(serialized, InternalMetadata.US_ASCII));
55         }
56       };
57 
58   private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status",
59       HTTP_STATUS_MARSHALLER);
60 
61   /** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */
62   private Status transportError;
63   private Metadata transportErrorMetadata;
64   private Charset errorCharset = Charsets.UTF_8;
65   private boolean headersReceived;
66 
Http2ClientStreamTransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)67   protected Http2ClientStreamTransportState(
68       int maxMessageSize,
69       StatsTraceContext statsTraceCtx,
70       TransportTracer transportTracer) {
71     super(maxMessageSize, statsTraceCtx, transportTracer);
72   }
73 
74   /**
75    * Called to process a failure in HTTP/2 processing. It should notify the transport to cancel the
76    * stream and call {@code transportReportStatus()}.
77    */
http2ProcessingFailed( Status status, boolean stopDelivery, Metadata trailers)78   protected abstract void http2ProcessingFailed(
79       Status status, boolean stopDelivery, Metadata trailers);
80 
81   /**
82    * Called by subclasses whenever {@code Headers} are received from the transport.
83    *
84    * @param headers the received headers
85    */
transportHeadersReceived(Metadata headers)86   protected void transportHeadersReceived(Metadata headers) {
87     Preconditions.checkNotNull(headers, "headers");
88     if (transportError != null) {
89       // Already received a transport error so just augment it. Something is really, really strange.
90       transportError = transportError.augmentDescription("headers: " + headers);
91       return;
92     }
93     try {
94       if (headersReceived) {
95         transportError = Status.INTERNAL.withDescription("Received headers twice");
96         return;
97       }
98       Integer httpStatus = headers.get(HTTP2_STATUS);
99       if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) {
100         // Ignore the headers. See RFC 7540 §8.1
101         return;
102       }
103       headersReceived = true;
104 
105       transportError = validateInitialMetadata(headers);
106       if (transportError != null) {
107         return;
108       }
109 
110       stripTransportDetails(headers);
111       inboundHeadersReceived(headers);
112     } finally {
113       if (transportError != null) {
114         // Note we don't immediately report the transport error, instead we wait for more data on
115         // the stream so we can accumulate more detail into the error before reporting it.
116         transportError = transportError.augmentDescription("headers: " + headers);
117         transportErrorMetadata = headers;
118         errorCharset = extractCharset(headers);
119       }
120     }
121   }
122 
123   /**
124    * Called by subclasses whenever a data frame is received from the transport.
125    *
126    * @param frame the received data frame
127    * @param endOfStream {@code true} if there will be no more data received for this stream
128    */
transportDataReceived(ReadableBuffer frame, boolean endOfStream)129   protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) {
130     if (transportError != null) {
131       // We've already detected a transport error and now we're just accumulating more detail
132       // for it.
133       transportError = transportError.augmentDescription("DATA-----------------------------\n"
134           + ReadableBuffers.readAsString(frame, errorCharset));
135       frame.close();
136       if (transportError.getDescription().length() > 1000 || endOfStream) {
137         http2ProcessingFailed(transportError, false, transportErrorMetadata);
138       }
139     } else {
140       if (!headersReceived) {
141         http2ProcessingFailed(
142             Status.INTERNAL.withDescription("headers not received before payload"),
143             false,
144             new Metadata());
145         return;
146       }
147       inboundDataReceived(frame);
148       if (endOfStream) {
149         // This is a protocol violation as we expect to receive trailers.
150         transportError =
151             Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server.");
152         transportErrorMetadata = new Metadata();
153         transportReportStatus(transportError, false, transportErrorMetadata);
154       }
155     }
156   }
157 
158   /**
159    * Called by subclasses for the terminal trailer metadata on a stream.
160    *
161    * @param trailers the received terminal trailer metadata
162    */
transportTrailersReceived(Metadata trailers)163   protected void transportTrailersReceived(Metadata trailers) {
164     Preconditions.checkNotNull(trailers, "trailers");
165     if (transportError == null && !headersReceived) {
166       transportError = validateInitialMetadata(trailers);
167       if (transportError != null) {
168         transportErrorMetadata = trailers;
169       }
170     }
171     if (transportError != null) {
172       transportError = transportError.augmentDescription("trailers: " + trailers);
173       http2ProcessingFailed(transportError, false, transportErrorMetadata);
174     } else {
175       Status status = statusFromTrailers(trailers);
176       stripTransportDetails(trailers);
177       inboundTrailersReceived(trailers, status);
178     }
179   }
180 
181   /**
182    * Extract the response status from trailers.
183    */
statusFromTrailers(Metadata trailers)184   private Status statusFromTrailers(Metadata trailers) {
185     Status status = trailers.get(InternalStatus.CODE_KEY);
186     if (status != null) {
187       return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY));
188     }
189     // No status; something is broken. Try to provide a resonanable error.
190     if (headersReceived) {
191       return Status.UNKNOWN.withDescription("missing GRPC status in response");
192     }
193     Integer httpStatus = trailers.get(HTTP2_STATUS);
194     if (httpStatus != null) {
195       status = GrpcUtil.httpStatusToGrpcStatus(httpStatus);
196     } else {
197       status = Status.INTERNAL.withDescription("missing HTTP status code");
198     }
199     return status.augmentDescription(
200         "missing GRPC status, inferred error from HTTP status code");
201   }
202 
203   /**
204    * Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status}
205    * on failure.
206    *
207    * @return status with description of failure, or {@code null} when valid
208    */
209   @Nullable
validateInitialMetadata(Metadata headers)210   private Status validateInitialMetadata(Metadata headers) {
211     Integer httpStatus = headers.get(HTTP2_STATUS);
212     if (httpStatus == null) {
213       return Status.INTERNAL.withDescription("Missing HTTP status code");
214     }
215     String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
216     if (!GrpcUtil.isGrpcContentType(contentType)) {
217       return GrpcUtil.httpStatusToGrpcStatus(httpStatus)
218           .augmentDescription("invalid content-type: " + contentType);
219     }
220     return null;
221   }
222 
223   /**
224    * Inspect the raw metadata and figure out what charset is being used.
225    */
extractCharset(Metadata headers)226   private static Charset extractCharset(Metadata headers) {
227     String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
228     if (contentType != null) {
229       String[] split = contentType.split("charset=", 2);
230       try {
231         return Charset.forName(split[split.length - 1].trim());
232       } catch (Exception t) {
233         // Ignore and assume UTF-8
234       }
235     }
236     return Charsets.UTF_8;
237   }
238 
239   /**
240    * Strip HTTP transport implementation details so they don't leak via metadata into
241    * the application layer.
242    */
stripTransportDetails(Metadata metadata)243   private static void stripTransportDetails(Metadata metadata) {
244     metadata.discardAll(HTTP2_STATUS);
245     metadata.discardAll(InternalStatus.CODE_KEY);
246     metadata.discardAll(InternalStatus.MESSAGE_KEY);
247   }
248 }
249