1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID; 20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; 21 import static java.lang.Math.ceil; 22 import static java.lang.Math.max; 23 import static java.lang.Math.min; 24 25 import com.google.common.base.Preconditions; 26 import io.grpc.okhttp.internal.framed.FrameWriter; 27 import java.io.IOException; 28 import java.util.ArrayDeque; 29 import java.util.Queue; 30 import javax.annotation.Nullable; 31 import okio.Buffer; 32 33 /** 34 * Simple outbound flow controller that evenly splits the connection window across all existing 35 * streams. 36 */ 37 class OutboundFlowController { 38 private final OkHttpClientTransport transport; 39 private final FrameWriter frameWriter; 40 private int initialWindowSize = DEFAULT_WINDOW_SIZE; 41 private final OutboundFlowState connectionState = new OutboundFlowState(CONNECTION_STREAM_ID); 42 OutboundFlowController(OkHttpClientTransport transport, FrameWriter frameWriter)43 OutboundFlowController(OkHttpClientTransport transport, FrameWriter frameWriter) { 44 this.transport = Preconditions.checkNotNull(transport, "transport"); 45 this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); 46 } 47 48 /** 49 * Adjusts outbound window size requested by peer. When window size is increased, it does not send 50 * any pending frames. If this method returns {@code true}, the caller should call {@link 51 * #writeStreams()} after settings ack. 52 * 53 * <p>Must be called with holding transport lock. 54 * 55 * @return true, if new window size is increased, false otherwise. 56 */ initialOutboundWindowSize(int newWindowSize)57 boolean initialOutboundWindowSize(int newWindowSize) { 58 if (newWindowSize < 0) { 59 throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); 60 } 61 62 int delta = newWindowSize - initialWindowSize; 63 initialWindowSize = newWindowSize; 64 for (OkHttpClientStream stream : transport.getActiveStreams()) { 65 OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState(); 66 if (state == null) { 67 // Create the OutboundFlowState with the new window size. 68 state = new OutboundFlowState(stream); 69 stream.setOutboundFlowState(state); 70 } else { 71 state.incrementStreamWindow(delta); 72 } 73 } 74 75 return delta > 0; 76 } 77 78 /** 79 * Update the outbound window for given stream, or for the connection if stream is null. Returns 80 * the new value of the window size. 81 * 82 * <p>Must be called with holding transport lock. 83 */ windowUpdate(@ullable OkHttpClientStream stream, int delta)84 int windowUpdate(@Nullable OkHttpClientStream stream, int delta) { 85 final int updatedWindow; 86 if (stream == null) { 87 // Update the connection window and write any pending frames for all streams. 88 updatedWindow = connectionState.incrementStreamWindow(delta); 89 writeStreams(); 90 } else { 91 // Update the stream window and write any pending frames for the stream. 92 OutboundFlowState state = state(stream); 93 updatedWindow = state.incrementStreamWindow(delta); 94 95 WriteStatus writeStatus = new WriteStatus(); 96 state.writeBytes(state.writableWindow(), writeStatus); 97 if (writeStatus.hasWritten()) { 98 flush(); 99 } 100 } 101 return updatedWindow; 102 } 103 104 /** 105 * Must be called with holding transport lock. 106 */ data(boolean outFinished, int streamId, Buffer source, boolean flush)107 void data(boolean outFinished, int streamId, Buffer source, boolean flush) { 108 Preconditions.checkNotNull(source, "source"); 109 110 OkHttpClientStream stream = transport.getStream(streamId); 111 if (stream == null) { 112 // This is possible for a stream that has received end-of-stream from server (but hasn't sent 113 // end-of-stream), and was removed from the transport stream map. 114 // In such case, we just throw away the data. 115 return; 116 } 117 118 OutboundFlowState state = state(stream); 119 int window = state.writableWindow(); 120 boolean framesAlreadyQueued = state.hasFrame(); 121 122 OutboundFlowState.Frame frame = state.newFrame(source, outFinished); 123 if (!framesAlreadyQueued && window >= frame.size()) { 124 // Window size is large enough to send entire data frame 125 frame.write(); 126 if (flush) { 127 flush(); 128 } 129 return; 130 } 131 132 // Enqueue the frame to be written when the window size permits. 133 frame.enqueue(); 134 135 if (framesAlreadyQueued || window <= 0) { 136 // Stream already has frames pending or is stalled, don't send anything now. 137 if (flush) { 138 flush(); 139 } 140 return; 141 } 142 143 // Create and send a partial frame up to the window size. 144 frame.split(window).write(); 145 if (flush) { 146 flush(); 147 } 148 } 149 flush()150 void flush() { 151 try { 152 frameWriter.flush(); 153 } catch (IOException e) { 154 throw new RuntimeException(e); 155 } 156 } 157 state(OkHttpClientStream stream)158 private OutboundFlowState state(OkHttpClientStream stream) { 159 OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState(); 160 if (state == null) { 161 state = new OutboundFlowState(stream); 162 stream.setOutboundFlowState(state); 163 } 164 return state; 165 } 166 167 /** 168 * Writes as much data for all the streams as possible given the current flow control windows. 169 * 170 * <p>Must be called with holding transport lock. 171 */ writeStreams()172 void writeStreams() { 173 OkHttpClientStream[] streams = transport.getActiveStreams(); 174 int connectionWindow = connectionState.window(); 175 for (int numStreams = streams.length; numStreams > 0 && connectionWindow > 0;) { 176 int nextNumStreams = 0; 177 int windowSlice = (int) ceil(connectionWindow / (float) numStreams); 178 for (int index = 0; index < numStreams && connectionWindow > 0; ++index) { 179 OkHttpClientStream stream = streams[index]; 180 OutboundFlowState state = state(stream); 181 182 int bytesForStream = min(connectionWindow, min(state.unallocatedBytes(), windowSlice)); 183 if (bytesForStream > 0) { 184 state.allocateBytes(bytesForStream); 185 connectionWindow -= bytesForStream; 186 } 187 188 if (state.unallocatedBytes() > 0) { 189 // There is more data to process for this stream. Add it to the next 190 // pass. 191 streams[nextNumStreams++] = stream; 192 } 193 } 194 numStreams = nextNumStreams; 195 } 196 197 // Now take one last pass through all of the streams and write any allocated bytes. 198 WriteStatus writeStatus = new WriteStatus(); 199 for (OkHttpClientStream stream : transport.getActiveStreams()) { 200 OutboundFlowState state = state(stream); 201 state.writeBytes(state.allocatedBytes(), writeStatus); 202 state.clearAllocatedBytes(); 203 } 204 205 if (writeStatus.hasWritten()) { 206 flush(); 207 } 208 } 209 210 /** 211 * Simple status that keeps track of the number of writes performed. 212 */ 213 private static final class WriteStatus { 214 int numWrites; 215 incrementNumWrites()216 void incrementNumWrites() { 217 numWrites++; 218 } 219 hasWritten()220 boolean hasWritten() { 221 return numWrites > 0; 222 } 223 } 224 225 /** 226 * The outbound flow control state for a single stream. 227 */ 228 private final class OutboundFlowState { 229 final Queue<Frame> pendingWriteQueue; 230 final int streamId; 231 int queuedBytes; 232 int window = initialWindowSize; 233 int allocatedBytes; 234 OkHttpClientStream stream; 235 OutboundFlowState(int streamId)236 OutboundFlowState(int streamId) { 237 this.streamId = streamId; 238 pendingWriteQueue = new ArrayDeque<Frame>(2); 239 } 240 OutboundFlowState(OkHttpClientStream stream)241 OutboundFlowState(OkHttpClientStream stream) { 242 this(stream.id()); 243 this.stream = stream; 244 } 245 window()246 int window() { 247 return window; 248 } 249 allocateBytes(int bytes)250 void allocateBytes(int bytes) { 251 allocatedBytes += bytes; 252 } 253 allocatedBytes()254 int allocatedBytes() { 255 return allocatedBytes; 256 } 257 unallocatedBytes()258 int unallocatedBytes() { 259 return streamableBytes() - allocatedBytes; 260 } 261 clearAllocatedBytes()262 void clearAllocatedBytes() { 263 allocatedBytes = 0; 264 } 265 266 /** 267 * Increments the flow control window for this stream by the given delta and returns the new 268 * value. 269 */ incrementStreamWindow(int delta)270 int incrementStreamWindow(int delta) { 271 if (delta > 0 && Integer.MAX_VALUE - delta < window) { 272 throw new IllegalArgumentException("Window size overflow for stream: " + streamId); 273 } 274 window += delta; 275 276 return window; 277 } 278 279 /** 280 * Returns the maximum writable window (minimum of the stream and connection windows). 281 */ writableWindow()282 int writableWindow() { 283 return min(window, connectionState.window()); 284 } 285 streamableBytes()286 int streamableBytes() { 287 return max(0, min(window, queuedBytes)); 288 } 289 290 /** 291 * Creates a new frame with the given values but does not add it to the pending queue. 292 */ newFrame(Buffer data, boolean endStream)293 Frame newFrame(Buffer data, boolean endStream) { 294 return new Frame(data, endStream); 295 } 296 297 /** 298 * Indicates whether or not there are frames in the pending queue. 299 */ hasFrame()300 boolean hasFrame() { 301 return !pendingWriteQueue.isEmpty(); 302 } 303 304 /** 305 * Returns the head of the pending queue, or {@code null} if empty. 306 */ peek()307 private Frame peek() { 308 return pendingWriteQueue.peek(); 309 } 310 311 /** 312 * Writes up to the number of bytes from the pending queue. 313 */ writeBytes(int bytes, WriteStatus writeStatus)314 int writeBytes(int bytes, WriteStatus writeStatus) { 315 int bytesAttempted = 0; 316 int maxBytes = min(bytes, writableWindow()); 317 while (hasFrame()) { 318 Frame pendingWrite = peek(); 319 if (maxBytes >= pendingWrite.size()) { 320 // Window size is large enough to send entire data frame 321 writeStatus.incrementNumWrites(); 322 bytesAttempted += pendingWrite.size(); 323 pendingWrite.write(); 324 } else if (maxBytes <= 0) { 325 // No data from the current frame can be written - we're done. 326 // We purposely check this after first testing the size of the 327 // pending frame to properly handle zero-length frame. 328 break; 329 } else { 330 // We can send a partial frame 331 Frame partialFrame = pendingWrite.split(maxBytes); 332 writeStatus.incrementNumWrites(); 333 bytesAttempted += partialFrame.size(); 334 partialFrame.write(); 335 } 336 337 // Update the threshold. 338 maxBytes = min(bytes - bytesAttempted, writableWindow()); 339 } 340 return bytesAttempted; 341 } 342 343 /** 344 * A wrapper class around the content of a data frame. 345 */ 346 private final class Frame { 347 final Buffer data; 348 final boolean endStream; 349 boolean enqueued; 350 Frame(Buffer data, boolean endStream)351 Frame(Buffer data, boolean endStream) { 352 this.data = data; 353 this.endStream = endStream; 354 } 355 356 /** 357 * Gets the total size (in bytes) of this frame including the data and padding. 358 */ size()359 int size() { 360 return (int) data.size(); 361 } 362 enqueue()363 void enqueue() { 364 if (!enqueued) { 365 enqueued = true; 366 pendingWriteQueue.offer(this); 367 368 // Increment the number of pending bytes for this stream. 369 queuedBytes += size(); 370 } 371 } 372 373 /** 374 * Writes the frame and decrements the stream and connection window sizes. If the frame is in 375 * the pending queue, the written bytes are removed from this branch of the priority tree. 376 */ write()377 void write() { 378 // Using a do/while loop because if the buffer is empty we still need to call 379 // the writer once to send the empty frame. 380 do { 381 int bytesToWrite = size(); 382 int frameBytes = min(bytesToWrite, frameWriter.maxDataLength()); 383 if (frameBytes == bytesToWrite) { 384 // All the bytes fit into a single HTTP/2 frame, just send it all. 385 connectionState.incrementStreamWindow(-bytesToWrite); 386 incrementStreamWindow(-bytesToWrite); 387 try { 388 frameWriter.data(endStream, streamId, data, bytesToWrite); 389 } catch (IOException e) { 390 throw new RuntimeException(e); 391 } 392 stream.transportState().onSentBytes(bytesToWrite); 393 394 if (enqueued) { 395 // It's enqueued - remove it from the head of the pending write queue. 396 queuedBytes -= bytesToWrite; 397 pendingWriteQueue.remove(this); 398 } 399 return; 400 } 401 402 // Split a chunk that will fit into a single HTTP/2 frame and write it. 403 Frame frame = split(frameBytes); 404 frame.write(); 405 } while (size() > 0); 406 } 407 408 /** 409 * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first 410 * split from the data buffer. If not all the requested bytes are available, the remaining 411 * bytes are then split from the padding (if available). 412 * 413 * @param maxBytes the maximum number of bytes that is allowed in the created frame. 414 * @return the partial frame. 415 */ split(int maxBytes)416 Frame split(int maxBytes) { 417 // The requested maxBytes should always be less than the size of this frame. 418 assert maxBytes < size() : "Attempting to split a frame for the full size."; 419 420 // Get the portion of the data buffer to be split. Limit to the readable bytes. 421 int dataSplit = min(maxBytes, (int) data.size()); 422 423 Buffer splitSlice = new Buffer(); 424 splitSlice.write(data, dataSplit); 425 426 Frame frame = new Frame(splitSlice, false); 427 428 if (enqueued) { 429 queuedBytes -= dataSplit; 430 } 431 return frame; 432 } 433 } 434 } 435 } 436