• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Preconditions;
21 import com.google.errorprone.annotations.CanIgnoreReturnValue;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelPromise;
25 import java.util.Queue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 
29 /**
30  * A queue of pending writes to a {@link Channel} that is flushed as a single unit.
31  */
32 class WriteQueue {
33 
34   // Dequeue in chunks, so we don't have to acquire the queue's log too often.
35   @VisibleForTesting
36   static final int DEQUE_CHUNK_SIZE = 128;
37 
38   /**
39    * {@link Runnable} used to schedule work onto the tail of the event loop.
40    */
41   private final Runnable later = new Runnable() {
42     @Override
43     public void run() {
44       flush();
45     }
46   };
47 
48   private final Channel channel;
49   private final Queue<QueuedCommand> queue;
50   private final AtomicBoolean scheduled = new AtomicBoolean();
51 
WriteQueue(Channel channel)52   public WriteQueue(Channel channel) {
53     this.channel = Preconditions.checkNotNull(channel, "channel");
54     queue = new ConcurrentLinkedQueue<QueuedCommand>();
55   }
56 
57   /**
58    * Schedule a flush on the channel.
59    */
scheduleFlush()60   void scheduleFlush() {
61     if (scheduled.compareAndSet(false, true)) {
62       // Add the queue to the tail of the event loop so writes will be executed immediately
63       // inside the event loop. Note DO NOT do channel.write outside the event loop as
64       // it will not wake up immediately without a flush.
65       channel.eventLoop().execute(later);
66     }
67   }
68 
69   /**
70    * Enqueue a write command on the channel.
71    *
72    * @param command a write to be executed on the channel.
73    * @param flush true if a flush of the write should be schedule, false if a later call to
74    *              enqueue will schedule the flush.
75    */
76   @CanIgnoreReturnValue
enqueue(QueuedCommand command, boolean flush)77   ChannelFuture enqueue(QueuedCommand command, boolean flush) {
78     // Detect erroneous code that tries to reuse command objects.
79     Preconditions.checkArgument(command.promise() == null, "promise must not be set on command");
80 
81     ChannelPromise promise = channel.newPromise();
82     command.promise(promise);
83     queue.add(command);
84     if (flush) {
85       scheduleFlush();
86     }
87     return promise;
88   }
89 
90   /**
91    * Enqueue the runnable. It is not safe for another thread to queue an Runnable directly to the
92    * event loop, because it will be out-of-order with writes. This method allows the Runnable to be
93    * processed in-order with writes.
94    */
enqueue(Runnable runnable, boolean flush)95   void enqueue(Runnable runnable, boolean flush) {
96     queue.add(new RunnableCommand(runnable));
97     if (flush) {
98       scheduleFlush();
99     }
100   }
101 
102   /**
103    * Process the queue of commands and dispatch them to the stream. This method is only
104    * called in the event loop
105    */
flush()106   private void flush() {
107     try {
108       QueuedCommand cmd;
109       int i = 0;
110       boolean flushedOnce = false;
111       while ((cmd = queue.poll()) != null) {
112         cmd.run(channel);
113         if (++i == DEQUE_CHUNK_SIZE) {
114           i = 0;
115           // Flush each chunk so we are releasing buffers periodically. In theory this loop
116           // might never end as new events are continuously added to the queue, if we never
117           // flushed in that case we would be guaranteed to OOM.
118           channel.flush();
119           flushedOnce = true;
120         }
121       }
122       // Must flush at least once, even if there were no writes.
123       if (i != 0 || !flushedOnce) {
124         channel.flush();
125       }
126     } finally {
127       // Mark the write as done, if the queue is non-empty after marking trigger a new write.
128       scheduled.set(false);
129       if (!queue.isEmpty()) {
130         scheduleFlush();
131       }
132     }
133   }
134 
135   private static class RunnableCommand implements QueuedCommand {
136     private final Runnable runnable;
137 
RunnableCommand(Runnable runnable)138     public RunnableCommand(Runnable runnable) {
139       this.runnable = runnable;
140     }
141 
142     @Override
promise(ChannelPromise promise)143     public final void promise(ChannelPromise promise) {
144       throw new UnsupportedOperationException();
145     }
146 
147     @Override
promise()148     public final ChannelPromise promise() {
149       throw new UnsupportedOperationException();
150     }
151 
152     @Override
run(Channel channel)153     public final void run(Channel channel) {
154       runnable.run();
155     }
156   }
157 
158   abstract static class AbstractQueuedCommand implements QueuedCommand {
159 
160     private ChannelPromise promise;
161 
162     @Override
promise(ChannelPromise promise)163     public final void promise(ChannelPromise promise) {
164       this.promise = promise;
165     }
166 
167     @Override
promise()168     public final ChannelPromise promise() {
169       return promise;
170     }
171 
172     @Override
run(Channel channel)173     public final void run(Channel channel) {
174       channel.write(this, promise);
175     }
176   }
177 
178   /**
179    * Simple wrapper type around a command and its optional completion listener.
180    */
181   interface QueuedCommand {
182     /**
183      * Returns the promise beeing notified of the success/failure of the write.
184      */
promise()185     ChannelPromise promise();
186 
187     /**
188      * Sets the promise.
189      */
promise(ChannelPromise promise)190     void promise(ChannelPromise promise);
191 
run(Channel channel)192     void run(Channel channel);
193   }
194 }
195