• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID;
20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21 import static java.lang.Math.ceil;
22 import static java.lang.Math.max;
23 import static java.lang.Math.min;
24 
25 import com.google.common.base.Preconditions;
26 import io.grpc.okhttp.internal.framed.FrameWriter;
27 import java.io.IOException;
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30 import javax.annotation.Nullable;
31 import okio.Buffer;
32 
33 /**
34  * Simple outbound flow controller that evenly splits the connection window across all existing
35  * streams.
36  */
37 class OutboundFlowController {
38   private final OkHttpClientTransport transport;
39   private final FrameWriter frameWriter;
40   private int initialWindowSize = DEFAULT_WINDOW_SIZE;
41   private final OutboundFlowState connectionState = new OutboundFlowState(CONNECTION_STREAM_ID);
42 
OutboundFlowController(OkHttpClientTransport transport, FrameWriter frameWriter)43   OutboundFlowController(OkHttpClientTransport transport, FrameWriter frameWriter) {
44     this.transport = Preconditions.checkNotNull(transport, "transport");
45     this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
46   }
47 
48   /**
49    * Adjusts outbound window size requested by peer. When window size is increased, it does not send
50    * any pending frames. If this method returns {@code true}, the caller should call {@link
51    * #writeStreams()} after settings ack.
52    *
53    * <p>Must be called with holding transport lock.
54    *
55    * @return true, if new window size is increased, false otherwise.
56    */
initialOutboundWindowSize(int newWindowSize)57   boolean initialOutboundWindowSize(int newWindowSize) {
58     if (newWindowSize < 0) {
59       throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
60     }
61 
62     int delta = newWindowSize - initialWindowSize;
63     initialWindowSize = newWindowSize;
64     for (OkHttpClientStream stream : transport.getActiveStreams()) {
65       OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
66       if (state == null) {
67         // Create the OutboundFlowState with the new window size.
68         state = new OutboundFlowState(stream);
69         stream.setOutboundFlowState(state);
70       } else {
71         state.incrementStreamWindow(delta);
72       }
73     }
74 
75     return delta > 0;
76   }
77 
78   /**
79    * Update the outbound window for given stream, or for the connection if stream is null. Returns
80    * the new value of the window size.
81    *
82    * <p>Must be called with holding transport lock.
83    */
windowUpdate(@ullable OkHttpClientStream stream, int delta)84   int windowUpdate(@Nullable OkHttpClientStream stream, int delta) {
85     final int updatedWindow;
86     if (stream == null) {
87       // Update the connection window and write any pending frames for all streams.
88       updatedWindow = connectionState.incrementStreamWindow(delta);
89       writeStreams();
90     } else {
91       // Update the stream window and write any pending frames for the stream.
92       OutboundFlowState state = state(stream);
93       updatedWindow = state.incrementStreamWindow(delta);
94 
95       WriteStatus writeStatus = new WriteStatus();
96       state.writeBytes(state.writableWindow(), writeStatus);
97       if (writeStatus.hasWritten()) {
98         flush();
99       }
100     }
101     return updatedWindow;
102   }
103 
104   /**
105    * Must be called with holding transport lock.
106    */
data(boolean outFinished, int streamId, Buffer source, boolean flush)107   void data(boolean outFinished, int streamId, Buffer source, boolean flush) {
108     Preconditions.checkNotNull(source, "source");
109 
110     OkHttpClientStream stream = transport.getStream(streamId);
111     if (stream == null) {
112       // This is possible for a stream that has received end-of-stream from server (but hasn't sent
113       // end-of-stream), and was removed from the transport stream map.
114       // In such case, we just throw away the data.
115       return;
116     }
117 
118     OutboundFlowState state = state(stream);
119     int window = state.writableWindow();
120     boolean framesAlreadyQueued = state.hasFrame();
121 
122     OutboundFlowState.Frame frame = state.newFrame(source, outFinished);
123     if (!framesAlreadyQueued && window >= frame.size()) {
124       // Window size is large enough to send entire data frame
125       frame.write();
126       if (flush) {
127         flush();
128       }
129       return;
130     }
131 
132     // Enqueue the frame to be written when the window size permits.
133     frame.enqueue();
134 
135     if (framesAlreadyQueued || window <= 0) {
136       // Stream already has frames pending or is stalled, don't send anything now.
137       if (flush) {
138         flush();
139       }
140       return;
141     }
142 
143     // Create and send a partial frame up to the window size.
144     frame.split(window).write();
145     if (flush) {
146       flush();
147     }
148   }
149 
flush()150   void flush() {
151     try {
152       frameWriter.flush();
153     } catch (IOException e) {
154       throw new RuntimeException(e);
155     }
156   }
157 
state(OkHttpClientStream stream)158   private OutboundFlowState state(OkHttpClientStream stream) {
159     OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
160     if (state == null) {
161       state = new OutboundFlowState(stream);
162       stream.setOutboundFlowState(state);
163     }
164     return state;
165   }
166 
167   /**
168    * Writes as much data for all the streams as possible given the current flow control windows.
169    *
170    * <p>Must be called with holding transport lock.
171    */
writeStreams()172   void writeStreams() {
173     OkHttpClientStream[] streams = transport.getActiveStreams();
174     int connectionWindow = connectionState.window();
175     for (int numStreams = streams.length; numStreams > 0 && connectionWindow > 0;) {
176       int nextNumStreams = 0;
177       int windowSlice = (int) ceil(connectionWindow / (float) numStreams);
178       for (int index = 0; index < numStreams && connectionWindow > 0; ++index) {
179         OkHttpClientStream stream = streams[index];
180         OutboundFlowState state = state(stream);
181 
182         int bytesForStream = min(connectionWindow, min(state.unallocatedBytes(), windowSlice));
183         if (bytesForStream > 0) {
184           state.allocateBytes(bytesForStream);
185           connectionWindow -= bytesForStream;
186         }
187 
188         if (state.unallocatedBytes() > 0) {
189           // There is more data to process for this stream. Add it to the next
190           // pass.
191           streams[nextNumStreams++] = stream;
192         }
193       }
194       numStreams = nextNumStreams;
195     }
196 
197     // Now take one last pass through all of the streams and write any allocated bytes.
198     WriteStatus writeStatus = new WriteStatus();
199     for (OkHttpClientStream stream : transport.getActiveStreams()) {
200       OutboundFlowState state = state(stream);
201       state.writeBytes(state.allocatedBytes(), writeStatus);
202       state.clearAllocatedBytes();
203     }
204 
205     if (writeStatus.hasWritten()) {
206       flush();
207     }
208   }
209 
210   /**
211    * Simple status that keeps track of the number of writes performed.
212    */
213   private static final class WriteStatus {
214     int numWrites;
215 
incrementNumWrites()216     void incrementNumWrites() {
217       numWrites++;
218     }
219 
hasWritten()220     boolean hasWritten() {
221       return numWrites > 0;
222     }
223   }
224 
225   /**
226    * The outbound flow control state for a single stream.
227    */
228   private final class OutboundFlowState {
229     final Queue<Frame> pendingWriteQueue;
230     final int streamId;
231     int queuedBytes;
232     int window = initialWindowSize;
233     int allocatedBytes;
234     OkHttpClientStream stream;
235 
OutboundFlowState(int streamId)236     OutboundFlowState(int streamId) {
237       this.streamId = streamId;
238       pendingWriteQueue = new ArrayDeque<Frame>(2);
239     }
240 
OutboundFlowState(OkHttpClientStream stream)241     OutboundFlowState(OkHttpClientStream stream) {
242       this(stream.id());
243       this.stream = stream;
244     }
245 
window()246     int window() {
247       return window;
248     }
249 
allocateBytes(int bytes)250     void allocateBytes(int bytes) {
251       allocatedBytes += bytes;
252     }
253 
allocatedBytes()254     int allocatedBytes() {
255       return allocatedBytes;
256     }
257 
unallocatedBytes()258     int unallocatedBytes() {
259       return streamableBytes() - allocatedBytes;
260     }
261 
clearAllocatedBytes()262     void clearAllocatedBytes() {
263       allocatedBytes = 0;
264     }
265 
266     /**
267      * Increments the flow control window for this stream by the given delta and returns the new
268      * value.
269      */
incrementStreamWindow(int delta)270     int incrementStreamWindow(int delta) {
271       if (delta > 0 && Integer.MAX_VALUE - delta < window) {
272         throw new IllegalArgumentException("Window size overflow for stream: " + streamId);
273       }
274       window += delta;
275 
276       return window;
277     }
278 
279     /**
280      * Returns the maximum writable window (minimum of the stream and connection windows).
281      */
writableWindow()282     int writableWindow() {
283       return min(window, connectionState.window());
284     }
285 
streamableBytes()286     int streamableBytes() {
287       return max(0, min(window, queuedBytes));
288     }
289 
290     /**
291      * Creates a new frame with the given values but does not add it to the pending queue.
292      */
newFrame(Buffer data, boolean endStream)293     Frame newFrame(Buffer data, boolean endStream) {
294       return new Frame(data, endStream);
295     }
296 
297     /**
298      * Indicates whether or not there are frames in the pending queue.
299      */
hasFrame()300     boolean hasFrame() {
301       return !pendingWriteQueue.isEmpty();
302     }
303 
304     /**
305      * Returns the head of the pending queue, or {@code null} if empty.
306      */
peek()307     private Frame peek() {
308       return pendingWriteQueue.peek();
309     }
310 
311     /**
312      * Writes up to the number of bytes from the pending queue.
313      */
writeBytes(int bytes, WriteStatus writeStatus)314     int writeBytes(int bytes, WriteStatus writeStatus) {
315       int bytesAttempted = 0;
316       int maxBytes = min(bytes, writableWindow());
317       while (hasFrame()) {
318         Frame pendingWrite = peek();
319         if (maxBytes >= pendingWrite.size()) {
320           // Window size is large enough to send entire data frame
321           writeStatus.incrementNumWrites();
322           bytesAttempted += pendingWrite.size();
323           pendingWrite.write();
324         } else if (maxBytes <= 0) {
325           // No data from the current frame can be written - we're done.
326           // We purposely check this after first testing the size of the
327           // pending frame to properly handle zero-length frame.
328           break;
329         } else {
330           // We can send a partial frame
331           Frame partialFrame = pendingWrite.split(maxBytes);
332           writeStatus.incrementNumWrites();
333           bytesAttempted += partialFrame.size();
334           partialFrame.write();
335         }
336 
337         // Update the threshold.
338         maxBytes = min(bytes - bytesAttempted, writableWindow());
339       }
340       return bytesAttempted;
341     }
342 
343     /**
344      * A wrapper class around the content of a data frame.
345      */
346     private final class Frame {
347       final Buffer data;
348       final boolean endStream;
349       boolean enqueued;
350 
Frame(Buffer data, boolean endStream)351       Frame(Buffer data, boolean endStream) {
352         this.data = data;
353         this.endStream = endStream;
354       }
355 
356       /**
357        * Gets the total size (in bytes) of this frame including the data and padding.
358        */
size()359       int size() {
360         return (int) data.size();
361       }
362 
enqueue()363       void enqueue() {
364         if (!enqueued) {
365           enqueued = true;
366           pendingWriteQueue.offer(this);
367 
368           // Increment the number of pending bytes for this stream.
369           queuedBytes += size();
370         }
371       }
372 
373       /**
374        * Writes the frame and decrements the stream and connection window sizes. If the frame is in
375        * the pending queue, the written bytes are removed from this branch of the priority tree.
376        */
write()377       void write() {
378         // Using a do/while loop because if the buffer is empty we still need to call
379         // the writer once to send the empty frame.
380         do {
381           int bytesToWrite = size();
382           int frameBytes = min(bytesToWrite, frameWriter.maxDataLength());
383           if (frameBytes == bytesToWrite) {
384             // All the bytes fit into a single HTTP/2 frame, just send it all.
385             connectionState.incrementStreamWindow(-bytesToWrite);
386             incrementStreamWindow(-bytesToWrite);
387             try {
388               frameWriter.data(endStream, streamId, data, bytesToWrite);
389             } catch (IOException e) {
390               throw new RuntimeException(e);
391             }
392             stream.transportState().onSentBytes(bytesToWrite);
393 
394             if (enqueued) {
395               // It's enqueued - remove it from the head of the pending write queue.
396               queuedBytes -= bytesToWrite;
397               pendingWriteQueue.remove(this);
398             }
399             return;
400           }
401 
402           // Split a chunk that will fit into a single HTTP/2 frame and write it.
403           Frame frame = split(frameBytes);
404           frame.write();
405         } while (size() > 0);
406       }
407 
408       /**
409        * Creates a new frame that is a view of this frame's data. The {@code maxBytes} are first
410        * split from the data buffer. If not all the requested bytes are available, the remaining
411        * bytes are then split from the padding (if available).
412        *
413        * @param maxBytes the maximum number of bytes that is allowed in the created frame.
414        * @return the partial frame.
415        */
split(int maxBytes)416       Frame split(int maxBytes) {
417         // The requested maxBytes should always be less than the size of this frame.
418         assert maxBytes < size() : "Attempting to split a frame for the full size.";
419 
420         // Get the portion of the data buffer to be split. Limit to the readable bytes.
421         int dataSplit = min(maxBytes, (int) data.size());
422 
423         Buffer splitSlice = new Buffer();
424         splitSlice.write(data, dataSplit);
425 
426         Frame frame = new Frame(splitSlice, false);
427 
428         if (enqueued) {
429           queuedBytes -= dataSplit;
430         }
431         return frame;
432       }
433     }
434   }
435 }
436