1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.spi.AsynchronousChannelProvider; 29 import java.io.IOException; 30 import java.util.concurrent.ArrayBlockingQueue; 31 import java.util.concurrent.RejectedExecutionException; 32 import java.util.concurrent.atomic.AtomicInteger; 33 import static sun.nio.ch.EPoll.*; 34 35 /** 36 * AsynchronousChannelGroup implementation based on the Linux epoll facility. 37 */ 38 39 final class EPollPort 40 extends Port 41 { 42 // maximum number of events to poll at a time 43 private static final int MAX_EPOLL_EVENTS = 512; 44 45 // errors 46 private static final int ENOENT = 2; 47 48 // epoll file descriptor 49 private final int epfd; 50 51 // true if epoll closed 52 private boolean closed; 53 54 // socket pair used for wakeup 55 private final int sp[]; 56 57 // number of wakeups pending 58 private final AtomicInteger wakeupCount = new AtomicInteger(); 59 60 // address of the poll array passed to epoll_wait 61 private final long address; 62 63 // encapsulates an event for a channel 64 static class Event { 65 final PollableChannel channel; 66 final int events; 67 Event(PollableChannel channel, int events)68 Event(PollableChannel channel, int events) { 69 this.channel = channel; 70 this.events = events; 71 } 72 channel()73 PollableChannel channel() { return channel; } events()74 int events() { return events; } 75 } 76 77 // queue of events for cases that a polling thread dequeues more than one 78 // event 79 private final ArrayBlockingQueue<Event> queue; 80 private final Event NEED_TO_POLL = new Event(null, 0); 81 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); 82 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)83 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool) 84 throws IOException 85 { 86 super(provider, pool); 87 88 // open epoll 89 this.epfd = epollCreate(); 90 91 // create socket pair for wakeup mechanism 92 int[] sv = new int[2]; 93 try { 94 socketpair(sv); 95 // register one end with epoll 96 epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN); 97 } catch (IOException x) { 98 close0(epfd); 99 throw x; 100 } 101 this.sp = sv; 102 103 // allocate the poll array 104 this.address = allocatePollArray(MAX_EPOLL_EVENTS); 105 106 // create the queue and offer the special event to ensure that the first 107 // threads polls 108 this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS); 109 this.queue.offer(NEED_TO_POLL); 110 } 111 start()112 EPollPort start() { 113 startThreads(new EventHandlerTask()); 114 return this; 115 } 116 117 /** 118 * Release all resources 119 */ implClose()120 private void implClose() { 121 synchronized (this) { 122 if (closed) 123 return; 124 closed = true; 125 } 126 freePollArray(address); 127 close0(sp[0]); 128 close0(sp[1]); 129 close0(epfd); 130 } 131 wakeup()132 private void wakeup() { 133 if (wakeupCount.incrementAndGet() == 1) { 134 // write byte to socketpair to force wakeup 135 try { 136 interrupt(sp[1]); 137 } catch (IOException x) { 138 throw new AssertionError(x); 139 } 140 } 141 } 142 143 @Override executeOnHandlerTask(Runnable task)144 void executeOnHandlerTask(Runnable task) { 145 synchronized (this) { 146 if (closed) 147 throw new RejectedExecutionException(); 148 offerTask(task); 149 wakeup(); 150 } 151 } 152 153 @Override shutdownHandlerTasks()154 void shutdownHandlerTasks() { 155 /* 156 * If no tasks are running then just release resources; otherwise 157 * write to the one end of the socketpair to wakeup any polling threads. 158 */ 159 int nThreads = threadCount(); 160 if (nThreads == 0) { 161 implClose(); 162 } else { 163 // send interrupt to each thread 164 while (nThreads-- > 0) { 165 wakeup(); 166 } 167 } 168 } 169 170 // invoke by clients to register a file descriptor 171 @Override startPoll(int fd, int events)172 void startPoll(int fd, int events) { 173 // update events (or add to epoll on first usage) 174 int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT)); 175 if (err == ENOENT) 176 err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT)); 177 if (err != 0) 178 throw new AssertionError(); // should not happen 179 } 180 181 /* 182 * Task to process events from epoll and dispatch to the channel's 183 * onEvent handler. 184 * 185 * Events are retreived from epoll in batch and offered to a BlockingQueue 186 * where they are consumed by handler threads. A special "NEED_TO_POLL" 187 * event is used to signal one consumer to re-poll when all events have 188 * been consumed. 189 */ 190 private class EventHandlerTask implements Runnable { poll()191 private Event poll() throws IOException { 192 try { 193 for (;;) { 194 int n = epollWait(epfd, address, MAX_EPOLL_EVENTS); 195 /* 196 * 'n' events have been read. Here we map them to their 197 * corresponding channel in batch and queue n-1 so that 198 * they can be handled by other handler threads. The last 199 * event is handled by this thread (and so is not queued). 200 */ 201 fdToChannelLock.readLock().lock(); 202 try { 203 while (n-- > 0) { 204 long eventAddress = getEvent(address, n); 205 int fd = getDescriptor(eventAddress); 206 207 // wakeup 208 if (fd == sp[0]) { 209 if (wakeupCount.decrementAndGet() == 0) { 210 // no more wakeups so drain pipe 211 drain1(sp[0]); 212 } 213 214 // queue special event if there are more events 215 // to handle. 216 if (n > 0) { 217 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); 218 continue; 219 } 220 return EXECUTE_TASK_OR_SHUTDOWN; 221 } 222 223 PollableChannel channel = fdToChannel.get(fd); 224 if (channel != null) { 225 int events = getEvents(eventAddress); 226 Event ev = new Event(channel, events); 227 228 // n-1 events are queued; This thread handles 229 // the last one except for the wakeup 230 if (n > 0) { 231 queue.offer(ev); 232 } else { 233 return ev; 234 } 235 } 236 } 237 } finally { 238 fdToChannelLock.readLock().unlock(); 239 } 240 } 241 } finally { 242 // to ensure that some thread will poll when all events have 243 // been consumed 244 queue.offer(NEED_TO_POLL); 245 } 246 } 247 run()248 public void run() { 249 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 250 Invoker.getGroupAndInvokeCount(); 251 final boolean isPooledThread = (myGroupAndInvokeCount != null); 252 boolean replaceMe = false; 253 Event ev; 254 try { 255 for (;;) { 256 // reset invoke count 257 if (isPooledThread) 258 myGroupAndInvokeCount.resetInvokeCount(); 259 260 try { 261 replaceMe = false; 262 ev = queue.take(); 263 264 // no events and this thread has been "selected" to 265 // poll for more. 266 if (ev == NEED_TO_POLL) { 267 try { 268 ev = poll(); 269 } catch (IOException x) { 270 x.printStackTrace(); 271 return; 272 } 273 } 274 } catch (InterruptedException x) { 275 continue; 276 } 277 278 // handle wakeup to execute task or shutdown 279 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { 280 Runnable task = pollTask(); 281 if (task == null) { 282 // shutdown request 283 return; 284 } 285 // run task (may throw error/exception) 286 replaceMe = true; 287 task.run(); 288 continue; 289 } 290 291 // process event 292 try { 293 ev.channel().onEvent(ev.events(), isPooledThread); 294 } catch (Error x) { 295 replaceMe = true; throw x; 296 } catch (RuntimeException x) { 297 replaceMe = true; throw x; 298 } 299 } 300 } finally { 301 // last handler to exit when shutdown releases resources 302 int remaining = threadExit(this, replaceMe); 303 if (remaining == 0 && isShutdown()) { 304 implClose(); 305 } 306 } 307 } 308 } 309 310 // -- Native methods -- 311 socketpair(int[] sv)312 private static native void socketpair(int[] sv) throws IOException; 313 interrupt(int fd)314 private static native void interrupt(int fd) throws IOException; 315 drain1(int fd)316 private static native void drain1(int fd) throws IOException; 317 close0(int fd)318 private static native void close0(int fd); 319 } 320