1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Preconditions; 21 import io.grpc.internal.SerializingExecutor; 22 import io.grpc.okhttp.internal.framed.ErrorCode; 23 import io.grpc.okhttp.internal.framed.FrameWriter; 24 import io.grpc.okhttp.internal.framed.Header; 25 import io.grpc.okhttp.internal.framed.Settings; 26 import java.io.IOException; 27 import java.net.Socket; 28 import java.util.Arrays; 29 import java.util.Collections; 30 import java.util.HashSet; 31 import java.util.List; 32 import java.util.Set; 33 import java.util.concurrent.atomic.AtomicLong; 34 import java.util.logging.Level; 35 import java.util.logging.Logger; 36 import okio.Buffer; 37 38 class AsyncFrameWriter implements FrameWriter { 39 private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); 40 private FrameWriter frameWriter; 41 private Socket socket; 42 // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are 43 // just waiting on each other. 44 private final SerializingExecutor executor; 45 private final TransportExceptionHandler transportExceptionHandler; 46 private final AtomicLong flushCounter = new AtomicLong(); 47 // Some exceptions are not very useful and add too much noise to the log 48 private static final Set<String> QUIET_ERRORS = 49 Collections.unmodifiableSet(new HashSet<>(Arrays.asList("Socket closed"))); 50 AsyncFrameWriter( TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor)51 public AsyncFrameWriter( 52 TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) { 53 this.transportExceptionHandler = transportExceptionHandler; 54 this.executor = executor; 55 } 56 57 /** 58 * Set the real frameWriter and the corresponding underlying socket, the socket is needed for 59 * closing. 60 * 61 * <p>should only be called by thread of executor. 62 */ becomeConnected(FrameWriter frameWriter, Socket socket)63 void becomeConnected(FrameWriter frameWriter, Socket socket) { 64 Preconditions.checkState(this.frameWriter == null, 65 "AsyncFrameWriter's setFrameWriter() should only be called once."); 66 this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); 67 this.socket = Preconditions.checkNotNull(socket, "socket"); 68 } 69 70 @Override connectionPreface()71 public void connectionPreface() { 72 executor.execute(new WriteRunnable() { 73 @Override 74 public void doRun() throws IOException { 75 frameWriter.connectionPreface(); 76 } 77 }); 78 } 79 80 @Override ackSettings(final Settings peerSettings)81 public void ackSettings(final Settings peerSettings) { 82 executor.execute(new WriteRunnable() { 83 @Override 84 public void doRun() throws IOException { 85 frameWriter.ackSettings(peerSettings); 86 } 87 }); 88 } 89 90 @Override pushPromise(final int streamId, final int promisedStreamId, final List<Header> requestHeaders)91 public void pushPromise(final int streamId, final int promisedStreamId, 92 final List<Header> requestHeaders) { 93 executor.execute(new WriteRunnable() { 94 @Override 95 public void doRun() throws IOException { 96 frameWriter.pushPromise(streamId, promisedStreamId, requestHeaders); 97 } 98 }); 99 } 100 101 @Override flush()102 public void flush() { 103 // keep track of version of flushes to skip flush if another flush task is queued. 104 final long flushCount = flushCounter.incrementAndGet(); 105 106 executor.execute(new WriteRunnable() { 107 @Override 108 public void doRun() throws IOException { 109 // There can be a flush starvation if there are continuous flood of flush is queued, this 110 // is not an issue with OkHttp since it flushes if the buffer is full. 111 if (flushCounter.get() == flushCount) { 112 frameWriter.flush(); 113 } 114 } 115 }); 116 } 117 118 @Override synStream(final boolean outFinished, final boolean inFinished, final int streamId, final int associatedStreamId, final List<Header> headerBlock)119 public void synStream(final boolean outFinished, final boolean inFinished, final int streamId, 120 final int associatedStreamId, final List<Header> headerBlock) { 121 executor.execute(new WriteRunnable() { 122 @Override 123 public void doRun() throws IOException { 124 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, headerBlock); 125 } 126 }); 127 } 128 129 @Override synReply(final boolean outFinished, final int streamId, final List<Header> headerBlock)130 public void synReply(final boolean outFinished, final int streamId, 131 final List<Header> headerBlock) { 132 executor.execute(new WriteRunnable() { 133 @Override 134 public void doRun() throws IOException { 135 frameWriter.synReply(outFinished, streamId, headerBlock); 136 } 137 }); 138 } 139 140 @Override headers(final int streamId, final List<Header> headerBlock)141 public void headers(final int streamId, final List<Header> headerBlock) { 142 executor.execute(new WriteRunnable() { 143 @Override 144 public void doRun() throws IOException { 145 frameWriter.headers(streamId, headerBlock); 146 } 147 }); 148 } 149 150 @Override rstStream(final int streamId, final ErrorCode errorCode)151 public void rstStream(final int streamId, final ErrorCode errorCode) { 152 executor.execute(new WriteRunnable() { 153 @Override 154 public void doRun() throws IOException { 155 frameWriter.rstStream(streamId, errorCode); 156 } 157 }); 158 } 159 160 @Override data(final boolean outFinished, final int streamId, final Buffer source, final int byteCount)161 public void data(final boolean outFinished, final int streamId, final Buffer source, 162 final int byteCount) { 163 executor.execute(new WriteRunnable() { 164 @Override 165 public void doRun() throws IOException { 166 frameWriter.data(outFinished, streamId, source, byteCount); 167 } 168 }); 169 } 170 171 @Override settings(final Settings okHttpSettings)172 public void settings(final Settings okHttpSettings) { 173 executor.execute(new WriteRunnable() { 174 @Override 175 public void doRun() throws IOException { 176 frameWriter.settings(okHttpSettings); 177 } 178 }); 179 } 180 181 @Override ping(final boolean ack, final int payload1, final int payload2)182 public void ping(final boolean ack, final int payload1, final int payload2) { 183 executor.execute(new WriteRunnable() { 184 @Override 185 public void doRun() throws IOException { 186 frameWriter.ping(ack, payload1, payload2); 187 } 188 }); 189 } 190 191 @Override goAway(final int lastGoodStreamId, final ErrorCode errorCode, final byte[] debugData)192 public void goAway(final int lastGoodStreamId, final ErrorCode errorCode, 193 final byte[] debugData) { 194 executor.execute(new WriteRunnable() { 195 @Override 196 public void doRun() throws IOException { 197 frameWriter.goAway(lastGoodStreamId, errorCode, debugData); 198 // Flush it since after goAway, we are likely to close this writer. 199 frameWriter.flush(); 200 } 201 }); 202 } 203 204 @Override windowUpdate(final int streamId, final long windowSizeIncrement)205 public void windowUpdate(final int streamId, final long windowSizeIncrement) { 206 executor.execute(new WriteRunnable() { 207 @Override 208 public void doRun() throws IOException { 209 frameWriter.windowUpdate(streamId, windowSizeIncrement); 210 } 211 }); 212 } 213 214 @Override close()215 public void close() { 216 executor.execute(new Runnable() { 217 @Override 218 public void run() { 219 if (frameWriter != null) { 220 try { 221 frameWriter.close(); 222 socket.close(); 223 } catch (IOException e) { 224 log.log(getLogLevel(e), "Failed closing connection", e); 225 } 226 } 227 } 228 }); 229 } 230 231 /** 232 * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions 233 * should not clutter the log. 234 */ 235 @VisibleForTesting getLogLevel(Throwable t)236 static Level getLogLevel(Throwable t) { 237 if (t instanceof IOException 238 && t.getMessage() != null 239 && QUIET_ERRORS.contains(t.getMessage())) { 240 return Level.FINE; 241 242 } 243 return Level.INFO; 244 } 245 246 private abstract class WriteRunnable implements Runnable { 247 @Override run()248 public final void run() { 249 try { 250 if (frameWriter == null) { 251 throw new IOException("Unable to perform write due to unavailable frameWriter."); 252 } 253 doRun(); 254 } catch (RuntimeException e) { 255 transportExceptionHandler.onException(e); 256 } catch (Exception e) { 257 transportExceptionHandler.onException(e); 258 } 259 } 260 doRun()261 public abstract void doRun() throws IOException; 262 } 263 264 @Override maxDataLength()265 public int maxDataLength() { 266 return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */ 267 : frameWriter.maxDataLength(); 268 } 269 270 /** A class that handles transport exception. */ 271 interface TransportExceptionHandler { 272 273 /** Handles exception. */ onException(Throwable throwable)274 void onException(Throwable throwable); 275 } 276 } 277