• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30 
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37 
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42 
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45 
46 struct uv__stream_select_s {
47   uv_stream_t* stream;
48   uv_thread_t thread;
49   uv_sem_t close_sem;
50   uv_sem_t async_sem;
51   uv_async_t async;
52   int events;
53   int fake_fd;
54   int int_fd;
55   int fd;
56   fd_set* sread;
57   size_t sread_sz;
58   fd_set* swrite;
59   size_t swrite_sz;
60 };
61 #endif /* defined(__APPLE__) */
62 
63 union uv__cmsg {
64   struct cmsghdr hdr;
65   /* This cannot be larger because of the IBMi PASE limitation that
66    * the total size of control messages cannot exceed 256 bytes.
67    */
68   char pad[256];
69 };
70 
71 STATIC_ASSERT(256 == sizeof(union uv__cmsg));
72 
73 static void uv__stream_connect(uv_stream_t*);
74 static void uv__write(uv_stream_t* stream);
75 static void uv__read(uv_stream_t* stream);
76 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
77 static void uv__write_callbacks(uv_stream_t* stream);
78 static size_t uv__write_req_size(uv_write_t* req);
79 static void uv__drain(uv_stream_t* stream);
80 
81 
uv__stream_init(uv_loop_t * loop,uv_stream_t * stream,uv_handle_type type)82 void uv__stream_init(uv_loop_t* loop,
83                      uv_stream_t* stream,
84                      uv_handle_type type) {
85   int err;
86 
87   uv__handle_init(loop, (uv_handle_t*)stream, type);
88   stream->read_cb = NULL;
89   stream->alloc_cb = NULL;
90   stream->close_cb = NULL;
91   stream->connection_cb = NULL;
92   stream->connect_req = NULL;
93   stream->shutdown_req = NULL;
94   stream->accepted_fd = -1;
95   stream->queued_fds = NULL;
96   stream->delayed_error = 0;
97   uv__queue_init(&stream->write_queue);
98   uv__queue_init(&stream->write_completed_queue);
99   stream->write_queue_size = 0;
100 
101   if (loop->emfile_fd == -1) {
102     err = uv__open_cloexec("/dev/null", O_RDONLY);
103     if (err < 0)
104         /* In the rare case that "/dev/null" isn't mounted open "/"
105          * instead.
106          */
107         err = uv__open_cloexec("/", O_RDONLY);
108     if (err >= 0)
109       loop->emfile_fd = err;
110   }
111 
112 #if defined(__APPLE__)
113   stream->select = NULL;
114 #endif /* defined(__APPLE_) */
115 
116   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
117 }
118 
119 
uv__stream_osx_interrupt_select(uv_stream_t * stream)120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
121 #if defined(__APPLE__)
122   /* Notify select() thread about state change */
123   uv__stream_select_t* s;
124   int r;
125 
126   s = stream->select;
127   if (s == NULL)
128     return;
129 
130   /* Interrupt select() loop
131    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
132    * emit read event on other side
133    */
134   do
135     r = write(s->fake_fd, "x", 1);
136   while (r == -1 && errno == EINTR);
137 
138   assert(r == 1);
139 #else  /* !defined(__APPLE__) */
140   /* No-op on any other platform */
141 #endif  /* !defined(__APPLE__) */
142 }
143 
144 
145 #if defined(__APPLE__)
uv__stream_osx_select(void * arg)146 static void uv__stream_osx_select(void* arg) {
147   uv_stream_t* stream;
148   uv__stream_select_t* s;
149   char buf[1024];
150   int events;
151   int fd;
152   int r;
153   int max_fd;
154 
155   stream = arg;
156   s = stream->select;
157   fd = s->fd;
158 
159   if (fd > s->int_fd)
160     max_fd = fd;
161   else
162     max_fd = s->int_fd;
163 
164   for (;;) {
165     /* Terminate on semaphore */
166     if (uv_sem_trywait(&s->close_sem) == 0)
167       break;
168 
169     /* Watch fd using select(2) */
170     memset(s->sread, 0, s->sread_sz);
171     memset(s->swrite, 0, s->swrite_sz);
172 
173     if (uv__io_active(&stream->io_watcher, POLLIN))
174       FD_SET(fd, s->sread);
175     if (uv__io_active(&stream->io_watcher, POLLOUT))
176       FD_SET(fd, s->swrite);
177     FD_SET(s->int_fd, s->sread);
178 
179     /* Wait indefinitely for fd events */
180     r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
181     if (r == -1) {
182       if (errno == EINTR)
183         continue;
184 
185       /* XXX: Possible?! */
186       abort();
187     }
188 
189     /* Ignore timeouts */
190     if (r == 0)
191       continue;
192 
193     /* Empty socketpair's buffer in case of interruption */
194     if (FD_ISSET(s->int_fd, s->sread))
195       for (;;) {
196         r = read(s->int_fd, buf, sizeof(buf));
197 
198         if (r == sizeof(buf))
199           continue;
200 
201         if (r != -1)
202           break;
203 
204         if (errno == EAGAIN || errno == EWOULDBLOCK)
205           break;
206 
207         if (errno == EINTR)
208           continue;
209 
210         abort();
211       }
212 
213     /* Handle events */
214     events = 0;
215     if (FD_ISSET(fd, s->sread))
216       events |= POLLIN;
217     if (FD_ISSET(fd, s->swrite))
218       events |= POLLOUT;
219 
220     assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
221     if (events != 0) {
222       ACCESS_ONCE(int, s->events) = events;
223 
224       uv_async_send(&s->async);
225       uv_sem_wait(&s->async_sem);
226 
227       /* Should be processed at this stage */
228       assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
229     }
230   }
231 }
232 
233 
uv__stream_osx_select_cb(uv_async_t * handle)234 static void uv__stream_osx_select_cb(uv_async_t* handle) {
235   uv__stream_select_t* s;
236   uv_stream_t* stream;
237   int events;
238 
239   s = container_of(handle, uv__stream_select_t, async);
240   stream = s->stream;
241 
242   /* Get and reset stream's events */
243   events = s->events;
244   ACCESS_ONCE(int, s->events) = 0;
245 
246   assert(events != 0);
247   assert(events == (events & (POLLIN | POLLOUT)));
248 
249   /* Invoke callback on event-loop */
250   if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
251     uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
252 
253   if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
254     uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
255 
256   if (stream->flags & UV_HANDLE_CLOSING)
257     return;
258 
259   /* NOTE: It is important to do it here, otherwise `select()` might be called
260    * before the actual `uv__read()`, leading to the blocking syscall
261    */
262   uv_sem_post(&s->async_sem);
263 }
264 
265 
uv__stream_osx_cb_close(uv_handle_t * async)266 static void uv__stream_osx_cb_close(uv_handle_t* async) {
267   uv__stream_select_t* s;
268 
269   s = container_of(async, uv__stream_select_t, async);
270   uv__free(s);
271 }
272 
273 
uv__stream_try_select(uv_stream_t * stream,int * fd)274 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
275   /*
276    * kqueue doesn't work with some files from /dev mount on osx.
277    * select(2) in separate thread for those fds
278    */
279 
280   struct kevent filter[1];
281   struct kevent events[1];
282   struct timespec timeout;
283   uv__stream_select_t* s;
284   int fds[2];
285   int err;
286   int ret;
287   int kq;
288   int old_fd;
289   int max_fd;
290   size_t sread_sz;
291   size_t swrite_sz;
292 
293   kq = kqueue();
294   if (kq == -1) {
295     perror("(libuv) kqueue()");
296     return UV__ERR(errno);
297   }
298 
299   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
300 
301   /* Use small timeout, because we only want to capture EINVALs */
302   timeout.tv_sec = 0;
303   timeout.tv_nsec = 1;
304 
305   do
306     ret = kevent(kq, filter, 1, events, 1, &timeout);
307   while (ret == -1 && errno == EINTR);
308 
309   uv__close(kq);
310 
311   if (ret == -1)
312     return UV__ERR(errno);
313 
314   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
315     return 0;
316 
317   /* At this point we definitely know that this fd won't work with kqueue */
318 
319   /*
320    * Create fds for io watcher and to interrupt the select() loop.
321    * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
322    */
323   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
324     return UV__ERR(errno);
325 
326   max_fd = *fd;
327   if (fds[1] > max_fd)
328     max_fd = fds[1];
329 
330   sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
331   swrite_sz = sread_sz;
332 
333   s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
334   if (s == NULL) {
335     err = UV_ENOMEM;
336     goto failed_malloc;
337   }
338 
339   s->events = 0;
340   s->fd = *fd;
341   s->sread = (fd_set*) ((char*) s + sizeof(*s));
342   s->sread_sz = sread_sz;
343   s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
344   s->swrite_sz = swrite_sz;
345 
346   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
347   if (err)
348     goto failed_async_init;
349 
350   s->async.flags |= UV_HANDLE_INTERNAL;
351   uv__handle_unref(&s->async);
352 
353   err = uv_sem_init(&s->close_sem, 0);
354   if (err != 0)
355     goto failed_close_sem_init;
356 
357   err = uv_sem_init(&s->async_sem, 0);
358   if (err != 0)
359     goto failed_async_sem_init;
360 
361   s->fake_fd = fds[0];
362   s->int_fd = fds[1];
363 
364   old_fd = *fd;
365   s->stream = stream;
366   stream->select = s;
367   *fd = s->fake_fd;
368 
369   err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
370   if (err != 0)
371     goto failed_thread_create;
372 
373   return 0;
374 
375 failed_thread_create:
376   s->stream = NULL;
377   stream->select = NULL;
378   *fd = old_fd;
379 
380   uv_sem_destroy(&s->async_sem);
381 
382 failed_async_sem_init:
383   uv_sem_destroy(&s->close_sem);
384 
385 failed_close_sem_init:
386   uv__close(fds[0]);
387   uv__close(fds[1]);
388   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
389   return err;
390 
391 failed_async_init:
392   uv__free(s);
393 
394 failed_malloc:
395   uv__close(fds[0]);
396   uv__close(fds[1]);
397 
398   return err;
399 }
400 #endif /* defined(__APPLE__) */
401 
402 
uv__stream_open(uv_stream_t * stream,int fd,int flags)403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
404 #if defined(__APPLE__)
405   int enable;
406 #endif
407 
408   if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
409     return UV_EBUSY;
410 
411   assert(fd >= 0);
412   stream->flags |= flags;
413 
414   if (stream->type == UV_TCP) {
415     if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
416       return UV__ERR(errno);
417 
418     /* TODO Use delay the user passed in. */
419     if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
420         uv__tcp_keepalive(fd, 1, 60)) {
421       return UV__ERR(errno);
422     }
423   }
424 
425 #if defined(__APPLE__)
426   enable = 1;
427   if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
428       errno != ENOTSOCK &&
429       errno != EINVAL) {
430     return UV__ERR(errno);
431   }
432 #endif
433 
434   stream->io_watcher.fd = fd;
435 
436   return 0;
437 }
438 
439 
uv__stream_flush_write_queue(uv_stream_t * stream,int error)440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
441   uv_write_t* req;
442   struct uv__queue* q;
443   while (!uv__queue_empty(&stream->write_queue)) {
444     q = uv__queue_head(&stream->write_queue);
445     uv__queue_remove(q);
446 
447     req = uv__queue_data(q, uv_write_t, queue);
448     req->error = error;
449 
450     uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
451   }
452 }
453 
454 
uv__stream_destroy(uv_stream_t * stream)455 void uv__stream_destroy(uv_stream_t* stream) {
456   assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
457   assert(stream->flags & UV_HANDLE_CLOSED);
458 
459   if (stream->connect_req) {
460     uv__req_unregister(stream->loop, stream->connect_req);
461     stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
462     stream->connect_req = NULL;
463   }
464 
465   uv__stream_flush_write_queue(stream, UV_ECANCELED);
466   uv__write_callbacks(stream);
467   uv__drain(stream);
468 
469   assert(stream->write_queue_size == 0);
470 }
471 
472 
473 /* Implements a best effort approach to mitigating accept() EMFILE errors.
474  * We have a spare file descriptor stashed away that we close to get below
475  * the EMFILE limit. Next, we accept all pending connections and close them
476  * immediately to signal the clients that we're overloaded - and we are, but
477  * we still keep on trucking.
478  *
479  * There is one caveat: it's not reliable in a multi-threaded environment.
480  * The file descriptor limit is per process. Our party trick fails if another
481  * thread opens a file or creates a socket in the time window between us
482  * calling close() and accept().
483  */
uv__emfile_trick(uv_loop_t * loop,int accept_fd)484 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
485   int err;
486   int emfile_fd;
487 
488   if (loop->emfile_fd == -1)
489     return UV_EMFILE;
490 
491   uv__close(loop->emfile_fd);
492   loop->emfile_fd = -1;
493 
494   do {
495     err = uv__accept(accept_fd);
496     if (err >= 0)
497       uv__close(err);
498   } while (err >= 0 || err == UV_EINTR);
499 
500   emfile_fd = uv__open_cloexec("/", O_RDONLY);
501   if (emfile_fd >= 0)
502     loop->emfile_fd = emfile_fd;
503 
504   return err;
505 }
506 
507 
uv__server_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)508 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
509   uv_stream_t* stream;
510   int err;
511   int fd;
512 
513   stream = container_of(w, uv_stream_t, io_watcher);
514   assert(events & POLLIN);
515   assert(stream->accepted_fd == -1);
516   assert(!(stream->flags & UV_HANDLE_CLOSING));
517 
518   fd = uv__stream_fd(stream);
519   err = uv__accept(fd);
520 
521   if (err == UV_EMFILE || err == UV_ENFILE)
522     err = uv__emfile_trick(loop, fd);  /* Shed load. */
523 
524   if (err < 0)
525     return;
526 
527   stream->accepted_fd = err;
528   stream->connection_cb(stream, 0);
529 
530   if (stream->accepted_fd != -1)
531     /* The user hasn't yet accepted called uv_accept() */
532     uv__io_stop(loop, &stream->io_watcher, POLLIN);
533 }
534 
535 
uv_accept(uv_stream_t * server,uv_stream_t * client)536 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
537   int err;
538 
539   assert(server->loop == client->loop);
540 
541   if (server->accepted_fd == -1)
542     return UV_EAGAIN;
543 
544   switch (client->type) {
545     case UV_NAMED_PIPE:
546     case UV_TCP:
547       err = uv__stream_open(client,
548                             server->accepted_fd,
549                             UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
550       if (err) {
551         /* TODO handle error */
552         uv__close(server->accepted_fd);
553         goto done;
554       }
555       break;
556 
557     case UV_UDP:
558       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
559       if (err) {
560         uv__close(server->accepted_fd);
561         goto done;
562       }
563       break;
564 
565     default:
566       return UV_EINVAL;
567   }
568 
569   client->flags |= UV_HANDLE_BOUND;
570 
571 done:
572   /* Process queued fds */
573   if (server->queued_fds != NULL) {
574     uv__stream_queued_fds_t* queued_fds;
575 
576     queued_fds = server->queued_fds;
577 
578     /* Read first */
579     server->accepted_fd = queued_fds->fds[0];
580 
581     /* All read, free */
582     assert(queued_fds->offset > 0);
583     if (--queued_fds->offset == 0) {
584       uv__free(queued_fds);
585       server->queued_fds = NULL;
586     } else {
587       /* Shift rest */
588       memmove(queued_fds->fds,
589               queued_fds->fds + 1,
590               queued_fds->offset * sizeof(*queued_fds->fds));
591     }
592   } else {
593     server->accepted_fd = -1;
594     if (err == 0)
595       uv__io_start(server->loop, &server->io_watcher, POLLIN);
596   }
597   return err;
598 }
599 
600 
uv_listen(uv_stream_t * stream,int backlog,uv_connection_cb cb)601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
602   int err;
603   if (uv__is_closing(stream)) {
604     return UV_EINVAL;
605   }
606   switch (stream->type) {
607   case UV_TCP:
608     err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
609     break;
610 
611   case UV_NAMED_PIPE:
612     err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
613     break;
614 
615   default:
616     err = UV_EINVAL;
617   }
618 
619   if (err == 0)
620     uv__handle_start(stream);
621 
622   return err;
623 }
624 
625 
uv__drain(uv_stream_t * stream)626 static void uv__drain(uv_stream_t* stream) {
627   uv_shutdown_t* req;
628   int err;
629 
630   assert(uv__queue_empty(&stream->write_queue));
631   if (!(stream->flags & UV_HANDLE_CLOSING)) {
632     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
633     uv__stream_osx_interrupt_select(stream);
634   }
635 
636   if (!uv__is_stream_shutting(stream))
637     return;
638 
639   req = stream->shutdown_req;
640   assert(req);
641 
642   if ((stream->flags & UV_HANDLE_CLOSING) ||
643       !(stream->flags & UV_HANDLE_SHUT)) {
644     stream->shutdown_req = NULL;
645     uv__req_unregister(stream->loop, req);
646 
647     err = 0;
648     if (stream->flags & UV_HANDLE_CLOSING)
649       /* The user destroyed the stream before we got to do the shutdown. */
650       err = UV_ECANCELED;
651     else if (shutdown(uv__stream_fd(stream), SHUT_WR))
652       err = UV__ERR(errno);
653     else /* Success. */
654       stream->flags |= UV_HANDLE_SHUT;
655 
656     if (req->cb != NULL)
657       req->cb(req, err);
658   }
659 }
660 
661 
uv__writev(int fd,struct iovec * vec,size_t n)662 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
663   if (n == 1)
664     return write(fd, vec->iov_base, vec->iov_len);
665   else
666     return writev(fd, vec, n);
667 }
668 
669 
uv__write_req_size(uv_write_t * req)670 static size_t uv__write_req_size(uv_write_t* req) {
671   size_t size;
672 
673   assert(req->bufs != NULL);
674   size = uv__count_bufs(req->bufs + req->write_index,
675                         req->nbufs - req->write_index);
676   assert(req->handle->write_queue_size >= size);
677 
678   return size;
679 }
680 
681 
682 /* Returns 1 if all write request data has been written, or 0 if there is still
683  * more data to write.
684  *
685  * Note: the return value only says something about the *current* request.
686  * There may still be other write requests sitting in the queue.
687  */
uv__write_req_update(uv_stream_t * stream,uv_write_t * req,size_t n)688 static int uv__write_req_update(uv_stream_t* stream,
689                                 uv_write_t* req,
690                                 size_t n) {
691   uv_buf_t* buf;
692   size_t len;
693 
694   assert(n <= stream->write_queue_size);
695   stream->write_queue_size -= n;
696 
697   buf = req->bufs + req->write_index;
698 
699   do {
700     len = n < buf->len ? n : buf->len;
701     buf->base += len;
702     buf->len -= len;
703     buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
704     n -= len;
705   } while (n > 0);
706 
707   req->write_index = buf - req->bufs;
708 
709   return req->write_index == req->nbufs;
710 }
711 
712 
uv__write_req_finish(uv_write_t * req)713 static void uv__write_req_finish(uv_write_t* req) {
714   uv_stream_t* stream = req->handle;
715 
716   /* Pop the req off tcp->write_queue. */
717   uv__queue_remove(&req->queue);
718 
719   /* Only free when there was no error. On error, we touch up write_queue_size
720    * right before making the callback. The reason we don't do that right away
721    * is that a write_queue_size > 0 is our only way to signal to the user that
722    * they should stop writing - which they should if we got an error. Something
723    * to revisit in future revisions of the libuv API.
724    */
725   if (req->error == 0) {
726     if (req->bufs != req->bufsml)
727       uv__free(req->bufs);
728     req->bufs = NULL;
729   }
730 
731   /* Add it to the write_completed_queue where it will have its
732    * callback called in the near future.
733    */
734   uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
735   uv__io_feed(stream->loop, &stream->io_watcher);
736 }
737 
738 
uv__handle_fd(uv_handle_t * handle)739 static int uv__handle_fd(uv_handle_t* handle) {
740   switch (handle->type) {
741     case UV_NAMED_PIPE:
742     case UV_TCP:
743       return ((uv_stream_t*) handle)->io_watcher.fd;
744 
745     case UV_UDP:
746       return ((uv_udp_t*) handle)->io_watcher.fd;
747 
748     default:
749       return -1;
750   }
751 }
752 
uv__try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)753 static int uv__try_write(uv_stream_t* stream,
754                          const uv_buf_t bufs[],
755                          unsigned int nbufs,
756                          uv_stream_t* send_handle) {
757   struct iovec* iov;
758   int iovmax;
759   int iovcnt;
760   ssize_t n;
761 
762   /*
763    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
764    * because Windows's WSABUF is not an iovec.
765    */
766   iov = (struct iovec*) bufs;
767   iovcnt = nbufs;
768 
769   iovmax = uv__getiovmax();
770 
771   /* Limit iov count to avoid EINVALs from writev() */
772   if (iovcnt > iovmax)
773     iovcnt = iovmax;
774 
775   /*
776    * Now do the actual writev. Note that we've been updating the pointers
777    * inside the iov each time we write. So there is no need to offset it.
778    */
779   if (send_handle != NULL) {
780     int fd_to_send;
781     struct msghdr msg;
782     union uv__cmsg cmsg;
783 
784     if (uv__is_closing(send_handle))
785       return UV_EBADF;
786 
787     fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
788 
789     memset(&cmsg, 0, sizeof(cmsg));
790 
791     assert(fd_to_send >= 0);
792 
793     msg.msg_name = NULL;
794     msg.msg_namelen = 0;
795     msg.msg_iov = iov;
796     msg.msg_iovlen = iovcnt;
797     msg.msg_flags = 0;
798 
799     msg.msg_control = &cmsg.hdr;
800     msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
801 
802     cmsg.hdr.cmsg_level = SOL_SOCKET;
803     cmsg.hdr.cmsg_type = SCM_RIGHTS;
804     cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send));
805     memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send));
806 
807     do
808       n = sendmsg(uv__stream_fd(stream), &msg, 0);
809     while (n == -1 && errno == EINTR);
810   } else {
811     do
812       n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
813     while (n == -1 && errno == EINTR);
814   }
815 
816   if (n >= 0)
817     return n;
818 
819   if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
820     return UV_EAGAIN;
821 
822 #ifdef __APPLE__
823   /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
824    * have a bug where a race condition causes the kernel to return EPROTOTYPE
825    * because the socket isn't fully constructed. It's probably the result of
826    * the peer closing the connection and that is why libuv translates it to
827    * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
828    * away but some VPN software causes the same behavior except the error is
829    * permanent, not transient, turning the retry mechanism into an infinite
830    * loop. See https://github.com/libuv/libuv/pull/482.
831    */
832   if (errno == EPROTOTYPE)
833     return UV_ECONNRESET;
834 #endif  /* __APPLE__ */
835 
836   return UV__ERR(errno);
837 }
838 
uv__write(uv_stream_t * stream)839 static void uv__write(uv_stream_t* stream) {
840   struct uv__queue* q;
841   uv_write_t* req;
842   ssize_t n;
843   int count;
844 
845   assert(uv__stream_fd(stream) >= 0);
846 
847   /* Prevent loop starvation when the consumer of this stream read as fast as
848    * (or faster than) we can write it. This `count` mechanism does not need to
849    * change even if we switch to edge-triggered I/O.
850    */
851   count = 32;
852 
853   for (;;) {
854     if (uv__queue_empty(&stream->write_queue))
855       return;
856 
857     q = uv__queue_head(&stream->write_queue);
858     req = uv__queue_data(q, uv_write_t, queue);
859     assert(req->handle == stream);
860 
861     n = uv__try_write(stream,
862                       &(req->bufs[req->write_index]),
863                       req->nbufs - req->write_index,
864                       req->send_handle);
865 
866     /* Ensure the handle isn't sent again in case this is a partial write. */
867     if (n >= 0) {
868       req->send_handle = NULL;
869       if (uv__write_req_update(stream, req, n)) {
870         uv__write_req_finish(req);
871         if (count-- > 0)
872           continue; /* Start trying to write the next request. */
873 
874         return;
875       }
876     } else if (n != UV_EAGAIN)
877       goto error;
878 
879     /* If this is a blocking stream, try again. */
880     if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
881       continue;
882 
883     /* We're not done. */
884     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
885 
886     /* Notify select() thread about state change */
887     uv__stream_osx_interrupt_select(stream);
888 
889     return;
890   }
891 
892 error:
893   req->error = n;
894   uv__write_req_finish(req);
895   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
896   uv__stream_osx_interrupt_select(stream);
897 }
898 
899 
uv__write_callbacks(uv_stream_t * stream)900 static void uv__write_callbacks(uv_stream_t* stream) {
901   uv_write_t* req;
902   struct uv__queue* q;
903   struct uv__queue pq;
904 
905   if (uv__queue_empty(&stream->write_completed_queue))
906     return;
907 
908   uv__queue_move(&stream->write_completed_queue, &pq);
909 
910   while (!uv__queue_empty(&pq)) {
911     /* Pop a req off write_completed_queue. */
912     q = uv__queue_head(&pq);
913     req = uv__queue_data(q, uv_write_t, queue);
914     uv__queue_remove(q);
915     uv__req_unregister(stream->loop, req);
916 
917     if (req->bufs != NULL) {
918       stream->write_queue_size -= uv__write_req_size(req);
919       if (req->bufs != req->bufsml)
920         uv__free(req->bufs);
921       req->bufs = NULL;
922     }
923 
924     /* NOTE: call callback AFTER freeing the request data. */
925     if (req->cb)
926       req->cb(req, req->error);
927   }
928 }
929 
930 
uv__stream_eof(uv_stream_t * stream,const uv_buf_t * buf)931 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
932   stream->flags |= UV_HANDLE_READ_EOF;
933   stream->flags &= ~UV_HANDLE_READING;
934   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
935   uv__handle_stop(stream);
936   uv__stream_osx_interrupt_select(stream);
937   stream->read_cb(stream, UV_EOF, buf);
938 }
939 
940 
uv__stream_queue_fd(uv_stream_t * stream,int fd)941 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
942   uv__stream_queued_fds_t* queued_fds;
943   unsigned int queue_size;
944 
945   queued_fds = stream->queued_fds;
946   if (queued_fds == NULL) {
947     queue_size = 8;
948     queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
949                             sizeof(*queued_fds));
950     if (queued_fds == NULL)
951       return UV_ENOMEM;
952     queued_fds->size = queue_size;
953     queued_fds->offset = 0;
954     stream->queued_fds = queued_fds;
955 
956     /* Grow */
957   } else if (queued_fds->size == queued_fds->offset) {
958     queue_size = queued_fds->size + 8;
959     queued_fds = uv__realloc(queued_fds,
960                              (queue_size - 1) * sizeof(*queued_fds->fds) +
961                               sizeof(*queued_fds));
962 
963     /*
964      * Allocation failure, report back.
965      * NOTE: if it is fatal - sockets will be closed in uv__stream_close
966      */
967     if (queued_fds == NULL)
968       return UV_ENOMEM;
969     queued_fds->size = queue_size;
970     stream->queued_fds = queued_fds;
971   }
972 
973   /* Put fd in a queue */
974   queued_fds->fds[queued_fds->offset++] = fd;
975 
976   return 0;
977 }
978 
979 
uv__stream_recv_cmsg(uv_stream_t * stream,struct msghdr * msg)980 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
981   struct cmsghdr* cmsg;
982   int fd;
983   int err;
984   size_t i;
985   size_t count;
986 
987   for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
988     if (cmsg->cmsg_type != SCM_RIGHTS) {
989       fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
990           cmsg->cmsg_type);
991       continue;
992     }
993 
994     assert(cmsg->cmsg_len >= CMSG_LEN(0));
995     count = cmsg->cmsg_len - CMSG_LEN(0);
996     assert(count % sizeof(fd) == 0);
997     count /= sizeof(fd);
998 
999     for (i = 0; i < count; i++) {
1000       memcpy(&fd, (char*) CMSG_DATA(cmsg) + i * sizeof(fd), sizeof(fd));
1001       /* Already has accepted fd, queue now */
1002       if (stream->accepted_fd != -1) {
1003         err = uv__stream_queue_fd(stream, fd);
1004         if (err != 0) {
1005           /* Close rest */
1006           for (; i < count; i++)
1007             uv__close(fd);
1008           return err;
1009         }
1010       } else {
1011         stream->accepted_fd = fd;
1012       }
1013     }
1014   }
1015 
1016   return 0;
1017 }
1018 
1019 
uv__read(uv_stream_t * stream)1020 static void uv__read(uv_stream_t* stream) {
1021   uv_buf_t buf;
1022   ssize_t nread;
1023   struct msghdr msg;
1024   union uv__cmsg cmsg;
1025   int count;
1026   int err;
1027   int is_ipc;
1028 
1029   stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1030 
1031   /* Prevent loop starvation when the data comes in as fast as (or faster than)
1032    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1033    */
1034   count = 32;
1035 
1036   is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1037 
1038   /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1039    * tcp->read_cb is NULL or not?
1040    */
1041   while (stream->read_cb
1042       && (stream->flags & UV_HANDLE_READING)
1043       && (count-- > 0)) {
1044     assert(stream->alloc_cb != NULL);
1045 
1046     buf = uv_buf_init(NULL, 0);
1047     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1048     if (buf.base == NULL || buf.len == 0) {
1049       /* User indicates it can't or won't handle the read. */
1050       stream->read_cb(stream, UV_ENOBUFS, &buf);
1051       return;
1052     }
1053 
1054     assert(buf.base != NULL);
1055     assert(uv__stream_fd(stream) >= 0);
1056 
1057     if (!is_ipc) {
1058       do {
1059         nread = read(uv__stream_fd(stream), buf.base, buf.len);
1060       }
1061       while (nread < 0 && errno == EINTR);
1062     } else {
1063       /* ipc uses recvmsg */
1064       msg.msg_flags = 0;
1065       msg.msg_iov = (struct iovec*) &buf;
1066       msg.msg_iovlen = 1;
1067       msg.msg_name = NULL;
1068       msg.msg_namelen = 0;
1069       /* Set up to receive a descriptor even if one isn't in the message */
1070       msg.msg_controllen = sizeof(cmsg);
1071       msg.msg_control = &cmsg.hdr;
1072 
1073       do {
1074         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1075       }
1076       while (nread < 0 && errno == EINTR);
1077     }
1078 
1079     if (nread < 0) {
1080       /* Error */
1081       if (errno == EAGAIN || errno == EWOULDBLOCK) {
1082         /* Wait for the next one. */
1083         if (stream->flags & UV_HANDLE_READING) {
1084           uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1085           uv__stream_osx_interrupt_select(stream);
1086         }
1087         stream->read_cb(stream, 0, &buf);
1088 #if defined(__CYGWIN__) || defined(__MSYS__)
1089       } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1090         uv__stream_eof(stream, &buf);
1091         return;
1092 #endif
1093       } else {
1094         /* Error. User should call uv_close(). */
1095         stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1096         stream->read_cb(stream, UV__ERR(errno), &buf);
1097         if (stream->flags & UV_HANDLE_READING) {
1098           stream->flags &= ~UV_HANDLE_READING;
1099           uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1100           uv__handle_stop(stream);
1101           uv__stream_osx_interrupt_select(stream);
1102         }
1103       }
1104       return;
1105     } else if (nread == 0) {
1106       uv__stream_eof(stream, &buf);
1107       return;
1108     } else {
1109       /* Successful read */
1110       ssize_t buflen = buf.len;
1111 
1112       if (is_ipc) {
1113         err = uv__stream_recv_cmsg(stream, &msg);
1114         if (err != 0) {
1115           stream->read_cb(stream, err, &buf);
1116           return;
1117         }
1118       }
1119 
1120 #if defined(__MVS__)
1121       if (is_ipc && msg.msg_controllen > 0) {
1122         uv_buf_t blankbuf;
1123         int nread;
1124         struct iovec *old;
1125 
1126         blankbuf.base = 0;
1127         blankbuf.len = 0;
1128         old = msg.msg_iov;
1129         msg.msg_iov = (struct iovec*) &blankbuf;
1130         nread = 0;
1131         do {
1132           nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133           err = uv__stream_recv_cmsg(stream, &msg);
1134           if (err != 0) {
1135             stream->read_cb(stream, err, &buf);
1136             msg.msg_iov = old;
1137             return;
1138           }
1139         } while (nread == 0 && msg.msg_controllen > 0);
1140         msg.msg_iov = old;
1141       }
1142 #endif
1143       stream->read_cb(stream, nread, &buf);
1144 
1145       /* Return if we didn't fill the buffer, there is no more data to read. */
1146       if (nread < buflen) {
1147         stream->flags |= UV_HANDLE_READ_PARTIAL;
1148         return;
1149       }
1150     }
1151   }
1152 }
1153 
1154 
uv_shutdown(uv_shutdown_t * req,uv_stream_t * stream,uv_shutdown_cb cb)1155 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1156   assert(stream->type == UV_TCP ||
1157          stream->type == UV_TTY ||
1158          stream->type == UV_NAMED_PIPE);
1159 
1160   if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1161       stream->flags & UV_HANDLE_SHUT ||
1162       uv__is_stream_shutting(stream) ||
1163       uv__is_closing(stream)) {
1164     return UV_ENOTCONN;
1165   }
1166 
1167   assert(uv__stream_fd(stream) >= 0);
1168 
1169   /* Initialize request. The `shutdown(2)` call will always be deferred until
1170    * `uv__drain`, just before the callback is run. */
1171   uv__req_init(stream->loop, req, UV_SHUTDOWN);
1172   req->handle = stream;
1173   req->cb = cb;
1174   stream->shutdown_req = req;
1175   stream->flags &= ~UV_HANDLE_WRITABLE;
1176 
1177   if (uv__queue_empty(&stream->write_queue))
1178     uv__io_feed(stream->loop, &stream->io_watcher);
1179 
1180   return 0;
1181 }
1182 
1183 
uv__stream_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1184 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1185   uv_stream_t* stream;
1186 
1187   stream = container_of(w, uv_stream_t, io_watcher);
1188 
1189   assert(stream->type == UV_TCP ||
1190          stream->type == UV_NAMED_PIPE ||
1191          stream->type == UV_TTY);
1192   assert(!(stream->flags & UV_HANDLE_CLOSING));
1193 
1194   if (stream->connect_req) {
1195     uv__stream_connect(stream);
1196     return;
1197   }
1198 
1199   assert(uv__stream_fd(stream) >= 0);
1200 
1201   /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1202   if (events & (POLLIN | POLLERR | POLLHUP))
1203     uv__read(stream);
1204 
1205   if (uv__stream_fd(stream) == -1)
1206     return;  /* read_cb closed stream. */
1207 
1208   /* Short-circuit iff POLLHUP is set, the user is still interested in read
1209    * events and uv__read() reported a partial read but not EOF. If the EOF
1210    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1211    * have to do anything. If the partial read flag is not set, we can't
1212    * report the EOF yet because there is still data to read.
1213    */
1214   if ((events & POLLHUP) &&
1215       (stream->flags & UV_HANDLE_READING) &&
1216       (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1217       !(stream->flags & UV_HANDLE_READ_EOF)) {
1218     uv_buf_t buf = { NULL, 0 };
1219     uv__stream_eof(stream, &buf);
1220   }
1221 
1222   if (uv__stream_fd(stream) == -1)
1223     return;  /* read_cb closed stream. */
1224 
1225   if (events & (POLLOUT | POLLERR | POLLHUP)) {
1226     uv__write(stream);
1227     uv__write_callbacks(stream);
1228 
1229     /* Write queue drained. */
1230     if (uv__queue_empty(&stream->write_queue))
1231       uv__drain(stream);
1232   }
1233 }
1234 
1235 
1236 /**
1237  * We get called here from directly following a call to connect(2).
1238  * In order to determine if we've errored out or succeeded must call
1239  * getsockopt.
1240  */
uv__stream_connect(uv_stream_t * stream)1241 static void uv__stream_connect(uv_stream_t* stream) {
1242   int error;
1243   uv_connect_t* req = stream->connect_req;
1244   socklen_t errorsize = sizeof(int);
1245 
1246   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1247   assert(req);
1248 
1249   if (stream->delayed_error) {
1250     /* To smooth over the differences between unixes errors that
1251      * were reported synchronously on the first connect can be delayed
1252      * until the next tick--which is now.
1253      */
1254     error = stream->delayed_error;
1255     stream->delayed_error = 0;
1256   } else {
1257     /* Normal situation: we need to get the socket error from the kernel. */
1258     assert(uv__stream_fd(stream) >= 0);
1259     getsockopt(uv__stream_fd(stream),
1260                SOL_SOCKET,
1261                SO_ERROR,
1262                &error,
1263                &errorsize);
1264     error = UV__ERR(error);
1265   }
1266 
1267   if (error == UV__ERR(EINPROGRESS))
1268     return;
1269 
1270   stream->connect_req = NULL;
1271   uv__req_unregister(stream->loop, req);
1272 
1273   if (error < 0 || uv__queue_empty(&stream->write_queue)) {
1274     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1275   }
1276 
1277   if (req->cb)
1278     req->cb(req, error);
1279 
1280   if (uv__stream_fd(stream) == -1)
1281     return;
1282 
1283   if (error < 0) {
1284     uv__stream_flush_write_queue(stream, UV_ECANCELED);
1285     uv__write_callbacks(stream);
1286   }
1287 }
1288 
1289 
uv__check_before_write(uv_stream_t * stream,unsigned int nbufs,uv_stream_t * send_handle)1290 static int uv__check_before_write(uv_stream_t* stream,
1291                                   unsigned int nbufs,
1292                                   uv_stream_t* send_handle) {
1293   assert(nbufs > 0);
1294   assert((stream->type == UV_TCP ||
1295           stream->type == UV_NAMED_PIPE ||
1296           stream->type == UV_TTY) &&
1297          "uv_write (unix) does not yet support other types of streams");
1298 
1299   if (uv__stream_fd(stream) < 0)
1300     return UV_EBADF;
1301 
1302   if (!(stream->flags & UV_HANDLE_WRITABLE))
1303     return UV_EPIPE;
1304 
1305   if (send_handle != NULL) {
1306     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1307       return UV_EINVAL;
1308 
1309     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1310      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1311      * evaluates to a function that operates on a uv_stream_t with a couple of
1312      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1313      * which works but only by accident.
1314      */
1315     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1316       return UV_EBADF;
1317 
1318 #if defined(__CYGWIN__) || defined(__MSYS__)
1319     /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1320        See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1321     return UV_ENOSYS;
1322 #endif
1323   }
1324 
1325   return 0;
1326 }
1327 
uv_write2(uv_write_t * req,uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1328 int uv_write2(uv_write_t* req,
1329               uv_stream_t* stream,
1330               const uv_buf_t bufs[],
1331               unsigned int nbufs,
1332               uv_stream_t* send_handle,
1333               uv_write_cb cb) {
1334   int empty_queue;
1335   int err;
1336 
1337   err = uv__check_before_write(stream, nbufs, send_handle);
1338   if (err < 0)
1339     return err;
1340 
1341   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1342    * it means there are error-state requests in the write_completed_queue that
1343    * will touch up write_queue_size later, see also uv__write_req_finish().
1344    * We could check that write_queue is empty instead but that implies making
1345    * a write() syscall when we know that the handle is in error mode.
1346    */
1347   empty_queue = (stream->write_queue_size == 0);
1348 
1349   /* Initialize the req */
1350   uv__req_init(stream->loop, req, UV_WRITE);
1351   req->cb = cb;
1352   req->handle = stream;
1353   req->error = 0;
1354   req->send_handle = send_handle;
1355   uv__queue_init(&req->queue);
1356 
1357   req->bufs = req->bufsml;
1358   if (nbufs > ARRAY_SIZE(req->bufsml))
1359     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1360 
1361   if (req->bufs == NULL)
1362     return UV_ENOMEM;
1363 
1364   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1365   req->nbufs = nbufs;
1366   req->write_index = 0;
1367   stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1368 
1369   /* Append the request to write_queue. */
1370   uv__queue_insert_tail(&stream->write_queue, &req->queue);
1371 
1372   /* If the queue was empty when this function began, we should attempt to
1373    * do the write immediately. Otherwise start the write_watcher and wait
1374    * for the fd to become writable.
1375    */
1376   if (stream->connect_req) {
1377     /* Still connecting, do nothing. */
1378   }
1379   else if (empty_queue) {
1380     uv__write(stream);
1381   }
1382   else {
1383     /*
1384      * blocking streams should never have anything in the queue.
1385      * if this assert fires then somehow the blocking stream isn't being
1386      * sufficiently flushed in uv__write.
1387      */
1388     assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1389     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1390     uv__stream_osx_interrupt_select(stream);
1391   }
1392 
1393   return 0;
1394 }
1395 
1396 
1397 /* The buffers to be written must remain valid until the callback is called.
1398  * This is not required for the uv_buf_t array.
1399  */
uv_write(uv_write_t * req,uv_stream_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1400 int uv_write(uv_write_t* req,
1401              uv_stream_t* handle,
1402              const uv_buf_t bufs[],
1403              unsigned int nbufs,
1404              uv_write_cb cb) {
1405   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1406 }
1407 
1408 
uv_try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs)1409 int uv_try_write(uv_stream_t* stream,
1410                  const uv_buf_t bufs[],
1411                  unsigned int nbufs) {
1412   return uv_try_write2(stream, bufs, nbufs, NULL);
1413 }
1414 
1415 
uv_try_write2(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)1416 int uv_try_write2(uv_stream_t* stream,
1417                   const uv_buf_t bufs[],
1418                   unsigned int nbufs,
1419                   uv_stream_t* send_handle) {
1420   int err;
1421 
1422   /* Connecting or already writing some data */
1423   if (stream->connect_req != NULL || stream->write_queue_size != 0)
1424     return UV_EAGAIN;
1425 
1426   err = uv__check_before_write(stream, nbufs, NULL);
1427   if (err < 0)
1428     return err;
1429 
1430   return uv__try_write(stream, bufs, nbufs, send_handle);
1431 }
1432 
1433 
uv__read_start(uv_stream_t * stream,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1434 int uv__read_start(uv_stream_t* stream,
1435                    uv_alloc_cb alloc_cb,
1436                    uv_read_cb read_cb) {
1437   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1438       stream->type == UV_TTY);
1439 
1440   /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1441    * just expresses the desired state of the user. */
1442   stream->flags |= UV_HANDLE_READING;
1443   stream->flags &= ~UV_HANDLE_READ_EOF;
1444 
1445   /* TODO: try to do the read inline? */
1446   assert(uv__stream_fd(stream) >= 0);
1447   assert(alloc_cb);
1448 
1449   stream->read_cb = read_cb;
1450   stream->alloc_cb = alloc_cb;
1451 
1452   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1453   uv__handle_start(stream);
1454   uv__stream_osx_interrupt_select(stream);
1455 
1456   return 0;
1457 }
1458 
1459 
uv_read_stop(uv_stream_t * stream)1460 int uv_read_stop(uv_stream_t* stream) {
1461   if (!(stream->flags & UV_HANDLE_READING))
1462     return 0;
1463 
1464   stream->flags &= ~UV_HANDLE_READING;
1465   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1466   uv__handle_stop(stream);
1467   uv__stream_osx_interrupt_select(stream);
1468 
1469   stream->read_cb = NULL;
1470   stream->alloc_cb = NULL;
1471   return 0;
1472 }
1473 
1474 
uv_is_readable(const uv_stream_t * stream)1475 int uv_is_readable(const uv_stream_t* stream) {
1476   return !!(stream->flags & UV_HANDLE_READABLE);
1477 }
1478 
1479 
uv_is_writable(const uv_stream_t * stream)1480 int uv_is_writable(const uv_stream_t* stream) {
1481   return !!(stream->flags & UV_HANDLE_WRITABLE);
1482 }
1483 
1484 
1485 #if defined(__APPLE__)
uv___stream_fd(const uv_stream_t * handle)1486 int uv___stream_fd(const uv_stream_t* handle) {
1487   const uv__stream_select_t* s;
1488 
1489   assert(handle->type == UV_TCP ||
1490          handle->type == UV_TTY ||
1491          handle->type == UV_NAMED_PIPE);
1492 
1493   s = handle->select;
1494   if (s != NULL)
1495     return s->fd;
1496 
1497   return handle->io_watcher.fd;
1498 }
1499 #endif /* defined(__APPLE__) */
1500 
1501 
uv__stream_close(uv_stream_t * handle)1502 void uv__stream_close(uv_stream_t* handle) {
1503   unsigned int i;
1504   uv__stream_queued_fds_t* queued_fds;
1505 
1506 #if defined(__APPLE__)
1507   /* Terminate select loop first */
1508   if (handle->select != NULL) {
1509     uv__stream_select_t* s;
1510 
1511     s = handle->select;
1512 
1513     uv_sem_post(&s->close_sem);
1514     uv_sem_post(&s->async_sem);
1515     uv__stream_osx_interrupt_select(handle);
1516     uv_thread_join(&s->thread);
1517     uv_sem_destroy(&s->close_sem);
1518     uv_sem_destroy(&s->async_sem);
1519     uv__close(s->fake_fd);
1520     uv__close(s->int_fd);
1521     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1522 
1523     handle->select = NULL;
1524   }
1525 #endif /* defined(__APPLE__) */
1526 
1527   uv__io_close(handle->loop, &handle->io_watcher);
1528   uv_read_stop(handle);
1529   uv__handle_stop(handle);
1530   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1531 
1532   if (handle->io_watcher.fd != -1) {
1533     /* Don't close stdio file descriptors.  Nothing good comes from it. */
1534     if (handle->io_watcher.fd > STDERR_FILENO)
1535       uv__close(handle->io_watcher.fd);
1536     handle->io_watcher.fd = -1;
1537   }
1538 
1539   if (handle->accepted_fd != -1) {
1540     uv__close(handle->accepted_fd);
1541     handle->accepted_fd = -1;
1542   }
1543 
1544   /* Close all queued fds */
1545   if (handle->queued_fds != NULL) {
1546     queued_fds = handle->queued_fds;
1547     for (i = 0; i < queued_fds->offset; i++)
1548       uv__close(queued_fds->fds[i]);
1549     uv__free(handle->queued_fds);
1550     handle->queued_fds = NULL;
1551   }
1552 
1553   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1554 }
1555 
1556 
uv_stream_set_blocking(uv_stream_t * handle,int blocking)1557 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1558   /* Don't need to check the file descriptor, uv__nonblock()
1559    * will fail with EBADF if it's not valid.
1560    */
1561   return uv__nonblock(uv__stream_fd(handle), !blocking);
1562 }
1563