• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Preconditions;
21 import com.google.errorprone.annotations.CanIgnoreReturnValue;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelPromise;
25 import io.perfmark.Link;
26 import io.perfmark.PerfMark;
27 import io.perfmark.TaskCloseable;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 
32 /**
33  * A queue of pending writes to a {@link Channel} that is flushed as a single unit.
34  */
35 class WriteQueue {
36 
37   // Dequeue in chunks, so we don't have to acquire the queue's log too often.
38   @VisibleForTesting
39   static final int DEQUE_CHUNK_SIZE = 128;
40 
41   /**
42    * {@link Runnable} used to schedule work onto the tail of the event loop.
43    */
44   private final Runnable later = new Runnable() {
45     @Override
46     public void run() {
47       flush();
48     }
49   };
50 
51   private final Channel channel;
52   private final Queue<QueuedCommand> queue;
53   private final AtomicBoolean scheduled = new AtomicBoolean();
54 
WriteQueue(Channel channel)55   public WriteQueue(Channel channel) {
56     this.channel = Preconditions.checkNotNull(channel, "channel");
57     queue = new ConcurrentLinkedQueue<>();
58   }
59 
60   /**
61    * Schedule a flush on the channel.
62    */
scheduleFlush()63   void scheduleFlush() {
64     if (scheduled.compareAndSet(false, true)) {
65       // Add the queue to the tail of the event loop so writes will be executed immediately
66       // inside the event loop. Note DO NOT do channel.write outside the event loop as
67       // it will not wake up immediately without a flush.
68       channel.eventLoop().execute(later);
69     }
70   }
71 
72   /**
73    * Enqueue a write command on the channel.
74    *
75    * @param command a write to be executed on the channel.
76    * @param flush true if a flush of the write should be schedule, false if a later call to
77    *              enqueue will schedule the flush.
78    */
79   @CanIgnoreReturnValue
enqueue(QueuedCommand command, boolean flush)80   ChannelFuture enqueue(QueuedCommand command, boolean flush) {
81     // Detect erroneous code that tries to reuse command objects.
82     Preconditions.checkArgument(command.promise() == null, "promise must not be set on command");
83 
84     ChannelPromise promise = channel.newPromise();
85     command.promise(promise);
86     queue.add(command);
87     if (flush) {
88       scheduleFlush();
89     }
90     return promise;
91   }
92 
93   /**
94    * Enqueue the runnable. It is not safe for another thread to queue an Runnable directly to the
95    * event loop, because it will be out-of-order with writes. This method allows the Runnable to be
96    * processed in-order with writes.
97    */
enqueue(Runnable runnable, boolean flush)98   void enqueue(Runnable runnable, boolean flush) {
99     queue.add(new RunnableCommand(runnable));
100     if (flush) {
101       scheduleFlush();
102     }
103   }
104 
105   /**
106    * Executes enqueued work directly on the current thread. This can be used to trigger writes
107    * before performing additional reads. Must be called from the event loop. This method makes no
108    * guarantee that the work queue is empty when it returns.
109    */
drainNow()110   void drainNow() {
111     Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
112     if (queue.peek() == null) {
113       return;
114     }
115     flush();
116   }
117 
118   /**
119    * Process the queue of commands and dispatch them to the stream. This method is only
120    * called in the event loop
121    */
flush()122   private void flush() {
123     try (TaskCloseable ignore = PerfMark.traceTask("WriteQueue.periodicFlush")) {
124       QueuedCommand cmd;
125       int i = 0;
126       boolean flushedOnce = false;
127       while ((cmd = queue.poll()) != null) {
128         cmd.run(channel);
129         if (++i == DEQUE_CHUNK_SIZE) {
130           i = 0;
131           // Flush each chunk so we are releasing buffers periodically. In theory this loop
132           // might never end as new events are continuously added to the queue, if we never
133           // flushed in that case we would be guaranteed to OOM.
134           try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush0")) {
135             channel.flush();
136           }
137           flushedOnce = true;
138         }
139       }
140       // Must flush at least once, even if there were no writes.
141       if (i != 0 || !flushedOnce) {
142         try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush1")) {
143           channel.flush();
144         }
145       }
146     } finally {
147       // Mark the write as done, if the queue is non-empty after marking trigger a new write.
148       scheduled.set(false);
149       if (!queue.isEmpty()) {
150         scheduleFlush();
151       }
152     }
153   }
154 
155   private static class RunnableCommand implements QueuedCommand {
156     private final Runnable runnable;
157     private final Link link;
158 
RunnableCommand(Runnable runnable)159     public RunnableCommand(Runnable runnable) {
160       this.link = PerfMark.linkOut();
161       this.runnable = runnable;
162     }
163 
164     @Override
promise(ChannelPromise promise)165     public final void promise(ChannelPromise promise) {
166       throw new UnsupportedOperationException();
167     }
168 
169     @Override
promise()170     public final ChannelPromise promise() {
171       throw new UnsupportedOperationException();
172     }
173 
174     @Override
run(Channel channel)175     public final void run(Channel channel) {
176       runnable.run();
177     }
178 
179     @Override
getLink()180     public Link getLink() {
181       return link;
182     }
183   }
184 
185   abstract static class AbstractQueuedCommand implements QueuedCommand {
186 
187     private ChannelPromise promise;
188     private final Link link;
189 
AbstractQueuedCommand()190     AbstractQueuedCommand() {
191       this.link = PerfMark.linkOut();
192     }
193 
194     @Override
promise(ChannelPromise promise)195     public final void promise(ChannelPromise promise) {
196       this.promise = promise;
197     }
198 
199     @Override
promise()200     public final ChannelPromise promise() {
201       return promise;
202     }
203 
204     @Override
run(Channel channel)205     public final void run(Channel channel) {
206       channel.write(this, promise);
207     }
208 
209     @Override
getLink()210     public Link getLink() {
211       return link;
212     }
213   }
214 
215   /**
216    * Simple wrapper type around a command and its optional completion listener.
217    */
218   interface QueuedCommand {
219     /**
220      * Returns the promise beeing notified of the success/failure of the write.
221      */
promise()222     ChannelPromise promise();
223 
224     /**
225      * Sets the promise.
226      */
promise(ChannelPromise promise)227     void promise(ChannelPromise promise);
228 
run(Channel channel)229     void run(Channel channel);
230 
getLink()231     Link getLink();
232   }
233 }
234