• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
22 
23 import com.google.common.io.BaseEncoding;
24 import io.grpc.Attributes;
25 import io.grpc.CallOptions;
26 import io.grpc.Metadata;
27 import io.grpc.MethodDescriptor;
28 import io.grpc.Status;
29 import io.grpc.internal.AbstractClientStream;
30 import io.grpc.internal.Http2ClientStreamTransportState;
31 import io.grpc.internal.StatsTraceContext;
32 import io.grpc.internal.TransportTracer;
33 import io.grpc.internal.WritableBuffer;
34 import io.grpc.okhttp.internal.framed.ErrorCode;
35 import io.grpc.okhttp.internal.framed.Header;
36 import io.perfmark.PerfMark;
37 import io.perfmark.Tag;
38 import io.perfmark.TaskCloseable;
39 import java.util.List;
40 import javax.annotation.concurrent.GuardedBy;
41 import okio.Buffer;
42 
43 /**
44  * Client stream for the okhttp transport.
45  */
46 class OkHttpClientStream extends AbstractClientStream {
47 
48   private static final Buffer EMPTY_BUFFER = new Buffer();
49 
50   public static final int ABSENT_ID = -1;
51 
52   private final MethodDescriptor<?, ?> method;
53 
54   private final String userAgent;
55   private final StatsTraceContext statsTraceCtx;
56   private String authority;
57   private final TransportState state;
58   private final Sink sink = new Sink();
59   private final Attributes attributes;
60 
61   private boolean useGet = false;
62 
OkHttpClientStream( MethodDescriptor<?, ?> method, Metadata headers, ExceptionHandlingFrameWriter frameWriter, OkHttpClientTransport transport, OutboundFlowController outboundFlow, Object lock, int maxMessageSize, int initialWindowSize, String authority, String userAgent, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, CallOptions callOptions, boolean useGetForSafeMethods)63   OkHttpClientStream(
64       MethodDescriptor<?, ?> method,
65       Metadata headers,
66       ExceptionHandlingFrameWriter frameWriter,
67       OkHttpClientTransport transport,
68       OutboundFlowController outboundFlow,
69       Object lock,
70       int maxMessageSize,
71       int initialWindowSize,
72       String authority,
73       String userAgent,
74       StatsTraceContext statsTraceCtx,
75       TransportTracer transportTracer,
76       CallOptions callOptions,
77       boolean useGetForSafeMethods) {
78     super(
79         new OkHttpWritableBufferAllocator(),
80         statsTraceCtx,
81         transportTracer,
82         headers,
83         callOptions,
84         useGetForSafeMethods && method.isSafe());
85     this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
86     this.method = method;
87     this.authority = authority;
88     this.userAgent = userAgent;
89     // OkHttpClientStream is only created after the transport has finished connecting,
90     // so it is safe to read the transport attributes.
91     // We make a copy here for convenience, even though we can ask the transport.
92     this.attributes = transport.getAttributes();
93     this.state =
94         new TransportState(
95             maxMessageSize,
96             statsTraceCtx,
97             lock,
98             frameWriter,
99             outboundFlow,
100             transport,
101             initialWindowSize,
102             method.getFullMethodName());
103   }
104 
105   @Override
transportState()106   protected TransportState transportState() {
107     return state;
108   }
109 
110   @Override
abstractClientStreamSink()111   protected Sink abstractClientStreamSink() {
112     return sink;
113   }
114 
115   /**
116    * Returns the type of this stream.
117    */
getType()118   public MethodDescriptor.MethodType getType() {
119     return method.getType();
120   }
121 
122   /**
123    * Returns whether the stream uses GET. This is not known until after {@link Sink#writeHeaders} is
124    * invoked.
125    */
useGet()126   boolean useGet() {
127     return useGet;
128   }
129 
130   @Override
setAuthority(String authority)131   public void setAuthority(String authority) {
132     this.authority = checkNotNull(authority, "authority");
133   }
134 
135   @Override
getAttributes()136   public Attributes getAttributes() {
137     return attributes;
138   }
139 
140   class Sink implements AbstractClientStream.Sink {
141     @Override
writeHeaders(Metadata metadata, byte[] payload)142     public void writeHeaders(Metadata metadata, byte[] payload) {
143       try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeHeaders")) {
144         String defaultPath = "/" + method.getFullMethodName();
145         if (payload != null) {
146           useGet = true;
147           defaultPath += "?" + BaseEncoding.base64().encode(payload);
148         }
149         synchronized (state.lock) {
150           state.streamReady(metadata, defaultPath);
151         }
152       }
153     }
154 
155     @Override
writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)156     public void writeFrame(
157         WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
158       try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeFrame")) {
159         Buffer buffer;
160         if (frame == null) {
161           buffer = EMPTY_BUFFER;
162         } else {
163           buffer = ((OkHttpWritableBuffer) frame).buffer();
164           int size = (int) buffer.size();
165           if (size > 0) {
166             onSendingBytes(size);
167           }
168         }
169 
170         synchronized (state.lock) {
171           state.sendBuffer(buffer, endOfStream, flush);
172           getTransportTracer().reportMessageSent(numMessages);
173         }
174       }
175     }
176 
177     @Override
cancel(Status reason)178     public void cancel(Status reason) {
179       try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) {
180         synchronized (state.lock) {
181           state.cancel(reason, true, null);
182         }
183       }
184     }
185   }
186 
187   class TransportState extends Http2ClientStreamTransportState
188       implements OutboundFlowController.Stream {
189     private final int initialWindowSize;
190     private final Object lock;
191     @GuardedBy("lock")
192     private List<Header> requestHeaders;
193     @GuardedBy("lock")
194     private Buffer pendingData = new Buffer();
195     private boolean pendingDataHasEndOfStream = false;
196     private boolean flushPendingData = false;
197     @GuardedBy("lock")
198     private boolean cancelSent = false;
199     @GuardedBy("lock")
200     private int window;
201     @GuardedBy("lock")
202     private int processedWindow;
203     @GuardedBy("lock")
204     private final ExceptionHandlingFrameWriter frameWriter;
205     @GuardedBy("lock")
206     private final OutboundFlowController outboundFlow;
207     @GuardedBy("lock")
208     private final OkHttpClientTransport transport;
209     /** True iff neither {@link #cancel} nor {@link #start(int)} have been called. */
210     @GuardedBy("lock")
211     private boolean canStart = true;
212     private final Tag tag;
213     @GuardedBy("lock")
214     private OutboundFlowController.StreamState outboundFlowState;
215     private int id = ABSENT_ID;
216 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, ExceptionHandlingFrameWriter frameWriter, OutboundFlowController outboundFlow, OkHttpClientTransport transport, int initialWindowSize, String methodName)217     public TransportState(
218         int maxMessageSize,
219         StatsTraceContext statsTraceCtx,
220         Object lock,
221         ExceptionHandlingFrameWriter frameWriter,
222         OutboundFlowController outboundFlow,
223         OkHttpClientTransport transport,
224         int initialWindowSize,
225         String methodName) {
226       super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
227       this.lock = checkNotNull(lock, "lock");
228       this.frameWriter = frameWriter;
229       this.outboundFlow = outboundFlow;
230       this.transport = transport;
231       this.window = initialWindowSize;
232       this.processedWindow = initialWindowSize;
233       this.initialWindowSize = initialWindowSize;
234       tag = PerfMark.createTag(methodName);
235     }
236 
237     @SuppressWarnings("GuardedBy")
238     @GuardedBy("lock")
start(int streamId)239     public void start(int streamId) {
240       checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId);
241       id = streamId;
242       outboundFlowState = outboundFlow.createState(this, streamId);
243       // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock';
244       // instead found: 'this.lock'
245       state.onStreamAllocated();
246 
247       if (canStart) {
248         // Only happens when the stream has neither been started nor cancelled.
249         frameWriter.synStream(useGet, false, id, 0, requestHeaders);
250         statsTraceCtx.clientOutboundHeaders();
251         requestHeaders = null;
252 
253         if (pendingData.size() > 0) {
254           outboundFlow.data(
255               pendingDataHasEndOfStream, outboundFlowState, pendingData, flushPendingData);
256 
257         }
258         canStart = false;
259       }
260     }
261 
262     @GuardedBy("lock")
263     @Override
onStreamAllocated()264     protected void onStreamAllocated() {
265       super.onStreamAllocated();
266       getTransportTracer().reportLocalStreamStarted();
267     }
268 
269     @GuardedBy("lock")
270     @Override
http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)271     protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
272       cancel(status, stopDelivery, trailers);
273     }
274 
275     @Override
276     @GuardedBy("lock")
deframeFailed(Throwable cause)277     public void deframeFailed(Throwable cause) {
278       http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
279     }
280 
281     @Override
282     @GuardedBy("lock")
bytesRead(int processedBytes)283     public void bytesRead(int processedBytes) {
284       processedWindow -= processedBytes;
285       if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
286         int delta = initialWindowSize - processedWindow;
287         window += delta;
288         processedWindow += delta;
289         frameWriter.windowUpdate(id(), delta);
290       }
291     }
292 
293     @Override
294     @GuardedBy("lock")
deframerClosed(boolean hasPartialMessage)295     public void deframerClosed(boolean hasPartialMessage) {
296       onEndOfStream();
297       super.deframerClosed(hasPartialMessage);
298     }
299 
300     @Override
301     @GuardedBy("lock")
runOnTransportThread(final Runnable r)302     public void runOnTransportThread(final Runnable r) {
303       synchronized (lock) {
304         r.run();
305       }
306     }
307 
308     /**
309      * Must be called with holding the transport lock.
310      */
311     @GuardedBy("lock")
transportHeadersReceived(List<Header> headers, boolean endOfStream)312     public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
313       if (endOfStream) {
314         transportTrailersReceived(Utils.convertTrailers(headers));
315       } else {
316         transportHeadersReceived(Utils.convertHeaders(headers));
317       }
318     }
319 
320     /**
321      * Must be called with holding the transport lock.
322      */
323     @GuardedBy("lock")
transportDataReceived(okio.Buffer frame, boolean endOfStream)324     public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
325       // We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified
326       // in OkHttp's Http2 deframer. In addition, this code is after the data has been read.
327       int length = (int) frame.size();
328       window -= length;
329       if (window < 0) {
330         frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
331         transport.finishStream(
332             id(),
333             Status.INTERNAL.withDescription(
334                 "Received data size exceeded our receiving window size"),
335             PROCESSED, false, null, null);
336         return;
337       }
338       super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
339     }
340 
341     @GuardedBy("lock")
onEndOfStream()342     private void onEndOfStream() {
343       if (!isOutboundClosed()) {
344         // If server's end-of-stream is received before client sends end-of-stream, we just send a
345         // reset to server to fully close the server side stream.
346         transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null);
347       } else {
348         transport.finishStream(id(), null, PROCESSED, false, null, null);
349       }
350     }
351 
352     @SuppressWarnings("GuardedBy")
353     @GuardedBy("lock")
cancel(Status reason, boolean stopDelivery, Metadata trailers)354     private void cancel(Status reason, boolean stopDelivery, Metadata trailers) {
355       if (cancelSent) {
356         return;
357       }
358       cancelSent = true;
359       if (canStart) {
360         // stream is pending.
361         // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found:
362         // 'this.lock'
363         transport.removePendingStream(OkHttpClientStream.this);
364         // release holding data, so they can be GCed or returned to pool earlier.
365         requestHeaders = null;
366         pendingData.clear();
367         canStart = false;
368         transportReportStatus(reason, true, trailers != null ? trailers : new Metadata());
369       } else {
370         // If pendingData is null, start must have already been called, which means synStream has
371         // been called as well.
372         transport.finishStream(
373             id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers);
374       }
375     }
376 
377     @GuardedBy("lock")
sendBuffer(Buffer buffer, boolean endOfStream, boolean flush)378     private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) {
379       if (cancelSent) {
380         return;
381       }
382       if (canStart) {
383         // Stream is pending start, queue the data.
384         int dataSize = (int) buffer.size();
385         pendingData.write(buffer, dataSize);
386         pendingDataHasEndOfStream |= endOfStream;
387         flushPendingData |= flush;
388       } else {
389         checkState(id() != ABSENT_ID, "streamId should be set");
390         // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
391         // properly chunked.
392         outboundFlow.data(endOfStream, outboundFlowState, buffer, flush);
393       }
394     }
395 
396     @SuppressWarnings("GuardedBy")
397     @GuardedBy("lock")
streamReady(Metadata metadata, String path)398     private void streamReady(Metadata metadata, String path) {
399       requestHeaders =
400           Headers.createRequestHeaders(
401               metadata,
402               path,
403               authority,
404               userAgent,
405               useGet,
406               transport.isUsingPlaintext());
407       // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found:
408       // 'this.lock'
409       transport.streamReadyToStart(OkHttpClientStream.this);
410     }
411 
tag()412     Tag tag() {
413       return tag;
414     }
415 
id()416     int id() {
417       return id;
418     }
419 
getOutboundFlowState()420     OutboundFlowController.StreamState getOutboundFlowState() {
421       synchronized (lock) {
422         return outboundFlowState;
423       }
424     }
425   }
426 }
427