• Home
  • Raw
  • Download

Lines Matching refs:stream

47   uv_stream_t* stream;  member
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
67 static void uv__write_callbacks(uv_stream_t* stream);
72 uv_stream_t* stream, in uv__stream_init() argument
76 uv__handle_init(loop, (uv_handle_t*)stream, type); in uv__stream_init()
77 stream->read_cb = NULL; in uv__stream_init()
78 stream->alloc_cb = NULL; in uv__stream_init()
79 stream->close_cb = NULL; in uv__stream_init()
80 stream->connection_cb = NULL; in uv__stream_init()
81 stream->connect_req = NULL; in uv__stream_init()
82 stream->shutdown_req = NULL; in uv__stream_init()
83 stream->accepted_fd = -1; in uv__stream_init()
84 stream->queued_fds = NULL; in uv__stream_init()
85 stream->delayed_error = 0; in uv__stream_init()
86 QUEUE_INIT(&stream->write_queue); in uv__stream_init()
87 QUEUE_INIT(&stream->write_completed_queue); in uv__stream_init()
88 stream->write_queue_size = 0; in uv__stream_init()
102 stream->select = NULL; in uv__stream_init()
105 uv__io_init(&stream->io_watcher, uv__stream_io, -1); in uv__stream_init()
109 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { in uv__stream_osx_interrupt_select() argument
115 s = stream->select; in uv__stream_osx_interrupt_select()
136 uv_stream_t* stream; in uv__stream_osx_select() local
144 stream = arg; in uv__stream_osx_select()
145 s = stream->select; in uv__stream_osx_select()
162 if (uv__io_active(&stream->io_watcher, POLLIN)) in uv__stream_osx_select()
164 if (uv__io_active(&stream->io_watcher, POLLOUT)) in uv__stream_osx_select()
217 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); in uv__stream_osx_select()
225 uv_stream_t* stream; in uv__stream_osx_select_cb() local
229 stream = s->stream; in uv__stream_osx_select_cb()
239 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) in uv__stream_osx_select_cb()
240 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); in uv__stream_osx_select_cb()
242 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) in uv__stream_osx_select_cb()
243 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); in uv__stream_osx_select_cb()
245 if (stream->flags & UV_HANDLE_CLOSING) in uv__stream_osx_select_cb()
263 int uv__stream_try_select(uv_stream_t* stream, int* fd) { in uv__stream_try_select() argument
335 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); in uv__stream_try_select()
354 s->stream = stream; in uv__stream_try_select()
355 stream->select = s; in uv__stream_try_select()
358 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); in uv__stream_try_select()
365 s->stream = NULL; in uv__stream_try_select()
366 stream->select = NULL; in uv__stream_try_select()
392 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { in uv__stream_open() argument
397 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) in uv__stream_open()
401 stream->flags |= flags; in uv__stream_open()
403 if (stream->type == UV_TCP) { in uv__stream_open()
404 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) in uv__stream_open()
408 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && in uv__stream_open()
423 stream->io_watcher.fd = fd; in uv__stream_open()
429 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { in uv__stream_flush_write_queue() argument
432 while (!QUEUE_EMPTY(&stream->write_queue)) { in uv__stream_flush_write_queue()
433 q = QUEUE_HEAD(&stream->write_queue); in uv__stream_flush_write_queue()
439 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); in uv__stream_flush_write_queue()
444 void uv__stream_destroy(uv_stream_t* stream) { in uv__stream_destroy() argument
445 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); in uv__stream_destroy()
446 assert(stream->flags & UV_HANDLE_CLOSED); in uv__stream_destroy()
448 if (stream->connect_req) { in uv__stream_destroy()
449 uv__req_unregister(stream->loop, stream->connect_req); in uv__stream_destroy()
450 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); in uv__stream_destroy()
451 stream->connect_req = NULL; in uv__stream_destroy()
454 uv__stream_flush_write_queue(stream, UV_ECANCELED); in uv__stream_destroy()
455 uv__write_callbacks(stream); in uv__stream_destroy()
457 if (stream->shutdown_req) { in uv__stream_destroy()
463 uv__req_unregister(stream->loop, stream->shutdown_req); in uv__stream_destroy()
464 stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED); in uv__stream_destroy()
465 stream->shutdown_req = NULL; in uv__stream_destroy()
468 assert(stream->write_queue_size == 0); in uv__stream_destroy()
515 uv_stream_t* stream; in uv__server_io() local
518 stream = container_of(w, uv_stream_t, io_watcher); in uv__server_io()
520 assert(stream->accepted_fd == -1); in uv__server_io()
521 assert(!(stream->flags & UV_HANDLE_CLOSING)); in uv__server_io()
523 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); in uv__server_io()
528 while (uv__stream_fd(stream) != -1) { in uv__server_io()
529 assert(stream->accepted_fd == -1); in uv__server_io()
536 err = uv__accept(uv__stream_fd(stream)); in uv__server_io()
545 err = uv__emfile_trick(loop, uv__stream_fd(stream)); in uv__server_io()
550 stream->connection_cb(stream, err); in uv__server_io()
555 stream->accepted_fd = err; in uv__server_io()
556 stream->connection_cb(stream, 0); in uv__server_io()
558 if (stream->accepted_fd != -1) { in uv__server_io()
560 uv__io_stop(loop, &stream->io_watcher, POLLIN); in uv__server_io()
564 if (stream->type == UV_TCP && in uv__server_io()
565 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { in uv__server_io()
642 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { in uv_listen() argument
645 switch (stream->type) { in uv_listen()
647 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb); in uv_listen()
651 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb); in uv_listen()
659 uv__handle_start(stream); in uv_listen()
665 static void uv__drain(uv_stream_t* stream) { in uv__drain() argument
669 assert(QUEUE_EMPTY(&stream->write_queue)); in uv__drain()
670 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__drain()
671 uv__stream_osx_interrupt_select(stream); in uv__drain()
674 if ((stream->flags & UV_HANDLE_SHUTTING) && in uv__drain()
675 !(stream->flags & UV_HANDLE_CLOSING) && in uv__drain()
676 !(stream->flags & UV_HANDLE_SHUT)) { in uv__drain()
677 assert(stream->shutdown_req); in uv__drain()
679 req = stream->shutdown_req; in uv__drain()
680 stream->shutdown_req = NULL; in uv__drain()
681 stream->flags &= ~UV_HANDLE_SHUTTING; in uv__drain()
682 uv__req_unregister(stream->loop, req); in uv__drain()
685 if (shutdown(uv__stream_fd(stream), SHUT_WR)) in uv__drain()
689 stream->flags |= UV_HANDLE_SHUT; in uv__drain()
723 static int uv__write_req_update(uv_stream_t* stream, in uv__write_req_update() argument
729 assert(n <= stream->write_queue_size); in uv__write_req_update()
730 stream->write_queue_size -= n; in uv__write_req_update()
749 uv_stream_t* stream = req->handle; in uv__write_req_finish() local
769 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); in uv__write_req_finish()
770 uv__io_feed(stream->loop, &stream->io_watcher); in uv__write_req_finish()
788 static int uv__try_write(uv_stream_t* stream, in uv__try_write() argument
854 n = sendmsg(uv__stream_fd(stream), &msg, 0); in uv__try_write()
858 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); in uv__try_write()
885 static void uv__write(uv_stream_t* stream) { in uv__write() argument
890 assert(uv__stream_fd(stream) >= 0); in uv__write()
893 if (QUEUE_EMPTY(&stream->write_queue)) in uv__write()
896 q = QUEUE_HEAD(&stream->write_queue); in uv__write()
898 assert(req->handle == stream); in uv__write()
900 n = uv__try_write(stream, in uv__write()
908 if (uv__write_req_update(stream, req, n)) { in uv__write()
916 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) in uv__write()
920 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); in uv__write()
923 uv__stream_osx_interrupt_select(stream); in uv__write()
931 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__write()
932 uv__stream_osx_interrupt_select(stream); in uv__write()
936 static void uv__write_callbacks(uv_stream_t* stream) { in uv__write_callbacks() argument
941 if (QUEUE_EMPTY(&stream->write_completed_queue)) in uv__write_callbacks()
944 QUEUE_MOVE(&stream->write_completed_queue, &pq); in uv__write_callbacks()
951 uv__req_unregister(stream->loop, req); in uv__write_callbacks()
954 stream->write_queue_size -= uv__write_req_size(req); in uv__write_callbacks()
1010 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { in uv__stream_eof() argument
1011 stream->flags |= UV_HANDLE_READ_EOF; in uv__stream_eof()
1012 stream->flags &= ~UV_HANDLE_READING; in uv__stream_eof()
1013 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv__stream_eof()
1014 uv__handle_stop(stream); in uv__stream_eof()
1015 uv__stream_osx_interrupt_select(stream); in uv__stream_eof()
1016 stream->read_cb(stream, UV_EOF, buf); in uv__stream_eof()
1020 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { in uv__stream_queue_fd() argument
1024 queued_fds = stream->queued_fds; in uv__stream_queue_fd()
1033 stream->queued_fds = queued_fds; in uv__stream_queue_fd()
1049 stream->queued_fds = queued_fds; in uv__stream_queue_fd()
1068 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { in uv__stream_recv_cmsg() argument
1100 if (stream->accepted_fd != -1) { in uv__stream_recv_cmsg()
1101 err = uv__stream_queue_fd(stream, pi[i]); in uv__stream_recv_cmsg()
1109 stream->accepted_fd = pi[i]; in uv__stream_recv_cmsg()
1124 static void uv__read(uv_stream_t* stream) { in uv__read() argument
1133 stream->flags &= ~UV_HANDLE_READ_PARTIAL; in uv__read()
1140 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; in uv__read()
1145 while (stream->read_cb in uv__read()
1146 && (stream->flags & UV_HANDLE_READING) in uv__read()
1148 assert(stream->alloc_cb != NULL); in uv__read()
1151 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); in uv__read()
1154 stream->read_cb(stream, UV_ENOBUFS, &buf); in uv__read()
1159 assert(uv__stream_fd(stream) >= 0); in uv__read()
1163 nread = read(uv__stream_fd(stream), buf.base, buf.len); in uv__read()
1178 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); in uv__read()
1187 if (stream->flags & UV_HANDLE_READING) { in uv__read()
1188 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); in uv__read()
1189 uv__stream_osx_interrupt_select(stream); in uv__read()
1191 stream->read_cb(stream, 0, &buf); in uv__read()
1193 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { in uv__read()
1194 uv__stream_eof(stream, &buf); in uv__read()
1199 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); in uv__read()
1200 stream->read_cb(stream, UV__ERR(errno), &buf); in uv__read()
1201 if (stream->flags & UV_HANDLE_READING) { in uv__read()
1202 stream->flags &= ~UV_HANDLE_READING; in uv__read()
1203 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv__read()
1204 uv__handle_stop(stream); in uv__read()
1205 uv__stream_osx_interrupt_select(stream); in uv__read()
1210 uv__stream_eof(stream, &buf); in uv__read()
1217 err = uv__stream_recv_cmsg(stream, &msg); in uv__read()
1219 stream->read_cb(stream, err, &buf); in uv__read()
1236 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); in uv__read()
1237 err = uv__stream_recv_cmsg(stream, &msg); in uv__read()
1239 stream->read_cb(stream, err, &buf); in uv__read()
1247 stream->read_cb(stream, nread, &buf); in uv__read()
1251 stream->flags |= UV_HANDLE_READ_PARTIAL; in uv__read()
1267 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { in uv_shutdown() argument
1268 assert(stream->type == UV_TCP || in uv_shutdown()
1269 stream->type == UV_TTY || in uv_shutdown()
1270 stream->type == UV_NAMED_PIPE); in uv_shutdown()
1272 if (!(stream->flags & UV_HANDLE_WRITABLE) || in uv_shutdown()
1273 stream->flags & UV_HANDLE_SHUT || in uv_shutdown()
1274 stream->flags & UV_HANDLE_SHUTTING || in uv_shutdown()
1275 uv__is_closing(stream)) { in uv_shutdown()
1279 assert(uv__stream_fd(stream) >= 0); in uv_shutdown()
1282 uv__req_init(stream->loop, req, UV_SHUTDOWN); in uv_shutdown()
1283 req->handle = stream; in uv_shutdown()
1285 stream->shutdown_req = req; in uv_shutdown()
1286 stream->flags |= UV_HANDLE_SHUTTING; in uv_shutdown()
1287 stream->flags &= ~UV_HANDLE_WRITABLE; in uv_shutdown()
1289 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); in uv_shutdown()
1290 uv__stream_osx_interrupt_select(stream); in uv_shutdown()
1297 uv_stream_t* stream; in uv__stream_io() local
1299 stream = container_of(w, uv_stream_t, io_watcher); in uv__stream_io()
1301 assert(stream->type == UV_TCP || in uv__stream_io()
1302 stream->type == UV_NAMED_PIPE || in uv__stream_io()
1303 stream->type == UV_TTY); in uv__stream_io()
1304 assert(!(stream->flags & UV_HANDLE_CLOSING)); in uv__stream_io()
1306 if (stream->connect_req) { in uv__stream_io()
1307 uv__stream_connect(stream); in uv__stream_io()
1311 assert(uv__stream_fd(stream) >= 0); in uv__stream_io()
1315 uv__read(stream); in uv__stream_io()
1317 if (uv__stream_fd(stream) == -1) in uv__stream_io()
1327 (stream->flags & UV_HANDLE_READING) && in uv__stream_io()
1328 (stream->flags & UV_HANDLE_READ_PARTIAL) && in uv__stream_io()
1329 !(stream->flags & UV_HANDLE_READ_EOF)) { in uv__stream_io()
1331 uv__stream_eof(stream, &buf); in uv__stream_io()
1334 if (uv__stream_fd(stream) == -1) in uv__stream_io()
1338 uv__write(stream); in uv__stream_io()
1339 uv__write_callbacks(stream); in uv__stream_io()
1342 if (QUEUE_EMPTY(&stream->write_queue)) in uv__stream_io()
1343 uv__drain(stream); in uv__stream_io()
1353 static void uv__stream_connect(uv_stream_t* stream) { in uv__stream_connect() argument
1355 uv_connect_t* req = stream->connect_req; in uv__stream_connect()
1358 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); in uv__stream_connect()
1361 if (stream->delayed_error) { in uv__stream_connect()
1366 error = stream->delayed_error; in uv__stream_connect()
1367 stream->delayed_error = 0; in uv__stream_connect()
1370 assert(uv__stream_fd(stream) >= 0); in uv__stream_connect()
1371 getsockopt(uv__stream_fd(stream), in uv__stream_connect()
1382 stream->connect_req = NULL; in uv__stream_connect()
1383 uv__req_unregister(stream->loop, req); in uv__stream_connect()
1385 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { in uv__stream_connect()
1386 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__stream_connect()
1392 if (uv__stream_fd(stream) == -1) in uv__stream_connect()
1396 uv__stream_flush_write_queue(stream, UV_ECANCELED); in uv__stream_connect()
1397 uv__write_callbacks(stream); in uv__stream_connect()
1402 static int uv__check_before_write(uv_stream_t* stream, in uv__check_before_write() argument
1406 assert((stream->type == UV_TCP || in uv__check_before_write()
1407 stream->type == UV_NAMED_PIPE || in uv__check_before_write()
1408 stream->type == UV_TTY) && in uv__check_before_write()
1411 if (uv__stream_fd(stream) < 0) in uv__check_before_write()
1414 if (!(stream->flags & UV_HANDLE_WRITABLE)) in uv__check_before_write()
1418 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) in uv__check_before_write()
1441 uv_stream_t* stream, in uv_write2() argument
1449 err = uv__check_before_write(stream, nbufs, send_handle); in uv_write2()
1459 empty_queue = (stream->write_queue_size == 0); in uv_write2()
1462 uv__req_init(stream->loop, req, UV_WRITE); in uv_write2()
1464 req->handle = stream; in uv_write2()
1479 stream->write_queue_size += uv__count_bufs(bufs, nbufs); in uv_write2()
1482 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); in uv_write2()
1488 if (stream->connect_req) { in uv_write2()
1492 uv__write(stream); in uv_write2()
1500 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); in uv_write2()
1501 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); in uv_write2()
1502 uv__stream_osx_interrupt_select(stream); in uv_write2()
1521 int uv_try_write(uv_stream_t* stream, in uv_try_write() argument
1524 return uv_try_write2(stream, bufs, nbufs, NULL); in uv_try_write()
1528 int uv_try_write2(uv_stream_t* stream, in uv_try_write2() argument
1535 if (stream->connect_req != NULL || stream->write_queue_size != 0) in uv_try_write2()
1538 err = uv__check_before_write(stream, nbufs, NULL); in uv_try_write2()
1542 return uv__try_write(stream, bufs, nbufs, send_handle); in uv_try_write2()
1546 int uv__read_start(uv_stream_t* stream, in uv__read_start() argument
1549 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || in uv__read_start()
1550 stream->type == UV_TTY); in uv__read_start()
1554 stream->flags |= UV_HANDLE_READING; in uv__read_start()
1555 stream->flags &= ~UV_HANDLE_READ_EOF; in uv__read_start()
1558 assert(uv__stream_fd(stream) >= 0); in uv__read_start()
1561 stream->read_cb = read_cb; in uv__read_start()
1562 stream->alloc_cb = alloc_cb; in uv__read_start()
1564 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); in uv__read_start()
1565 uv__handle_start(stream); in uv__read_start()
1566 uv__stream_osx_interrupt_select(stream); in uv__read_start()
1572 int uv_read_stop(uv_stream_t* stream) { in uv_read_stop() argument
1573 if (!(stream->flags & UV_HANDLE_READING)) in uv_read_stop()
1576 stream->flags &= ~UV_HANDLE_READING; in uv_read_stop()
1577 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv_read_stop()
1578 uv__handle_stop(stream); in uv_read_stop()
1579 uv__stream_osx_interrupt_select(stream); in uv_read_stop()
1581 stream->read_cb = NULL; in uv_read_stop()
1582 stream->alloc_cb = NULL; in uv_read_stop()
1587 int uv_is_readable(const uv_stream_t* stream) { in uv_is_readable() argument
1588 return !!(stream->flags & UV_HANDLE_READABLE); in uv_is_readable()
1592 int uv_is_writable(const uv_stream_t* stream) { in uv_is_writable() argument
1593 return !!(stream->flags & UV_HANDLE_WRITABLE); in uv_is_writable()