1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID; 20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; 21 import static java.lang.Math.ceil; 22 import static java.lang.Math.max; 23 import static java.lang.Math.min; 24 25 import com.google.common.base.Preconditions; 26 import io.grpc.okhttp.internal.framed.FrameWriter; 27 import java.io.IOException; 28 import java.util.Arrays; 29 import java.util.Collections; 30 import javax.annotation.Nullable; 31 import okio.Buffer; 32 33 /** 34 * Simple outbound flow controller that evenly splits the connection window across all existing 35 * streams. 36 */ 37 class OutboundFlowController { 38 private final Transport transport; 39 private final FrameWriter frameWriter; 40 private int initialWindowSize; 41 private final StreamState connectionState; 42 OutboundFlowController(Transport transport, FrameWriter frameWriter)43 public OutboundFlowController(Transport transport, FrameWriter frameWriter) { 44 this.transport = Preconditions.checkNotNull(transport, "transport"); 45 this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); 46 this.initialWindowSize = DEFAULT_WINDOW_SIZE; 47 connectionState = new StreamState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE, null); 48 } 49 50 /** 51 * Adjusts outbound window size requested by peer. When window size is increased, it does not send 52 * any pending frames. If this method returns {@code true}, the caller should call {@link 53 * #writeStreams()} after settings ack. 54 * 55 * <p>Must be called with holding transport lock. 56 * 57 * @return true, if new window size is increased, false otherwise. 58 */ initialOutboundWindowSize(int newWindowSize)59 public boolean initialOutboundWindowSize(int newWindowSize) { 60 if (newWindowSize < 0) { 61 throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); 62 } 63 64 int delta = newWindowSize - initialWindowSize; 65 initialWindowSize = newWindowSize; 66 for (StreamState state : transport.getActiveStreams()) { 67 state.incrementStreamWindow(delta); 68 } 69 70 return delta > 0; 71 } 72 73 /** 74 * Update the outbound window for given stream, or for the connection if stream is null. Returns 75 * the new value of the window size. 76 * 77 * <p>Must be called with holding transport lock. 78 */ windowUpdate(@ullable StreamState state, int delta)79 public int windowUpdate(@Nullable StreamState state, int delta) { 80 final int updatedWindow; 81 if (state == null) { 82 // Update the connection window and write any pending frames for all streams. 83 updatedWindow = connectionState.incrementStreamWindow(delta); 84 writeStreams(); 85 } else { 86 // Update the stream window and write any pending frames for the stream. 87 updatedWindow = state.incrementStreamWindow(delta); 88 89 WriteStatus writeStatus = new WriteStatus(); 90 state.writeBytes(state.writableWindow(), writeStatus); 91 if (writeStatus.hasWritten()) { 92 flush(); 93 } 94 } 95 return updatedWindow; 96 } 97 98 /** 99 * Must be called with holding transport lock. 100 */ data(boolean outFinished, StreamState state, Buffer source, boolean flush)101 public void data(boolean outFinished, StreamState state, Buffer source, boolean flush) { 102 Preconditions.checkNotNull(source, "source"); 103 104 int window = state.writableWindow(); 105 boolean framesAlreadyQueued = state.hasPendingData(); 106 int size = (int) source.size(); 107 108 if (!framesAlreadyQueued && window >= size) { 109 // Window size is large enough to send entire data frame 110 state.write(source, size, outFinished); 111 } else { 112 // send partial data 113 if (!framesAlreadyQueued && window > 0) { 114 state.write(source, window, false); 115 } 116 // Queue remaining data in the buffer 117 state.enqueueData(source, (int) source.size(), outFinished); 118 } 119 120 if (flush) { 121 flush(); 122 } 123 } 124 125 /** 126 * Transport lock must be held when calling. 127 */ notifyWhenNoPendingData(StreamState state, Runnable noPendingDataRunnable)128 public void notifyWhenNoPendingData(StreamState state, Runnable noPendingDataRunnable) { 129 Preconditions.checkNotNull(noPendingDataRunnable, "noPendingDataRunnable"); 130 if (state.hasPendingData()) { 131 state.notifyWhenNoPendingData(noPendingDataRunnable); 132 } else { 133 noPendingDataRunnable.run(); 134 } 135 } 136 flush()137 public void flush() { 138 try { 139 frameWriter.flush(); 140 } catch (IOException e) { 141 throw new RuntimeException(e); 142 } 143 } 144 createState(Stream stream, int streamId)145 public StreamState createState(Stream stream, int streamId) { 146 return new StreamState( 147 streamId, initialWindowSize, Preconditions.checkNotNull(stream, "stream")); 148 } 149 150 /** 151 * Writes as much data for all the streams as possible given the current flow control windows. 152 * 153 * <p>Must be called with holding transport lock. 154 */ writeStreams()155 public void writeStreams() { 156 StreamState[] states = transport.getActiveStreams(); 157 Collections.shuffle(Arrays.asList(states)); 158 int connectionWindow = connectionState.window(); 159 for (int numStreams = states.length; numStreams > 0 && connectionWindow > 0;) { 160 int nextNumStreams = 0; 161 int windowSlice = (int) ceil(connectionWindow / (float) numStreams); 162 for (int index = 0; index < numStreams && connectionWindow > 0; ++index) { 163 StreamState state = states[index]; 164 165 int bytesForStream = min(connectionWindow, min(state.unallocatedBytes(), windowSlice)); 166 if (bytesForStream > 0) { 167 state.allocateBytes(bytesForStream); 168 connectionWindow -= bytesForStream; 169 } 170 171 if (state.unallocatedBytes() > 0) { 172 // There is more data to process for this stream. Add it to the next 173 // pass. 174 states[nextNumStreams++] = state; 175 } 176 } 177 numStreams = nextNumStreams; 178 } 179 180 // Now take one last pass through all of the streams and write any allocated bytes. 181 WriteStatus writeStatus = new WriteStatus(); 182 for (StreamState state : transport.getActiveStreams()) { 183 state.writeBytes(state.allocatedBytes(), writeStatus); 184 state.clearAllocatedBytes(); 185 } 186 187 if (writeStatus.hasWritten()) { 188 flush(); 189 } 190 } 191 192 /** 193 * Simple status that keeps track of the number of writes performed. 194 */ 195 private static final class WriteStatus { 196 int numWrites; 197 incrementNumWrites()198 void incrementNumWrites() { 199 numWrites++; 200 } 201 hasWritten()202 boolean hasWritten() { 203 return numWrites > 0; 204 } 205 } 206 207 public interface Transport { getActiveStreams()208 StreamState[] getActiveStreams(); 209 } 210 211 public interface Stream { onSentBytes(int frameBytes)212 void onSentBytes(int frameBytes); 213 } 214 215 /** 216 * The outbound flow control state for a single stream. 217 */ 218 public final class StreamState { 219 private final Buffer pendingWriteBuffer = new Buffer(); 220 private Runnable noPendingDataRunnable; 221 private final int streamId; 222 private int window; 223 private int allocatedBytes; 224 private final Stream stream; 225 private boolean pendingBufferHasEndOfStream = false; 226 StreamState(int streamId, int initialWindowSize, Stream stream)227 StreamState(int streamId, int initialWindowSize, Stream stream) { 228 this.streamId = streamId; 229 window = initialWindowSize; 230 this.stream = stream; 231 } 232 window()233 int window() { 234 return window; 235 } 236 allocateBytes(int bytes)237 void allocateBytes(int bytes) { 238 allocatedBytes += bytes; 239 } 240 allocatedBytes()241 int allocatedBytes() { 242 return allocatedBytes; 243 } 244 unallocatedBytes()245 int unallocatedBytes() { 246 return streamableBytes() - allocatedBytes; 247 } 248 clearAllocatedBytes()249 void clearAllocatedBytes() { 250 allocatedBytes = 0; 251 } 252 253 /** 254 * Increments the flow control window for this stream by the given delta and returns the new 255 * value. 256 */ incrementStreamWindow(int delta)257 int incrementStreamWindow(int delta) { 258 if (delta > 0 && Integer.MAX_VALUE - delta < window) { 259 throw new IllegalArgumentException("Window size overflow for stream: " + streamId); 260 } 261 window += delta; 262 263 return window; 264 } 265 266 /** 267 * Returns the maximum writable window (minimum of the stream and connection windows). 268 */ writableWindow()269 int writableWindow() { 270 return min(window, connectionState.window()); 271 } 272 streamableBytes()273 int streamableBytes() { 274 return max(0, min(window, (int) pendingWriteBuffer.size())); 275 } 276 277 /** 278 * Indicates whether or not there are frames in the pending queue. 279 */ hasPendingData()280 boolean hasPendingData() { 281 return pendingWriteBuffer.size() > 0; 282 } 283 284 /** 285 * Writes up to the number of bytes from the pending queue. 286 */ writeBytes(int bytes, WriteStatus writeStatus)287 int writeBytes(int bytes, WriteStatus writeStatus) { 288 int bytesAttempted = 0; 289 int maxBytes = min(bytes, writableWindow()); 290 while (hasPendingData() && maxBytes > 0) { 291 if (maxBytes >= pendingWriteBuffer.size()) { 292 // Window size is large enough to send entire data frame 293 bytesAttempted += (int) pendingWriteBuffer.size(); 294 write(pendingWriteBuffer, (int) pendingWriteBuffer.size(), pendingBufferHasEndOfStream); 295 } else { 296 bytesAttempted += maxBytes; 297 write(pendingWriteBuffer, maxBytes, false); 298 } 299 writeStatus.incrementNumWrites(); 300 // Update the threshold. 301 maxBytes = min(bytes - bytesAttempted, writableWindow()); 302 } 303 if (!hasPendingData() && noPendingDataRunnable != null) { 304 noPendingDataRunnable.run(); 305 noPendingDataRunnable = null; 306 } 307 return bytesAttempted; 308 } 309 310 /** 311 * Writes the frame and decrements the stream and connection window sizes. If the frame is in 312 * the pending queue, the written bytes are removed from this branch of the priority tree. If 313 * the window size is smaller than the frame, it sends partial frame. 314 */ write(Buffer buffer, int bytesToSend, boolean endOfStream)315 void write(Buffer buffer, int bytesToSend, boolean endOfStream) { 316 int bytesToWrite = bytesToSend; 317 // Using a do/while loop because if the buffer is empty we still need to call 318 // the writer once to send the empty frame. 319 do { 320 int frameBytes = min(bytesToWrite, frameWriter.maxDataLength()); 321 connectionState.incrementStreamWindow(-frameBytes); 322 incrementStreamWindow(-frameBytes); 323 try { 324 // endOfStream is set for the last chunk of data marked as endOfStream 325 boolean isEndOfStream = buffer.size() == frameBytes && endOfStream; 326 frameWriter.data(isEndOfStream, streamId, buffer, frameBytes); 327 } catch (IOException e) { 328 throw new RuntimeException(e); 329 } 330 stream.onSentBytes(frameBytes); 331 bytesToWrite -= frameBytes; 332 } while (bytesToWrite > 0); 333 } 334 enqueueData(Buffer buffer, int size, boolean endOfStream)335 void enqueueData(Buffer buffer, int size, boolean endOfStream) { 336 this.pendingWriteBuffer.write(buffer, size); 337 this.pendingBufferHasEndOfStream |= endOfStream; 338 } 339 notifyWhenNoPendingData(Runnable noPendingDataRunnable)340 void notifyWhenNoPendingData(Runnable noPendingDataRunnable) { 341 Preconditions.checkState( 342 this.noPendingDataRunnable == null, "pending data notification already requested"); 343 this.noPendingDataRunnable = noPendingDataRunnable; 344 } 345 } 346 } 347