1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.squareup.okhttp.internal.framed; 18 19 import java.io.EOFException; 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.net.SocketTimeoutException; 23 import java.util.ArrayList; 24 import java.util.List; 25 import okio.AsyncTimeout; 26 import okio.Buffer; 27 import okio.BufferedSource; 28 import okio.Sink; 29 import okio.Source; 30 import okio.Timeout; 31 32 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 33 34 /** A logical bidirectional stream. */ 35 public final class FramedStream { 36 // Internal state is guarded by this. No long-running or potentially 37 // blocking operations are performed while the lock is held. 38 39 /** 40 * The total number of bytes consumed by the application (with {@link 41 * FramedDataSource#read}), but not yet acknowledged by sending a {@code 42 * WINDOW_UPDATE} frame on this stream. 43 */ 44 // Visible for testing 45 long unacknowledgedBytesRead = 0; 46 47 /** 48 * Count of bytes that can be written on the stream before receiving a 49 * window update. Even if this is positive, writes will block until there 50 * available bytes in {@code connection.bytesLeftInWriteWindow}. 51 */ 52 // guarded by this 53 long bytesLeftInWriteWindow; 54 55 private final int id; 56 private final FramedConnection connection; 57 58 /** Headers sent by the stream initiator. Immutable and non null. */ 59 private final List<Header> requestHeaders; 60 61 /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */ 62 private List<Header> responseHeaders; 63 64 private final FramedDataSource source; 65 final FramedDataSink sink; 66 private final StreamTimeout readTimeout = new StreamTimeout(); 67 private final StreamTimeout writeTimeout = new StreamTimeout(); 68 69 /** 70 * The reason why this stream was abnormally closed. If there are multiple 71 * reasons to abnormally close this stream (such as both peers closing it 72 * near-simultaneously) then this is the first reason known to this peer. 73 */ 74 private ErrorCode errorCode = null; 75 FramedStream(int id, FramedConnection connection, boolean outFinished, boolean inFinished, List<Header> requestHeaders)76 FramedStream(int id, FramedConnection connection, boolean outFinished, boolean inFinished, 77 List<Header> requestHeaders) { 78 if (connection == null) throw new NullPointerException("connection == null"); 79 if (requestHeaders == null) throw new NullPointerException("requestHeaders == null"); 80 this.id = id; 81 this.connection = connection; 82 this.bytesLeftInWriteWindow = 83 connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 84 this.source = new FramedDataSource( 85 connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE)); 86 this.sink = new FramedDataSink(); 87 this.source.finished = inFinished; 88 this.sink.finished = outFinished; 89 this.requestHeaders = requestHeaders; 90 } 91 getId()92 public int getId() { 93 return id; 94 } 95 96 /** 97 * Returns true if this stream is open. A stream is open until either: 98 * <ul> 99 * <li>A {@code SYN_RESET} frame abnormally terminates the stream. 100 * <li>Both input and output streams have transmitted all data and 101 * headers. 102 * </ul> 103 * Note that the input stream may continue to yield data even after a stream 104 * reports itself as not open. This is because input data is buffered. 105 */ isOpen()106 public synchronized boolean isOpen() { 107 if (errorCode != null) { 108 return false; 109 } 110 if ((source.finished || source.closed) 111 && (sink.finished || sink.closed) 112 && responseHeaders != null) { 113 return false; 114 } 115 return true; 116 } 117 118 /** Returns true if this stream was created by this peer. */ isLocallyInitiated()119 public boolean isLocallyInitiated() { 120 boolean streamIsClient = ((id & 1) == 1); 121 return connection.client == streamIsClient; 122 } 123 getConnection()124 public FramedConnection getConnection() { 125 return connection; 126 } 127 getRequestHeaders()128 public List<Header> getRequestHeaders() { 129 return requestHeaders; 130 } 131 132 /** 133 * Returns the stream's response headers, blocking if necessary if they 134 * have not been received yet. 135 */ getResponseHeaders()136 public synchronized List<Header> getResponseHeaders() throws IOException { 137 readTimeout.enter(); 138 try { 139 while (responseHeaders == null && errorCode == null) { 140 waitForIo(); 141 } 142 } finally { 143 readTimeout.exitAndThrowIfTimedOut(); 144 } 145 if (responseHeaders != null) return responseHeaders; 146 throw new IOException("stream was reset: " + errorCode); 147 } 148 149 /** 150 * Returns the reason why this stream was closed, or null if it closed 151 * normally or has not yet been closed. 152 */ getErrorCode()153 public synchronized ErrorCode getErrorCode() { 154 return errorCode; 155 } 156 157 /** 158 * Sends a reply to an incoming stream. 159 * 160 * @param out true to create an output stream that we can use to send data 161 * to the remote peer. Corresponds to {@code FLAG_FIN}. 162 */ reply(List<Header> responseHeaders, boolean out)163 public void reply(List<Header> responseHeaders, boolean out) throws IOException { 164 assert (!Thread.holdsLock(FramedStream.this)); 165 boolean outFinished = false; 166 synchronized (this) { 167 if (responseHeaders == null) { 168 throw new NullPointerException("responseHeaders == null"); 169 } 170 if (this.responseHeaders != null) { 171 throw new IllegalStateException("reply already sent"); 172 } 173 this.responseHeaders = responseHeaders; 174 if (!out) { 175 this.sink.finished = true; 176 outFinished = true; 177 } 178 } 179 connection.writeSynReply(id, outFinished, responseHeaders); 180 181 if (outFinished) { 182 connection.flush(); 183 } 184 } 185 readTimeout()186 public Timeout readTimeout() { 187 return readTimeout; 188 } 189 writeTimeout()190 public Timeout writeTimeout() { 191 return writeTimeout; 192 } 193 194 /** Returns a source that reads data from the peer. */ getSource()195 public Source getSource() { 196 return source; 197 } 198 199 /** 200 * Returns a sink that can be used to write data to the peer. 201 * 202 * @throws IllegalStateException if this stream was initiated by the peer 203 * and a {@link #reply} has not yet been sent. 204 */ getSink()205 public Sink getSink() { 206 synchronized (this) { 207 if (responseHeaders == null && !isLocallyInitiated()) { 208 throw new IllegalStateException("reply before requesting the sink"); 209 } 210 } 211 return sink; 212 } 213 214 /** 215 * Abnormally terminate this stream. This blocks until the {@code RST_STREAM} 216 * frame has been transmitted. 217 */ close(ErrorCode rstStatusCode)218 public void close(ErrorCode rstStatusCode) throws IOException { 219 if (!closeInternal(rstStatusCode)) { 220 return; // Already closed. 221 } 222 connection.writeSynReset(id, rstStatusCode); 223 } 224 225 /** 226 * Abnormally terminate this stream. This enqueues a {@code RST_STREAM} 227 * frame and returns immediately. 228 */ closeLater(ErrorCode errorCode)229 public void closeLater(ErrorCode errorCode) { 230 if (!closeInternal(errorCode)) { 231 return; // Already closed. 232 } 233 connection.writeSynResetLater(id, errorCode); 234 } 235 236 /** Returns true if this stream was closed. */ closeInternal(ErrorCode errorCode)237 private boolean closeInternal(ErrorCode errorCode) { 238 assert (!Thread.holdsLock(this)); 239 synchronized (this) { 240 if (this.errorCode != null) { 241 return false; 242 } 243 if (source.finished && sink.finished) { 244 return false; 245 } 246 this.errorCode = errorCode; 247 notifyAll(); 248 } 249 connection.removeStream(id); 250 return true; 251 } 252 receiveHeaders(List<Header> headers, HeadersMode headersMode)253 void receiveHeaders(List<Header> headers, HeadersMode headersMode) { 254 assert (!Thread.holdsLock(FramedStream.this)); 255 ErrorCode errorCode = null; 256 boolean open = true; 257 synchronized (this) { 258 if (responseHeaders == null) { 259 if (headersMode.failIfHeadersAbsent()) { 260 errorCode = ErrorCode.PROTOCOL_ERROR; 261 } else { 262 responseHeaders = headers; 263 open = isOpen(); 264 notifyAll(); 265 } 266 } else { 267 if (headersMode.failIfHeadersPresent()) { 268 errorCode = ErrorCode.STREAM_IN_USE; 269 } else { 270 List<Header> newHeaders = new ArrayList<>(); 271 newHeaders.addAll(responseHeaders); 272 newHeaders.addAll(headers); 273 this.responseHeaders = newHeaders; 274 } 275 } 276 } 277 if (errorCode != null) { 278 closeLater(errorCode); 279 } else if (!open) { 280 connection.removeStream(id); 281 } 282 } 283 receiveData(BufferedSource in, int length)284 void receiveData(BufferedSource in, int length) throws IOException { 285 assert (!Thread.holdsLock(FramedStream.this)); 286 this.source.receive(in, length); 287 } 288 receiveFin()289 void receiveFin() { 290 assert (!Thread.holdsLock(FramedStream.this)); 291 boolean open; 292 synchronized (this) { 293 this.source.finished = true; 294 open = isOpen(); 295 notifyAll(); 296 } 297 if (!open) { 298 connection.removeStream(id); 299 } 300 } 301 receiveRstStream(ErrorCode errorCode)302 synchronized void receiveRstStream(ErrorCode errorCode) { 303 if (this.errorCode == null) { 304 this.errorCode = errorCode; 305 notifyAll(); 306 } 307 } 308 309 /** 310 * A source that reads the incoming data frames of a stream. Although this 311 * class uses synchronization to safely receive incoming data frames, it is 312 * not intended for use by multiple readers. 313 */ 314 private final class FramedDataSource implements Source { 315 /** Buffer to receive data from the network into. Only accessed by the reader thread. */ 316 private final Buffer receiveBuffer = new Buffer(); 317 318 /** Buffer with readable data. Guarded by FramedStream.this. */ 319 private final Buffer readBuffer = new Buffer(); 320 321 /** Maximum number of bytes to buffer before reporting a flow control error. */ 322 private final long maxByteCount; 323 324 /** True if the caller has closed this stream. */ 325 private boolean closed; 326 327 /** 328 * True if either side has cleanly shut down this stream. We will 329 * receive no more bytes beyond those already in the buffer. 330 */ 331 private boolean finished; 332 FramedDataSource(long maxByteCount)333 private FramedDataSource(long maxByteCount) { 334 this.maxByteCount = maxByteCount; 335 } 336 read(Buffer sink, long byteCount)337 @Override public long read(Buffer sink, long byteCount) 338 throws IOException { 339 if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); 340 341 long read; 342 synchronized (FramedStream.this) { 343 waitUntilReadable(); 344 checkNotClosed(); 345 if (readBuffer.size() == 0) return -1; // This source is exhausted. 346 347 // Move bytes from the read buffer into the caller's buffer. 348 read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size())); 349 350 // Flow control: notify the peer that we're ready for more data! 351 unacknowledgedBytesRead += read; 352 if (unacknowledgedBytesRead 353 >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { 354 connection.writeWindowUpdateLater(id, unacknowledgedBytesRead); 355 unacknowledgedBytesRead = 0; 356 } 357 } 358 359 // Update connection.unacknowledgedBytesRead outside the stream lock. 360 synchronized (connection) { // Multiple application threads may hit this section. 361 connection.unacknowledgedBytesRead += read; 362 if (connection.unacknowledgedBytesRead 363 >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { 364 connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead); 365 connection.unacknowledgedBytesRead = 0; 366 } 367 } 368 369 return read; 370 } 371 372 /** Returns once the source is either readable or finished. */ waitUntilReadable()373 private void waitUntilReadable() throws IOException { 374 readTimeout.enter(); 375 try { 376 while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) { 377 waitForIo(); 378 } 379 } finally { 380 readTimeout.exitAndThrowIfTimedOut(); 381 } 382 } 383 receive(BufferedSource in, long byteCount)384 void receive(BufferedSource in, long byteCount) throws IOException { 385 assert (!Thread.holdsLock(FramedStream.this)); 386 387 while (byteCount > 0) { 388 boolean finished; 389 boolean flowControlError; 390 synchronized (FramedStream.this) { 391 finished = this.finished; 392 flowControlError = byteCount + readBuffer.size() > maxByteCount; 393 } 394 395 // If the peer sends more data than we can handle, discard it and close the connection. 396 if (flowControlError) { 397 in.skip(byteCount); 398 closeLater(ErrorCode.FLOW_CONTROL_ERROR); 399 return; 400 } 401 402 // Discard data received after the stream is finished. It's probably a benign race. 403 if (finished) { 404 in.skip(byteCount); 405 return; 406 } 407 408 // Fill the receive buffer without holding any locks. 409 long read = in.read(receiveBuffer, byteCount); 410 if (read == -1) throw new EOFException(); 411 byteCount -= read; 412 413 // Move the received data to the read buffer to the reader can read it. 414 synchronized (FramedStream.this) { 415 boolean wasEmpty = readBuffer.size() == 0; 416 readBuffer.writeAll(receiveBuffer); 417 if (wasEmpty) { 418 FramedStream.this.notifyAll(); 419 } 420 } 421 } 422 } 423 timeout()424 @Override public Timeout timeout() { 425 return readTimeout; 426 } 427 close()428 @Override public void close() throws IOException { 429 synchronized (FramedStream.this) { 430 closed = true; 431 readBuffer.clear(); 432 FramedStream.this.notifyAll(); 433 } 434 cancelStreamIfNecessary(); 435 } 436 checkNotClosed()437 private void checkNotClosed() throws IOException { 438 if (closed) { 439 throw new IOException("stream closed"); 440 } 441 if (errorCode != null) { 442 throw new IOException("stream was reset: " + errorCode); 443 } 444 } 445 } 446 cancelStreamIfNecessary()447 private void cancelStreamIfNecessary() throws IOException { 448 assert (!Thread.holdsLock(FramedStream.this)); 449 boolean open; 450 boolean cancel; 451 synchronized (this) { 452 cancel = !source.finished && source.closed && (sink.finished || sink.closed); 453 open = isOpen(); 454 } 455 if (cancel) { 456 // RST this stream to prevent additional data from being sent. This 457 // is safe because the input stream is closed (we won't use any 458 // further bytes) and the output stream is either finished or closed 459 // (so RSTing both streams doesn't cause harm). 460 FramedStream.this.close(ErrorCode.CANCEL); 461 } else if (!open) { 462 connection.removeStream(id); 463 } 464 } 465 466 /** 467 * A sink that writes outgoing data frames of a stream. This class is not 468 * thread safe. 469 */ 470 final class FramedDataSink implements Sink { 471 private static final long EMIT_BUFFER_SIZE = 16384; 472 473 /** 474 * Buffer of outgoing data. This batches writes of small writes into this sink as larges 475 * frames written to the outgoing connection. Batching saves the (small) framing overhead. 476 */ 477 private final Buffer sendBuffer = new Buffer(); 478 479 private boolean closed; 480 481 /** 482 * True if either side has cleanly shut down this stream. We shall send 483 * no more bytes. 484 */ 485 private boolean finished; 486 write(Buffer source, long byteCount)487 @Override public void write(Buffer source, long byteCount) throws IOException { 488 assert (!Thread.holdsLock(FramedStream.this)); 489 sendBuffer.write(source, byteCount); 490 while (sendBuffer.size() >= EMIT_BUFFER_SIZE) { 491 emitDataFrame(false); 492 } 493 } 494 495 /** 496 * Emit a single data frame to the connection. The frame's size be limited by this stream's 497 * write window. This method will block until the write window is nonempty. 498 */ emitDataFrame(boolean outFinished)499 private void emitDataFrame(boolean outFinished) throws IOException { 500 long toWrite; 501 synchronized (FramedStream.this) { 502 writeTimeout.enter(); 503 try { 504 while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) { 505 waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream. 506 } 507 } finally { 508 writeTimeout.exitAndThrowIfTimedOut(); 509 } 510 511 checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting. 512 toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size()); 513 bytesLeftInWriteWindow -= toWrite; 514 } 515 516 writeTimeout.enter(); 517 try { 518 connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite); 519 } finally { 520 writeTimeout.exitAndThrowIfTimedOut(); 521 } 522 } 523 flush()524 @Override public void flush() throws IOException { 525 assert (!Thread.holdsLock(FramedStream.this)); 526 synchronized (FramedStream.this) { 527 checkOutNotClosed(); 528 } 529 while (sendBuffer.size() > 0) { 530 emitDataFrame(false); 531 connection.flush(); 532 } 533 } 534 timeout()535 @Override public Timeout timeout() { 536 return writeTimeout; 537 } 538 close()539 @Override public void close() throws IOException { 540 assert (!Thread.holdsLock(FramedStream.this)); 541 synchronized (FramedStream.this) { 542 if (closed) return; 543 } 544 if (!sink.finished) { 545 // Emit the remaining data, setting the END_STREAM flag on the last frame. 546 if (sendBuffer.size() > 0) { 547 while (sendBuffer.size() > 0) { 548 emitDataFrame(true); 549 } 550 } else { 551 // Send an empty frame just so we can set the END_STREAM flag. 552 connection.writeData(id, true, null, 0); 553 } 554 } 555 synchronized (FramedStream.this) { 556 closed = true; 557 } 558 connection.flush(); 559 cancelStreamIfNecessary(); 560 } 561 } 562 563 /** 564 * {@code delta} will be negative if a settings frame initial window is 565 * smaller than the last. 566 */ addBytesToWriteWindow(long delta)567 void addBytesToWriteWindow(long delta) { 568 bytesLeftInWriteWindow += delta; 569 if (delta > 0) FramedStream.this.notifyAll(); 570 } 571 checkOutNotClosed()572 private void checkOutNotClosed() throws IOException { 573 if (sink.closed) { 574 throw new IOException("stream closed"); 575 } else if (sink.finished) { 576 throw new IOException("stream finished"); 577 } else if (errorCode != null) { 578 throw new IOException("stream was reset: " + errorCode); 579 } 580 } 581 582 /** 583 * Like {@link #wait}, but throws an {@code InterruptedIOException} when 584 * interrupted instead of the more awkward {@link InterruptedException}. 585 */ waitForIo()586 private void waitForIo() throws InterruptedIOException { 587 try { 588 wait(); 589 } catch (InterruptedException e) { 590 throw new InterruptedIOException(); 591 } 592 } 593 594 /** 595 * The Okio timeout watchdog will call {@link #timedOut} if the timeout is 596 * reached. In that case we close the stream (asynchronously) which will 597 * notify the waiting thread. 598 */ 599 class StreamTimeout extends AsyncTimeout { timedOut()600 @Override protected void timedOut() { 601 closeLater(ErrorCode.CANCEL); 602 } 603 newTimeoutException(IOException cause)604 @Override protected IOException newTimeoutException(IOException cause) { 605 SocketTimeoutException socketTimeoutException = new SocketTimeoutException("timeout"); 606 if (cause != null) { 607 socketTimeoutException.initCause(cause); 608 } 609 return socketTimeoutException; 610 } 611 exitAndThrowIfTimedOut()612 public void exitAndThrowIfTimedOut() throws IOException { 613 if (exit()) throw newTimeoutException(null /* cause */); 614 } 615 } 616 } 617