1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.netty.buffer.Unpooled.EMPTY_BUFFER; 23 24 import com.google.common.base.Preconditions; 25 import com.google.common.io.BaseEncoding; 26 import io.grpc.Attributes; 27 import io.grpc.CallOptions; 28 import io.grpc.InternalKnownTransport; 29 import io.grpc.InternalMethodDescriptor; 30 import io.grpc.Metadata; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.Status; 33 import io.grpc.internal.AbstractClientStream; 34 import io.grpc.internal.ClientStreamListener.RpcProgress; 35 import io.grpc.internal.Http2ClientStreamTransportState; 36 import io.grpc.internal.StatsTraceContext; 37 import io.grpc.internal.TransportTracer; 38 import io.grpc.internal.WritableBuffer; 39 import io.netty.buffer.ByteBuf; 40 import io.netty.channel.Channel; 41 import io.netty.channel.ChannelFuture; 42 import io.netty.channel.ChannelFutureListener; 43 import io.netty.channel.EventLoop; 44 import io.netty.handler.codec.http2.Http2Headers; 45 import io.netty.handler.codec.http2.Http2Stream; 46 import io.netty.util.AsciiString; 47 import io.perfmark.PerfMark; 48 import io.perfmark.Tag; 49 import io.perfmark.TaskCloseable; 50 import javax.annotation.Nullable; 51 52 /** 53 * Client stream for a Netty transport. Must only be called from the sending application 54 * thread. 55 */ 56 class NettyClientStream extends AbstractClientStream { 57 private static final InternalMethodDescriptor methodDescriptorAccessor = 58 new InternalMethodDescriptor( 59 NettyClientTransport.class.getName().contains("grpc.netty.shaded") 60 ? InternalKnownTransport.NETTY_SHADED : InternalKnownTransport.NETTY); 61 62 private final Sink sink = new Sink(); 63 private final TransportState state; 64 private final WriteQueue writeQueue; 65 private final MethodDescriptor<?, ?> method; 66 private AsciiString authority; 67 private final AsciiString scheme; 68 private final AsciiString userAgent; 69 NettyClientStream( TransportState state, MethodDescriptor<?, ?> method, Metadata headers, Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, CallOptions callOptions, boolean useGetForSafeMethods)70 NettyClientStream( 71 TransportState state, 72 MethodDescriptor<?, ?> method, 73 Metadata headers, 74 Channel channel, 75 AsciiString authority, 76 AsciiString scheme, 77 AsciiString userAgent, 78 StatsTraceContext statsTraceCtx, 79 TransportTracer transportTracer, 80 CallOptions callOptions, 81 boolean useGetForSafeMethods) { 82 super( 83 new NettyWritableBufferAllocator(channel.alloc()), 84 statsTraceCtx, 85 transportTracer, 86 headers, 87 callOptions, 88 useGetForSafeMethods && method.isSafe()); 89 this.state = checkNotNull(state, "transportState"); 90 this.writeQueue = state.handler.getWriteQueue(); 91 this.method = checkNotNull(method, "method"); 92 this.authority = checkNotNull(authority, "authority"); 93 this.scheme = checkNotNull(scheme, "scheme"); 94 this.userAgent = userAgent; 95 } 96 97 @Override transportState()98 protected TransportState transportState() { 99 return state; 100 } 101 102 @Override abstractClientStreamSink()103 protected Sink abstractClientStreamSink() { 104 return sink; 105 } 106 107 @Override setAuthority(String authority)108 public void setAuthority(String authority) { 109 this.authority = AsciiString.of(checkNotNull(authority, "authority")); 110 } 111 112 @Override getAttributes()113 public Attributes getAttributes() { 114 return state.handler.getAttributes(); 115 } 116 117 private class Sink implements AbstractClientStream.Sink { 118 119 @Override writeHeaders(Metadata headers, byte[] requestPayload)120 public void writeHeaders(Metadata headers, byte[] requestPayload) { 121 try (TaskCloseable ignore = 122 PerfMark.traceTask("NettyClientStream$Sink.writeHeaders")) { 123 writeHeadersInternal(headers, requestPayload); 124 } 125 } 126 writeHeadersInternal(Metadata headers, byte[] requestPayload)127 private void writeHeadersInternal(Metadata headers, byte[] requestPayload) { 128 // Convert the headers into Netty HTTP/2 headers. 129 AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method); 130 if (defaultPath == null) { 131 defaultPath = new AsciiString("/" + method.getFullMethodName()); 132 methodDescriptorAccessor.setRawMethodName(method, defaultPath); 133 } 134 boolean get = (requestPayload != null); 135 AsciiString httpMethod; 136 if (get) { 137 // Forge the query string 138 // TODO(ericgribkoff) Add the key back to the query string 139 defaultPath = 140 new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload)); 141 httpMethod = Utils.HTTP_GET_METHOD; 142 } else { 143 httpMethod = Utils.HTTP_METHOD; 144 } 145 Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath, 146 authority, httpMethod, userAgent); 147 148 ChannelFutureListener failureListener = new ChannelFutureListener() { 149 @Override 150 public void operationComplete(ChannelFuture future) throws Exception { 151 if (!future.isSuccess()) { 152 // Stream creation failed. Close the stream if not already closed. 153 // When the channel is shutdown, the lifecycle manager has a better view of the failure, 154 // especially before negotiation completes (because the negotiator commonly doesn't 155 // receive the exceptionCaught because NettyClientHandler does not propagate it). 156 Status s = transportState().handler.getLifecycleManager().getShutdownStatus(); 157 if (s == null) { 158 s = transportState().statusFromFailedFuture(future); 159 } 160 if (transportState().isNonExistent()) { 161 transportState().transportReportStatus( 162 s, RpcProgress.MISCARRIED, true, new Metadata()); 163 } else { 164 transportState().transportReportStatus( 165 s, RpcProgress.PROCESSED, true, new Metadata()); 166 } 167 } 168 } 169 }; 170 // Write the command requesting the creation of the stream. 171 writeQueue.enqueue( 172 new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), 173 !method.getType().clientSendsOneMessage() || get).addListener(failureListener); 174 } 175 writeFrameInternal( WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages)176 private void writeFrameInternal( 177 WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { 178 Preconditions.checkArgument(numMessages >= 0); 179 ByteBuf bytebuf = 180 frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf().touch(); 181 final int numBytes = bytebuf.readableBytes(); 182 if (numBytes > 0) { 183 // Add the bytes to outbound flow control. 184 onSendingBytes(numBytes); 185 writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush) 186 .addListener(new ChannelFutureListener() { 187 @Override 188 public void operationComplete(ChannelFuture future) throws Exception { 189 // If the future succeeds when http2stream is null, the stream has been cancelled 190 // before it began and Netty is purging pending writes from the flow-controller. 191 if (future.isSuccess() && transportState().http2Stream() != null) { 192 // Remove the bytes from outbound flow control, optionally notifying 193 // the client that they can send more bytes. 194 transportState().onSentBytes(numBytes); 195 NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages); 196 } 197 } 198 }); 199 } else { 200 // The frame is empty and will not impact outbound flow control. Just send it. 201 writeQueue.enqueue( 202 new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); 203 } 204 } 205 206 @Override writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)207 public void writeFrame( 208 WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { 209 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.writeFrame")) { 210 writeFrameInternal(frame, endOfStream, flush, numMessages); 211 } 212 } 213 214 @Override cancel(Status status)215 public void cancel(Status status) { 216 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.cancel")) { 217 writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); 218 } 219 } 220 } 221 222 /** This should only be called from the transport thread. */ 223 public abstract static class TransportState extends Http2ClientStreamTransportState 224 implements StreamIdHolder { 225 private static final int NON_EXISTENT_ID = -1; 226 227 private final String methodName; 228 private final NettyClientHandler handler; 229 private final EventLoop eventLoop; 230 private int id; 231 private Http2Stream http2Stream; 232 private Tag tag; 233 TransportState( NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, String methodName)234 protected TransportState( 235 NettyClientHandler handler, 236 EventLoop eventLoop, 237 int maxMessageSize, 238 StatsTraceContext statsTraceCtx, 239 TransportTracer transportTracer, 240 String methodName) { 241 super(maxMessageSize, statsTraceCtx, transportTracer); 242 this.methodName = checkNotNull(methodName, "methodName"); 243 this.handler = checkNotNull(handler, "handler"); 244 this.eventLoop = checkNotNull(eventLoop, "eventLoop"); 245 tag = PerfMark.createTag(methodName); 246 } 247 248 @Override id()249 public int id() { 250 // id should be positive 251 return id; 252 } 253 setId(int id)254 public void setId(int id) { 255 checkArgument(id > 0, "id must be positive %s", id); 256 checkState(this.id == 0, "id has been previously set: %s", this.id); 257 this.id = id; 258 this.tag = PerfMark.createTag(methodName, id); 259 } 260 261 /** 262 * Marks the stream state as if it had never existed. This can happen if the stream is 263 * cancelled after it is created, but before it has been started. 264 */ setNonExistent()265 void setNonExistent() { 266 checkState(this.id == 0, "Id has been previously set: %s", this.id); 267 this.id = NON_EXISTENT_ID; 268 } 269 isNonExistent()270 boolean isNonExistent() { 271 return this.id == NON_EXISTENT_ID || this.id == 0; 272 } 273 274 /** 275 * Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the 276 * context of the transport thread. 277 */ setHttp2Stream(Http2Stream http2Stream)278 public void setHttp2Stream(Http2Stream http2Stream) { 279 checkNotNull(http2Stream, "http2Stream"); 280 checkState(this.http2Stream == null, "Can only set http2Stream once"); 281 this.http2Stream = http2Stream; 282 283 // Now that the stream has actually been initialized, call the listener's onReady callback if 284 // appropriate. 285 onStreamAllocated(); 286 getTransportTracer().reportLocalStreamStarted(); 287 } 288 289 /** 290 * Gets the underlying Netty {@link Http2Stream} for this stream. 291 */ 292 @Nullable http2Stream()293 public Http2Stream http2Stream() { 294 return http2Stream; 295 } 296 297 /** 298 * Intended to be overridden by NettyClientTransport, which has more information about failures. 299 * May only be called from event loop. 300 */ statusFromFailedFuture(ChannelFuture f)301 protected abstract Status statusFromFailedFuture(ChannelFuture f); 302 303 @Override http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)304 protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { 305 transportReportStatus(status, stopDelivery, trailers); 306 handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, status), true); 307 } 308 309 @Override runOnTransportThread(final Runnable r)310 public void runOnTransportThread(final Runnable r) { 311 if (eventLoop.inEventLoop()) { 312 r.run(); 313 } else { 314 eventLoop.execute(r); 315 } 316 } 317 318 @Override bytesRead(int processedBytes)319 public void bytesRead(int processedBytes) { 320 handler.returnProcessedBytes(http2Stream, processedBytes); 321 handler.getWriteQueue().scheduleFlush(); 322 } 323 324 @Override deframeFailed(Throwable cause)325 public void deframeFailed(Throwable cause) { 326 http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata()); 327 } 328 transportHeadersReceived(Http2Headers headers, boolean endOfStream)329 void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { 330 if (endOfStream) { 331 if (!isOutboundClosed()) { 332 handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true); 333 } 334 transportTrailersReceived(Utils.convertTrailers(headers)); 335 } else { 336 transportHeadersReceived(Utils.convertHeaders(headers)); 337 } 338 } 339 transportDataReceived(ByteBuf frame, boolean endOfStream)340 void transportDataReceived(ByteBuf frame, boolean endOfStream) { 341 transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); 342 } 343 344 @Override tag()345 public final Tag tag() { 346 return tag; 347 } 348 } 349 } 350