1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; 22 23 import com.google.common.io.BaseEncoding; 24 import io.grpc.Attributes; 25 import io.grpc.CallOptions; 26 import io.grpc.Metadata; 27 import io.grpc.MethodDescriptor; 28 import io.grpc.Status; 29 import io.grpc.internal.AbstractClientStream; 30 import io.grpc.internal.Http2ClientStreamTransportState; 31 import io.grpc.internal.StatsTraceContext; 32 import io.grpc.internal.TransportTracer; 33 import io.grpc.internal.WritableBuffer; 34 import io.grpc.okhttp.internal.framed.ErrorCode; 35 import io.grpc.okhttp.internal.framed.Header; 36 import io.perfmark.PerfMark; 37 import io.perfmark.Tag; 38 import io.perfmark.TaskCloseable; 39 import java.util.List; 40 import javax.annotation.concurrent.GuardedBy; 41 import okio.Buffer; 42 43 /** 44 * Client stream for the okhttp transport. 45 */ 46 class OkHttpClientStream extends AbstractClientStream { 47 48 private static final Buffer EMPTY_BUFFER = new Buffer(); 49 50 public static final int ABSENT_ID = -1; 51 52 private final MethodDescriptor<?, ?> method; 53 54 private final String userAgent; 55 private final StatsTraceContext statsTraceCtx; 56 private String authority; 57 private final TransportState state; 58 private final Sink sink = new Sink(); 59 private final Attributes attributes; 60 61 private boolean useGet = false; 62 OkHttpClientStream( MethodDescriptor<?, ?> method, Metadata headers, ExceptionHandlingFrameWriter frameWriter, OkHttpClientTransport transport, OutboundFlowController outboundFlow, Object lock, int maxMessageSize, int initialWindowSize, String authority, String userAgent, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, CallOptions callOptions, boolean useGetForSafeMethods)63 OkHttpClientStream( 64 MethodDescriptor<?, ?> method, 65 Metadata headers, 66 ExceptionHandlingFrameWriter frameWriter, 67 OkHttpClientTransport transport, 68 OutboundFlowController outboundFlow, 69 Object lock, 70 int maxMessageSize, 71 int initialWindowSize, 72 String authority, 73 String userAgent, 74 StatsTraceContext statsTraceCtx, 75 TransportTracer transportTracer, 76 CallOptions callOptions, 77 boolean useGetForSafeMethods) { 78 super( 79 new OkHttpWritableBufferAllocator(), 80 statsTraceCtx, 81 transportTracer, 82 headers, 83 callOptions, 84 useGetForSafeMethods && method.isSafe()); 85 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 86 this.method = method; 87 this.authority = authority; 88 this.userAgent = userAgent; 89 // OkHttpClientStream is only created after the transport has finished connecting, 90 // so it is safe to read the transport attributes. 91 // We make a copy here for convenience, even though we can ask the transport. 92 this.attributes = transport.getAttributes(); 93 this.state = 94 new TransportState( 95 maxMessageSize, 96 statsTraceCtx, 97 lock, 98 frameWriter, 99 outboundFlow, 100 transport, 101 initialWindowSize, 102 method.getFullMethodName()); 103 } 104 105 @Override transportState()106 protected TransportState transportState() { 107 return state; 108 } 109 110 @Override abstractClientStreamSink()111 protected Sink abstractClientStreamSink() { 112 return sink; 113 } 114 115 /** 116 * Returns the type of this stream. 117 */ getType()118 public MethodDescriptor.MethodType getType() { 119 return method.getType(); 120 } 121 122 /** 123 * Returns whether the stream uses GET. This is not known until after {@link Sink#writeHeaders} is 124 * invoked. 125 */ useGet()126 boolean useGet() { 127 return useGet; 128 } 129 130 @Override setAuthority(String authority)131 public void setAuthority(String authority) { 132 this.authority = checkNotNull(authority, "authority"); 133 } 134 135 @Override getAttributes()136 public Attributes getAttributes() { 137 return attributes; 138 } 139 140 class Sink implements AbstractClientStream.Sink { 141 @Override writeHeaders(Metadata metadata, byte[] payload)142 public void writeHeaders(Metadata metadata, byte[] payload) { 143 try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeHeaders")) { 144 String defaultPath = "/" + method.getFullMethodName(); 145 if (payload != null) { 146 useGet = true; 147 defaultPath += "?" + BaseEncoding.base64().encode(payload); 148 } 149 synchronized (state.lock) { 150 state.streamReady(metadata, defaultPath); 151 } 152 } 153 } 154 155 @Override writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)156 public void writeFrame( 157 WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { 158 try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeFrame")) { 159 Buffer buffer; 160 if (frame == null) { 161 buffer = EMPTY_BUFFER; 162 } else { 163 buffer = ((OkHttpWritableBuffer) frame).buffer(); 164 int size = (int) buffer.size(); 165 if (size > 0) { 166 onSendingBytes(size); 167 } 168 } 169 170 synchronized (state.lock) { 171 state.sendBuffer(buffer, endOfStream, flush); 172 getTransportTracer().reportMessageSent(numMessages); 173 } 174 } 175 } 176 177 @Override cancel(Status reason)178 public void cancel(Status reason) { 179 try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) { 180 synchronized (state.lock) { 181 state.cancel(reason, true, null); 182 } 183 } 184 } 185 } 186 187 class TransportState extends Http2ClientStreamTransportState 188 implements OutboundFlowController.Stream { 189 private final int initialWindowSize; 190 private final Object lock; 191 @GuardedBy("lock") 192 private List<Header> requestHeaders; 193 @GuardedBy("lock") 194 private Buffer pendingData = new Buffer(); 195 private boolean pendingDataHasEndOfStream = false; 196 private boolean flushPendingData = false; 197 @GuardedBy("lock") 198 private boolean cancelSent = false; 199 @GuardedBy("lock") 200 private int window; 201 @GuardedBy("lock") 202 private int processedWindow; 203 @GuardedBy("lock") 204 private final ExceptionHandlingFrameWriter frameWriter; 205 @GuardedBy("lock") 206 private final OutboundFlowController outboundFlow; 207 @GuardedBy("lock") 208 private final OkHttpClientTransport transport; 209 /** True iff neither {@link #cancel} nor {@link #start(int)} have been called. */ 210 @GuardedBy("lock") 211 private boolean canStart = true; 212 private final Tag tag; 213 @GuardedBy("lock") 214 private OutboundFlowController.StreamState outboundFlowState; 215 private int id = ABSENT_ID; 216 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, ExceptionHandlingFrameWriter frameWriter, OutboundFlowController outboundFlow, OkHttpClientTransport transport, int initialWindowSize, String methodName)217 public TransportState( 218 int maxMessageSize, 219 StatsTraceContext statsTraceCtx, 220 Object lock, 221 ExceptionHandlingFrameWriter frameWriter, 222 OutboundFlowController outboundFlow, 223 OkHttpClientTransport transport, 224 int initialWindowSize, 225 String methodName) { 226 super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer()); 227 this.lock = checkNotNull(lock, "lock"); 228 this.frameWriter = frameWriter; 229 this.outboundFlow = outboundFlow; 230 this.transport = transport; 231 this.window = initialWindowSize; 232 this.processedWindow = initialWindowSize; 233 this.initialWindowSize = initialWindowSize; 234 tag = PerfMark.createTag(methodName); 235 } 236 237 @SuppressWarnings("GuardedBy") 238 @GuardedBy("lock") start(int streamId)239 public void start(int streamId) { 240 checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); 241 id = streamId; 242 outboundFlowState = outboundFlow.createState(this, streamId); 243 // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock'; 244 // instead found: 'this.lock' 245 state.onStreamAllocated(); 246 247 if (canStart) { 248 // Only happens when the stream has neither been started nor cancelled. 249 frameWriter.synStream(useGet, false, id, 0, requestHeaders); 250 statsTraceCtx.clientOutboundHeaders(); 251 requestHeaders = null; 252 253 if (pendingData.size() > 0) { 254 outboundFlow.data( 255 pendingDataHasEndOfStream, outboundFlowState, pendingData, flushPendingData); 256 257 } 258 canStart = false; 259 } 260 } 261 262 @GuardedBy("lock") 263 @Override onStreamAllocated()264 protected void onStreamAllocated() { 265 super.onStreamAllocated(); 266 getTransportTracer().reportLocalStreamStarted(); 267 } 268 269 @GuardedBy("lock") 270 @Override http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)271 protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { 272 cancel(status, stopDelivery, trailers); 273 } 274 275 @Override 276 @GuardedBy("lock") deframeFailed(Throwable cause)277 public void deframeFailed(Throwable cause) { 278 http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata()); 279 } 280 281 @Override 282 @GuardedBy("lock") bytesRead(int processedBytes)283 public void bytesRead(int processedBytes) { 284 processedWindow -= processedBytes; 285 if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) { 286 int delta = initialWindowSize - processedWindow; 287 window += delta; 288 processedWindow += delta; 289 frameWriter.windowUpdate(id(), delta); 290 } 291 } 292 293 @Override 294 @GuardedBy("lock") deframerClosed(boolean hasPartialMessage)295 public void deframerClosed(boolean hasPartialMessage) { 296 onEndOfStream(); 297 super.deframerClosed(hasPartialMessage); 298 } 299 300 @Override 301 @GuardedBy("lock") runOnTransportThread(final Runnable r)302 public void runOnTransportThread(final Runnable r) { 303 synchronized (lock) { 304 r.run(); 305 } 306 } 307 308 /** 309 * Must be called with holding the transport lock. 310 */ 311 @GuardedBy("lock") transportHeadersReceived(List<Header> headers, boolean endOfStream)312 public void transportHeadersReceived(List<Header> headers, boolean endOfStream) { 313 if (endOfStream) { 314 transportTrailersReceived(Utils.convertTrailers(headers)); 315 } else { 316 transportHeadersReceived(Utils.convertHeaders(headers)); 317 } 318 } 319 320 /** 321 * Must be called with holding the transport lock. 322 */ 323 @GuardedBy("lock") transportDataReceived(okio.Buffer frame, boolean endOfStream)324 public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { 325 // We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified 326 // in OkHttp's Http2 deframer. In addition, this code is after the data has been read. 327 int length = (int) frame.size(); 328 window -= length; 329 if (window < 0) { 330 frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); 331 transport.finishStream( 332 id(), 333 Status.INTERNAL.withDescription( 334 "Received data size exceeded our receiving window size"), 335 PROCESSED, false, null, null); 336 return; 337 } 338 super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); 339 } 340 341 @GuardedBy("lock") onEndOfStream()342 private void onEndOfStream() { 343 if (!isOutboundClosed()) { 344 // If server's end-of-stream is received before client sends end-of-stream, we just send a 345 // reset to server to fully close the server side stream. 346 transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null); 347 } else { 348 transport.finishStream(id(), null, PROCESSED, false, null, null); 349 } 350 } 351 352 @SuppressWarnings("GuardedBy") 353 @GuardedBy("lock") cancel(Status reason, boolean stopDelivery, Metadata trailers)354 private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { 355 if (cancelSent) { 356 return; 357 } 358 cancelSent = true; 359 if (canStart) { 360 // stream is pending. 361 // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: 362 // 'this.lock' 363 transport.removePendingStream(OkHttpClientStream.this); 364 // release holding data, so they can be GCed or returned to pool earlier. 365 requestHeaders = null; 366 pendingData.clear(); 367 canStart = false; 368 transportReportStatus(reason, true, trailers != null ? trailers : new Metadata()); 369 } else { 370 // If pendingData is null, start must have already been called, which means synStream has 371 // been called as well. 372 transport.finishStream( 373 id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers); 374 } 375 } 376 377 @GuardedBy("lock") sendBuffer(Buffer buffer, boolean endOfStream, boolean flush)378 private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) { 379 if (cancelSent) { 380 return; 381 } 382 if (canStart) { 383 // Stream is pending start, queue the data. 384 int dataSize = (int) buffer.size(); 385 pendingData.write(buffer, dataSize); 386 pendingDataHasEndOfStream |= endOfStream; 387 flushPendingData |= flush; 388 } else { 389 checkState(id() != ABSENT_ID, "streamId should be set"); 390 // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is 391 // properly chunked. 392 outboundFlow.data(endOfStream, outboundFlowState, buffer, flush); 393 } 394 } 395 396 @SuppressWarnings("GuardedBy") 397 @GuardedBy("lock") streamReady(Metadata metadata, String path)398 private void streamReady(Metadata metadata, String path) { 399 requestHeaders = 400 Headers.createRequestHeaders( 401 metadata, 402 path, 403 authority, 404 userAgent, 405 useGet, 406 transport.isUsingPlaintext()); 407 // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: 408 // 'this.lock' 409 transport.streamReadyToStart(OkHttpClientStream.this); 410 } 411 tag()412 Tag tag() { 413 return tag; 414 } 415 id()416 int id() { 417 return id; 418 } 419 getOutboundFlowState()420 OutboundFlowController.StreamState getOutboundFlowState() { 421 synchronized (lock) { 422 return outboundFlowState; 423 } 424 } 425 } 426 } 427