• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import io.grpc.internal.SerializingExecutor;
23 import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
24 import io.grpc.okhttp.internal.framed.ErrorCode;
25 import io.grpc.okhttp.internal.framed.FrameWriter;
26 import io.grpc.okhttp.internal.framed.Settings;
27 import io.perfmark.Link;
28 import io.perfmark.PerfMark;
29 import io.perfmark.TaskCloseable;
30 import java.io.IOException;
31 import java.net.Socket;
32 import javax.annotation.Nullable;
33 import javax.annotation.concurrent.GuardedBy;
34 import okio.Buffer;
35 import okio.Sink;
36 import okio.Timeout;
37 
38 /**
39  * A sink that asynchronously write / flushes a buffer internally. AsyncSink provides flush
40  * coalescing to minimize network packing transmit. Because I/O is handled asynchronously, most I/O
41  * exceptions will be delivered via a callback.
42  */
43 final class AsyncSink implements Sink {
44 
45   private final Object lock = new Object();
46   private final Buffer buffer = new Buffer();
47   private final SerializingExecutor serializingExecutor;
48   private final TransportExceptionHandler transportExceptionHandler;
49   private final int maxQueuedControlFrames;
50 
51   @GuardedBy("lock")
52   private boolean writeEnqueued = false;
53   @GuardedBy("lock")
54   private boolean flushEnqueued = false;
55   private boolean closed = false;
56   @Nullable
57   private Sink sink;
58   @Nullable
59   private Socket socket;
60   private boolean controlFramesExceeded;
61   private int controlFramesInWrite;
62   @GuardedBy("lock")
63   private int queuedControlFrames;
64 
AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler, int maxQueuedControlFrames)65   private AsyncSink(SerializingExecutor executor, TransportExceptionHandler exceptionHandler,
66       int maxQueuedControlFrames) {
67     this.serializingExecutor = checkNotNull(executor, "executor");
68     this.transportExceptionHandler = checkNotNull(exceptionHandler, "exceptionHandler");
69     this.maxQueuedControlFrames = maxQueuedControlFrames;
70   }
71 
72   /**
73    * {@code maxQueuedControlFrames} is only effective for frames written with
74    * {@link #limitControlFramesWriter(FrameWriter)}.
75    */
sink( SerializingExecutor executor, TransportExceptionHandler exceptionHandler, int maxQueuedControlFrames)76   static AsyncSink sink(
77       SerializingExecutor executor, TransportExceptionHandler exceptionHandler,
78       int maxQueuedControlFrames) {
79     return new AsyncSink(executor, exceptionHandler, maxQueuedControlFrames);
80   }
81 
82   /**
83    * Sets the actual sink. It is allowed to call write / flush operations on the sink iff calling
84    * this method is scheduled in the executor. The socket is needed for closing.
85    *
86    * <p>should only be called once by thread of executor.
87    */
becomeConnected(Sink sink, Socket socket)88   void becomeConnected(Sink sink, Socket socket) {
89     checkState(this.sink == null, "AsyncSink's becomeConnected should only be called once.");
90     this.sink = checkNotNull(sink, "sink");
91     this.socket = checkNotNull(socket, "socket");
92   }
93 
limitControlFramesWriter(FrameWriter delegate)94   FrameWriter limitControlFramesWriter(FrameWriter delegate) {
95     return new LimitControlFramesWriter(delegate);
96   }
97 
98   @Override
write(Buffer source, long byteCount)99   public void write(Buffer source, long byteCount) throws IOException {
100     checkNotNull(source, "source");
101     if (closed) {
102       throw new IOException("closed");
103     }
104     try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.write")) {
105       boolean closeSocket = false;
106       synchronized (lock) {
107         buffer.write(source, byteCount);
108 
109         queuedControlFrames += controlFramesInWrite;
110         controlFramesInWrite = 0;
111         if (!controlFramesExceeded && queuedControlFrames > maxQueuedControlFrames) {
112           controlFramesExceeded = true;
113           closeSocket = true;
114         } else {
115           if (writeEnqueued || flushEnqueued || buffer.completeSegmentByteCount() <= 0) {
116             return;
117           }
118           writeEnqueued = true;
119         }
120       }
121       if (closeSocket) {
122         try {
123           socket.close();
124         } catch (IOException e) {
125           transportExceptionHandler.onException(e);
126         }
127         return;
128       }
129       serializingExecutor.execute(new WriteRunnable() {
130         final Link link = PerfMark.linkOut();
131         @Override
132         public void doRun() throws IOException {
133           Buffer buf = new Buffer();
134           try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runWrite")) {
135             PerfMark.linkIn(link);
136             int writingControlFrames;
137             synchronized (lock) {
138               buf.write(buffer, buffer.completeSegmentByteCount());
139               writeEnqueued = false;
140               // Imprecise because we only tranfer complete segments, but not by much and error
141               // won't accumulate over time
142               writingControlFrames = queuedControlFrames;
143             }
144             sink.write(buf, buf.size());
145             synchronized (lock) {
146               queuedControlFrames -= writingControlFrames;
147             }
148           }
149         }
150       });
151     }
152   }
153 
154   @Override
flush()155   public void flush() throws IOException {
156     if (closed) {
157       throw new IOException("closed");
158     }
159     try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.flush")) {
160       synchronized (lock) {
161         if (flushEnqueued) {
162           return;
163         }
164         flushEnqueued = true;
165       }
166       serializingExecutor.execute(new WriteRunnable() {
167         final Link link = PerfMark.linkOut();
168         @Override
169         public void doRun() throws IOException {
170           Buffer buf = new Buffer();
171           try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runFlush")) {
172             PerfMark.linkIn(link);
173             synchronized (lock) {
174               buf.write(buffer, buffer.size());
175               flushEnqueued = false;
176             }
177             sink.write(buf, buf.size());
178             sink.flush();
179           }
180         }
181       });
182     }
183   }
184 
185   @Override
timeout()186   public Timeout timeout() {
187     return Timeout.NONE;
188   }
189 
190   @Override
close()191   public void close() {
192     if (closed) {
193       return;
194     }
195     closed = true;
196     serializingExecutor.execute(new Runnable() {
197       @Override
198       public void run() {
199         try {
200           if (sink != null && buffer.size() > 0) {
201             sink.write(buffer, buffer.size());
202           }
203         } catch (IOException e) {
204           transportExceptionHandler.onException(e);
205         }
206         buffer.close();
207         try {
208           if (sink != null) {
209             sink.close();
210           }
211         } catch (IOException e) {
212           transportExceptionHandler.onException(e);
213         }
214         try {
215           if (socket != null) {
216             socket.close();
217           }
218         } catch (IOException e) {
219           transportExceptionHandler.onException(e);
220         }
221       }
222     });
223   }
224 
225   private abstract class WriteRunnable implements Runnable {
226     @Override
run()227     public final void run() {
228       try {
229         if (sink == null) {
230           throw new IOException("Unable to perform write due to unavailable sink.");
231         }
232         doRun();
233       } catch (Exception e) {
234         transportExceptionHandler.onException(e);
235       }
236     }
237 
doRun()238     public abstract void doRun() throws IOException;
239   }
240 
241   private class LimitControlFramesWriter extends ForwardingFrameWriter {
LimitControlFramesWriter(FrameWriter delegate)242     public LimitControlFramesWriter(FrameWriter delegate) {
243       super(delegate);
244     }
245 
246     @Override
ackSettings(Settings peerSettings)247     public void ackSettings(Settings peerSettings) throws IOException {
248       controlFramesInWrite++;
249       super.ackSettings(peerSettings);
250     }
251 
252     @Override
rstStream(int streamId, ErrorCode errorCode)253     public void rstStream(int streamId, ErrorCode errorCode) throws IOException {
254       controlFramesInWrite++;
255       super.rstStream(streamId, errorCode);
256     }
257 
258     @Override
ping(boolean ack, int payload1, int payload2)259     public void ping(boolean ack, int payload1, int payload2) throws IOException {
260       if (ack) {
261         controlFramesInWrite++;
262       }
263       super.ping(ack, payload1, payload2);
264     }
265   }
266 }
267