• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID;
20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21 import static java.lang.Math.ceil;
22 import static java.lang.Math.max;
23 import static java.lang.Math.min;
24 
25 import com.google.common.base.Preconditions;
26 import io.grpc.okhttp.internal.framed.FrameWriter;
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import javax.annotation.Nullable;
31 import okio.Buffer;
32 
33 /**
34  * Simple outbound flow controller that evenly splits the connection window across all existing
35  * streams.
36  */
37 class OutboundFlowController {
38   private final Transport transport;
39   private final FrameWriter frameWriter;
40   private int initialWindowSize;
41   private final StreamState connectionState;
42 
OutboundFlowController(Transport transport, FrameWriter frameWriter)43   public OutboundFlowController(Transport transport, FrameWriter frameWriter) {
44     this.transport = Preconditions.checkNotNull(transport, "transport");
45     this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
46     this.initialWindowSize = DEFAULT_WINDOW_SIZE;
47     connectionState = new StreamState(CONNECTION_STREAM_ID, DEFAULT_WINDOW_SIZE, null);
48   }
49 
50   /**
51    * Adjusts outbound window size requested by peer. When window size is increased, it does not send
52    * any pending frames. If this method returns {@code true}, the caller should call {@link
53    * #writeStreams()} after settings ack.
54    *
55    * <p>Must be called with holding transport lock.
56    *
57    * @return true, if new window size is increased, false otherwise.
58    */
initialOutboundWindowSize(int newWindowSize)59   public boolean initialOutboundWindowSize(int newWindowSize) {
60     if (newWindowSize < 0) {
61       throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
62     }
63 
64     int delta = newWindowSize - initialWindowSize;
65     initialWindowSize = newWindowSize;
66     for (StreamState state : transport.getActiveStreams()) {
67       state.incrementStreamWindow(delta);
68     }
69 
70     return delta > 0;
71   }
72 
73   /**
74    * Update the outbound window for given stream, or for the connection if stream is null. Returns
75    * the new value of the window size.
76    *
77    * <p>Must be called with holding transport lock.
78    */
windowUpdate(@ullable StreamState state, int delta)79   public int windowUpdate(@Nullable StreamState state, int delta) {
80     final int updatedWindow;
81     if (state == null) {
82       // Update the connection window and write any pending frames for all streams.
83       updatedWindow = connectionState.incrementStreamWindow(delta);
84       writeStreams();
85     } else {
86       // Update the stream window and write any pending frames for the stream.
87       updatedWindow = state.incrementStreamWindow(delta);
88 
89       WriteStatus writeStatus = new WriteStatus();
90       state.writeBytes(state.writableWindow(), writeStatus);
91       if (writeStatus.hasWritten()) {
92         flush();
93       }
94     }
95     return updatedWindow;
96   }
97 
98   /**
99    * Must be called with holding transport lock.
100    */
data(boolean outFinished, StreamState state, Buffer source, boolean flush)101   public void data(boolean outFinished, StreamState state, Buffer source, boolean flush) {
102     Preconditions.checkNotNull(source, "source");
103 
104     int window = state.writableWindow();
105     boolean framesAlreadyQueued = state.hasPendingData();
106     int size = (int) source.size();
107 
108     if (!framesAlreadyQueued && window >= size) {
109       // Window size is large enough to send entire data frame
110       state.write(source, size, outFinished);
111     } else {
112       // send partial data
113       if (!framesAlreadyQueued && window > 0) {
114         state.write(source, window, false);
115       }
116       // Queue remaining data in the buffer
117       state.enqueueData(source, (int) source.size(), outFinished);
118     }
119 
120     if (flush) {
121       flush();
122     }
123   }
124 
125   /**
126    * Transport lock must be held when calling.
127    */
notifyWhenNoPendingData(StreamState state, Runnable noPendingDataRunnable)128   public void notifyWhenNoPendingData(StreamState state, Runnable noPendingDataRunnable) {
129     Preconditions.checkNotNull(noPendingDataRunnable, "noPendingDataRunnable");
130     if (state.hasPendingData()) {
131       state.notifyWhenNoPendingData(noPendingDataRunnable);
132     } else {
133       noPendingDataRunnable.run();
134     }
135   }
136 
flush()137   public void flush() {
138     try {
139       frameWriter.flush();
140     } catch (IOException e) {
141       throw new RuntimeException(e);
142     }
143   }
144 
createState(Stream stream, int streamId)145   public StreamState createState(Stream stream, int streamId) {
146     return new StreamState(
147         streamId, initialWindowSize, Preconditions.checkNotNull(stream, "stream"));
148   }
149 
150   /**
151    * Writes as much data for all the streams as possible given the current flow control windows.
152    *
153    * <p>Must be called with holding transport lock.
154    */
writeStreams()155   public void writeStreams() {
156     StreamState[] states = transport.getActiveStreams();
157     Collections.shuffle(Arrays.asList(states));
158     int connectionWindow = connectionState.window();
159     for (int numStreams = states.length; numStreams > 0 && connectionWindow > 0;) {
160       int nextNumStreams = 0;
161       int windowSlice = (int) ceil(connectionWindow / (float) numStreams);
162       for (int index = 0; index < numStreams && connectionWindow > 0; ++index) {
163         StreamState state = states[index];
164 
165         int bytesForStream = min(connectionWindow, min(state.unallocatedBytes(), windowSlice));
166         if (bytesForStream > 0) {
167           state.allocateBytes(bytesForStream);
168           connectionWindow -= bytesForStream;
169         }
170 
171         if (state.unallocatedBytes() > 0) {
172           // There is more data to process for this stream. Add it to the next
173           // pass.
174           states[nextNumStreams++] = state;
175         }
176       }
177       numStreams = nextNumStreams;
178     }
179 
180     // Now take one last pass through all of the streams and write any allocated bytes.
181     WriteStatus writeStatus = new WriteStatus();
182     for (StreamState state : transport.getActiveStreams()) {
183       state.writeBytes(state.allocatedBytes(), writeStatus);
184       state.clearAllocatedBytes();
185     }
186 
187     if (writeStatus.hasWritten()) {
188       flush();
189     }
190   }
191 
192   /**
193    * Simple status that keeps track of the number of writes performed.
194    */
195   private static final class WriteStatus {
196     int numWrites;
197 
incrementNumWrites()198     void incrementNumWrites() {
199       numWrites++;
200     }
201 
hasWritten()202     boolean hasWritten() {
203       return numWrites > 0;
204     }
205   }
206 
207   public interface Transport {
getActiveStreams()208     StreamState[] getActiveStreams();
209   }
210 
211   public interface Stream {
onSentBytes(int frameBytes)212     void onSentBytes(int frameBytes);
213   }
214 
215   /**
216    * The outbound flow control state for a single stream.
217    */
218   public final class StreamState {
219     private final Buffer pendingWriteBuffer = new Buffer();
220     private Runnable noPendingDataRunnable;
221     private final int streamId;
222     private int window;
223     private int allocatedBytes;
224     private final Stream stream;
225     private boolean pendingBufferHasEndOfStream = false;
226 
StreamState(int streamId, int initialWindowSize, Stream stream)227     StreamState(int streamId, int initialWindowSize, Stream stream) {
228       this.streamId = streamId;
229       window = initialWindowSize;
230       this.stream = stream;
231     }
232 
window()233     int window() {
234       return window;
235     }
236 
allocateBytes(int bytes)237     void allocateBytes(int bytes) {
238       allocatedBytes += bytes;
239     }
240 
allocatedBytes()241     int allocatedBytes() {
242       return allocatedBytes;
243     }
244 
unallocatedBytes()245     int unallocatedBytes() {
246       return streamableBytes() - allocatedBytes;
247     }
248 
clearAllocatedBytes()249     void clearAllocatedBytes() {
250       allocatedBytes = 0;
251     }
252 
253     /**
254      * Increments the flow control window for this stream by the given delta and returns the new
255      * value.
256      */
incrementStreamWindow(int delta)257     int incrementStreamWindow(int delta) {
258       if (delta > 0 && Integer.MAX_VALUE - delta < window) {
259         throw new IllegalArgumentException("Window size overflow for stream: " + streamId);
260       }
261       window += delta;
262 
263       return window;
264     }
265 
266     /**
267      * Returns the maximum writable window (minimum of the stream and connection windows).
268      */
writableWindow()269     int writableWindow() {
270       return min(window, connectionState.window());
271     }
272 
streamableBytes()273     int streamableBytes() {
274       return max(0, min(window, (int) pendingWriteBuffer.size()));
275     }
276 
277     /**
278      * Indicates whether or not there are frames in the pending queue.
279      */
hasPendingData()280     boolean hasPendingData() {
281       return pendingWriteBuffer.size() > 0;
282     }
283 
284     /**
285      * Writes up to the number of bytes from the pending queue.
286      */
writeBytes(int bytes, WriteStatus writeStatus)287     int writeBytes(int bytes, WriteStatus writeStatus) {
288       int bytesAttempted = 0;
289       int maxBytes = min(bytes, writableWindow());
290       while (hasPendingData() && maxBytes > 0) {
291         if (maxBytes >= pendingWriteBuffer.size()) {
292           // Window size is large enough to send entire data frame
293           bytesAttempted += (int) pendingWriteBuffer.size();
294           write(pendingWriteBuffer, (int) pendingWriteBuffer.size(), pendingBufferHasEndOfStream);
295         } else {
296           bytesAttempted += maxBytes;
297           write(pendingWriteBuffer, maxBytes, false);
298         }
299         writeStatus.incrementNumWrites();
300         // Update the threshold.
301         maxBytes = min(bytes - bytesAttempted, writableWindow());
302       }
303       if (!hasPendingData() && noPendingDataRunnable != null) {
304         noPendingDataRunnable.run();
305         noPendingDataRunnable = null;
306       }
307       return bytesAttempted;
308     }
309 
310     /**
311      * Writes the frame and decrements the stream and connection window sizes. If the frame is in
312      * the pending queue, the written bytes are removed from this branch of the priority tree. If
313      * the window size is smaller than the frame, it sends partial frame.
314      */
write(Buffer buffer, int bytesToSend, boolean endOfStream)315     void write(Buffer buffer, int bytesToSend, boolean endOfStream) {
316       int bytesToWrite = bytesToSend;
317       // Using a do/while loop because if the buffer is empty we still need to call
318       // the writer once to send the empty frame.
319       do {
320         int frameBytes = min(bytesToWrite, frameWriter.maxDataLength());
321         connectionState.incrementStreamWindow(-frameBytes);
322         incrementStreamWindow(-frameBytes);
323         try {
324           // endOfStream is set for the last chunk of data marked as endOfStream
325           boolean isEndOfStream = buffer.size() == frameBytes && endOfStream;
326           frameWriter.data(isEndOfStream, streamId, buffer, frameBytes);
327         } catch (IOException e) {
328           throw new RuntimeException(e);
329         }
330         stream.onSentBytes(frameBytes);
331         bytesToWrite -= frameBytes;
332       } while (bytesToWrite > 0);
333     }
334 
enqueueData(Buffer buffer, int size, boolean endOfStream)335     void enqueueData(Buffer buffer, int size, boolean endOfStream) {
336       this.pendingWriteBuffer.write(buffer, size);
337       this.pendingBufferHasEndOfStream |= endOfStream;
338     }
339 
notifyWhenNoPendingData(Runnable noPendingDataRunnable)340     void notifyWhenNoPendingData(Runnable noPendingDataRunnable) {
341       Preconditions.checkState(
342           this.noPendingDataRunnable == null, "pending data notification already requested");
343       this.noPendingDataRunnable = noPendingDataRunnable;
344     }
345   }
346 }
347