1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.base.Charsets; 20 import com.google.common.base.Preconditions; 21 import io.grpc.InternalMetadata; 22 import io.grpc.InternalStatus; 23 import io.grpc.Metadata; 24 import io.grpc.Status; 25 import java.nio.charset.Charset; 26 import javax.annotation.Nullable; 27 28 /** 29 * Base implementation for client streams using HTTP2 as the transport. 30 */ 31 public abstract class Http2ClientStreamTransportState extends AbstractClientStream.TransportState { 32 33 /** 34 * Metadata marshaller for HTTP status lines. 35 */ 36 private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER = 37 new InternalMetadata.TrustedAsciiMarshaller<Integer>() { 38 @Override 39 public byte[] toAsciiString(Integer value) { 40 throw new UnsupportedOperationException(); 41 } 42 43 /** 44 * RFC 7231 says status codes are 3 digits long. 45 * 46 * @see <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a> 47 */ 48 @Override 49 public Integer parseAsciiString(byte[] serialized) { 50 if (serialized.length >= 3) { 51 return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0'); 52 } 53 throw new NumberFormatException( 54 "Malformed status code " + new String(serialized, InternalMetadata.US_ASCII)); 55 } 56 }; 57 58 private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status", 59 HTTP_STATUS_MARSHALLER); 60 61 /** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */ 62 private Status transportError; 63 private Metadata transportErrorMetadata; 64 private Charset errorCharset = Charsets.UTF_8; 65 private boolean headersReceived; 66 Http2ClientStreamTransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)67 protected Http2ClientStreamTransportState( 68 int maxMessageSize, 69 StatsTraceContext statsTraceCtx, 70 TransportTracer transportTracer) { 71 super(maxMessageSize, statsTraceCtx, transportTracer); 72 } 73 74 /** 75 * Called to process a failure in HTTP/2 processing. It should notify the transport to cancel the 76 * stream and call {@code transportReportStatus()}. 77 */ http2ProcessingFailed( Status status, boolean stopDelivery, Metadata trailers)78 protected abstract void http2ProcessingFailed( 79 Status status, boolean stopDelivery, Metadata trailers); 80 81 /** 82 * Called by subclasses whenever {@code Headers} are received from the transport. 83 * 84 * @param headers the received headers 85 */ transportHeadersReceived(Metadata headers)86 protected void transportHeadersReceived(Metadata headers) { 87 Preconditions.checkNotNull(headers, "headers"); 88 if (transportError != null) { 89 // Already received a transport error so just augment it. Something is really, really strange. 90 transportError = transportError.augmentDescription("headers: " + headers); 91 return; 92 } 93 try { 94 if (headersReceived) { 95 transportError = Status.INTERNAL.withDescription("Received headers twice"); 96 return; 97 } 98 Integer httpStatus = headers.get(HTTP2_STATUS); 99 if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) { 100 // Ignore the headers. See RFC 7540 §8.1 101 return; 102 } 103 headersReceived = true; 104 105 transportError = validateInitialMetadata(headers); 106 if (transportError != null) { 107 return; 108 } 109 110 stripTransportDetails(headers); 111 inboundHeadersReceived(headers); 112 } finally { 113 if (transportError != null) { 114 // Note we don't immediately report the transport error, instead we wait for more data on 115 // the stream so we can accumulate more detail into the error before reporting it. 116 transportError = transportError.augmentDescription("headers: " + headers); 117 transportErrorMetadata = headers; 118 errorCharset = extractCharset(headers); 119 } 120 } 121 } 122 123 /** 124 * Called by subclasses whenever a data frame is received from the transport. 125 * 126 * @param frame the received data frame 127 * @param endOfStream {@code true} if there will be no more data received for this stream 128 */ transportDataReceived(ReadableBuffer frame, boolean endOfStream)129 protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) { 130 if (transportError != null) { 131 // We've already detected a transport error and now we're just accumulating more detail 132 // for it. 133 transportError = transportError.augmentDescription("DATA-----------------------------\n" 134 + ReadableBuffers.readAsString(frame, errorCharset)); 135 frame.close(); 136 if (transportError.getDescription().length() > 1000 || endOfStream) { 137 http2ProcessingFailed(transportError, false, transportErrorMetadata); 138 } 139 } else { 140 if (!headersReceived) { 141 http2ProcessingFailed( 142 Status.INTERNAL.withDescription("headers not received before payload"), 143 false, 144 new Metadata()); 145 return; 146 } 147 inboundDataReceived(frame); 148 if (endOfStream) { 149 // This is a protocol violation as we expect to receive trailers. 150 transportError = 151 Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server."); 152 transportErrorMetadata = new Metadata(); 153 transportReportStatus(transportError, false, transportErrorMetadata); 154 } 155 } 156 } 157 158 /** 159 * Called by subclasses for the terminal trailer metadata on a stream. 160 * 161 * @param trailers the received terminal trailer metadata 162 */ transportTrailersReceived(Metadata trailers)163 protected void transportTrailersReceived(Metadata trailers) { 164 Preconditions.checkNotNull(trailers, "trailers"); 165 if (transportError == null && !headersReceived) { 166 transportError = validateInitialMetadata(trailers); 167 if (transportError != null) { 168 transportErrorMetadata = trailers; 169 } 170 } 171 if (transportError != null) { 172 transportError = transportError.augmentDescription("trailers: " + trailers); 173 http2ProcessingFailed(transportError, false, transportErrorMetadata); 174 } else { 175 Status status = statusFromTrailers(trailers); 176 stripTransportDetails(trailers); 177 inboundTrailersReceived(trailers, status); 178 } 179 } 180 181 /** 182 * Extract the response status from trailers. 183 */ statusFromTrailers(Metadata trailers)184 private Status statusFromTrailers(Metadata trailers) { 185 Status status = trailers.get(InternalStatus.CODE_KEY); 186 if (status != null) { 187 return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY)); 188 } 189 // No status; something is broken. Try to provide a resonanable error. 190 if (headersReceived) { 191 return Status.UNKNOWN.withDescription("missing GRPC status in response"); 192 } 193 Integer httpStatus = trailers.get(HTTP2_STATUS); 194 if (httpStatus != null) { 195 status = GrpcUtil.httpStatusToGrpcStatus(httpStatus); 196 } else { 197 status = Status.INTERNAL.withDescription("missing HTTP status code"); 198 } 199 return status.augmentDescription( 200 "missing GRPC status, inferred error from HTTP status code"); 201 } 202 203 /** 204 * Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status} 205 * on failure. 206 * 207 * @return status with description of failure, or {@code null} when valid 208 */ 209 @Nullable validateInitialMetadata(Metadata headers)210 private Status validateInitialMetadata(Metadata headers) { 211 Integer httpStatus = headers.get(HTTP2_STATUS); 212 if (httpStatus == null) { 213 return Status.INTERNAL.withDescription("Missing HTTP status code"); 214 } 215 String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); 216 if (!GrpcUtil.isGrpcContentType(contentType)) { 217 return GrpcUtil.httpStatusToGrpcStatus(httpStatus) 218 .augmentDescription("invalid content-type: " + contentType); 219 } 220 return null; 221 } 222 223 /** 224 * Inspect the raw metadata and figure out what charset is being used. 225 */ extractCharset(Metadata headers)226 private static Charset extractCharset(Metadata headers) { 227 String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); 228 if (contentType != null) { 229 String[] split = contentType.split("charset=", 2); 230 try { 231 return Charset.forName(split[split.length - 1].trim()); 232 } catch (Exception t) { 233 // Ignore and assume UTF-8 234 } 235 } 236 return Charsets.UTF_8; 237 } 238 239 /** 240 * Strip HTTP transport implementation details so they don't leak via metadata into 241 * the application layer. 242 */ stripTransportDetails(Metadata metadata)243 private static void stripTransportDetails(Metadata metadata) { 244 metadata.discardAll(HTTP2_STATUS); 245 metadata.discardAll(InternalStatus.CODE_KEY); 246 metadata.discardAll(InternalStatus.MESSAGE_KEY); 247 } 248 } 249