1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * Permission is hereby granted, free of charge, to any person obtaining a copy 3 * of this software and associated documentation files (the "Software"), to 4 * deal in the Software without restriction, including without limitation the 5 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 6 * sell copies of the Software, and to permit persons to whom the Software is 7 * furnished to do so, subject to the following conditions: 8 * 9 * The above copyright notice and this permission notice shall be included in 10 * all copies or substantial portions of the Software. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 15 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 17 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 18 * IN THE SOFTWARE. 19 */ 20 21 /* This file contains both the uv__async internal infrastructure and the 22 * user-facing uv_async_t functions. 23 */ 24 25 #include "uv.h" 26 #include "internal.h" 27 #include "atomic-ops.h" 28 29 #include <errno.h> 30 #include <stdio.h> /* snprintf() */ 31 #include <assert.h> 32 #include <stdlib.h> 33 #include <string.h> 34 #include <unistd.h> 35 #include <sched.h> /* sched_yield() */ 36 37 #ifdef __linux__ 38 #include <sys/eventfd.h> 39 #endif 40 41 static void uv__async_send(uv_loop_t* loop); 42 static int uv__async_start(uv_loop_t* loop); 43 44 uv_async_init(uv_loop_t * loop,uv_async_t * handle,uv_async_cb async_cb)45 int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { 46 int err; 47 48 err = uv__async_start(loop); 49 if (err) 50 return err; 51 52 uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); 53 handle->async_cb = async_cb; 54 handle->pending = 0; 55 56 QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); 57 uv__handle_start(handle); 58 59 return 0; 60 } 61 62 uv_async_send(uv_async_t * handle)63 int uv_async_send(uv_async_t* handle) { 64 /* Do a cheap read first. */ 65 if (ACCESS_ONCE(int, handle->pending) != 0) 66 return 0; 67 68 /* Tell the other thread we're busy with the handle. */ 69 if (cmpxchgi(&handle->pending, 0, 1) != 0) 70 return 0; 71 72 /* Wake up the other thread's event loop. */ 73 uv__async_send(handle->loop); 74 75 /* Tell the other thread we're done. */ 76 if (cmpxchgi(&handle->pending, 1, 2) != 1) 77 abort(); 78 79 return 0; 80 } 81 82 83 /* Only call this from the event loop thread. */ uv__async_spin(uv_async_t * handle)84 static int uv__async_spin(uv_async_t* handle) { 85 int i; 86 int rc; 87 88 for (;;) { 89 /* 997 is not completely chosen at random. It's a prime number, acyclical 90 * by nature, and should therefore hopefully dampen sympathetic resonance. 91 */ 92 for (i = 0; i < 997; i++) { 93 /* rc=0 -- handle is not pending. 94 * rc=1 -- handle is pending, other thread is still working with it. 95 * rc=2 -- handle is pending, other thread is done. 96 */ 97 rc = cmpxchgi(&handle->pending, 2, 0); 98 99 if (rc != 1) 100 return rc; 101 102 /* Other thread is busy with this handle, spin until it's done. */ 103 cpu_relax(); 104 } 105 106 /* Yield the CPU. We may have preempted the other thread while it's 107 * inside the critical section and if it's running on the same CPU 108 * as us, we'll just burn CPU cycles until the end of our time slice. 109 */ 110 sched_yield(); 111 } 112 } 113 114 uv__async_close(uv_async_t * handle)115 void uv__async_close(uv_async_t* handle) { 116 uv__async_spin(handle); 117 QUEUE_REMOVE(&handle->queue); 118 uv__handle_stop(handle); 119 } 120 121 uv__async_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)122 static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 123 char buf[1024]; 124 ssize_t r; 125 QUEUE queue; 126 QUEUE* q; 127 uv_async_t* h; 128 129 assert(w == &loop->async_io_watcher); 130 131 for (;;) { 132 r = read(w->fd, buf, sizeof(buf)); 133 134 if (r == sizeof(buf)) 135 continue; 136 137 if (r != -1) 138 break; 139 140 if (errno == EAGAIN || errno == EWOULDBLOCK) 141 break; 142 143 if (errno == EINTR) 144 continue; 145 146 abort(); 147 } 148 149 QUEUE_MOVE(&loop->async_handles, &queue); 150 while (!QUEUE_EMPTY(&queue)) { 151 q = QUEUE_HEAD(&queue); 152 h = QUEUE_DATA(q, uv_async_t, queue); 153 154 QUEUE_REMOVE(q); 155 QUEUE_INSERT_TAIL(&loop->async_handles, q); 156 157 if (0 == uv__async_spin(h)) 158 continue; /* Not pending. */ 159 160 if (h->async_cb == NULL) 161 continue; 162 163 h->async_cb(h); 164 } 165 } 166 167 uv__async_send(uv_loop_t * loop)168 static void uv__async_send(uv_loop_t* loop) { 169 const void* buf; 170 ssize_t len; 171 int fd; 172 int r; 173 174 buf = ""; 175 len = 1; 176 fd = loop->async_wfd; 177 178 #if defined(__linux__) 179 if (fd == -1) { 180 static const uint64_t val = 1; 181 buf = &val; 182 len = sizeof(val); 183 fd = loop->async_io_watcher.fd; /* eventfd */ 184 } 185 #endif 186 187 do 188 r = write(fd, buf, len); 189 while (r == -1 && errno == EINTR); 190 191 if (r == len) 192 return; 193 194 if (r == -1) 195 if (errno == EAGAIN || errno == EWOULDBLOCK) 196 return; 197 198 abort(); 199 } 200 201 uv__async_start(uv_loop_t * loop)202 static int uv__async_start(uv_loop_t* loop) { 203 int pipefd[2]; 204 int err; 205 206 if (loop->async_io_watcher.fd != -1) 207 return 0; 208 209 #ifdef __linux__ 210 err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); 211 if (err < 0) 212 return UV__ERR(errno); 213 214 pipefd[0] = err; 215 pipefd[1] = -1; 216 #else 217 err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE); 218 if (err < 0) 219 return err; 220 #endif 221 222 uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]); 223 uv__io_start(loop, &loop->async_io_watcher, POLLIN); 224 loop->async_wfd = pipefd[1]; 225 226 return 0; 227 } 228 229 uv__async_fork(uv_loop_t * loop)230 int uv__async_fork(uv_loop_t* loop) { 231 if (loop->async_io_watcher.fd == -1) /* never started */ 232 return 0; 233 234 uv__async_stop(loop); 235 236 return uv__async_start(loop); 237 } 238 239 uv__async_stop(uv_loop_t * loop)240 void uv__async_stop(uv_loop_t* loop) { 241 if (loop->async_io_watcher.fd == -1) 242 return; 243 244 if (loop->async_wfd != -1) { 245 if (loop->async_wfd != loop->async_io_watcher.fd) 246 uv__close(loop->async_wfd); 247 loop->async_wfd = -1; 248 } 249 250 uv__io_stop(loop, &loop->async_io_watcher, POLLIN); 251 uv__close(loop->async_io_watcher.fd); 252 loop->async_io_watcher.fd = -1; 253 } 254