1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import com.google.common.base.Preconditions; 20 import io.grpc.internal.SerializingExecutor; 21 import io.grpc.okhttp.internal.framed.ErrorCode; 22 import io.grpc.okhttp.internal.framed.FrameWriter; 23 import io.grpc.okhttp.internal.framed.Header; 24 import io.grpc.okhttp.internal.framed.Settings; 25 import java.io.IOException; 26 import java.net.Socket; 27 import java.util.List; 28 import java.util.concurrent.atomic.AtomicLong; 29 import java.util.logging.Level; 30 import java.util.logging.Logger; 31 import okio.Buffer; 32 33 class AsyncFrameWriter implements FrameWriter { 34 private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); 35 private FrameWriter frameWriter; 36 private Socket socket; 37 // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are 38 // just waiting on each other. 39 private final SerializingExecutor executor; 40 private final TransportExceptionHandler transportExceptionHandler; 41 private final AtomicLong flushCounter = new AtomicLong(); 42 AsyncFrameWriter( TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor)43 public AsyncFrameWriter( 44 TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) { 45 this.transportExceptionHandler = transportExceptionHandler; 46 this.executor = executor; 47 } 48 49 /** 50 * Set the real frameWriter and the corresponding underlying socket, the socket is needed for 51 * closing. 52 * 53 * <p>should only be called by thread of executor. 54 */ becomeConnected(FrameWriter frameWriter, Socket socket)55 void becomeConnected(FrameWriter frameWriter, Socket socket) { 56 Preconditions.checkState(this.frameWriter == null, 57 "AsyncFrameWriter's setFrameWriter() should only be called once."); 58 this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); 59 this.socket = Preconditions.checkNotNull(socket, "socket"); 60 } 61 62 @Override connectionPreface()63 public void connectionPreface() { 64 executor.execute(new WriteRunnable() { 65 @Override 66 public void doRun() throws IOException { 67 frameWriter.connectionPreface(); 68 } 69 }); 70 } 71 72 @Override ackSettings(final Settings peerSettings)73 public void ackSettings(final Settings peerSettings) { 74 executor.execute(new WriteRunnable() { 75 @Override 76 public void doRun() throws IOException { 77 frameWriter.ackSettings(peerSettings); 78 } 79 }); 80 } 81 82 @Override pushPromise(final int streamId, final int promisedStreamId, final List<Header> requestHeaders)83 public void pushPromise(final int streamId, final int promisedStreamId, 84 final List<Header> requestHeaders) { 85 executor.execute(new WriteRunnable() { 86 @Override 87 public void doRun() throws IOException { 88 frameWriter.pushPromise(streamId, promisedStreamId, requestHeaders); 89 } 90 }); 91 } 92 93 @Override flush()94 public void flush() { 95 // keep track of version of flushes to skip flush if another flush task is queued. 96 final long flushCount = flushCounter.incrementAndGet(); 97 98 executor.execute(new WriteRunnable() { 99 @Override 100 public void doRun() throws IOException { 101 // There can be a flush starvation if there are continuous flood of flush is queued, this 102 // is not an issue with OkHttp since it flushes if the buffer is full. 103 if (flushCounter.get() == flushCount) { 104 frameWriter.flush(); 105 } 106 } 107 }); 108 } 109 110 @Override synStream(final boolean outFinished, final boolean inFinished, final int streamId, final int associatedStreamId, final List<Header> headerBlock)111 public void synStream(final boolean outFinished, final boolean inFinished, final int streamId, 112 final int associatedStreamId, final List<Header> headerBlock) { 113 executor.execute(new WriteRunnable() { 114 @Override 115 public void doRun() throws IOException { 116 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, headerBlock); 117 } 118 }); 119 } 120 121 @Override synReply(final boolean outFinished, final int streamId, final List<Header> headerBlock)122 public void synReply(final boolean outFinished, final int streamId, 123 final List<Header> headerBlock) { 124 executor.execute(new WriteRunnable() { 125 @Override 126 public void doRun() throws IOException { 127 frameWriter.synReply(outFinished, streamId, headerBlock); 128 } 129 }); 130 } 131 132 @Override headers(final int streamId, final List<Header> headerBlock)133 public void headers(final int streamId, final List<Header> headerBlock) { 134 executor.execute(new WriteRunnable() { 135 @Override 136 public void doRun() throws IOException { 137 frameWriter.headers(streamId, headerBlock); 138 } 139 }); 140 } 141 142 @Override rstStream(final int streamId, final ErrorCode errorCode)143 public void rstStream(final int streamId, final ErrorCode errorCode) { 144 executor.execute(new WriteRunnable() { 145 @Override 146 public void doRun() throws IOException { 147 frameWriter.rstStream(streamId, errorCode); 148 } 149 }); 150 } 151 152 @Override data(final boolean outFinished, final int streamId, final Buffer source, final int byteCount)153 public void data(final boolean outFinished, final int streamId, final Buffer source, 154 final int byteCount) { 155 executor.execute(new WriteRunnable() { 156 @Override 157 public void doRun() throws IOException { 158 frameWriter.data(outFinished, streamId, source, byteCount); 159 } 160 }); 161 } 162 163 @Override settings(final Settings okHttpSettings)164 public void settings(final Settings okHttpSettings) { 165 executor.execute(new WriteRunnable() { 166 @Override 167 public void doRun() throws IOException { 168 frameWriter.settings(okHttpSettings); 169 } 170 }); 171 } 172 173 @Override ping(final boolean ack, final int payload1, final int payload2)174 public void ping(final boolean ack, final int payload1, final int payload2) { 175 executor.execute(new WriteRunnable() { 176 @Override 177 public void doRun() throws IOException { 178 frameWriter.ping(ack, payload1, payload2); 179 } 180 }); 181 } 182 183 @Override goAway(final int lastGoodStreamId, final ErrorCode errorCode, final byte[] debugData)184 public void goAway(final int lastGoodStreamId, final ErrorCode errorCode, 185 final byte[] debugData) { 186 executor.execute(new WriteRunnable() { 187 @Override 188 public void doRun() throws IOException { 189 frameWriter.goAway(lastGoodStreamId, errorCode, debugData); 190 // Flush it since after goAway, we are likely to close this writer. 191 frameWriter.flush(); 192 } 193 }); 194 } 195 196 @Override windowUpdate(final int streamId, final long windowSizeIncrement)197 public void windowUpdate(final int streamId, final long windowSizeIncrement) { 198 executor.execute(new WriteRunnable() { 199 @Override 200 public void doRun() throws IOException { 201 frameWriter.windowUpdate(streamId, windowSizeIncrement); 202 } 203 }); 204 } 205 206 @Override close()207 public void close() { 208 executor.execute(new Runnable() { 209 @Override 210 public void run() { 211 if (frameWriter != null) { 212 try { 213 frameWriter.close(); 214 socket.close(); 215 } catch (IOException e) { 216 log.log(Level.WARNING, "Failed closing connection", e); 217 } 218 } 219 } 220 }); 221 } 222 223 private abstract class WriteRunnable implements Runnable { 224 @Override run()225 public final void run() { 226 try { 227 if (frameWriter == null) { 228 throw new IOException("Unable to perform write due to unavailable frameWriter."); 229 } 230 doRun(); 231 } catch (RuntimeException e) { 232 transportExceptionHandler.onException(e); 233 } catch (Exception e) { 234 transportExceptionHandler.onException(e); 235 } 236 } 237 doRun()238 public abstract void doRun() throws IOException; 239 } 240 241 @Override maxDataLength()242 public int maxDataLength() { 243 return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */ 244 : frameWriter.maxDataLength(); 245 } 246 247 /** A class that handles transport exception. */ 248 interface TransportExceptionHandler { 249 250 /** Handles exception. */ onException(Throwable throwable)251 void onException(Throwable throwable); 252 } 253 } 254