1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.base.Charsets; 20 import com.google.common.base.Preconditions; 21 import io.grpc.InternalMetadata; 22 import io.grpc.InternalStatus; 23 import io.grpc.Metadata; 24 import io.grpc.Status; 25 import java.nio.charset.Charset; 26 import javax.annotation.Nullable; 27 28 /** 29 * Base implementation for client streams using HTTP2 as the transport. 30 */ 31 public abstract class Http2ClientStreamTransportState extends AbstractClientStream.TransportState { 32 33 /** 34 * Metadata marshaller for HTTP status lines. 35 */ 36 private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER = 37 new InternalMetadata.TrustedAsciiMarshaller<Integer>() { 38 @Override 39 public byte[] toAsciiString(Integer value) { 40 throw new UnsupportedOperationException(); 41 } 42 43 /** 44 * RFC 7231 says status codes are 3 digits long. 45 * 46 * @see <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a> 47 */ 48 @Override 49 public Integer parseAsciiString(byte[] serialized) { 50 if (serialized.length >= 3) { 51 return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0'); 52 } 53 throw new NumberFormatException( 54 "Malformed status code " + new String(serialized, InternalMetadata.US_ASCII)); 55 } 56 }; 57 58 private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status", 59 HTTP_STATUS_MARSHALLER); 60 61 /** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */ 62 private Status transportError; 63 private Metadata transportErrorMetadata; 64 private Charset errorCharset = Charsets.UTF_8; 65 private boolean headersReceived; 66 Http2ClientStreamTransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)67 protected Http2ClientStreamTransportState( 68 int maxMessageSize, 69 StatsTraceContext statsTraceCtx, 70 TransportTracer transportTracer) { 71 super(maxMessageSize, statsTraceCtx, transportTracer); 72 } 73 74 /** 75 * Called to process a failure in HTTP/2 processing. It should notify the transport to cancel the 76 * stream and call {@code transportReportStatus()}. 77 */ http2ProcessingFailed( Status status, boolean stopDelivery, Metadata trailers)78 protected abstract void http2ProcessingFailed( 79 Status status, boolean stopDelivery, Metadata trailers); 80 81 /** 82 * Called by subclasses whenever {@code Headers} are received from the transport. 83 * 84 * @param headers the received headers 85 */ transportHeadersReceived(Metadata headers)86 protected void transportHeadersReceived(Metadata headers) { 87 Preconditions.checkNotNull(headers, "headers"); 88 if (transportError != null) { 89 // Already received a transport error so just augment it. Something is really, really strange. 90 transportError = transportError.augmentDescription("headers: " + headers); 91 return; 92 } 93 try { 94 if (headersReceived) { 95 transportError = Status.INTERNAL.withDescription("Received headers twice"); 96 return; 97 } 98 Integer httpStatus = headers.get(HTTP2_STATUS); 99 if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) { 100 // Ignore the headers. See RFC 7540 §8.1 101 return; 102 } 103 headersReceived = true; 104 105 transportError = validateInitialMetadata(headers); 106 if (transportError != null) { 107 return; 108 } 109 110 stripTransportDetails(headers); 111 inboundHeadersReceived(headers); 112 } finally { 113 if (transportError != null) { 114 // Note we don't immediately report the transport error, instead we wait for more data on 115 // the stream so we can accumulate more detail into the error before reporting it. 116 transportError = transportError.augmentDescription("headers: " + headers); 117 transportErrorMetadata = headers; 118 errorCharset = extractCharset(headers); 119 } 120 } 121 } 122 123 /** 124 * Called by subclasses whenever a data frame is received from the transport. 125 * 126 * @param frame the received data frame 127 * @param endOfStream {@code true} if there will be no more data received for this stream 128 */ transportDataReceived(ReadableBuffer frame, boolean endOfStream)129 protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) { 130 if (transportError != null) { 131 // We've already detected a transport error and now we're just accumulating more detail 132 // for it. 133 transportError = transportError.augmentDescription("DATA-----------------------------\n" 134 + ReadableBuffers.readAsString(frame, errorCharset)); 135 frame.close(); 136 if (transportError.getDescription().length() > 1000 || endOfStream) { 137 http2ProcessingFailed(transportError, false, transportErrorMetadata); 138 } 139 } else { 140 if (!headersReceived) { 141 http2ProcessingFailed( 142 Status.INTERNAL.withDescription("headers not received before payload"), 143 false, 144 new Metadata()); 145 return; 146 } 147 int frameSize = frame.readableBytes(); 148 inboundDataReceived(frame); 149 if (endOfStream) { 150 // This is a protocol violation as we expect to receive trailers. 151 if (frameSize > 0) { 152 transportError = Status.INTERNAL 153 .withDescription("Received unexpected EOS on non-empty DATA frame from server"); 154 } else { 155 transportError = Status.INTERNAL 156 .withDescription("Received unexpected EOS on empty DATA frame from server"); 157 } 158 transportErrorMetadata = new Metadata(); 159 transportReportStatus(transportError, false, transportErrorMetadata); 160 } 161 } 162 } 163 164 /** 165 * Called by subclasses for the terminal trailer metadata on a stream. 166 * 167 * @param trailers the received terminal trailer metadata 168 */ transportTrailersReceived(Metadata trailers)169 protected void transportTrailersReceived(Metadata trailers) { 170 Preconditions.checkNotNull(trailers, "trailers"); 171 if (transportError == null && !headersReceived) { 172 transportError = validateInitialMetadata(trailers); 173 if (transportError != null) { 174 transportErrorMetadata = trailers; 175 } 176 } 177 if (transportError != null) { 178 transportError = transportError.augmentDescription("trailers: " + trailers); 179 http2ProcessingFailed(transportError, false, transportErrorMetadata); 180 } else { 181 Status status = statusFromTrailers(trailers); 182 stripTransportDetails(trailers); 183 inboundTrailersReceived(trailers, status); 184 } 185 } 186 187 /** 188 * Extract the response status from trailers. 189 */ statusFromTrailers(Metadata trailers)190 private Status statusFromTrailers(Metadata trailers) { 191 Status status = trailers.get(InternalStatus.CODE_KEY); 192 if (status != null) { 193 return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY)); 194 } 195 // No status; something is broken. Try to provide a resonanable error. 196 if (headersReceived) { 197 return Status.UNKNOWN.withDescription("missing GRPC status in response"); 198 } 199 Integer httpStatus = trailers.get(HTTP2_STATUS); 200 if (httpStatus != null) { 201 status = GrpcUtil.httpStatusToGrpcStatus(httpStatus); 202 } else { 203 status = Status.INTERNAL.withDescription("missing HTTP status code"); 204 } 205 return status.augmentDescription( 206 "missing GRPC status, inferred error from HTTP status code"); 207 } 208 209 /** 210 * Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status} 211 * on failure. 212 * 213 * @return status with description of failure, or {@code null} when valid 214 */ 215 @Nullable validateInitialMetadata(Metadata headers)216 private Status validateInitialMetadata(Metadata headers) { 217 Integer httpStatus = headers.get(HTTP2_STATUS); 218 if (httpStatus == null) { 219 return Status.INTERNAL.withDescription("Missing HTTP status code"); 220 } 221 String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); 222 if (!GrpcUtil.isGrpcContentType(contentType)) { 223 return GrpcUtil.httpStatusToGrpcStatus(httpStatus) 224 .augmentDescription("invalid content-type: " + contentType); 225 } 226 return null; 227 } 228 229 /** 230 * Inspect the raw metadata and figure out what charset is being used. 231 */ extractCharset(Metadata headers)232 private static Charset extractCharset(Metadata headers) { 233 String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); 234 if (contentType != null) { 235 String[] split = contentType.split("charset=", 2); 236 try { 237 return Charset.forName(split[split.length - 1].trim()); 238 } catch (Exception t) { 239 // Ignore and assume UTF-8 240 } 241 } 242 return Charsets.UTF_8; 243 } 244 245 /** 246 * Strip HTTP transport implementation details so they don't leak via metadata into 247 * the application layer. 248 */ stripTransportDetails(Metadata metadata)249 private static void stripTransportDetails(Metadata metadata) { 250 metadata.discardAll(HTTP2_STATUS); 251 metadata.discardAll(InternalStatus.CODE_KEY); 252 metadata.discardAll(InternalStatus.MESSAGE_KEY); 253 } 254 } 255