1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Preconditions; 21 import com.google.errorprone.annotations.CanIgnoreReturnValue; 22 import io.netty.channel.Channel; 23 import io.netty.channel.ChannelFuture; 24 import io.netty.channel.ChannelPromise; 25 import java.util.Queue; 26 import java.util.concurrent.ConcurrentLinkedQueue; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 29 /** 30 * A queue of pending writes to a {@link Channel} that is flushed as a single unit. 31 */ 32 class WriteQueue { 33 34 // Dequeue in chunks, so we don't have to acquire the queue's log too often. 35 @VisibleForTesting 36 static final int DEQUE_CHUNK_SIZE = 128; 37 38 /** 39 * {@link Runnable} used to schedule work onto the tail of the event loop. 40 */ 41 private final Runnable later = new Runnable() { 42 @Override 43 public void run() { 44 flush(); 45 } 46 }; 47 48 private final Channel channel; 49 private final Queue<QueuedCommand> queue; 50 private final AtomicBoolean scheduled = new AtomicBoolean(); 51 WriteQueue(Channel channel)52 public WriteQueue(Channel channel) { 53 this.channel = Preconditions.checkNotNull(channel, "channel"); 54 queue = new ConcurrentLinkedQueue<QueuedCommand>(); 55 } 56 57 /** 58 * Schedule a flush on the channel. 59 */ scheduleFlush()60 void scheduleFlush() { 61 if (scheduled.compareAndSet(false, true)) { 62 // Add the queue to the tail of the event loop so writes will be executed immediately 63 // inside the event loop. Note DO NOT do channel.write outside the event loop as 64 // it will not wake up immediately without a flush. 65 channel.eventLoop().execute(later); 66 } 67 } 68 69 /** 70 * Enqueue a write command on the channel. 71 * 72 * @param command a write to be executed on the channel. 73 * @param flush true if a flush of the write should be schedule, false if a later call to 74 * enqueue will schedule the flush. 75 */ 76 @CanIgnoreReturnValue enqueue(QueuedCommand command, boolean flush)77 ChannelFuture enqueue(QueuedCommand command, boolean flush) { 78 // Detect erroneous code that tries to reuse command objects. 79 Preconditions.checkArgument(command.promise() == null, "promise must not be set on command"); 80 81 ChannelPromise promise = channel.newPromise(); 82 command.promise(promise); 83 queue.add(command); 84 if (flush) { 85 scheduleFlush(); 86 } 87 return promise; 88 } 89 90 /** 91 * Enqueue the runnable. It is not safe for another thread to queue an Runnable directly to the 92 * event loop, because it will be out-of-order with writes. This method allows the Runnable to be 93 * processed in-order with writes. 94 */ enqueue(Runnable runnable, boolean flush)95 void enqueue(Runnable runnable, boolean flush) { 96 queue.add(new RunnableCommand(runnable)); 97 if (flush) { 98 scheduleFlush(); 99 } 100 } 101 102 /** 103 * Process the queue of commands and dispatch them to the stream. This method is only 104 * called in the event loop 105 */ flush()106 private void flush() { 107 try { 108 QueuedCommand cmd; 109 int i = 0; 110 boolean flushedOnce = false; 111 while ((cmd = queue.poll()) != null) { 112 cmd.run(channel); 113 if (++i == DEQUE_CHUNK_SIZE) { 114 i = 0; 115 // Flush each chunk so we are releasing buffers periodically. In theory this loop 116 // might never end as new events are continuously added to the queue, if we never 117 // flushed in that case we would be guaranteed to OOM. 118 channel.flush(); 119 flushedOnce = true; 120 } 121 } 122 // Must flush at least once, even if there were no writes. 123 if (i != 0 || !flushedOnce) { 124 channel.flush(); 125 } 126 } finally { 127 // Mark the write as done, if the queue is non-empty after marking trigger a new write. 128 scheduled.set(false); 129 if (!queue.isEmpty()) { 130 scheduleFlush(); 131 } 132 } 133 } 134 135 private static class RunnableCommand implements QueuedCommand { 136 private final Runnable runnable; 137 RunnableCommand(Runnable runnable)138 public RunnableCommand(Runnable runnable) { 139 this.runnable = runnable; 140 } 141 142 @Override promise(ChannelPromise promise)143 public final void promise(ChannelPromise promise) { 144 throw new UnsupportedOperationException(); 145 } 146 147 @Override promise()148 public final ChannelPromise promise() { 149 throw new UnsupportedOperationException(); 150 } 151 152 @Override run(Channel channel)153 public final void run(Channel channel) { 154 runnable.run(); 155 } 156 } 157 158 abstract static class AbstractQueuedCommand implements QueuedCommand { 159 160 private ChannelPromise promise; 161 162 @Override promise(ChannelPromise promise)163 public final void promise(ChannelPromise promise) { 164 this.promise = promise; 165 } 166 167 @Override promise()168 public final ChannelPromise promise() { 169 return promise; 170 } 171 172 @Override run(Channel channel)173 public final void run(Channel channel) { 174 channel.write(this, promise); 175 } 176 } 177 178 /** 179 * Simple wrapper type around a command and its optional completion listener. 180 */ 181 interface QueuedCommand { 182 /** 183 * Returns the promise beeing notified of the success/failure of the write. 184 */ promise()185 ChannelPromise promise(); 186 187 /** 188 * Sets the promise. 189 */ promise(ChannelPromise promise)190 void promise(ChannelPromise promise); 191 run(Channel channel)192 void run(Channel channel); 193 } 194 } 195