• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
23 
24 import com.google.common.base.Preconditions;
25 import com.google.common.io.BaseEncoding;
26 import io.grpc.Attributes;
27 import io.grpc.CallOptions;
28 import io.grpc.InternalKnownTransport;
29 import io.grpc.InternalMethodDescriptor;
30 import io.grpc.Metadata;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.Status;
33 import io.grpc.internal.AbstractClientStream;
34 import io.grpc.internal.ClientStreamListener.RpcProgress;
35 import io.grpc.internal.Http2ClientStreamTransportState;
36 import io.grpc.internal.StatsTraceContext;
37 import io.grpc.internal.TransportTracer;
38 import io.grpc.internal.WritableBuffer;
39 import io.netty.buffer.ByteBuf;
40 import io.netty.channel.Channel;
41 import io.netty.channel.ChannelFuture;
42 import io.netty.channel.ChannelFutureListener;
43 import io.netty.channel.EventLoop;
44 import io.netty.handler.codec.http2.Http2Headers;
45 import io.netty.handler.codec.http2.Http2Stream;
46 import io.netty.util.AsciiString;
47 import io.perfmark.PerfMark;
48 import io.perfmark.Tag;
49 import io.perfmark.TaskCloseable;
50 import javax.annotation.Nullable;
51 
52 /**
53  * Client stream for a Netty transport. Must only be called from the sending application
54  * thread.
55  */
56 class NettyClientStream extends AbstractClientStream {
57   private static final InternalMethodDescriptor methodDescriptorAccessor =
58       new InternalMethodDescriptor(
59           NettyClientTransport.class.getName().contains("grpc.netty.shaded")
60               ? InternalKnownTransport.NETTY_SHADED : InternalKnownTransport.NETTY);
61 
62   private final Sink sink = new Sink();
63   private final TransportState state;
64   private final WriteQueue writeQueue;
65   private final MethodDescriptor<?, ?> method;
66   private AsciiString authority;
67   private final AsciiString scheme;
68   private final AsciiString userAgent;
69 
NettyClientStream( TransportState state, MethodDescriptor<?, ?> method, Metadata headers, Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, CallOptions callOptions, boolean useGetForSafeMethods)70   NettyClientStream(
71       TransportState state,
72       MethodDescriptor<?, ?> method,
73       Metadata headers,
74       Channel channel,
75       AsciiString authority,
76       AsciiString scheme,
77       AsciiString userAgent,
78       StatsTraceContext statsTraceCtx,
79       TransportTracer transportTracer,
80       CallOptions callOptions,
81       boolean useGetForSafeMethods) {
82     super(
83         new NettyWritableBufferAllocator(channel.alloc()),
84         statsTraceCtx,
85         transportTracer,
86         headers,
87         callOptions,
88         useGetForSafeMethods && method.isSafe());
89     this.state = checkNotNull(state, "transportState");
90     this.writeQueue = state.handler.getWriteQueue();
91     this.method = checkNotNull(method, "method");
92     this.authority = checkNotNull(authority, "authority");
93     this.scheme = checkNotNull(scheme, "scheme");
94     this.userAgent = userAgent;
95   }
96 
97   @Override
transportState()98   protected TransportState transportState() {
99     return state;
100   }
101 
102   @Override
abstractClientStreamSink()103   protected Sink abstractClientStreamSink() {
104     return sink;
105   }
106 
107   @Override
setAuthority(String authority)108   public void setAuthority(String authority) {
109     this.authority = AsciiString.of(checkNotNull(authority, "authority"));
110   }
111 
112   @Override
getAttributes()113   public Attributes getAttributes() {
114     return state.handler.getAttributes();
115   }
116 
117   private class Sink implements AbstractClientStream.Sink {
118 
119     @Override
writeHeaders(Metadata headers, byte[] requestPayload)120     public void writeHeaders(Metadata headers, byte[] requestPayload) {
121       try (TaskCloseable ignore =
122                PerfMark.traceTask("NettyClientStream$Sink.writeHeaders")) {
123         writeHeadersInternal(headers, requestPayload);
124       }
125     }
126 
writeHeadersInternal(Metadata headers, byte[] requestPayload)127     private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
128       // Convert the headers into Netty HTTP/2 headers.
129       AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
130       if (defaultPath == null) {
131         defaultPath = new AsciiString("/" + method.getFullMethodName());
132         methodDescriptorAccessor.setRawMethodName(method, defaultPath);
133       }
134       boolean get = (requestPayload != null);
135       AsciiString httpMethod;
136       if (get) {
137         // Forge the query string
138         // TODO(ericgribkoff) Add the key back to the query string
139         defaultPath =
140             new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
141         httpMethod = Utils.HTTP_GET_METHOD;
142       } else {
143         httpMethod = Utils.HTTP_METHOD;
144       }
145       Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath,
146           authority, httpMethod, userAgent);
147 
148       ChannelFutureListener failureListener = new ChannelFutureListener() {
149         @Override
150         public void operationComplete(ChannelFuture future) throws Exception {
151           if (!future.isSuccess()) {
152             // Stream creation failed. Close the stream if not already closed.
153             // When the channel is shutdown, the lifecycle manager has a better view of the failure,
154             // especially before negotiation completes (because the negotiator commonly doesn't
155             // receive the exceptionCaught because NettyClientHandler does not propagate it).
156             Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
157             if (s == null) {
158               s = transportState().statusFromFailedFuture(future);
159             }
160             if (transportState().isNonExistent()) {
161               transportState().transportReportStatus(
162                   s, RpcProgress.MISCARRIED, true, new Metadata());
163             } else {
164               transportState().transportReportStatus(
165                   s, RpcProgress.PROCESSED, true, new Metadata());
166             }
167           }
168         }
169       };
170       // Write the command requesting the creation of the stream.
171       writeQueue.enqueue(
172           new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
173           !method.getType().clientSendsOneMessage() || get).addListener(failureListener);
174     }
175 
writeFrameInternal( WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages)176     private void writeFrameInternal(
177         WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
178       Preconditions.checkArgument(numMessages >= 0);
179       ByteBuf bytebuf =
180           frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf().touch();
181       final int numBytes = bytebuf.readableBytes();
182       if (numBytes > 0) {
183         // Add the bytes to outbound flow control.
184         onSendingBytes(numBytes);
185         writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
186             .addListener(new ChannelFutureListener() {
187               @Override
188               public void operationComplete(ChannelFuture future) throws Exception {
189                 // If the future succeeds when http2stream is null, the stream has been cancelled
190                 // before it began and Netty is purging pending writes from the flow-controller.
191                 if (future.isSuccess() && transportState().http2Stream() != null) {
192                   // Remove the bytes from outbound flow control, optionally notifying
193                   // the client that they can send more bytes.
194                   transportState().onSentBytes(numBytes);
195                   NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
196                 }
197               }
198             });
199       } else {
200         // The frame is empty and will not impact outbound flow control. Just send it.
201         writeQueue.enqueue(
202             new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
203       }
204     }
205 
206     @Override
writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)207     public void writeFrame(
208         WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
209       try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.writeFrame")) {
210         writeFrameInternal(frame, endOfStream, flush, numMessages);
211       }
212     }
213 
214     @Override
cancel(Status status)215     public void cancel(Status status) {
216       try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.cancel")) {
217         writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
218       }
219     }
220   }
221 
222   /** This should only be called from the transport thread. */
223   public abstract static class TransportState extends Http2ClientStreamTransportState
224       implements StreamIdHolder {
225     private static final int NON_EXISTENT_ID = -1;
226 
227     private final String methodName;
228     private final NettyClientHandler handler;
229     private final EventLoop eventLoop;
230     private int id;
231     private Http2Stream http2Stream;
232     private Tag tag;
233 
TransportState( NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, String methodName)234     protected TransportState(
235         NettyClientHandler handler,
236         EventLoop eventLoop,
237         int maxMessageSize,
238         StatsTraceContext statsTraceCtx,
239         TransportTracer transportTracer,
240         String methodName) {
241       super(maxMessageSize, statsTraceCtx, transportTracer);
242       this.methodName = checkNotNull(methodName, "methodName");
243       this.handler = checkNotNull(handler, "handler");
244       this.eventLoop = checkNotNull(eventLoop, "eventLoop");
245       tag = PerfMark.createTag(methodName);
246     }
247 
248     @Override
id()249     public int id() {
250       // id should be positive
251       return id;
252     }
253 
setId(int id)254     public void setId(int id) {
255       checkArgument(id > 0, "id must be positive %s", id);
256       checkState(this.id == 0, "id has been previously set: %s", this.id);
257       this.id = id;
258       this.tag = PerfMark.createTag(methodName, id);
259     }
260 
261     /**
262      * Marks the stream state as if it had never existed.  This can happen if the stream is
263      * cancelled after it is created, but before it has been started.
264      */
setNonExistent()265     void setNonExistent() {
266       checkState(this.id == 0, "Id has been previously set: %s", this.id);
267       this.id = NON_EXISTENT_ID;
268     }
269 
isNonExistent()270     boolean isNonExistent() {
271       return this.id == NON_EXISTENT_ID || this.id == 0;
272     }
273 
274     /**
275      * Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the
276      * context of the transport thread.
277      */
setHttp2Stream(Http2Stream http2Stream)278     public void setHttp2Stream(Http2Stream http2Stream) {
279       checkNotNull(http2Stream, "http2Stream");
280       checkState(this.http2Stream == null, "Can only set http2Stream once");
281       this.http2Stream = http2Stream;
282 
283       // Now that the stream has actually been initialized, call the listener's onReady callback if
284       // appropriate.
285       onStreamAllocated();
286       getTransportTracer().reportLocalStreamStarted();
287     }
288 
289     /**
290      * Gets the underlying Netty {@link Http2Stream} for this stream.
291      */
292     @Nullable
http2Stream()293     public Http2Stream http2Stream() {
294       return http2Stream;
295     }
296 
297     /**
298      * Intended to be overridden by NettyClientTransport, which has more information about failures.
299      * May only be called from event loop.
300      */
statusFromFailedFuture(ChannelFuture f)301     protected abstract Status statusFromFailedFuture(ChannelFuture f);
302 
303     @Override
http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)304     protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
305       transportReportStatus(status, stopDelivery, trailers);
306       handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, status), true);
307     }
308 
309     @Override
runOnTransportThread(final Runnable r)310     public void runOnTransportThread(final Runnable r) {
311       if (eventLoop.inEventLoop()) {
312         r.run();
313       } else {
314         eventLoop.execute(r);
315       }
316     }
317 
318     @Override
bytesRead(int processedBytes)319     public void bytesRead(int processedBytes) {
320       handler.returnProcessedBytes(http2Stream, processedBytes);
321       handler.getWriteQueue().scheduleFlush();
322     }
323 
324     @Override
deframeFailed(Throwable cause)325     public void deframeFailed(Throwable cause) {
326       http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
327     }
328 
transportHeadersReceived(Http2Headers headers, boolean endOfStream)329     void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
330       if (endOfStream) {
331         if (!isOutboundClosed()) {
332           handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true);
333         }
334         transportTrailersReceived(Utils.convertTrailers(headers));
335       } else {
336         transportHeadersReceived(Utils.convertHeaders(headers));
337       }
338     }
339 
transportDataReceived(ByteBuf frame, boolean endOfStream)340     void transportDataReceived(ByteBuf frame, boolean endOfStream) {
341       transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
342     }
343 
344     @Override
tag()345     public final Tag tag() {
346       return tag;
347     }
348   }
349 }
350