• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Preconditions;
21 import io.grpc.internal.SerializingExecutor;
22 import io.grpc.okhttp.internal.framed.ErrorCode;
23 import io.grpc.okhttp.internal.framed.FrameWriter;
24 import io.grpc.okhttp.internal.framed.Header;
25 import io.grpc.okhttp.internal.framed.Settings;
26 import java.io.IOException;
27 import java.net.Socket;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 import okio.Buffer;
37 
38 class AsyncFrameWriter implements FrameWriter {
39   private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
40   private FrameWriter frameWriter;
41   private Socket socket;
42   // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
43   // just waiting on each other.
44   private final SerializingExecutor executor;
45   private final TransportExceptionHandler transportExceptionHandler;
46   private final AtomicLong flushCounter = new AtomicLong();
47   // Some exceptions are not very useful and add too much noise to the log
48   private static final Set<String> QUIET_ERRORS =
49       Collections.unmodifiableSet(new HashSet<>(Arrays.asList("Socket closed")));
50 
AsyncFrameWriter( TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor)51   public AsyncFrameWriter(
52       TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) {
53     this.transportExceptionHandler = transportExceptionHandler;
54     this.executor = executor;
55   }
56 
57   /**
58    * Set the real frameWriter and the corresponding underlying socket, the socket is needed for
59    * closing.
60    *
61    * <p>should only be called by thread of executor.
62    */
becomeConnected(FrameWriter frameWriter, Socket socket)63   void becomeConnected(FrameWriter frameWriter, Socket socket) {
64     Preconditions.checkState(this.frameWriter == null,
65         "AsyncFrameWriter's setFrameWriter() should only be called once.");
66     this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
67     this.socket = Preconditions.checkNotNull(socket, "socket");
68   }
69 
70   @Override
connectionPreface()71   public void connectionPreface() {
72     executor.execute(new WriteRunnable() {
73       @Override
74       public void doRun() throws IOException {
75         frameWriter.connectionPreface();
76       }
77     });
78   }
79 
80   @Override
ackSettings(final Settings peerSettings)81   public void ackSettings(final Settings peerSettings) {
82     executor.execute(new WriteRunnable() {
83       @Override
84       public void doRun() throws IOException {
85         frameWriter.ackSettings(peerSettings);
86       }
87     });
88   }
89 
90   @Override
pushPromise(final int streamId, final int promisedStreamId, final List<Header> requestHeaders)91   public void pushPromise(final int streamId, final int promisedStreamId,
92       final List<Header> requestHeaders) {
93     executor.execute(new WriteRunnable() {
94       @Override
95       public void doRun() throws IOException {
96         frameWriter.pushPromise(streamId, promisedStreamId, requestHeaders);
97       }
98     });
99   }
100 
101   @Override
flush()102   public void flush() {
103     // keep track of version of flushes to skip flush if another flush task is queued.
104     final long flushCount = flushCounter.incrementAndGet();
105 
106     executor.execute(new WriteRunnable() {
107       @Override
108       public void doRun() throws IOException {
109         // There can be a flush starvation if there are continuous flood of flush is queued, this
110         // is not an issue with OkHttp since it flushes if the buffer is full.
111         if (flushCounter.get() == flushCount) {
112           frameWriter.flush();
113         }
114       }
115     });
116   }
117 
118   @Override
synStream(final boolean outFinished, final boolean inFinished, final int streamId, final int associatedStreamId, final List<Header> headerBlock)119   public void synStream(final boolean outFinished, final boolean inFinished, final int streamId,
120       final int associatedStreamId, final List<Header> headerBlock) {
121     executor.execute(new WriteRunnable() {
122       @Override
123       public void doRun() throws IOException {
124         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, headerBlock);
125       }
126     });
127   }
128 
129   @Override
synReply(final boolean outFinished, final int streamId, final List<Header> headerBlock)130   public void synReply(final boolean outFinished, final int streamId,
131       final List<Header> headerBlock) {
132     executor.execute(new WriteRunnable() {
133       @Override
134       public void doRun() throws IOException {
135         frameWriter.synReply(outFinished, streamId, headerBlock);
136       }
137     });
138   }
139 
140   @Override
headers(final int streamId, final List<Header> headerBlock)141   public void headers(final int streamId, final List<Header> headerBlock) {
142     executor.execute(new WriteRunnable() {
143       @Override
144       public void doRun() throws IOException {
145         frameWriter.headers(streamId, headerBlock);
146       }
147     });
148   }
149 
150   @Override
rstStream(final int streamId, final ErrorCode errorCode)151   public void rstStream(final int streamId, final ErrorCode errorCode) {
152     executor.execute(new WriteRunnable() {
153       @Override
154       public void doRun() throws IOException {
155         frameWriter.rstStream(streamId, errorCode);
156       }
157     });
158   }
159 
160   @Override
data(final boolean outFinished, final int streamId, final Buffer source, final int byteCount)161   public void data(final boolean outFinished, final int streamId, final Buffer source,
162       final int byteCount) {
163     executor.execute(new WriteRunnable() {
164       @Override
165       public void doRun() throws IOException {
166         frameWriter.data(outFinished, streamId, source, byteCount);
167       }
168     });
169   }
170 
171   @Override
settings(final Settings okHttpSettings)172   public void settings(final Settings okHttpSettings) {
173     executor.execute(new WriteRunnable() {
174       @Override
175       public void doRun() throws IOException {
176         frameWriter.settings(okHttpSettings);
177       }
178     });
179   }
180 
181   @Override
ping(final boolean ack, final int payload1, final int payload2)182   public void ping(final boolean ack, final int payload1, final int payload2) {
183     executor.execute(new WriteRunnable() {
184       @Override
185       public void doRun() throws IOException {
186         frameWriter.ping(ack, payload1, payload2);
187       }
188     });
189   }
190 
191   @Override
goAway(final int lastGoodStreamId, final ErrorCode errorCode, final byte[] debugData)192   public void goAway(final int lastGoodStreamId, final ErrorCode errorCode,
193       final byte[] debugData) {
194     executor.execute(new WriteRunnable() {
195       @Override
196       public void doRun() throws IOException {
197         frameWriter.goAway(lastGoodStreamId, errorCode, debugData);
198         // Flush it since after goAway, we are likely to close this writer.
199         frameWriter.flush();
200       }
201     });
202   }
203 
204   @Override
windowUpdate(final int streamId, final long windowSizeIncrement)205   public void windowUpdate(final int streamId, final long windowSizeIncrement) {
206     executor.execute(new WriteRunnable() {
207       @Override
208       public void doRun() throws IOException {
209         frameWriter.windowUpdate(streamId, windowSizeIncrement);
210       }
211     });
212   }
213 
214   @Override
close()215   public void close() {
216     executor.execute(new Runnable() {
217       @Override
218       public void run() {
219         if (frameWriter != null) {
220           try {
221             frameWriter.close();
222             socket.close();
223           } catch (IOException e) {
224             log.log(getLogLevel(e), "Failed closing connection", e);
225           }
226         }
227       }
228     });
229   }
230 
231   /**
232    * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions
233    * should not clutter the log.
234    */
235   @VisibleForTesting
getLogLevel(Throwable t)236   static Level getLogLevel(Throwable t) {
237     if (t instanceof IOException
238         && t.getMessage() != null
239         && QUIET_ERRORS.contains(t.getMessage())) {
240       return Level.FINE;
241 
242     }
243     return Level.INFO;
244   }
245 
246   private abstract class WriteRunnable implements Runnable {
247     @Override
run()248     public final void run() {
249       try {
250         if (frameWriter == null) {
251           throw new IOException("Unable to perform write due to unavailable frameWriter.");
252         }
253         doRun();
254       } catch (RuntimeException e) {
255         transportExceptionHandler.onException(e);
256       } catch (Exception e) {
257         transportExceptionHandler.onException(e);
258       }
259     }
260 
doRun()261     public abstract void doRun() throws IOException;
262   }
263 
264   @Override
maxDataLength()265   public int maxDataLength() {
266     return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */
267         : frameWriter.maxDataLength();
268   }
269 
270   /** A class that handles transport exception. */
271   interface TransportExceptionHandler {
272 
273     /** Handles exception. */
onException(Throwable throwable)274     void onException(Throwable throwable);
275   }
276 }
277