1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Preconditions; 21 import com.google.errorprone.annotations.CanIgnoreReturnValue; 22 import io.netty.channel.Channel; 23 import io.netty.channel.ChannelFuture; 24 import io.netty.channel.ChannelPromise; 25 import io.perfmark.Link; 26 import io.perfmark.PerfMark; 27 import io.perfmark.TaskCloseable; 28 import java.util.Queue; 29 import java.util.concurrent.ConcurrentLinkedQueue; 30 import java.util.concurrent.atomic.AtomicBoolean; 31 32 /** 33 * A queue of pending writes to a {@link Channel} that is flushed as a single unit. 34 */ 35 class WriteQueue { 36 37 // Dequeue in chunks, so we don't have to acquire the queue's log too often. 38 @VisibleForTesting 39 static final int DEQUE_CHUNK_SIZE = 128; 40 41 /** 42 * {@link Runnable} used to schedule work onto the tail of the event loop. 43 */ 44 private final Runnable later = new Runnable() { 45 @Override 46 public void run() { 47 flush(); 48 } 49 }; 50 51 private final Channel channel; 52 private final Queue<QueuedCommand> queue; 53 private final AtomicBoolean scheduled = new AtomicBoolean(); 54 WriteQueue(Channel channel)55 public WriteQueue(Channel channel) { 56 this.channel = Preconditions.checkNotNull(channel, "channel"); 57 queue = new ConcurrentLinkedQueue<>(); 58 } 59 60 /** 61 * Schedule a flush on the channel. 62 */ scheduleFlush()63 void scheduleFlush() { 64 if (scheduled.compareAndSet(false, true)) { 65 // Add the queue to the tail of the event loop so writes will be executed immediately 66 // inside the event loop. Note DO NOT do channel.write outside the event loop as 67 // it will not wake up immediately without a flush. 68 channel.eventLoop().execute(later); 69 } 70 } 71 72 /** 73 * Enqueue a write command on the channel. 74 * 75 * @param command a write to be executed on the channel. 76 * @param flush true if a flush of the write should be schedule, false if a later call to 77 * enqueue will schedule the flush. 78 */ 79 @CanIgnoreReturnValue enqueue(QueuedCommand command, boolean flush)80 ChannelFuture enqueue(QueuedCommand command, boolean flush) { 81 // Detect erroneous code that tries to reuse command objects. 82 Preconditions.checkArgument(command.promise() == null, "promise must not be set on command"); 83 84 ChannelPromise promise = channel.newPromise(); 85 command.promise(promise); 86 queue.add(command); 87 if (flush) { 88 scheduleFlush(); 89 } 90 return promise; 91 } 92 93 /** 94 * Enqueue the runnable. It is not safe for another thread to queue an Runnable directly to the 95 * event loop, because it will be out-of-order with writes. This method allows the Runnable to be 96 * processed in-order with writes. 97 */ enqueue(Runnable runnable, boolean flush)98 void enqueue(Runnable runnable, boolean flush) { 99 queue.add(new RunnableCommand(runnable)); 100 if (flush) { 101 scheduleFlush(); 102 } 103 } 104 105 /** 106 * Executes enqueued work directly on the current thread. This can be used to trigger writes 107 * before performing additional reads. Must be called from the event loop. This method makes no 108 * guarantee that the work queue is empty when it returns. 109 */ drainNow()110 void drainNow() { 111 Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop"); 112 if (queue.peek() == null) { 113 return; 114 } 115 flush(); 116 } 117 118 /** 119 * Process the queue of commands and dispatch them to the stream. This method is only 120 * called in the event loop 121 */ flush()122 private void flush() { 123 try (TaskCloseable ignore = PerfMark.traceTask("WriteQueue.periodicFlush")) { 124 QueuedCommand cmd; 125 int i = 0; 126 boolean flushedOnce = false; 127 while ((cmd = queue.poll()) != null) { 128 cmd.run(channel); 129 if (++i == DEQUE_CHUNK_SIZE) { 130 i = 0; 131 // Flush each chunk so we are releasing buffers periodically. In theory this loop 132 // might never end as new events are continuously added to the queue, if we never 133 // flushed in that case we would be guaranteed to OOM. 134 try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush0")) { 135 channel.flush(); 136 } 137 flushedOnce = true; 138 } 139 } 140 // Must flush at least once, even if there were no writes. 141 if (i != 0 || !flushedOnce) { 142 try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush1")) { 143 channel.flush(); 144 } 145 } 146 } finally { 147 // Mark the write as done, if the queue is non-empty after marking trigger a new write. 148 scheduled.set(false); 149 if (!queue.isEmpty()) { 150 scheduleFlush(); 151 } 152 } 153 } 154 155 private static class RunnableCommand implements QueuedCommand { 156 private final Runnable runnable; 157 private final Link link; 158 RunnableCommand(Runnable runnable)159 public RunnableCommand(Runnable runnable) { 160 this.link = PerfMark.linkOut(); 161 this.runnable = runnable; 162 } 163 164 @Override promise(ChannelPromise promise)165 public final void promise(ChannelPromise promise) { 166 throw new UnsupportedOperationException(); 167 } 168 169 @Override promise()170 public final ChannelPromise promise() { 171 throw new UnsupportedOperationException(); 172 } 173 174 @Override run(Channel channel)175 public final void run(Channel channel) { 176 runnable.run(); 177 } 178 179 @Override getLink()180 public Link getLink() { 181 return link; 182 } 183 } 184 185 abstract static class AbstractQueuedCommand implements QueuedCommand { 186 187 private ChannelPromise promise; 188 private final Link link; 189 AbstractQueuedCommand()190 AbstractQueuedCommand() { 191 this.link = PerfMark.linkOut(); 192 } 193 194 @Override promise(ChannelPromise promise)195 public final void promise(ChannelPromise promise) { 196 this.promise = promise; 197 } 198 199 @Override promise()200 public final ChannelPromise promise() { 201 return promise; 202 } 203 204 @Override run(Channel channel)205 public final void run(Channel channel) { 206 channel.write(this, promise); 207 } 208 209 @Override getLink()210 public Link getLink() { 211 return link; 212 } 213 } 214 215 /** 216 * Simple wrapper type around a command and its optional completion listener. 217 */ 218 interface QueuedCommand { 219 /** 220 * Returns the promise beeing notified of the success/failure of the write. 221 */ promise()222 ChannelPromise promise(); 223 224 /** 225 * Sets the promise. 226 */ promise(ChannelPromise promise)227 void promise(ChannelPromise promise); 228 run(Channel channel)229 void run(Channel channel); 230 getLink()231 Link getLink(); 232 } 233 } 234