1 /* 2 * Copyright 2018 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import io.grpc.internal.SerializingExecutor; 23 import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler; 24 import io.grpc.okhttp.internal.framed.ErrorCode; 25 import io.grpc.okhttp.internal.framed.FrameWriter; 26 import io.grpc.okhttp.internal.framed.Settings; 27 import io.perfmark.Link; 28 import io.perfmark.PerfMark; 29 import io.perfmark.TaskCloseable; 30 import java.io.IOException; 31 import java.net.Socket; 32 import javax.annotation.Nullable; 33 import javax.annotation.concurrent.GuardedBy; 34 import okio.Buffer; 35 import okio.Sink; 36 import okio.Timeout; 37 38 /** 39 * A sink that asynchronously write / flushes a buffer internally. AsyncSink provides flush 40 * coalescing to minimize network packing transmit. Because I/O is handled asynchronously, most I/O 41 * exceptions will be delivered via a callback. 42 */ 43 final class AsyncSink implements Sink { 44 45 private final Object lock = new Object(); 46 private final Buffer buffer = new Buffer(); 47 private final SerializingExecutor serializingExecutor; 48 private final TransportExceptionHandler transportExceptionHandler; 49 private final int maxQueuedControlFrames; 50 51 @GuardedBy("lock") 52 private boolean writeEnqueued = false; 53 @GuardedBy("lock") 54 private boolean flushEnqueued = false; 55 private boolean closed = false; 56 @Nullable 57 private Sink sink; 58 @Nullable 59 private Socket socket; 60 private boolean controlFramesExceeded; 61 private int controlFramesInWrite; 62 @GuardedBy("lock") 63 private int queuedControlFrames; 64 AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler, int maxQueuedControlFrames)65 private AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler, 66 int maxQueuedControlFrames) { 67 this.serializingExecutor = checkNotNull(executor, "executor"); 68 this.transportExceptionHandler = checkNotNull(exceptionHandler, "exceptionHandler"); 69 this.maxQueuedControlFrames = maxQueuedControlFrames; 70 } 71 72 /** 73 * {@code maxQueuedControlFrames} is only effective for frames written with 74 * {@link #limitControlFramesWriter(FrameWriter)}. 75 */ sink( SerializingExecutor executor, TransportExceptionHandler exceptionHandler, int maxQueuedControlFrames)76 static AsyncSink sink( 77 SerializingExecutor executor, TransportExceptionHandler exceptionHandler, 78 int maxQueuedControlFrames) { 79 return new AsyncSink(executor, exceptionHandler, maxQueuedControlFrames); 80 } 81 82 /** 83 * Sets the actual sink. It is allowed to call write / flush operations on the sink iff calling 84 * this method is scheduled in the executor. The socket is needed for closing. 85 * 86 * <p>should only be called once by thread of executor. 87 */ becomeConnected(Sink sink, Socket socket)88 void becomeConnected(Sink sink, Socket socket) { 89 checkState(this.sink == null, "AsyncSink's becomeConnected should only be called once."); 90 this.sink = checkNotNull(sink, "sink"); 91 this.socket = checkNotNull(socket, "socket"); 92 } 93 limitControlFramesWriter(FrameWriter delegate)94 FrameWriter limitControlFramesWriter(FrameWriter delegate) { 95 return new LimitControlFramesWriter(delegate); 96 } 97 98 @Override write(Buffer source, long byteCount)99 public void write(Buffer source, long byteCount) throws IOException { 100 checkNotNull(source, "source"); 101 if (closed) { 102 throw new IOException("closed"); 103 } 104 try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.write")) { 105 boolean closeSocket = false; 106 synchronized (lock) { 107 buffer.write(source, byteCount); 108 109 queuedControlFrames += controlFramesInWrite; 110 controlFramesInWrite = 0; 111 if (!controlFramesExceeded && queuedControlFrames > maxQueuedControlFrames) { 112 controlFramesExceeded = true; 113 closeSocket = true; 114 } else { 115 if (writeEnqueued || flushEnqueued || buffer.completeSegmentByteCount() <= 0) { 116 return; 117 } 118 writeEnqueued = true; 119 } 120 } 121 if (closeSocket) { 122 try { 123 socket.close(); 124 } catch (IOException e) { 125 transportExceptionHandler.onException(e); 126 } 127 return; 128 } 129 serializingExecutor.execute(new WriteRunnable() { 130 final Link link = PerfMark.linkOut(); 131 @Override 132 public void doRun() throws IOException { 133 Buffer buf = new Buffer(); 134 try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runWrite")) { 135 PerfMark.linkIn(link); 136 int writingControlFrames; 137 synchronized (lock) { 138 buf.write(buffer, buffer.completeSegmentByteCount()); 139 writeEnqueued = false; 140 // Imprecise because we only tranfer complete segments, but not by much and error 141 // won't accumulate over time 142 writingControlFrames = queuedControlFrames; 143 } 144 sink.write(buf, buf.size()); 145 synchronized (lock) { 146 queuedControlFrames -= writingControlFrames; 147 } 148 } 149 } 150 }); 151 } 152 } 153 154 @Override flush()155 public void flush() throws IOException { 156 if (closed) { 157 throw new IOException("closed"); 158 } 159 try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.flush")) { 160 synchronized (lock) { 161 if (flushEnqueued) { 162 return; 163 } 164 flushEnqueued = true; 165 } 166 serializingExecutor.execute(new WriteRunnable() { 167 final Link link = PerfMark.linkOut(); 168 @Override 169 public void doRun() throws IOException { 170 Buffer buf = new Buffer(); 171 try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runFlush")) { 172 PerfMark.linkIn(link); 173 synchronized (lock) { 174 buf.write(buffer, buffer.size()); 175 flushEnqueued = false; 176 } 177 sink.write(buf, buf.size()); 178 sink.flush(); 179 } 180 } 181 }); 182 } 183 } 184 185 @Override timeout()186 public Timeout timeout() { 187 return Timeout.NONE; 188 } 189 190 @Override close()191 public void close() { 192 if (closed) { 193 return; 194 } 195 closed = true; 196 serializingExecutor.execute(new Runnable() { 197 @Override 198 public void run() { 199 try { 200 if (sink != null && buffer.size() > 0) { 201 sink.write(buffer, buffer.size()); 202 } 203 } catch (IOException e) { 204 transportExceptionHandler.onException(e); 205 } 206 buffer.close(); 207 try { 208 if (sink != null) { 209 sink.close(); 210 } 211 } catch (IOException e) { 212 transportExceptionHandler.onException(e); 213 } 214 try { 215 if (socket != null) { 216 socket.close(); 217 } 218 } catch (IOException e) { 219 transportExceptionHandler.onException(e); 220 } 221 } 222 }); 223 } 224 225 private abstract class WriteRunnable implements Runnable { 226 @Override run()227 public final void run() { 228 try { 229 if (sink == null) { 230 throw new IOException("Unable to perform write due to unavailable sink."); 231 } 232 doRun(); 233 } catch (Exception e) { 234 transportExceptionHandler.onException(e); 235 } 236 } 237 doRun()238 public abstract void doRun() throws IOException; 239 } 240 241 private class LimitControlFramesWriter extends ForwardingFrameWriter { LimitControlFramesWriter(FrameWriter delegate)242 public LimitControlFramesWriter(FrameWriter delegate) { 243 super(delegate); 244 } 245 246 @Override ackSettings(Settings peerSettings)247 public void ackSettings(Settings peerSettings) throws IOException { 248 controlFramesInWrite++; 249 super.ackSettings(peerSettings); 250 } 251 252 @Override rstStream(int streamId, ErrorCode errorCode)253 public void rstStream(int streamId, ErrorCode errorCode) throws IOException { 254 controlFramesInWrite++; 255 super.rstStream(streamId, errorCode); 256 } 257 258 @Override ping(boolean ack, int payload1, int payload2)259 public void ping(boolean ack, int payload1, int payload2) throws IOException { 260 if (ack) { 261 controlFramesInWrite++; 262 } 263 super.ping(ack, payload1, payload2); 264 } 265 } 266 } 267