1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45
46 struct uv__stream_select_s {
47 uv_stream_t* stream;
48 uv_thread_t thread;
49 uv_sem_t close_sem;
50 uv_sem_t async_sem;
51 uv_async_t async;
52 int events;
53 int fake_fd;
54 int int_fd;
55 int fd;
56 fd_set* sread;
57 size_t sread_sz;
58 fd_set* swrite;
59 size_t swrite_sz;
60 };
61 #endif /* defined(__APPLE__) */
62
63 union uv__cmsg {
64 struct cmsghdr hdr;
65 /* This cannot be larger because of the IBMi PASE limitation that
66 * the total size of control messages cannot exceed 256 bytes.
67 */
68 char pad[256];
69 };
70
71 STATIC_ASSERT(256 == sizeof(union uv__cmsg));
72
73 static void uv__stream_connect(uv_stream_t*);
74 static void uv__write(uv_stream_t* stream);
75 static void uv__read(uv_stream_t* stream);
76 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
77 static void uv__write_callbacks(uv_stream_t* stream);
78 static size_t uv__write_req_size(uv_write_t* req);
79 static void uv__drain(uv_stream_t* stream);
80
81
uv__stream_init(uv_loop_t * loop,uv_stream_t * stream,uv_handle_type type)82 void uv__stream_init(uv_loop_t* loop,
83 uv_stream_t* stream,
84 uv_handle_type type) {
85 int err;
86
87 uv__handle_init(loop, (uv_handle_t*)stream, type);
88 stream->read_cb = NULL;
89 stream->alloc_cb = NULL;
90 stream->close_cb = NULL;
91 stream->connection_cb = NULL;
92 stream->connect_req = NULL;
93 stream->shutdown_req = NULL;
94 stream->accepted_fd = -1;
95 stream->queued_fds = NULL;
96 stream->delayed_error = 0;
97 uv__queue_init(&stream->write_queue);
98 uv__queue_init(&stream->write_completed_queue);
99 stream->write_queue_size = 0;
100
101 if (loop->emfile_fd == -1) {
102 err = uv__open_cloexec("/dev/null", O_RDONLY);
103 if (err < 0)
104 /* In the rare case that "/dev/null" isn't mounted open "/"
105 * instead.
106 */
107 err = uv__open_cloexec("/", O_RDONLY);
108 if (err >= 0)
109 loop->emfile_fd = err;
110 }
111
112 #if defined(__APPLE__)
113 stream->select = NULL;
114 #endif /* defined(__APPLE_) */
115
116 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
117 }
118
119
uv__stream_osx_interrupt_select(uv_stream_t * stream)120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
121 #if defined(__APPLE__)
122 /* Notify select() thread about state change */
123 uv__stream_select_t* s;
124 int r;
125
126 s = stream->select;
127 if (s == NULL)
128 return;
129
130 /* Interrupt select() loop
131 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
132 * emit read event on other side
133 */
134 do
135 r = write(s->fake_fd, "x", 1);
136 while (r == -1 && errno == EINTR);
137
138 assert(r == 1);
139 #else /* !defined(__APPLE__) */
140 /* No-op on any other platform */
141 #endif /* !defined(__APPLE__) */
142 }
143
144
145 #if defined(__APPLE__)
uv__stream_osx_select(void * arg)146 static void uv__stream_osx_select(void* arg) {
147 uv_stream_t* stream;
148 uv__stream_select_t* s;
149 char buf[1024];
150 int events;
151 int fd;
152 int r;
153 int max_fd;
154
155 stream = arg;
156 s = stream->select;
157 fd = s->fd;
158
159 if (fd > s->int_fd)
160 max_fd = fd;
161 else
162 max_fd = s->int_fd;
163
164 for (;;) {
165 /* Terminate on semaphore */
166 if (uv_sem_trywait(&s->close_sem) == 0)
167 break;
168
169 /* Watch fd using select(2) */
170 memset(s->sread, 0, s->sread_sz);
171 memset(s->swrite, 0, s->swrite_sz);
172
173 if (uv__io_active(&stream->io_watcher, POLLIN))
174 FD_SET(fd, s->sread);
175 if (uv__io_active(&stream->io_watcher, POLLOUT))
176 FD_SET(fd, s->swrite);
177 FD_SET(s->int_fd, s->sread);
178
179 /* Wait indefinitely for fd events */
180 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
181 if (r == -1) {
182 if (errno == EINTR)
183 continue;
184
185 /* XXX: Possible?! */
186 abort();
187 }
188
189 /* Ignore timeouts */
190 if (r == 0)
191 continue;
192
193 /* Empty socketpair's buffer in case of interruption */
194 if (FD_ISSET(s->int_fd, s->sread))
195 for (;;) {
196 r = read(s->int_fd, buf, sizeof(buf));
197
198 if (r == sizeof(buf))
199 continue;
200
201 if (r != -1)
202 break;
203
204 if (errno == EAGAIN || errno == EWOULDBLOCK)
205 break;
206
207 if (errno == EINTR)
208 continue;
209
210 abort();
211 }
212
213 /* Handle events */
214 events = 0;
215 if (FD_ISSET(fd, s->sread))
216 events |= POLLIN;
217 if (FD_ISSET(fd, s->swrite))
218 events |= POLLOUT;
219
220 assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
221 if (events != 0) {
222 ACCESS_ONCE(int, s->events) = events;
223
224 uv_async_send(&s->async);
225 uv_sem_wait(&s->async_sem);
226
227 /* Should be processed at this stage */
228 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
229 }
230 }
231 }
232
233
uv__stream_osx_select_cb(uv_async_t * handle)234 static void uv__stream_osx_select_cb(uv_async_t* handle) {
235 uv__stream_select_t* s;
236 uv_stream_t* stream;
237 int events;
238
239 s = container_of(handle, uv__stream_select_t, async);
240 stream = s->stream;
241
242 /* Get and reset stream's events */
243 events = s->events;
244 ACCESS_ONCE(int, s->events) = 0;
245
246 assert(events != 0);
247 assert(events == (events & (POLLIN | POLLOUT)));
248
249 /* Invoke callback on event-loop */
250 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
251 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
252
253 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
254 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
255
256 if (stream->flags & UV_HANDLE_CLOSING)
257 return;
258
259 /* NOTE: It is important to do it here, otherwise `select()` might be called
260 * before the actual `uv__read()`, leading to the blocking syscall
261 */
262 uv_sem_post(&s->async_sem);
263 }
264
265
uv__stream_osx_cb_close(uv_handle_t * async)266 static void uv__stream_osx_cb_close(uv_handle_t* async) {
267 uv__stream_select_t* s;
268
269 s = container_of(async, uv__stream_select_t, async);
270 uv__free(s);
271 }
272
273
uv__stream_try_select(uv_stream_t * stream,int * fd)274 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
275 /*
276 * kqueue doesn't work with some files from /dev mount on osx.
277 * select(2) in separate thread for those fds
278 */
279
280 struct kevent filter[1];
281 struct kevent events[1];
282 struct timespec timeout;
283 uv__stream_select_t* s;
284 int fds[2];
285 int err;
286 int ret;
287 int kq;
288 int old_fd;
289 int max_fd;
290 size_t sread_sz;
291 size_t swrite_sz;
292
293 kq = kqueue();
294 if (kq == -1) {
295 perror("(libuv) kqueue()");
296 return UV__ERR(errno);
297 }
298
299 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
300
301 /* Use small timeout, because we only want to capture EINVALs */
302 timeout.tv_sec = 0;
303 timeout.tv_nsec = 1;
304
305 do
306 ret = kevent(kq, filter, 1, events, 1, &timeout);
307 while (ret == -1 && errno == EINTR);
308
309 uv__close(kq);
310
311 if (ret == -1)
312 return UV__ERR(errno);
313
314 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
315 return 0;
316
317 /* At this point we definitely know that this fd won't work with kqueue */
318
319 /*
320 * Create fds for io watcher and to interrupt the select() loop.
321 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
322 */
323 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
324 return UV__ERR(errno);
325
326 max_fd = *fd;
327 if (fds[1] > max_fd)
328 max_fd = fds[1];
329
330 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
331 swrite_sz = sread_sz;
332
333 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
334 if (s == NULL) {
335 err = UV_ENOMEM;
336 goto failed_malloc;
337 }
338
339 s->events = 0;
340 s->fd = *fd;
341 s->sread = (fd_set*) ((char*) s + sizeof(*s));
342 s->sread_sz = sread_sz;
343 s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
344 s->swrite_sz = swrite_sz;
345
346 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
347 if (err)
348 goto failed_async_init;
349
350 s->async.flags |= UV_HANDLE_INTERNAL;
351 uv__handle_unref(&s->async);
352
353 err = uv_sem_init(&s->close_sem, 0);
354 if (err != 0)
355 goto failed_close_sem_init;
356
357 err = uv_sem_init(&s->async_sem, 0);
358 if (err != 0)
359 goto failed_async_sem_init;
360
361 s->fake_fd = fds[0];
362 s->int_fd = fds[1];
363
364 old_fd = *fd;
365 s->stream = stream;
366 stream->select = s;
367 *fd = s->fake_fd;
368
369 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
370 if (err != 0)
371 goto failed_thread_create;
372
373 return 0;
374
375 failed_thread_create:
376 s->stream = NULL;
377 stream->select = NULL;
378 *fd = old_fd;
379
380 uv_sem_destroy(&s->async_sem);
381
382 failed_async_sem_init:
383 uv_sem_destroy(&s->close_sem);
384
385 failed_close_sem_init:
386 uv__close(fds[0]);
387 uv__close(fds[1]);
388 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
389 return err;
390
391 failed_async_init:
392 uv__free(s);
393
394 failed_malloc:
395 uv__close(fds[0]);
396 uv__close(fds[1]);
397
398 return err;
399 }
400 #endif /* defined(__APPLE__) */
401
402
uv__stream_open(uv_stream_t * stream,int fd,int flags)403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
404 #if defined(__APPLE__)
405 int enable;
406 #endif
407
408 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
409 return UV_EBUSY;
410
411 assert(fd >= 0);
412 stream->flags |= flags;
413
414 if (stream->type == UV_TCP) {
415 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
416 return UV__ERR(errno);
417
418 /* TODO Use delay the user passed in. */
419 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
420 uv__tcp_keepalive(fd, 1, 60)) {
421 return UV__ERR(errno);
422 }
423 }
424
425 #if defined(__APPLE__)
426 enable = 1;
427 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
428 errno != ENOTSOCK &&
429 errno != EINVAL) {
430 return UV__ERR(errno);
431 }
432 #endif
433
434 stream->io_watcher.fd = fd;
435
436 return 0;
437 }
438
439
uv__stream_flush_write_queue(uv_stream_t * stream,int error)440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
441 uv_write_t* req;
442 struct uv__queue* q;
443 while (!uv__queue_empty(&stream->write_queue)) {
444 q = uv__queue_head(&stream->write_queue);
445 uv__queue_remove(q);
446
447 req = uv__queue_data(q, uv_write_t, queue);
448 req->error = error;
449
450 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
451 }
452 }
453
454
uv__stream_destroy(uv_stream_t * stream)455 void uv__stream_destroy(uv_stream_t* stream) {
456 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
457 assert(stream->flags & UV_HANDLE_CLOSED);
458
459 if (stream->connect_req) {
460 uv__req_unregister(stream->loop, stream->connect_req);
461 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
462 stream->connect_req = NULL;
463 }
464
465 uv__stream_flush_write_queue(stream, UV_ECANCELED);
466 uv__write_callbacks(stream);
467 uv__drain(stream);
468
469 assert(stream->write_queue_size == 0);
470 }
471
472
473 /* Implements a best effort approach to mitigating accept() EMFILE errors.
474 * We have a spare file descriptor stashed away that we close to get below
475 * the EMFILE limit. Next, we accept all pending connections and close them
476 * immediately to signal the clients that we're overloaded - and we are, but
477 * we still keep on trucking.
478 *
479 * There is one caveat: it's not reliable in a multi-threaded environment.
480 * The file descriptor limit is per process. Our party trick fails if another
481 * thread opens a file or creates a socket in the time window between us
482 * calling close() and accept().
483 */
uv__emfile_trick(uv_loop_t * loop,int accept_fd)484 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
485 int err;
486 int emfile_fd;
487
488 if (loop->emfile_fd == -1)
489 return UV_EMFILE;
490
491 uv__close(loop->emfile_fd);
492 loop->emfile_fd = -1;
493
494 do {
495 err = uv__accept(accept_fd);
496 if (err >= 0)
497 uv__close(err);
498 } while (err >= 0 || err == UV_EINTR);
499
500 emfile_fd = uv__open_cloexec("/", O_RDONLY);
501 if (emfile_fd >= 0)
502 loop->emfile_fd = emfile_fd;
503
504 return err;
505 }
506
507
uv__server_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)508 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
509 uv_stream_t* stream;
510 int err;
511 int fd;
512
513 stream = container_of(w, uv_stream_t, io_watcher);
514 assert(events & POLLIN);
515 assert(stream->accepted_fd == -1);
516 assert(!(stream->flags & UV_HANDLE_CLOSING));
517
518 fd = uv__stream_fd(stream);
519 err = uv__accept(fd);
520
521 if (err == UV_EMFILE || err == UV_ENFILE)
522 err = uv__emfile_trick(loop, fd); /* Shed load. */
523
524 if (err < 0)
525 return;
526
527 stream->accepted_fd = err;
528 stream->connection_cb(stream, 0);
529
530 if (stream->accepted_fd != -1)
531 /* The user hasn't yet accepted called uv_accept() */
532 uv__io_stop(loop, &stream->io_watcher, POLLIN);
533 }
534
535
uv_accept(uv_stream_t * server,uv_stream_t * client)536 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
537 int err;
538
539 assert(server->loop == client->loop);
540
541 if (server->accepted_fd == -1)
542 return UV_EAGAIN;
543
544 switch (client->type) {
545 case UV_NAMED_PIPE:
546 case UV_TCP:
547 err = uv__stream_open(client,
548 server->accepted_fd,
549 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
550 if (err) {
551 /* TODO handle error */
552 uv__close(server->accepted_fd);
553 goto done;
554 }
555 break;
556
557 case UV_UDP:
558 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
559 if (err) {
560 uv__close(server->accepted_fd);
561 goto done;
562 }
563 break;
564
565 default:
566 return UV_EINVAL;
567 }
568
569 client->flags |= UV_HANDLE_BOUND;
570
571 done:
572 /* Process queued fds */
573 if (server->queued_fds != NULL) {
574 uv__stream_queued_fds_t* queued_fds;
575
576 queued_fds = server->queued_fds;
577
578 /* Read first */
579 server->accepted_fd = queued_fds->fds[0];
580
581 /* All read, free */
582 assert(queued_fds->offset > 0);
583 if (--queued_fds->offset == 0) {
584 uv__free(queued_fds);
585 server->queued_fds = NULL;
586 } else {
587 /* Shift rest */
588 memmove(queued_fds->fds,
589 queued_fds->fds + 1,
590 queued_fds->offset * sizeof(*queued_fds->fds));
591 }
592 } else {
593 server->accepted_fd = -1;
594 if (err == 0)
595 uv__io_start(server->loop, &server->io_watcher, POLLIN);
596 }
597 return err;
598 }
599
600
uv_listen(uv_stream_t * stream,int backlog,uv_connection_cb cb)601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
602 int err;
603 if (uv__is_closing(stream)) {
604 return UV_EINVAL;
605 }
606 switch (stream->type) {
607 case UV_TCP:
608 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
609 break;
610
611 case UV_NAMED_PIPE:
612 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
613 break;
614
615 default:
616 err = UV_EINVAL;
617 }
618
619 if (err == 0)
620 uv__handle_start(stream);
621
622 return err;
623 }
624
625
uv__drain(uv_stream_t * stream)626 static void uv__drain(uv_stream_t* stream) {
627 uv_shutdown_t* req;
628 int err;
629
630 assert(uv__queue_empty(&stream->write_queue));
631 if (!(stream->flags & UV_HANDLE_CLOSING)) {
632 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
633 uv__stream_osx_interrupt_select(stream);
634 }
635
636 if (!uv__is_stream_shutting(stream))
637 return;
638
639 req = stream->shutdown_req;
640 assert(req);
641
642 if ((stream->flags & UV_HANDLE_CLOSING) ||
643 !(stream->flags & UV_HANDLE_SHUT)) {
644 stream->shutdown_req = NULL;
645 uv__req_unregister(stream->loop, req);
646
647 err = 0;
648 if (stream->flags & UV_HANDLE_CLOSING)
649 /* The user destroyed the stream before we got to do the shutdown. */
650 err = UV_ECANCELED;
651 else if (shutdown(uv__stream_fd(stream), SHUT_WR))
652 err = UV__ERR(errno);
653 else /* Success. */
654 stream->flags |= UV_HANDLE_SHUT;
655
656 if (req->cb != NULL)
657 req->cb(req, err);
658 }
659 }
660
661
uv__writev(int fd,struct iovec * vec,size_t n)662 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
663 if (n == 1)
664 return write(fd, vec->iov_base, vec->iov_len);
665 else
666 return writev(fd, vec, n);
667 }
668
669
uv__write_req_size(uv_write_t * req)670 static size_t uv__write_req_size(uv_write_t* req) {
671 size_t size;
672
673 assert(req->bufs != NULL);
674 size = uv__count_bufs(req->bufs + req->write_index,
675 req->nbufs - req->write_index);
676 assert(req->handle->write_queue_size >= size);
677
678 return size;
679 }
680
681
682 /* Returns 1 if all write request data has been written, or 0 if there is still
683 * more data to write.
684 *
685 * Note: the return value only says something about the *current* request.
686 * There may still be other write requests sitting in the queue.
687 */
uv__write_req_update(uv_stream_t * stream,uv_write_t * req,size_t n)688 static int uv__write_req_update(uv_stream_t* stream,
689 uv_write_t* req,
690 size_t n) {
691 uv_buf_t* buf;
692 size_t len;
693
694 assert(n <= stream->write_queue_size);
695 stream->write_queue_size -= n;
696
697 buf = req->bufs + req->write_index;
698
699 do {
700 len = n < buf->len ? n : buf->len;
701 buf->base += len;
702 buf->len -= len;
703 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
704 n -= len;
705 } while (n > 0);
706
707 req->write_index = buf - req->bufs;
708
709 return req->write_index == req->nbufs;
710 }
711
712
uv__write_req_finish(uv_write_t * req)713 static void uv__write_req_finish(uv_write_t* req) {
714 uv_stream_t* stream = req->handle;
715
716 /* Pop the req off tcp->write_queue. */
717 uv__queue_remove(&req->queue);
718
719 /* Only free when there was no error. On error, we touch up write_queue_size
720 * right before making the callback. The reason we don't do that right away
721 * is that a write_queue_size > 0 is our only way to signal to the user that
722 * they should stop writing - which they should if we got an error. Something
723 * to revisit in future revisions of the libuv API.
724 */
725 if (req->error == 0) {
726 if (req->bufs != req->bufsml)
727 uv__free(req->bufs);
728 req->bufs = NULL;
729 }
730
731 /* Add it to the write_completed_queue where it will have its
732 * callback called in the near future.
733 */
734 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
735 uv__io_feed(stream->loop, &stream->io_watcher);
736 }
737
738
uv__handle_fd(uv_handle_t * handle)739 static int uv__handle_fd(uv_handle_t* handle) {
740 switch (handle->type) {
741 case UV_NAMED_PIPE:
742 case UV_TCP:
743 return ((uv_stream_t*) handle)->io_watcher.fd;
744
745 case UV_UDP:
746 return ((uv_udp_t*) handle)->io_watcher.fd;
747
748 default:
749 return -1;
750 }
751 }
752
uv__try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)753 static int uv__try_write(uv_stream_t* stream,
754 const uv_buf_t bufs[],
755 unsigned int nbufs,
756 uv_stream_t* send_handle) {
757 struct iovec* iov;
758 int iovmax;
759 int iovcnt;
760 ssize_t n;
761
762 /*
763 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
764 * because Windows's WSABUF is not an iovec.
765 */
766 iov = (struct iovec*) bufs;
767 iovcnt = nbufs;
768
769 iovmax = uv__getiovmax();
770
771 /* Limit iov count to avoid EINVALs from writev() */
772 if (iovcnt > iovmax)
773 iovcnt = iovmax;
774
775 /*
776 * Now do the actual writev. Note that we've been updating the pointers
777 * inside the iov each time we write. So there is no need to offset it.
778 */
779 if (send_handle != NULL) {
780 int fd_to_send;
781 struct msghdr msg;
782 union uv__cmsg cmsg;
783
784 if (uv__is_closing(send_handle))
785 return UV_EBADF;
786
787 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
788
789 memset(&cmsg, 0, sizeof(cmsg));
790
791 assert(fd_to_send >= 0);
792
793 msg.msg_name = NULL;
794 msg.msg_namelen = 0;
795 msg.msg_iov = iov;
796 msg.msg_iovlen = iovcnt;
797 msg.msg_flags = 0;
798
799 msg.msg_control = &cmsg.hdr;
800 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
801
802 cmsg.hdr.cmsg_level = SOL_SOCKET;
803 cmsg.hdr.cmsg_type = SCM_RIGHTS;
804 cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send));
805 memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send));
806
807 do
808 n = sendmsg(uv__stream_fd(stream), &msg, 0);
809 while (n == -1 && errno == EINTR);
810 } else {
811 do
812 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
813 while (n == -1 && errno == EINTR);
814 }
815
816 if (n >= 0)
817 return n;
818
819 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
820 return UV_EAGAIN;
821
822 #ifdef __APPLE__
823 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
824 * have a bug where a race condition causes the kernel to return EPROTOTYPE
825 * because the socket isn't fully constructed. It's probably the result of
826 * the peer closing the connection and that is why libuv translates it to
827 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
828 * away but some VPN software causes the same behavior except the error is
829 * permanent, not transient, turning the retry mechanism into an infinite
830 * loop. See https://github.com/libuv/libuv/pull/482.
831 */
832 if (errno == EPROTOTYPE)
833 return UV_ECONNRESET;
834 #endif /* __APPLE__ */
835
836 return UV__ERR(errno);
837 }
838
uv__write(uv_stream_t * stream)839 static void uv__write(uv_stream_t* stream) {
840 struct uv__queue* q;
841 uv_write_t* req;
842 ssize_t n;
843 int count;
844
845 assert(uv__stream_fd(stream) >= 0);
846
847 /* Prevent loop starvation when the consumer of this stream read as fast as
848 * (or faster than) we can write it. This `count` mechanism does not need to
849 * change even if we switch to edge-triggered I/O.
850 */
851 count = 32;
852
853 for (;;) {
854 if (uv__queue_empty(&stream->write_queue))
855 return;
856
857 q = uv__queue_head(&stream->write_queue);
858 req = uv__queue_data(q, uv_write_t, queue);
859 assert(req->handle == stream);
860
861 n = uv__try_write(stream,
862 &(req->bufs[req->write_index]),
863 req->nbufs - req->write_index,
864 req->send_handle);
865
866 /* Ensure the handle isn't sent again in case this is a partial write. */
867 if (n >= 0) {
868 req->send_handle = NULL;
869 if (uv__write_req_update(stream, req, n)) {
870 uv__write_req_finish(req);
871 if (count-- > 0)
872 continue; /* Start trying to write the next request. */
873
874 return;
875 }
876 } else if (n != UV_EAGAIN)
877 goto error;
878
879 /* If this is a blocking stream, try again. */
880 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
881 continue;
882
883 /* We're not done. */
884 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
885
886 /* Notify select() thread about state change */
887 uv__stream_osx_interrupt_select(stream);
888
889 return;
890 }
891
892 error:
893 req->error = n;
894 uv__write_req_finish(req);
895 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
896 uv__stream_osx_interrupt_select(stream);
897 }
898
899
uv__write_callbacks(uv_stream_t * stream)900 static void uv__write_callbacks(uv_stream_t* stream) {
901 uv_write_t* req;
902 struct uv__queue* q;
903 struct uv__queue pq;
904
905 if (uv__queue_empty(&stream->write_completed_queue))
906 return;
907
908 uv__queue_move(&stream->write_completed_queue, &pq);
909
910 while (!uv__queue_empty(&pq)) {
911 /* Pop a req off write_completed_queue. */
912 q = uv__queue_head(&pq);
913 req = uv__queue_data(q, uv_write_t, queue);
914 uv__queue_remove(q);
915 uv__req_unregister(stream->loop, req);
916
917 if (req->bufs != NULL) {
918 stream->write_queue_size -= uv__write_req_size(req);
919 if (req->bufs != req->bufsml)
920 uv__free(req->bufs);
921 req->bufs = NULL;
922 }
923
924 /* NOTE: call callback AFTER freeing the request data. */
925 if (req->cb)
926 req->cb(req, req->error);
927 }
928 }
929
930
uv__stream_eof(uv_stream_t * stream,const uv_buf_t * buf)931 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
932 stream->flags |= UV_HANDLE_READ_EOF;
933 stream->flags &= ~UV_HANDLE_READING;
934 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
935 uv__handle_stop(stream);
936 uv__stream_osx_interrupt_select(stream);
937 stream->read_cb(stream, UV_EOF, buf);
938 }
939
940
uv__stream_queue_fd(uv_stream_t * stream,int fd)941 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
942 uv__stream_queued_fds_t* queued_fds;
943 unsigned int queue_size;
944
945 queued_fds = stream->queued_fds;
946 if (queued_fds == NULL) {
947 queue_size = 8;
948 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
949 sizeof(*queued_fds));
950 if (queued_fds == NULL)
951 return UV_ENOMEM;
952 queued_fds->size = queue_size;
953 queued_fds->offset = 0;
954 stream->queued_fds = queued_fds;
955
956 /* Grow */
957 } else if (queued_fds->size == queued_fds->offset) {
958 queue_size = queued_fds->size + 8;
959 queued_fds = uv__realloc(queued_fds,
960 (queue_size - 1) * sizeof(*queued_fds->fds) +
961 sizeof(*queued_fds));
962
963 /*
964 * Allocation failure, report back.
965 * NOTE: if it is fatal - sockets will be closed in uv__stream_close
966 */
967 if (queued_fds == NULL)
968 return UV_ENOMEM;
969 queued_fds->size = queue_size;
970 stream->queued_fds = queued_fds;
971 }
972
973 /* Put fd in a queue */
974 queued_fds->fds[queued_fds->offset++] = fd;
975
976 return 0;
977 }
978
979
uv__stream_recv_cmsg(uv_stream_t * stream,struct msghdr * msg)980 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
981 struct cmsghdr* cmsg;
982 int fd;
983 int err;
984 size_t i;
985 size_t count;
986
987 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
988 if (cmsg->cmsg_type != SCM_RIGHTS) {
989 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
990 cmsg->cmsg_type);
991 continue;
992 }
993
994 assert(cmsg->cmsg_len >= CMSG_LEN(0));
995 count = cmsg->cmsg_len - CMSG_LEN(0);
996 assert(count % sizeof(fd) == 0);
997 count /= sizeof(fd);
998
999 for (i = 0; i < count; i++) {
1000 memcpy(&fd, (char*) CMSG_DATA(cmsg) + i * sizeof(fd), sizeof(fd));
1001 /* Already has accepted fd, queue now */
1002 if (stream->accepted_fd != -1) {
1003 err = uv__stream_queue_fd(stream, fd);
1004 if (err != 0) {
1005 /* Close rest */
1006 for (; i < count; i++)
1007 uv__close(fd);
1008 return err;
1009 }
1010 } else {
1011 stream->accepted_fd = fd;
1012 }
1013 }
1014 }
1015
1016 return 0;
1017 }
1018
1019
uv__read(uv_stream_t * stream)1020 static void uv__read(uv_stream_t* stream) {
1021 uv_buf_t buf;
1022 ssize_t nread;
1023 struct msghdr msg;
1024 union uv__cmsg cmsg;
1025 int count;
1026 int err;
1027 int is_ipc;
1028
1029 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1030
1031 /* Prevent loop starvation when the data comes in as fast as (or faster than)
1032 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1033 */
1034 count = 32;
1035
1036 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1037
1038 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1039 * tcp->read_cb is NULL or not?
1040 */
1041 while (stream->read_cb
1042 && (stream->flags & UV_HANDLE_READING)
1043 && (count-- > 0)) {
1044 assert(stream->alloc_cb != NULL);
1045
1046 buf = uv_buf_init(NULL, 0);
1047 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1048 if (buf.base == NULL || buf.len == 0) {
1049 /* User indicates it can't or won't handle the read. */
1050 stream->read_cb(stream, UV_ENOBUFS, &buf);
1051 return;
1052 }
1053
1054 assert(buf.base != NULL);
1055 assert(uv__stream_fd(stream) >= 0);
1056
1057 if (!is_ipc) {
1058 do {
1059 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1060 }
1061 while (nread < 0 && errno == EINTR);
1062 } else {
1063 /* ipc uses recvmsg */
1064 msg.msg_flags = 0;
1065 msg.msg_iov = (struct iovec*) &buf;
1066 msg.msg_iovlen = 1;
1067 msg.msg_name = NULL;
1068 msg.msg_namelen = 0;
1069 /* Set up to receive a descriptor even if one isn't in the message */
1070 msg.msg_controllen = sizeof(cmsg);
1071 msg.msg_control = &cmsg.hdr;
1072
1073 do {
1074 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1075 }
1076 while (nread < 0 && errno == EINTR);
1077 }
1078
1079 if (nread < 0) {
1080 /* Error */
1081 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1082 /* Wait for the next one. */
1083 if (stream->flags & UV_HANDLE_READING) {
1084 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1085 uv__stream_osx_interrupt_select(stream);
1086 }
1087 stream->read_cb(stream, 0, &buf);
1088 #if defined(__CYGWIN__) || defined(__MSYS__)
1089 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1090 uv__stream_eof(stream, &buf);
1091 return;
1092 #endif
1093 } else {
1094 /* Error. User should call uv_close(). */
1095 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1096 stream->read_cb(stream, UV__ERR(errno), &buf);
1097 if (stream->flags & UV_HANDLE_READING) {
1098 stream->flags &= ~UV_HANDLE_READING;
1099 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1100 uv__handle_stop(stream);
1101 uv__stream_osx_interrupt_select(stream);
1102 }
1103 }
1104 return;
1105 } else if (nread == 0) {
1106 uv__stream_eof(stream, &buf);
1107 return;
1108 } else {
1109 /* Successful read */
1110 ssize_t buflen = buf.len;
1111
1112 if (is_ipc) {
1113 err = uv__stream_recv_cmsg(stream, &msg);
1114 if (err != 0) {
1115 stream->read_cb(stream, err, &buf);
1116 return;
1117 }
1118 }
1119
1120 #if defined(__MVS__)
1121 if (is_ipc && msg.msg_controllen > 0) {
1122 uv_buf_t blankbuf;
1123 int nread;
1124 struct iovec *old;
1125
1126 blankbuf.base = 0;
1127 blankbuf.len = 0;
1128 old = msg.msg_iov;
1129 msg.msg_iov = (struct iovec*) &blankbuf;
1130 nread = 0;
1131 do {
1132 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133 err = uv__stream_recv_cmsg(stream, &msg);
1134 if (err != 0) {
1135 stream->read_cb(stream, err, &buf);
1136 msg.msg_iov = old;
1137 return;
1138 }
1139 } while (nread == 0 && msg.msg_controllen > 0);
1140 msg.msg_iov = old;
1141 }
1142 #endif
1143 stream->read_cb(stream, nread, &buf);
1144
1145 /* Return if we didn't fill the buffer, there is no more data to read. */
1146 if (nread < buflen) {
1147 stream->flags |= UV_HANDLE_READ_PARTIAL;
1148 return;
1149 }
1150 }
1151 }
1152 }
1153
1154
uv_shutdown(uv_shutdown_t * req,uv_stream_t * stream,uv_shutdown_cb cb)1155 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1156 assert(stream->type == UV_TCP ||
1157 stream->type == UV_TTY ||
1158 stream->type == UV_NAMED_PIPE);
1159
1160 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1161 stream->flags & UV_HANDLE_SHUT ||
1162 uv__is_stream_shutting(stream) ||
1163 uv__is_closing(stream)) {
1164 return UV_ENOTCONN;
1165 }
1166
1167 assert(uv__stream_fd(stream) >= 0);
1168
1169 /* Initialize request. The `shutdown(2)` call will always be deferred until
1170 * `uv__drain`, just before the callback is run. */
1171 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1172 req->handle = stream;
1173 req->cb = cb;
1174 stream->shutdown_req = req;
1175 stream->flags &= ~UV_HANDLE_WRITABLE;
1176
1177 if (uv__queue_empty(&stream->write_queue))
1178 uv__io_feed(stream->loop, &stream->io_watcher);
1179
1180 return 0;
1181 }
1182
1183
uv__stream_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1184 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1185 uv_stream_t* stream;
1186
1187 stream = container_of(w, uv_stream_t, io_watcher);
1188
1189 assert(stream->type == UV_TCP ||
1190 stream->type == UV_NAMED_PIPE ||
1191 stream->type == UV_TTY);
1192 assert(!(stream->flags & UV_HANDLE_CLOSING));
1193
1194 if (stream->connect_req) {
1195 uv__stream_connect(stream);
1196 return;
1197 }
1198
1199 assert(uv__stream_fd(stream) >= 0);
1200
1201 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1202 if (events & (POLLIN | POLLERR | POLLHUP))
1203 uv__read(stream);
1204
1205 if (uv__stream_fd(stream) == -1)
1206 return; /* read_cb closed stream. */
1207
1208 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1209 * events and uv__read() reported a partial read but not EOF. If the EOF
1210 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1211 * have to do anything. If the partial read flag is not set, we can't
1212 * report the EOF yet because there is still data to read.
1213 */
1214 if ((events & POLLHUP) &&
1215 (stream->flags & UV_HANDLE_READING) &&
1216 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1217 !(stream->flags & UV_HANDLE_READ_EOF)) {
1218 uv_buf_t buf = { NULL, 0 };
1219 uv__stream_eof(stream, &buf);
1220 }
1221
1222 if (uv__stream_fd(stream) == -1)
1223 return; /* read_cb closed stream. */
1224
1225 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1226 uv__write(stream);
1227 uv__write_callbacks(stream);
1228
1229 /* Write queue drained. */
1230 if (uv__queue_empty(&stream->write_queue))
1231 uv__drain(stream);
1232 }
1233 }
1234
1235
1236 /**
1237 * We get called here from directly following a call to connect(2).
1238 * In order to determine if we've errored out or succeeded must call
1239 * getsockopt.
1240 */
uv__stream_connect(uv_stream_t * stream)1241 static void uv__stream_connect(uv_stream_t* stream) {
1242 int error;
1243 uv_connect_t* req = stream->connect_req;
1244 socklen_t errorsize = sizeof(int);
1245
1246 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1247 assert(req);
1248
1249 if (stream->delayed_error) {
1250 /* To smooth over the differences between unixes errors that
1251 * were reported synchronously on the first connect can be delayed
1252 * until the next tick--which is now.
1253 */
1254 error = stream->delayed_error;
1255 stream->delayed_error = 0;
1256 } else {
1257 /* Normal situation: we need to get the socket error from the kernel. */
1258 assert(uv__stream_fd(stream) >= 0);
1259 getsockopt(uv__stream_fd(stream),
1260 SOL_SOCKET,
1261 SO_ERROR,
1262 &error,
1263 &errorsize);
1264 error = UV__ERR(error);
1265 }
1266
1267 if (error == UV__ERR(EINPROGRESS))
1268 return;
1269
1270 stream->connect_req = NULL;
1271 uv__req_unregister(stream->loop, req);
1272
1273 if (error < 0 || uv__queue_empty(&stream->write_queue)) {
1274 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1275 }
1276
1277 if (req->cb)
1278 req->cb(req, error);
1279
1280 if (uv__stream_fd(stream) == -1)
1281 return;
1282
1283 if (error < 0) {
1284 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1285 uv__write_callbacks(stream);
1286 }
1287 }
1288
1289
uv__check_before_write(uv_stream_t * stream,unsigned int nbufs,uv_stream_t * send_handle)1290 static int uv__check_before_write(uv_stream_t* stream,
1291 unsigned int nbufs,
1292 uv_stream_t* send_handle) {
1293 assert(nbufs > 0);
1294 assert((stream->type == UV_TCP ||
1295 stream->type == UV_NAMED_PIPE ||
1296 stream->type == UV_TTY) &&
1297 "uv_write (unix) does not yet support other types of streams");
1298
1299 if (uv__stream_fd(stream) < 0)
1300 return UV_EBADF;
1301
1302 if (!(stream->flags & UV_HANDLE_WRITABLE))
1303 return UV_EPIPE;
1304
1305 if (send_handle != NULL) {
1306 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1307 return UV_EINVAL;
1308
1309 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1310 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1311 * evaluates to a function that operates on a uv_stream_t with a couple of
1312 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1313 * which works but only by accident.
1314 */
1315 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1316 return UV_EBADF;
1317
1318 #if defined(__CYGWIN__) || defined(__MSYS__)
1319 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1320 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1321 return UV_ENOSYS;
1322 #endif
1323 }
1324
1325 return 0;
1326 }
1327
uv_write2(uv_write_t * req,uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1328 int uv_write2(uv_write_t* req,
1329 uv_stream_t* stream,
1330 const uv_buf_t bufs[],
1331 unsigned int nbufs,
1332 uv_stream_t* send_handle,
1333 uv_write_cb cb) {
1334 int empty_queue;
1335 int err;
1336
1337 err = uv__check_before_write(stream, nbufs, send_handle);
1338 if (err < 0)
1339 return err;
1340
1341 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1342 * it means there are error-state requests in the write_completed_queue that
1343 * will touch up write_queue_size later, see also uv__write_req_finish().
1344 * We could check that write_queue is empty instead but that implies making
1345 * a write() syscall when we know that the handle is in error mode.
1346 */
1347 empty_queue = (stream->write_queue_size == 0);
1348
1349 /* Initialize the req */
1350 uv__req_init(stream->loop, req, UV_WRITE);
1351 req->cb = cb;
1352 req->handle = stream;
1353 req->error = 0;
1354 req->send_handle = send_handle;
1355 uv__queue_init(&req->queue);
1356
1357 req->bufs = req->bufsml;
1358 if (nbufs > ARRAY_SIZE(req->bufsml))
1359 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1360
1361 if (req->bufs == NULL)
1362 return UV_ENOMEM;
1363
1364 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1365 req->nbufs = nbufs;
1366 req->write_index = 0;
1367 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1368
1369 /* Append the request to write_queue. */
1370 uv__queue_insert_tail(&stream->write_queue, &req->queue);
1371
1372 /* If the queue was empty when this function began, we should attempt to
1373 * do the write immediately. Otherwise start the write_watcher and wait
1374 * for the fd to become writable.
1375 */
1376 if (stream->connect_req) {
1377 /* Still connecting, do nothing. */
1378 }
1379 else if (empty_queue) {
1380 uv__write(stream);
1381 }
1382 else {
1383 /*
1384 * blocking streams should never have anything in the queue.
1385 * if this assert fires then somehow the blocking stream isn't being
1386 * sufficiently flushed in uv__write.
1387 */
1388 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1389 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1390 uv__stream_osx_interrupt_select(stream);
1391 }
1392
1393 return 0;
1394 }
1395
1396
1397 /* The buffers to be written must remain valid until the callback is called.
1398 * This is not required for the uv_buf_t array.
1399 */
uv_write(uv_write_t * req,uv_stream_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1400 int uv_write(uv_write_t* req,
1401 uv_stream_t* handle,
1402 const uv_buf_t bufs[],
1403 unsigned int nbufs,
1404 uv_write_cb cb) {
1405 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1406 }
1407
1408
uv_try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs)1409 int uv_try_write(uv_stream_t* stream,
1410 const uv_buf_t bufs[],
1411 unsigned int nbufs) {
1412 return uv_try_write2(stream, bufs, nbufs, NULL);
1413 }
1414
1415
uv_try_write2(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)1416 int uv_try_write2(uv_stream_t* stream,
1417 const uv_buf_t bufs[],
1418 unsigned int nbufs,
1419 uv_stream_t* send_handle) {
1420 int err;
1421
1422 /* Connecting or already writing some data */
1423 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1424 return UV_EAGAIN;
1425
1426 err = uv__check_before_write(stream, nbufs, NULL);
1427 if (err < 0)
1428 return err;
1429
1430 return uv__try_write(stream, bufs, nbufs, send_handle);
1431 }
1432
1433
uv__read_start(uv_stream_t * stream,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1434 int uv__read_start(uv_stream_t* stream,
1435 uv_alloc_cb alloc_cb,
1436 uv_read_cb read_cb) {
1437 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1438 stream->type == UV_TTY);
1439
1440 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1441 * just expresses the desired state of the user. */
1442 stream->flags |= UV_HANDLE_READING;
1443 stream->flags &= ~UV_HANDLE_READ_EOF;
1444
1445 /* TODO: try to do the read inline? */
1446 assert(uv__stream_fd(stream) >= 0);
1447 assert(alloc_cb);
1448
1449 stream->read_cb = read_cb;
1450 stream->alloc_cb = alloc_cb;
1451
1452 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1453 uv__handle_start(stream);
1454 uv__stream_osx_interrupt_select(stream);
1455
1456 return 0;
1457 }
1458
1459
uv_read_stop(uv_stream_t * stream)1460 int uv_read_stop(uv_stream_t* stream) {
1461 if (!(stream->flags & UV_HANDLE_READING))
1462 return 0;
1463
1464 stream->flags &= ~UV_HANDLE_READING;
1465 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1466 uv__handle_stop(stream);
1467 uv__stream_osx_interrupt_select(stream);
1468
1469 stream->read_cb = NULL;
1470 stream->alloc_cb = NULL;
1471 return 0;
1472 }
1473
1474
uv_is_readable(const uv_stream_t * stream)1475 int uv_is_readable(const uv_stream_t* stream) {
1476 return !!(stream->flags & UV_HANDLE_READABLE);
1477 }
1478
1479
uv_is_writable(const uv_stream_t * stream)1480 int uv_is_writable(const uv_stream_t* stream) {
1481 return !!(stream->flags & UV_HANDLE_WRITABLE);
1482 }
1483
1484
1485 #if defined(__APPLE__)
uv___stream_fd(const uv_stream_t * handle)1486 int uv___stream_fd(const uv_stream_t* handle) {
1487 const uv__stream_select_t* s;
1488
1489 assert(handle->type == UV_TCP ||
1490 handle->type == UV_TTY ||
1491 handle->type == UV_NAMED_PIPE);
1492
1493 s = handle->select;
1494 if (s != NULL)
1495 return s->fd;
1496
1497 return handle->io_watcher.fd;
1498 }
1499 #endif /* defined(__APPLE__) */
1500
1501
uv__stream_close(uv_stream_t * handle)1502 void uv__stream_close(uv_stream_t* handle) {
1503 unsigned int i;
1504 uv__stream_queued_fds_t* queued_fds;
1505
1506 #if defined(__APPLE__)
1507 /* Terminate select loop first */
1508 if (handle->select != NULL) {
1509 uv__stream_select_t* s;
1510
1511 s = handle->select;
1512
1513 uv_sem_post(&s->close_sem);
1514 uv_sem_post(&s->async_sem);
1515 uv__stream_osx_interrupt_select(handle);
1516 uv_thread_join(&s->thread);
1517 uv_sem_destroy(&s->close_sem);
1518 uv_sem_destroy(&s->async_sem);
1519 uv__close(s->fake_fd);
1520 uv__close(s->int_fd);
1521 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1522
1523 handle->select = NULL;
1524 }
1525 #endif /* defined(__APPLE__) */
1526
1527 uv__io_close(handle->loop, &handle->io_watcher);
1528 uv_read_stop(handle);
1529 uv__handle_stop(handle);
1530 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1531
1532 if (handle->io_watcher.fd != -1) {
1533 /* Don't close stdio file descriptors. Nothing good comes from it. */
1534 if (handle->io_watcher.fd > STDERR_FILENO)
1535 uv__close(handle->io_watcher.fd);
1536 handle->io_watcher.fd = -1;
1537 }
1538
1539 if (handle->accepted_fd != -1) {
1540 uv__close(handle->accepted_fd);
1541 handle->accepted_fd = -1;
1542 }
1543
1544 /* Close all queued fds */
1545 if (handle->queued_fds != NULL) {
1546 queued_fds = handle->queued_fds;
1547 for (i = 0; i < queued_fds->offset; i++)
1548 uv__close(queued_fds->fds[i]);
1549 uv__free(handle->queued_fds);
1550 handle->queued_fds = NULL;
1551 }
1552
1553 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1554 }
1555
1556
uv_stream_set_blocking(uv_stream_t * handle,int blocking)1557 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1558 /* Don't need to check the file descriptor, uv__nonblock()
1559 * will fail with EBADF if it's not valid.
1560 */
1561 return uv__nonblock(uv__stream_fd(handle), !blocking);
1562 }
1563