• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <io.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 
28 #include "handle-inl.h"
29 #include "internal.h"
30 #include "req-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
33 #include "uv.h"
34 
35 #include <aclapi.h>
36 #include <accctrl.h>
37 
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
40 
41 /* Null uv_buf_t */
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
43 
44 /* The timeout that the pipe will wait for the remote end to write data when
45  * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
47 
48 static const int default_pending_pipe_instances = 4;
49 
50 /* Pipe prefix */
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
53 
54 /* IPC incoming xfer queue item. */
55 typedef struct {
56   uv__ipc_socket_xfer_type_t xfer_type;
57   uv__ipc_socket_xfer_info_t xfer_info;
58   QUEUE member;
59 } uv__ipc_xfer_queue_item_t;
60 
61 /* IPC frame header flags. */
62 /* clang-format off */
63 enum {
64   UV__IPC_FRAME_HAS_DATA                = 0x01,
65   UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
66   UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
67   /* These are combinations of the flags above. */
68   UV__IPC_FRAME_XFER_FLAGS              = 0x06,
69   UV__IPC_FRAME_VALID_FLAGS             = 0x07
70 };
71 /* clang-format on */
72 
73 /* IPC frame header. */
74 typedef struct {
75   uint32_t flags;
76   uint32_t reserved1;   /* Ignored. */
77   uint32_t data_length; /* Must be zero if there is no data. */
78   uint32_t reserved2;   /* Must be zero. */
79 } uv__ipc_frame_header_t;
80 
81 /* To implement the IPC protocol correctly, these structures must have exactly
82  * the right size. */
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85 
86 /* Coalesced write request. */
87 typedef struct {
88   uv_write_t req;       /* Internal heap-allocated write request. */
89   uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
91 
92 
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
99 
100 
uv__unique_pipe_name(char * ptr,char * name,size_t size)101 static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
102   snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
103 }
104 
105 
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)106 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
107   uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
108 
109   handle->reqs_pending = 0;
110   handle->handle = INVALID_HANDLE_VALUE;
111   handle->name = NULL;
112   handle->pipe.conn.ipc_remote_pid = 0;
113   handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
114   QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
115   handle->pipe.conn.ipc_xfer_queue_length = 0;
116   handle->ipc = ipc;
117   handle->pipe.conn.non_overlapped_writes_tail = NULL;
118 
119   return 0;
120 }
121 
122 
uv__pipe_connection_init(uv_pipe_t * handle)123 static void uv__pipe_connection_init(uv_pipe_t* handle) {
124   uv__connection_init((uv_stream_t*) handle);
125   handle->read_req.data = handle;
126   handle->pipe.conn.eof_timer = NULL;
127   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
128   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
129     handle->pipe.conn.readfile_thread_handle = NULL;
130     InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
131   }
132 }
133 
134 
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)135 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
136   HANDLE pipeHandle;
137 
138   /*
139    * Assume that we have a duplex pipe first, so attempt to
140    * connect with GENERIC_READ | GENERIC_WRITE.
141    */
142   pipeHandle = CreateFileW(name,
143                            GENERIC_READ | GENERIC_WRITE,
144                            0,
145                            NULL,
146                            OPEN_EXISTING,
147                            FILE_FLAG_OVERLAPPED,
148                            NULL);
149   if (pipeHandle != INVALID_HANDLE_VALUE) {
150     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
151     return pipeHandle;
152   }
153 
154   /*
155    * If the pipe is not duplex CreateFileW fails with
156    * ERROR_ACCESS_DENIED.  In that case try to connect
157    * as a read-only or write-only.
158    */
159   if (GetLastError() == ERROR_ACCESS_DENIED) {
160     pipeHandle = CreateFileW(name,
161                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
162                              0,
163                              NULL,
164                              OPEN_EXISTING,
165                              FILE_FLAG_OVERLAPPED,
166                              NULL);
167 
168     if (pipeHandle != INVALID_HANDLE_VALUE) {
169       *duplex_flags = UV_HANDLE_READABLE;
170       return pipeHandle;
171     }
172   }
173 
174   if (GetLastError() == ERROR_ACCESS_DENIED) {
175     pipeHandle = CreateFileW(name,
176                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
177                              0,
178                              NULL,
179                              OPEN_EXISTING,
180                              FILE_FLAG_OVERLAPPED,
181                              NULL);
182 
183     if (pipeHandle != INVALID_HANDLE_VALUE) {
184       *duplex_flags = UV_HANDLE_WRITABLE;
185       return pipeHandle;
186     }
187   }
188 
189   return INVALID_HANDLE_VALUE;
190 }
191 
192 
close_pipe(uv_pipe_t * pipe)193 static void close_pipe(uv_pipe_t* pipe) {
194   assert(pipe->u.fd == -1 || pipe->u.fd > 2);
195   if (pipe->u.fd == -1)
196     CloseHandle(pipe->handle);
197   else
198     close(pipe->u.fd);
199 
200   pipe->u.fd = -1;
201   pipe->handle = INVALID_HANDLE_VALUE;
202 }
203 
204 
uv__pipe_server(HANDLE * pipeHandle_ptr,DWORD access,char * name,size_t nameSize,char * random)205 static int uv__pipe_server(
206     HANDLE* pipeHandle_ptr, DWORD access,
207     char* name, size_t nameSize, char* random) {
208   HANDLE pipeHandle;
209   int err;
210 
211   for (;;) {
212     uv__unique_pipe_name(random, name, nameSize);
213 
214     pipeHandle = CreateNamedPipeA(name,
215       access | FILE_FLAG_FIRST_PIPE_INSTANCE,
216       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
217       NULL);
218 
219     if (pipeHandle != INVALID_HANDLE_VALUE) {
220       /* No name collisions.  We're done. */
221       break;
222     }
223 
224     err = GetLastError();
225     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
226       goto error;
227     }
228 
229     /* Pipe name collision.  Increment the random number and try again. */
230     random++;
231   }
232 
233   *pipeHandle_ptr = pipeHandle;
234 
235   return 0;
236 
237  error:
238   if (pipeHandle != INVALID_HANDLE_VALUE)
239     CloseHandle(pipeHandle);
240 
241   return err;
242 }
243 
244 
uv__create_pipe_pair(HANDLE * server_pipe_ptr,HANDLE * client_pipe_ptr,unsigned int server_flags,unsigned int client_flags,int inherit_client,char * random)245 static int uv__create_pipe_pair(
246     HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
247     unsigned int server_flags, unsigned int client_flags,
248     int inherit_client, char* random) {
249   /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
250   char pipe_name[64];
251   SECURITY_ATTRIBUTES sa;
252   DWORD server_access;
253   DWORD client_access;
254   HANDLE server_pipe;
255   HANDLE client_pipe;
256   int err;
257 
258   server_pipe = INVALID_HANDLE_VALUE;
259   client_pipe = INVALID_HANDLE_VALUE;
260 
261   server_access = 0;
262   if (server_flags & UV_READABLE_PIPE)
263     server_access |= PIPE_ACCESS_INBOUND;
264   if (server_flags & UV_WRITABLE_PIPE)
265     server_access |= PIPE_ACCESS_OUTBOUND;
266   if (server_flags & UV_NONBLOCK_PIPE)
267     server_access |= FILE_FLAG_OVERLAPPED;
268   server_access |= WRITE_DAC;
269 
270   client_access = 0;
271   if (client_flags & UV_READABLE_PIPE)
272     client_access |= GENERIC_READ;
273   else
274     client_access |= FILE_READ_ATTRIBUTES;
275   if (client_flags & UV_WRITABLE_PIPE)
276     client_access |= GENERIC_WRITE;
277   else
278     client_access |= FILE_WRITE_ATTRIBUTES;
279   client_access |= WRITE_DAC;
280 
281   /* Create server pipe handle. */
282   err = uv__pipe_server(&server_pipe,
283                         server_access,
284                         pipe_name,
285                         sizeof(pipe_name),
286                         random);
287   if (err)
288     goto error;
289 
290   /* Create client pipe handle. */
291   sa.nLength = sizeof sa;
292   sa.lpSecurityDescriptor = NULL;
293   sa.bInheritHandle = inherit_client;
294 
295   client_pipe = CreateFileA(pipe_name,
296                             client_access,
297                             0,
298                             &sa,
299                             OPEN_EXISTING,
300                             (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
301                             NULL);
302   if (client_pipe == INVALID_HANDLE_VALUE) {
303     err = GetLastError();
304     goto error;
305   }
306 
307 #ifndef NDEBUG
308   /* Validate that the pipe was opened in the right mode. */
309   {
310     DWORD mode;
311     BOOL r;
312     r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
313     if (r == TRUE) {
314       assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
315     } else {
316       fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
317     }
318   }
319 #endif
320 
321   /* Do a blocking ConnectNamedPipe.  This should not block because we have
322    * both ends of the pipe created. */
323   if (!ConnectNamedPipe(server_pipe, NULL)) {
324     if (GetLastError() != ERROR_PIPE_CONNECTED) {
325       err = GetLastError();
326       goto error;
327     }
328   }
329 
330   *client_pipe_ptr = client_pipe;
331   *server_pipe_ptr = server_pipe;
332   return 0;
333 
334  error:
335   if (server_pipe != INVALID_HANDLE_VALUE)
336     CloseHandle(server_pipe);
337 
338   if (client_pipe != INVALID_HANDLE_VALUE)
339     CloseHandle(client_pipe);
340 
341   return err;
342 }
343 
344 
uv_pipe(uv_file fds[2],int read_flags,int write_flags)345 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
346   uv_file temp[2];
347   int err;
348   HANDLE readh;
349   HANDLE writeh;
350 
351   /* Make the server side the inbound (read) end, */
352   /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
353   /* TODO: better source of local randomness than &fds? */
354   read_flags |= UV_READABLE_PIPE;
355   write_flags |= UV_WRITABLE_PIPE;
356   err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
357   if (err != 0)
358     return err;
359   temp[0] = _open_osfhandle((intptr_t) readh, 0);
360   if (temp[0] == -1) {
361     if (errno == UV_EMFILE)
362       err = UV_EMFILE;
363     else
364       err = UV_UNKNOWN;
365     CloseHandle(readh);
366     CloseHandle(writeh);
367     return err;
368   }
369   temp[1] = _open_osfhandle((intptr_t) writeh, 0);
370   if (temp[1] == -1) {
371     if (errno == UV_EMFILE)
372       err = UV_EMFILE;
373     else
374       err = UV_UNKNOWN;
375     _close(temp[0]);
376     CloseHandle(writeh);
377     return err;
378   }
379   fds[0] = temp[0];
380   fds[1] = temp[1];
381   return 0;
382 }
383 
384 
uv__create_stdio_pipe_pair(uv_loop_t * loop,uv_pipe_t * parent_pipe,HANDLE * child_pipe_ptr,unsigned int flags)385 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
386     uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
387   /* The parent_pipe is always the server_pipe and kept by libuv.
388    * The child_pipe is always the client_pipe and is passed to the child.
389    * The flags are specified with respect to their usage in the child. */
390   HANDLE server_pipe;
391   HANDLE client_pipe;
392   unsigned int server_flags;
393   unsigned int client_flags;
394   int err;
395 
396   server_pipe = INVALID_HANDLE_VALUE;
397   client_pipe = INVALID_HANDLE_VALUE;
398 
399   server_flags = 0;
400   client_flags = 0;
401   if (flags & UV_READABLE_PIPE) {
402     /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
403      * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
404      * the state of the write buffer when we're trying to shutdown the pipe. */
405     server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
406     client_flags |= UV_READABLE_PIPE;
407   }
408   if (flags & UV_WRITABLE_PIPE) {
409     server_flags |= UV_READABLE_PIPE;
410     client_flags |= UV_WRITABLE_PIPE;
411   }
412   server_flags |= UV_NONBLOCK_PIPE;
413   if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
414     client_flags |= UV_NONBLOCK_PIPE;
415   }
416 
417   err = uv__create_pipe_pair(&server_pipe, &client_pipe,
418           server_flags, client_flags, 1, (char*) server_pipe);
419   if (err)
420     goto error;
421 
422   if (CreateIoCompletionPort(server_pipe,
423                              loop->iocp,
424                              (ULONG_PTR) parent_pipe,
425                              0) == NULL) {
426     err = GetLastError();
427     goto error;
428   }
429 
430   uv__pipe_connection_init(parent_pipe);
431   parent_pipe->handle = server_pipe;
432   *child_pipe_ptr = client_pipe;
433 
434   /* The server end is now readable and/or writable. */
435   if (flags & UV_READABLE_PIPE)
436     parent_pipe->flags |= UV_HANDLE_WRITABLE;
437   if (flags & UV_WRITABLE_PIPE)
438     parent_pipe->flags |= UV_HANDLE_READABLE;
439 
440   return 0;
441 
442  error:
443   if (server_pipe != INVALID_HANDLE_VALUE)
444     CloseHandle(server_pipe);
445 
446   if (client_pipe != INVALID_HANDLE_VALUE)
447     CloseHandle(client_pipe);
448 
449   return err;
450 }
451 
452 
uv__set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)453 static int uv__set_pipe_handle(uv_loop_t* loop,
454                                uv_pipe_t* handle,
455                                HANDLE pipeHandle,
456                                int fd,
457                                DWORD duplex_flags) {
458   NTSTATUS nt_status;
459   IO_STATUS_BLOCK io_status;
460   FILE_MODE_INFORMATION mode_info;
461   DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
462   DWORD current_mode = 0;
463   DWORD err = 0;
464 
465   if (handle->flags & UV_HANDLE_PIPESERVER)
466     return UV_EINVAL;
467   if (handle->handle != INVALID_HANDLE_VALUE)
468     return UV_EBUSY;
469 
470   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
471     err = GetLastError();
472     if (err == ERROR_ACCESS_DENIED) {
473       /*
474        * SetNamedPipeHandleState can fail if the handle doesn't have either
475        * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
476        * But if the handle already has the desired wait and blocking modes
477        * we can continue.
478        */
479       if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
480                                    NULL, NULL, 0)) {
481         return -1;
482       } else if (current_mode & PIPE_NOWAIT) {
483         SetLastError(ERROR_ACCESS_DENIED);
484         return -1;
485       }
486     } else {
487       /* If this returns ERROR_INVALID_PARAMETER we probably opened
488        * something that is not a pipe. */
489       if (err == ERROR_INVALID_PARAMETER) {
490         SetLastError(WSAENOTSOCK);
491       }
492       return -1;
493     }
494   }
495 
496   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
497   nt_status = pNtQueryInformationFile(pipeHandle,
498                                       &io_status,
499                                       &mode_info,
500                                       sizeof(mode_info),
501                                       FileModeInformation);
502   if (nt_status != STATUS_SUCCESS) {
503     return -1;
504   }
505 
506   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
507       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
508     /* Non-overlapped pipe. */
509     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
510   } else {
511     /* Overlapped pipe.  Try to associate with IOCP. */
512     if (CreateIoCompletionPort(pipeHandle,
513                                loop->iocp,
514                                (ULONG_PTR) handle,
515                                0) == NULL) {
516       handle->flags |= UV_HANDLE_EMULATE_IOCP;
517     }
518   }
519 
520   handle->handle = pipeHandle;
521   handle->u.fd = fd;
522   handle->flags |= duplex_flags;
523 
524   return 0;
525 }
526 
527 
pipe_alloc_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)528 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
529                              uv_pipe_accept_t* req, BOOL firstInstance) {
530   assert(req->pipeHandle == INVALID_HANDLE_VALUE);
531 
532   req->pipeHandle =
533       CreateNamedPipeW(handle->name,
534                        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
535                          (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
536                        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
537                        PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
538 
539   if (req->pipeHandle == INVALID_HANDLE_VALUE) {
540     return 0;
541   }
542 
543   /* Associate it with IOCP so we can get events. */
544   if (CreateIoCompletionPort(req->pipeHandle,
545                              loop->iocp,
546                              (ULONG_PTR) handle,
547                              0) == NULL) {
548     uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
549   }
550 
551   /* Stash a handle in the server object for use from places such as
552    * getsockname and chmod. As we transfer ownership of these to client
553    * objects, we'll allocate new ones here. */
554   handle->handle = req->pipeHandle;
555 
556   return 1;
557 }
558 
559 
pipe_shutdown_thread_proc(void * parameter)560 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
561   uv_loop_t* loop;
562   uv_pipe_t* handle;
563   uv_shutdown_t* req;
564 
565   req = (uv_shutdown_t*) parameter;
566   assert(req);
567   handle = (uv_pipe_t*) req->handle;
568   assert(handle);
569   loop = handle->loop;
570   assert(loop);
571 
572   FlushFileBuffers(handle->handle);
573 
574   /* Post completed */
575   POST_COMPLETION_FOR_REQ(loop, req);
576 
577   return 0;
578 }
579 
580 
uv__pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)581 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
582   int err;
583   DWORD result;
584   uv_shutdown_t* req;
585   NTSTATUS nt_status;
586   IO_STATUS_BLOCK io_status;
587   FILE_PIPE_LOCAL_INFORMATION pipe_info;
588   uv__ipc_xfer_queue_item_t* xfer_queue_item;
589 
590   if ((handle->flags & UV_HANDLE_CONNECTION) &&
591       handle->stream.conn.shutdown_req != NULL &&
592       handle->stream.conn.write_reqs_pending == 0) {
593     req = handle->stream.conn.shutdown_req;
594 
595     /* Clear the shutdown_req field so we don't go here again. */
596     handle->stream.conn.shutdown_req = NULL;
597 
598     if (handle->flags & UV_HANDLE_CLOSING) {
599       UNREGISTER_HANDLE_REQ(loop, handle, req);
600 
601       /* Already closing. Cancel the shutdown. */
602       if (req->cb) {
603         req->cb(req, UV_ECANCELED);
604       }
605 
606       DECREASE_PENDING_REQ_COUNT(handle);
607       return;
608     }
609 
610     /* Try to avoid flushing the pipe buffer in the thread pool. */
611     nt_status = pNtQueryInformationFile(handle->handle,
612                                         &io_status,
613                                         &pipe_info,
614                                         sizeof pipe_info,
615                                         FilePipeLocalInformation);
616 
617     if (nt_status != STATUS_SUCCESS) {
618       /* Failure */
619       UNREGISTER_HANDLE_REQ(loop, handle, req);
620 
621       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
622       if (req->cb) {
623         err = pRtlNtStatusToDosError(nt_status);
624         req->cb(req, uv_translate_sys_error(err));
625       }
626 
627       DECREASE_PENDING_REQ_COUNT(handle);
628       return;
629     }
630 
631     if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
632       /* Short-circuit, no need to call FlushFileBuffers. */
633       uv__insert_pending_req(loop, (uv_req_t*) req);
634       return;
635     }
636 
637     /* Run FlushFileBuffers in the thread pool. */
638     result = QueueUserWorkItem(pipe_shutdown_thread_proc,
639                                req,
640                                WT_EXECUTELONGFUNCTION);
641     if (result) {
642       return;
643 
644     } else {
645       /* Failure. */
646       UNREGISTER_HANDLE_REQ(loop, handle, req);
647 
648       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
649       if (req->cb) {
650         err = GetLastError();
651         req->cb(req, uv_translate_sys_error(err));
652       }
653 
654       DECREASE_PENDING_REQ_COUNT(handle);
655       return;
656     }
657   }
658 
659   if (handle->flags & UV_HANDLE_CLOSING &&
660       handle->reqs_pending == 0) {
661     assert(!(handle->flags & UV_HANDLE_CLOSED));
662 
663     if (handle->flags & UV_HANDLE_CONNECTION) {
664       /* Free pending sockets */
665       while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
666         QUEUE* q;
667         SOCKET socket;
668 
669         q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
670         QUEUE_REMOVE(q);
671         xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
672 
673         /* Materialize socket and close it */
674         socket = WSASocketW(FROM_PROTOCOL_INFO,
675                             FROM_PROTOCOL_INFO,
676                             FROM_PROTOCOL_INFO,
677                             &xfer_queue_item->xfer_info.socket_info,
678                             0,
679                             WSA_FLAG_OVERLAPPED);
680         uv__free(xfer_queue_item);
681 
682         if (socket != INVALID_SOCKET)
683           closesocket(socket);
684       }
685       handle->pipe.conn.ipc_xfer_queue_length = 0;
686 
687       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
688         if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
689           UnregisterWait(handle->read_req.wait_handle);
690           handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
691         }
692         if (handle->read_req.event_handle != NULL) {
693           CloseHandle(handle->read_req.event_handle);
694           handle->read_req.event_handle = NULL;
695         }
696       }
697 
698       if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
699         DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
700     }
701 
702     if (handle->flags & UV_HANDLE_PIPESERVER) {
703       assert(handle->pipe.serv.accept_reqs);
704       uv__free(handle->pipe.serv.accept_reqs);
705       handle->pipe.serv.accept_reqs = NULL;
706     }
707 
708     uv__handle_close(handle);
709   }
710 }
711 
712 
uv_pipe_pending_instances(uv_pipe_t * handle,int count)713 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
714   if (handle->flags & UV_HANDLE_BOUND)
715     return;
716   handle->pipe.serv.pending_instances = count;
717   handle->flags |= UV_HANDLE_PIPESERVER;
718 }
719 
720 
721 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)722 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
723   uv_loop_t* loop = handle->loop;
724   int i, err, nameSize;
725   uv_pipe_accept_t* req;
726 
727   if (handle->flags & UV_HANDLE_BOUND) {
728     return UV_EINVAL;
729   }
730 
731   if (!name) {
732     return UV_EINVAL;
733   }
734 
735   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
736     handle->pipe.serv.pending_instances = default_pending_pipe_instances;
737   }
738 
739   handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
740     uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
741   if (!handle->pipe.serv.accept_reqs) {
742     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
743   }
744 
745   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
746     req = &handle->pipe.serv.accept_reqs[i];
747     UV_REQ_INIT(req, UV_ACCEPT);
748     req->data = handle;
749     req->pipeHandle = INVALID_HANDLE_VALUE;
750     req->next_pending = NULL;
751   }
752 
753   /* Convert name to UTF16. */
754   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
755   handle->name = uv__malloc(nameSize);
756   if (!handle->name) {
757     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
758   }
759 
760   if (!MultiByteToWideChar(CP_UTF8,
761                            0,
762                            name,
763                            -1,
764                            handle->name,
765                            nameSize / sizeof(WCHAR))) {
766     err = GetLastError();
767     goto error;
768   }
769 
770   /*
771    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
772    * If this fails then there's already a pipe server for the given pipe name.
773    */
774   if (!pipe_alloc_accept(loop,
775                          handle,
776                          &handle->pipe.serv.accept_reqs[0],
777                          TRUE)) {
778     err = GetLastError();
779     if (err == ERROR_ACCESS_DENIED) {
780       err = WSAEADDRINUSE;  /* Translates to UV_EADDRINUSE. */
781     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
782       err = WSAEACCES;  /* Translates to UV_EACCES. */
783     }
784     goto error;
785   }
786 
787   handle->pipe.serv.pending_accepts = NULL;
788   handle->flags |= UV_HANDLE_PIPESERVER;
789   handle->flags |= UV_HANDLE_BOUND;
790 
791   return 0;
792 
793 error:
794   if (handle->name) {
795     uv__free(handle->name);
796     handle->name = NULL;
797   }
798 
799   return uv_translate_sys_error(err);
800 }
801 
802 
pipe_connect_thread_proc(void * parameter)803 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
804   uv_loop_t* loop;
805   uv_pipe_t* handle;
806   uv_connect_t* req;
807   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
808   DWORD duplex_flags;
809 
810   req = (uv_connect_t*) parameter;
811   assert(req);
812   handle = (uv_pipe_t*) req->handle;
813   assert(handle);
814   loop = handle->loop;
815   assert(loop);
816 
817   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
818    * for the pipe to become available with WaitNamedPipe. */
819   while (WaitNamedPipeW(handle->name, 30000)) {
820     /* The pipe is now available, try to connect. */
821     pipeHandle = open_named_pipe(handle->name, &duplex_flags);
822     if (pipeHandle != INVALID_HANDLE_VALUE)
823       break;
824 
825     SwitchToThread();
826   }
827 
828   if (pipeHandle != INVALID_HANDLE_VALUE &&
829       !uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
830     SET_REQ_SUCCESS(req);
831   } else {
832     SET_REQ_ERROR(req, GetLastError());
833   }
834 
835   /* Post completed */
836   POST_COMPLETION_FOR_REQ(loop, req);
837 
838   return 0;
839 }
840 
841 
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)842 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
843     const char* name, uv_connect_cb cb) {
844   uv_loop_t* loop = handle->loop;
845   int err, nameSize;
846   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
847   DWORD duplex_flags;
848 
849   UV_REQ_INIT(req, UV_CONNECT);
850   req->handle = (uv_stream_t*) handle;
851   req->cb = cb;
852 
853   /* Convert name to UTF16. */
854   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
855   handle->name = uv__malloc(nameSize);
856   if (!handle->name) {
857     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
858   }
859 
860   if (!MultiByteToWideChar(CP_UTF8,
861                            0,
862                            name,
863                            -1,
864                            handle->name,
865                            nameSize / sizeof(WCHAR))) {
866     err = GetLastError();
867     goto error;
868   }
869 
870   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
871   if (pipeHandle == INVALID_HANDLE_VALUE) {
872     if (GetLastError() == ERROR_PIPE_BUSY) {
873       /* Wait for the server to make a pipe instance available. */
874       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
875                              req,
876                              WT_EXECUTELONGFUNCTION)) {
877         err = GetLastError();
878         goto error;
879       }
880 
881       REGISTER_HANDLE_REQ(loop, handle, req);
882       handle->reqs_pending++;
883 
884       return;
885     }
886 
887     err = GetLastError();
888     goto error;
889   }
890 
891   assert(pipeHandle != INVALID_HANDLE_VALUE);
892 
893   if (uv__set_pipe_handle(loop,
894                           (uv_pipe_t*) req->handle,
895                           pipeHandle,
896                           -1,
897                           duplex_flags)) {
898     err = GetLastError();
899     goto error;
900   }
901 
902   SET_REQ_SUCCESS(req);
903   uv__insert_pending_req(loop, (uv_req_t*) req);
904   handle->reqs_pending++;
905   REGISTER_HANDLE_REQ(loop, handle, req);
906   return;
907 
908 error:
909   if (handle->name) {
910     uv__free(handle->name);
911     handle->name = NULL;
912   }
913 
914   if (pipeHandle != INVALID_HANDLE_VALUE)
915     CloseHandle(pipeHandle);
916 
917   /* Make this req pending reporting an error. */
918   SET_REQ_ERROR(req, err);
919   uv__insert_pending_req(loop, (uv_req_t*) req);
920   handle->reqs_pending++;
921   REGISTER_HANDLE_REQ(loop, handle, req);
922   return;
923 }
924 
925 
uv__pipe_interrupt_read(uv_pipe_t * handle)926 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
927   BOOL r;
928 
929   if (!(handle->flags & UV_HANDLE_READ_PENDING))
930     return; /* No pending reads. */
931   if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
932     return; /* Already cancelled. */
933   if (handle->handle == INVALID_HANDLE_VALUE)
934     return; /* Pipe handle closed. */
935 
936   if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
937     /* Cancel asynchronous read. */
938     r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
939     assert(r || GetLastError() == ERROR_NOT_FOUND);
940 
941   } else {
942     /* Cancel synchronous read (which is happening in the thread pool). */
943     HANDLE thread;
944     volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
945 
946     EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
947 
948     thread = *thread_ptr;
949     if (thread == NULL) {
950       /* The thread pool thread has not yet reached the point of blocking, we
951        * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
952       *thread_ptr = INVALID_HANDLE_VALUE;
953 
954     } else {
955       /* Spin until the thread has acknowledged (by setting the thread to
956        * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
957       while (thread != INVALID_HANDLE_VALUE) {
958         r = CancelSynchronousIo(thread);
959         assert(r || GetLastError() == ERROR_NOT_FOUND);
960         SwitchToThread(); /* Yield thread. */
961         thread = *thread_ptr;
962       }
963     }
964 
965     LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
966   }
967 
968   /* Set flag to indicate that read has been cancelled. */
969   handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
970 }
971 
972 
uv__pipe_read_stop(uv_pipe_t * handle)973 void uv__pipe_read_stop(uv_pipe_t* handle) {
974   handle->flags &= ~UV_HANDLE_READING;
975   DECREASE_ACTIVE_COUNT(handle->loop, handle);
976 
977   uv__pipe_interrupt_read(handle);
978 }
979 
980 
981 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
982  * it. */
uv__pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)983 void uv__pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
984   int i;
985   HANDLE pipeHandle;
986 
987   uv__pipe_interrupt_read(handle);
988 
989   if (handle->name) {
990     uv__free(handle->name);
991     handle->name = NULL;
992   }
993 
994   if (handle->flags & UV_HANDLE_PIPESERVER) {
995     for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
996       pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
997       if (pipeHandle != INVALID_HANDLE_VALUE) {
998         CloseHandle(pipeHandle);
999         handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
1000       }
1001     }
1002     handle->handle = INVALID_HANDLE_VALUE;
1003   }
1004 
1005   if (handle->flags & UV_HANDLE_CONNECTION) {
1006     handle->flags &= ~UV_HANDLE_WRITABLE;
1007     eof_timer_destroy(handle);
1008   }
1009 
1010   if ((handle->flags & UV_HANDLE_CONNECTION)
1011       && handle->handle != INVALID_HANDLE_VALUE)
1012     close_pipe(handle);
1013 }
1014 
1015 
uv__pipe_close(uv_loop_t * loop,uv_pipe_t * handle)1016 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
1017   if (handle->flags & UV_HANDLE_READING) {
1018     handle->flags &= ~UV_HANDLE_READING;
1019     DECREASE_ACTIVE_COUNT(loop, handle);
1020   }
1021 
1022   if (handle->flags & UV_HANDLE_LISTENING) {
1023     handle->flags &= ~UV_HANDLE_LISTENING;
1024     DECREASE_ACTIVE_COUNT(loop, handle);
1025   }
1026 
1027   uv__pipe_cleanup(loop, handle);
1028 
1029   if (handle->reqs_pending == 0) {
1030     uv__want_endgame(loop, (uv_handle_t*) handle);
1031   }
1032 
1033   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1034   uv__handle_closing(handle);
1035 }
1036 
1037 
uv__pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)1038 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1039     uv_pipe_accept_t* req, BOOL firstInstance) {
1040   assert(handle->flags & UV_HANDLE_LISTENING);
1041 
1042   if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1043     SET_REQ_ERROR(req, GetLastError());
1044     uv__insert_pending_req(loop, (uv_req_t*) req);
1045     handle->reqs_pending++;
1046     return;
1047   }
1048 
1049   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1050 
1051   /* Prepare the overlapped structure. */
1052   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1053 
1054   if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1055       GetLastError() != ERROR_IO_PENDING) {
1056     if (GetLastError() == ERROR_PIPE_CONNECTED) {
1057       SET_REQ_SUCCESS(req);
1058     } else {
1059       CloseHandle(req->pipeHandle);
1060       req->pipeHandle = INVALID_HANDLE_VALUE;
1061       /* Make this req pending reporting an error. */
1062       SET_REQ_ERROR(req, GetLastError());
1063     }
1064     uv__insert_pending_req(loop, (uv_req_t*) req);
1065     handle->reqs_pending++;
1066     return;
1067   }
1068 
1069   /* Wait for completion via IOCP */
1070   handle->reqs_pending++;
1071 }
1072 
1073 
uv__pipe_accept(uv_pipe_t * server,uv_stream_t * client)1074 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1075   uv_loop_t* loop = server->loop;
1076   uv_pipe_t* pipe_client;
1077   uv_pipe_accept_t* req;
1078   QUEUE* q;
1079   uv__ipc_xfer_queue_item_t* item;
1080   int err;
1081 
1082   if (server->ipc) {
1083     if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
1084       /* No valid pending sockets. */
1085       return WSAEWOULDBLOCK;
1086     }
1087 
1088     q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
1089     QUEUE_REMOVE(q);
1090     server->pipe.conn.ipc_xfer_queue_length--;
1091     item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
1092 
1093     err = uv__tcp_xfer_import(
1094         (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1095     if (err != 0)
1096       return err;
1097 
1098     uv__free(item);
1099 
1100   } else {
1101     pipe_client = (uv_pipe_t*) client;
1102 
1103     /* Find a connection instance that has been connected, but not yet
1104      * accepted. */
1105     req = server->pipe.serv.pending_accepts;
1106 
1107     if (!req) {
1108       /* No valid connections found, so we error out. */
1109       return WSAEWOULDBLOCK;
1110     }
1111 
1112     /* Initialize the client handle and copy the pipeHandle to the client */
1113     uv__pipe_connection_init(pipe_client);
1114     pipe_client->handle = req->pipeHandle;
1115     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1116 
1117     /* Prepare the req to pick up a new connection */
1118     server->pipe.serv.pending_accepts = req->next_pending;
1119     req->next_pending = NULL;
1120     req->pipeHandle = INVALID_HANDLE_VALUE;
1121 
1122     server->handle = INVALID_HANDLE_VALUE;
1123     if (!(server->flags & UV_HANDLE_CLOSING)) {
1124       uv__pipe_queue_accept(loop, server, req, FALSE);
1125     }
1126   }
1127 
1128   return 0;
1129 }
1130 
1131 
1132 /* Starts listening for connections for the given pipe. */
uv__pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)1133 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1134   uv_loop_t* loop = handle->loop;
1135   int i;
1136 
1137   if (handle->flags & UV_HANDLE_LISTENING) {
1138     handle->stream.serv.connection_cb = cb;
1139   }
1140 
1141   if (!(handle->flags & UV_HANDLE_BOUND)) {
1142     return WSAEINVAL;
1143   }
1144 
1145   if (handle->flags & UV_HANDLE_READING) {
1146     return WSAEISCONN;
1147   }
1148 
1149   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1150     return ERROR_NOT_SUPPORTED;
1151   }
1152 
1153   if (handle->ipc) {
1154     return WSAEINVAL;
1155   }
1156 
1157   handle->flags |= UV_HANDLE_LISTENING;
1158   INCREASE_ACTIVE_COUNT(loop, handle);
1159   handle->stream.serv.connection_cb = cb;
1160 
1161   /* First pipe handle should have already been created in uv_pipe_bind */
1162   assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1163 
1164   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1165     uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1166   }
1167 
1168   return 0;
1169 }
1170 
1171 
uv_pipe_zero_readfile_thread_proc(void * arg)1172 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1173   uv_read_t* req = (uv_read_t*) arg;
1174   uv_pipe_t* handle = (uv_pipe_t*) req->data;
1175   uv_loop_t* loop = handle->loop;
1176   volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1177   CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1178   HANDLE thread;
1179   DWORD bytes;
1180   DWORD err;
1181 
1182   assert(req->type == UV_READ);
1183   assert(handle->type == UV_NAMED_PIPE);
1184 
1185   err = 0;
1186 
1187   /* Create a handle to the current thread. */
1188   if (!DuplicateHandle(GetCurrentProcess(),
1189                        GetCurrentThread(),
1190                        GetCurrentProcess(),
1191                        &thread,
1192                        0,
1193                        FALSE,
1194                        DUPLICATE_SAME_ACCESS)) {
1195     err = GetLastError();
1196     goto out1;
1197   }
1198 
1199   /* The lock needs to be held when thread handle is modified. */
1200   EnterCriticalSection(lock);
1201   if (*thread_ptr == INVALID_HANDLE_VALUE) {
1202     /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1203     err = ERROR_OPERATION_ABORTED;
1204   } else {
1205     /* Let main thread know which worker thread is doing the blocking read. */
1206     assert(*thread_ptr == NULL);
1207     *thread_ptr = thread;
1208   }
1209   LeaveCriticalSection(lock);
1210 
1211   if (err)
1212     goto out2;
1213 
1214   /* Block the thread until data is available on the pipe, or the read is
1215    * cancelled. */
1216   if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1217     err = GetLastError();
1218 
1219   /* Let the main thread know the worker is past the point of blocking. */
1220   assert(thread == *thread_ptr);
1221   *thread_ptr = INVALID_HANDLE_VALUE;
1222 
1223   /* Briefly acquire the mutex. Since the main thread holds the lock while it
1224    * is spinning trying to cancel this thread's I/O, we will block here until
1225    * it stops doing that. */
1226   EnterCriticalSection(lock);
1227   LeaveCriticalSection(lock);
1228 
1229 out2:
1230   /* Close the handle to the current thread. */
1231   CloseHandle(thread);
1232 
1233 out1:
1234   /* Set request status and post a completion record to the IOCP. */
1235   if (err)
1236     SET_REQ_ERROR(req, err);
1237   else
1238     SET_REQ_SUCCESS(req);
1239   POST_COMPLETION_FOR_REQ(loop, req);
1240 
1241   return 0;
1242 }
1243 
1244 
uv_pipe_writefile_thread_proc(void * parameter)1245 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1246   int result;
1247   DWORD bytes;
1248   uv_write_t* req = (uv_write_t*) parameter;
1249   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1250   uv_loop_t* loop = handle->loop;
1251 
1252   assert(req != NULL);
1253   assert(req->type == UV_WRITE);
1254   assert(handle->type == UV_NAMED_PIPE);
1255 
1256   result = WriteFile(handle->handle,
1257                      req->write_buffer.base,
1258                      req->write_buffer.len,
1259                      &bytes,
1260                      NULL);
1261 
1262   if (!result) {
1263     SET_REQ_ERROR(req, GetLastError());
1264   }
1265 
1266   POST_COMPLETION_FOR_REQ(loop, req);
1267   return 0;
1268 }
1269 
1270 
post_completion_read_wait(void * context,BOOLEAN timed_out)1271 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1272   uv_read_t* req;
1273   uv_tcp_t* handle;
1274 
1275   req = (uv_read_t*) context;
1276   assert(req != NULL);
1277   handle = (uv_tcp_t*)req->data;
1278   assert(handle != NULL);
1279   assert(!timed_out);
1280 
1281   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1282                                   req->u.io.overlapped.InternalHigh,
1283                                   0,
1284                                   &req->u.io.overlapped)) {
1285     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1286   }
1287 }
1288 
1289 
post_completion_write_wait(void * context,BOOLEAN timed_out)1290 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1291   uv_write_t* req;
1292   uv_tcp_t* handle;
1293 
1294   req = (uv_write_t*) context;
1295   assert(req != NULL);
1296   handle = (uv_tcp_t*)req->handle;
1297   assert(handle != NULL);
1298   assert(!timed_out);
1299 
1300   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1301                                   req->u.io.overlapped.InternalHigh,
1302                                   0,
1303                                   &req->u.io.overlapped)) {
1304     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1305   }
1306 }
1307 
1308 
uv__pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1309 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1310   uv_read_t* req;
1311   int result;
1312 
1313   assert(handle->flags & UV_HANDLE_READING);
1314   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1315 
1316   assert(handle->handle != INVALID_HANDLE_VALUE);
1317 
1318   req = &handle->read_req;
1319 
1320   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1321     handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1322     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1323                            req,
1324                            WT_EXECUTELONGFUNCTION)) {
1325       /* Make this req pending reporting an error. */
1326       SET_REQ_ERROR(req, GetLastError());
1327       goto error;
1328     }
1329   } else {
1330     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1331     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1332       assert(req->event_handle != NULL);
1333       req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1334     }
1335 
1336     /* Do 0-read */
1337     result = ReadFile(handle->handle,
1338                       &uv_zero_,
1339                       0,
1340                       NULL,
1341                       &req->u.io.overlapped);
1342 
1343     if (!result && GetLastError() != ERROR_IO_PENDING) {
1344       /* Make this req pending reporting an error. */
1345       SET_REQ_ERROR(req, GetLastError());
1346       goto error;
1347     }
1348 
1349     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1350       if (req->wait_handle == INVALID_HANDLE_VALUE) {
1351         if (!RegisterWaitForSingleObject(&req->wait_handle,
1352             req->event_handle, post_completion_read_wait, (void*) req,
1353             INFINITE, WT_EXECUTEINWAITTHREAD)) {
1354           SET_REQ_ERROR(req, GetLastError());
1355           goto error;
1356         }
1357       }
1358     }
1359   }
1360 
1361   /* Start the eof timer if there is one */
1362   eof_timer_start(handle);
1363   handle->flags |= UV_HANDLE_READ_PENDING;
1364   handle->reqs_pending++;
1365   return;
1366 
1367 error:
1368   uv__insert_pending_req(loop, (uv_req_t*)req);
1369   handle->flags |= UV_HANDLE_READ_PENDING;
1370   handle->reqs_pending++;
1371 }
1372 
1373 
uv__pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1374 int uv__pipe_read_start(uv_pipe_t* handle,
1375                         uv_alloc_cb alloc_cb,
1376                         uv_read_cb read_cb) {
1377   uv_loop_t* loop = handle->loop;
1378 
1379   handle->flags |= UV_HANDLE_READING;
1380   INCREASE_ACTIVE_COUNT(loop, handle);
1381   handle->read_cb = read_cb;
1382   handle->alloc_cb = alloc_cb;
1383 
1384   /* If reading was stopped and then started again, there could still be a read
1385    * request pending. */
1386   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1387     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1388         handle->read_req.event_handle == NULL) {
1389       handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1390       if (handle->read_req.event_handle == NULL) {
1391         uv_fatal_error(GetLastError(), "CreateEvent");
1392       }
1393     }
1394     uv__pipe_queue_read(loop, handle);
1395   }
1396 
1397   return 0;
1398 }
1399 
1400 
uv__insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1401 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1402     uv_write_t* req) {
1403   req->next_req = NULL;
1404   if (handle->pipe.conn.non_overlapped_writes_tail) {
1405     req->next_req =
1406       handle->pipe.conn.non_overlapped_writes_tail->next_req;
1407     handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1408     handle->pipe.conn.non_overlapped_writes_tail = req;
1409   } else {
1410     req->next_req = (uv_req_t*)req;
1411     handle->pipe.conn.non_overlapped_writes_tail = req;
1412   }
1413 }
1414 
1415 
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1416 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1417   uv_write_t* req;
1418 
1419   if (handle->pipe.conn.non_overlapped_writes_tail) {
1420     req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1421 
1422     if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1423       handle->pipe.conn.non_overlapped_writes_tail = NULL;
1424     } else {
1425       handle->pipe.conn.non_overlapped_writes_tail->next_req =
1426         req->next_req;
1427     }
1428 
1429     return req;
1430   } else {
1431     /* queue empty */
1432     return NULL;
1433   }
1434 }
1435 
1436 
uv__queue_non_overlapped_write(uv_pipe_t * handle)1437 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1438   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1439   if (req) {
1440     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1441                            req,
1442                            WT_EXECUTELONGFUNCTION)) {
1443       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1444     }
1445   }
1446 }
1447 
1448 
uv__build_coalesced_write_req(uv_write_t * user_req,const uv_buf_t bufs[],size_t nbufs,uv_write_t ** req_out,uv_buf_t * write_buf_out)1449 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1450                                          const uv_buf_t bufs[],
1451                                          size_t nbufs,
1452                                          uv_write_t** req_out,
1453                                          uv_buf_t* write_buf_out) {
1454   /* Pack into a single heap-allocated buffer:
1455    *   (a) a uv_write_t structure where libuv stores the actual state.
1456    *   (b) a pointer to the original uv_write_t.
1457    *   (c) data from all `bufs` entries.
1458    */
1459   char* heap_buffer;
1460   size_t heap_buffer_length, heap_buffer_offset;
1461   uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1462   char* data_start;                           /* (c) */
1463   size_t data_length;
1464   unsigned int i;
1465 
1466   /* Compute combined size of all combined buffers from `bufs`. */
1467   data_length = 0;
1468   for (i = 0; i < nbufs; i++)
1469     data_length += bufs[i].len;
1470 
1471   /* The total combined size of data buffers should not exceed UINT32_MAX,
1472    * because WriteFile() won't accept buffers larger than that. */
1473   if (data_length > UINT32_MAX)
1474     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1475 
1476   /* Compute heap buffer size. */
1477   heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1478                        data_length;                  /* (c) */
1479 
1480   /* Allocate buffer. */
1481   heap_buffer = uv__malloc(heap_buffer_length);
1482   if (heap_buffer == NULL)
1483     return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1484 
1485   /* Copy uv_write_t information to the buffer. */
1486   coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1487   coalesced_write_req->req = *user_req; /* copy (a) */
1488   coalesced_write_req->req.coalesced = 1;
1489   coalesced_write_req->user_req = user_req;         /* copy (b) */
1490   heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1491 
1492   /* Copy data buffers to the heap buffer. */
1493   data_start = &heap_buffer[heap_buffer_offset];
1494   for (i = 0; i < nbufs; i++) {
1495     memcpy(&heap_buffer[heap_buffer_offset],
1496            bufs[i].base,
1497            bufs[i].len);               /* copy (c) */
1498     heap_buffer_offset += bufs[i].len; /* offset (c) */
1499   }
1500   assert(heap_buffer_offset == heap_buffer_length);
1501 
1502   /* Set out arguments and return. */
1503   *req_out = &coalesced_write_req->req;
1504   *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1505   return 0;
1506 }
1507 
1508 
uv__pipe_write_data(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_write_cb cb,int copy_always)1509 static int uv__pipe_write_data(uv_loop_t* loop,
1510                                uv_write_t* req,
1511                                uv_pipe_t* handle,
1512                                const uv_buf_t bufs[],
1513                                size_t nbufs,
1514                                uv_write_cb cb,
1515                                int copy_always) {
1516   int err;
1517   int result;
1518   uv_buf_t write_buf;
1519 
1520   assert(handle->handle != INVALID_HANDLE_VALUE);
1521 
1522   UV_REQ_INIT(req, UV_WRITE);
1523   req->handle = (uv_stream_t*) handle;
1524   req->send_handle = NULL;
1525   req->cb = cb;
1526   /* Private fields. */
1527   req->coalesced = 0;
1528   req->event_handle = NULL;
1529   req->wait_handle = INVALID_HANDLE_VALUE;
1530 
1531   /* Prepare the overlapped structure. */
1532   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1533   if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1534     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1535     if (req->event_handle == NULL) {
1536       uv_fatal_error(GetLastError(), "CreateEvent");
1537     }
1538     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1539   }
1540   req->write_buffer = uv_null_buf_;
1541 
1542   if (nbufs == 0) {
1543     /* Write empty buffer. */
1544     write_buf = uv_null_buf_;
1545   } else if (nbufs == 1 && !copy_always) {
1546     /* Write directly from bufs[0]. */
1547     write_buf = bufs[0];
1548   } else {
1549     /* Coalesce all `bufs` into one big buffer. This also creates a new
1550      * write-request structure that replaces the old one. */
1551     err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1552     if (err != 0)
1553       return err;
1554   }
1555 
1556   if ((handle->flags &
1557       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1558       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1559     DWORD bytes;
1560     result =
1561         WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1562 
1563     if (!result) {
1564       err = GetLastError();
1565       return err;
1566     } else {
1567       /* Request completed immediately. */
1568       req->u.io.queued_bytes = 0;
1569     }
1570 
1571     REGISTER_HANDLE_REQ(loop, handle, req);
1572     handle->reqs_pending++;
1573     handle->stream.conn.write_reqs_pending++;
1574     POST_COMPLETION_FOR_REQ(loop, req);
1575     return 0;
1576   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1577     req->write_buffer = write_buf;
1578     uv__insert_non_overlapped_write_req(handle, req);
1579     if (handle->stream.conn.write_reqs_pending == 0) {
1580       uv__queue_non_overlapped_write(handle);
1581     }
1582 
1583     /* Request queued by the kernel. */
1584     req->u.io.queued_bytes = write_buf.len;
1585     handle->write_queue_size += req->u.io.queued_bytes;
1586   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1587     /* Using overlapped IO, but wait for completion before returning */
1588     result = WriteFile(handle->handle,
1589                        write_buf.base,
1590                        write_buf.len,
1591                        NULL,
1592                        &req->u.io.overlapped);
1593 
1594     if (!result && GetLastError() != ERROR_IO_PENDING) {
1595       err = GetLastError();
1596       CloseHandle(req->event_handle);
1597       req->event_handle = NULL;
1598       return err;
1599     }
1600 
1601     if (result) {
1602       /* Request completed immediately. */
1603       req->u.io.queued_bytes = 0;
1604     } else {
1605       /* Request queued by the kernel. */
1606       req->u.io.queued_bytes = write_buf.len;
1607       handle->write_queue_size += req->u.io.queued_bytes;
1608       if (WaitForSingleObject(req->event_handle, INFINITE) !=
1609           WAIT_OBJECT_0) {
1610         err = GetLastError();
1611         CloseHandle(req->event_handle);
1612         req->event_handle = NULL;
1613         return err;
1614       }
1615     }
1616     CloseHandle(req->event_handle);
1617     req->event_handle = NULL;
1618 
1619     REGISTER_HANDLE_REQ(loop, handle, req);
1620     handle->reqs_pending++;
1621     handle->stream.conn.write_reqs_pending++;
1622     return 0;
1623   } else {
1624     result = WriteFile(handle->handle,
1625                        write_buf.base,
1626                        write_buf.len,
1627                        NULL,
1628                        &req->u.io.overlapped);
1629 
1630     if (!result && GetLastError() != ERROR_IO_PENDING) {
1631       return GetLastError();
1632     }
1633 
1634     if (result) {
1635       /* Request completed immediately. */
1636       req->u.io.queued_bytes = 0;
1637     } else {
1638       /* Request queued by the kernel. */
1639       req->u.io.queued_bytes = write_buf.len;
1640       handle->write_queue_size += req->u.io.queued_bytes;
1641     }
1642 
1643     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1644       if (!RegisterWaitForSingleObject(&req->wait_handle,
1645           req->event_handle, post_completion_write_wait, (void*) req,
1646           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1647         return GetLastError();
1648       }
1649     }
1650   }
1651 
1652   REGISTER_HANDLE_REQ(loop, handle, req);
1653   handle->reqs_pending++;
1654   handle->stream.conn.write_reqs_pending++;
1655 
1656   return 0;
1657 }
1658 
1659 
uv__pipe_get_ipc_remote_pid(uv_pipe_t * handle)1660 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1661   DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1662 
1663   /* If the both ends of the IPC pipe are owned by the same process,
1664    * the remote end pid may not yet be set. If so, do it here.
1665    * TODO: this is weird; it'd probably better to use a handshake. */
1666   if (*pid == 0)
1667     *pid = GetCurrentProcessId();
1668 
1669   return *pid;
1670 }
1671 
1672 
uv__pipe_write_ipc(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t data_bufs[],size_t data_buf_count,uv_stream_t * send_handle,uv_write_cb cb)1673 int uv__pipe_write_ipc(uv_loop_t* loop,
1674                        uv_write_t* req,
1675                        uv_pipe_t* handle,
1676                        const uv_buf_t data_bufs[],
1677                        size_t data_buf_count,
1678                        uv_stream_t* send_handle,
1679                        uv_write_cb cb) {
1680   uv_buf_t stack_bufs[6];
1681   uv_buf_t* bufs;
1682   size_t buf_count, buf_index;
1683   uv__ipc_frame_header_t frame_header;
1684   uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1685   uv__ipc_socket_xfer_info_t xfer_info;
1686   uint64_t data_length;
1687   size_t i;
1688   int err;
1689 
1690   /* Compute the combined size of data buffers. */
1691   data_length = 0;
1692   for (i = 0; i < data_buf_count; i++)
1693     data_length += data_bufs[i].len;
1694   if (data_length > UINT32_MAX)
1695     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1696 
1697   /* Prepare the frame's socket xfer payload. */
1698   if (send_handle != NULL) {
1699     uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1700 
1701     /* Verify that `send_handle` it is indeed a tcp handle. */
1702     if (send_tcp_handle->type != UV_TCP)
1703       return ERROR_NOT_SUPPORTED;
1704 
1705     /* Export the tcp handle. */
1706     err = uv__tcp_xfer_export(send_tcp_handle,
1707                               uv__pipe_get_ipc_remote_pid(handle),
1708                               &xfer_type,
1709                               &xfer_info);
1710     if (err != 0)
1711       return err;
1712   }
1713 
1714   /* Compute the number of uv_buf_t's required. */
1715   buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1716   if (send_handle != NULL)
1717     buf_count += 1; /* One extra for the socket xfer information. */
1718 
1719   /* Use the on-stack buffer array if it is big enough; otherwise allocate
1720    * space for it on the heap. */
1721   if (buf_count < ARRAY_SIZE(stack_bufs)) {
1722     /* Use on-stack buffer array. */
1723     bufs = stack_bufs;
1724   } else {
1725     /* Use heap-allocated buffer array. */
1726     bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1727     if (bufs == NULL)
1728       return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1729   }
1730   buf_index = 0;
1731 
1732   /* Initialize frame header and add it to the buffers list. */
1733   memset(&frame_header, 0, sizeof frame_header);
1734   bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1735 
1736   if (send_handle != NULL) {
1737     /* Add frame header flags. */
1738     switch (xfer_type) {
1739       case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1740         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1741                               UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1742         break;
1743       case UV__IPC_SOCKET_XFER_TCP_SERVER:
1744         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1745         break;
1746       default:
1747         assert(0);  /* Unreachable. */
1748     }
1749     /* Add xfer info buffer. */
1750     bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1751   }
1752 
1753   if (data_length > 0) {
1754     /* Update frame header. */
1755     frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1756     frame_header.data_length = (uint32_t) data_length;
1757     /* Add data buffers to buffers list. */
1758     for (i = 0; i < data_buf_count; i++)
1759       bufs[buf_index++] = data_bufs[i];
1760   }
1761 
1762   /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1763    * some of the written data lives on the stack. */
1764   err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1765 
1766   /* If we had to heap-allocate the bufs array, free it now. */
1767   if (bufs != stack_bufs) {
1768     uv__free(bufs);
1769   }
1770 
1771   return err;
1772 }
1773 
1774 
uv__pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_stream_t * send_handle,uv_write_cb cb)1775 int uv__pipe_write(uv_loop_t* loop,
1776                    uv_write_t* req,
1777                    uv_pipe_t* handle,
1778                    const uv_buf_t bufs[],
1779                    size_t nbufs,
1780                    uv_stream_t* send_handle,
1781                    uv_write_cb cb) {
1782   if (handle->ipc) {
1783     /* IPC pipe write: use framing protocol. */
1784     return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1785   } else {
1786     /* Non-IPC pipe write: put data on the wire directly. */
1787     assert(send_handle == NULL);
1788     return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1789   }
1790 }
1791 
1792 
uv__pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1793 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1794     uv_buf_t buf) {
1795   /* If there is an eof timer running, we don't need it any more, so discard
1796    * it. */
1797   eof_timer_destroy(handle);
1798 
1799   uv_read_stop((uv_stream_t*) handle);
1800 
1801   handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1802 }
1803 
1804 
uv__pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1805 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1806     uv_buf_t buf) {
1807   /* If there is an eof timer running, we don't need it any more, so discard
1808    * it. */
1809   eof_timer_destroy(handle);
1810 
1811   uv_read_stop((uv_stream_t*) handle);
1812 
1813   handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1814 }
1815 
1816 
uv__pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1817 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1818     int error, uv_buf_t buf) {
1819   if (error == ERROR_BROKEN_PIPE) {
1820     uv__pipe_read_eof(loop, handle, buf);
1821   } else {
1822     uv__pipe_read_error(loop, handle, error, buf);
1823   }
1824 }
1825 
1826 
uv__pipe_queue_ipc_xfer_info(uv_pipe_t * handle,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1827 static void uv__pipe_queue_ipc_xfer_info(
1828     uv_pipe_t* handle,
1829     uv__ipc_socket_xfer_type_t xfer_type,
1830     uv__ipc_socket_xfer_info_t* xfer_info) {
1831   uv__ipc_xfer_queue_item_t* item;
1832 
1833   item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1834   if (item == NULL)
1835     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1836 
1837   item->xfer_type = xfer_type;
1838   item->xfer_info = *xfer_info;
1839 
1840   QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1841   handle->pipe.conn.ipc_xfer_queue_length++;
1842 }
1843 
1844 
1845 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1846  * encountered before the requested number of bytes are read, an error is
1847  * returned. */
uv__pipe_read_exactly(HANDLE h,void * buffer,DWORD count)1848 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1849   DWORD bytes_read, bytes_read_now;
1850 
1851   bytes_read = 0;
1852   while (bytes_read < count) {
1853     if (!ReadFile(h,
1854                   (char*) buffer + bytes_read,
1855                   count - bytes_read,
1856                   &bytes_read_now,
1857                   NULL)) {
1858       return GetLastError();
1859     }
1860 
1861     bytes_read += bytes_read_now;
1862   }
1863 
1864   assert(bytes_read == count);
1865   return 0;
1866 }
1867 
1868 
uv__pipe_read_data(uv_loop_t * loop,uv_pipe_t * handle,DWORD suggested_bytes,DWORD max_bytes)1869 static DWORD uv__pipe_read_data(uv_loop_t* loop,
1870                                 uv_pipe_t* handle,
1871                                 DWORD suggested_bytes,
1872                                 DWORD max_bytes) {
1873   DWORD bytes_read;
1874   uv_buf_t buf;
1875 
1876   /* Ask the user for a buffer to read data into. */
1877   buf = uv_buf_init(NULL, 0);
1878   handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1879   if (buf.base == NULL || buf.len == 0) {
1880     handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1881     return 0; /* Break out of read loop. */
1882   }
1883 
1884   /* Ensure we read at most the smaller of:
1885    *   (a) the length of the user-allocated buffer.
1886    *   (b) the maximum data length as specified by the `max_bytes` argument.
1887    */
1888   if (max_bytes > buf.len)
1889     max_bytes = buf.len;
1890 
1891   /* Read into the user buffer. */
1892   if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1893     uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1894     return 0; /* Break out of read loop. */
1895   }
1896 
1897   /* Call the read callback. */
1898   handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1899 
1900   return bytes_read;
1901 }
1902 
1903 
uv__pipe_read_ipc(uv_loop_t * loop,uv_pipe_t * handle)1904 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1905   uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1906   int err;
1907 
1908   if (*data_remaining > 0) {
1909     /* Read frame data payload. */
1910     DWORD bytes_read =
1911         uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1912     *data_remaining -= bytes_read;
1913     return bytes_read;
1914 
1915   } else {
1916     /* Start of a new IPC frame. */
1917     uv__ipc_frame_header_t frame_header;
1918     uint32_t xfer_flags;
1919     uv__ipc_socket_xfer_type_t xfer_type;
1920     uv__ipc_socket_xfer_info_t xfer_info;
1921 
1922     /* Read the IPC frame header. */
1923     err = uv__pipe_read_exactly(
1924         handle->handle, &frame_header, sizeof frame_header);
1925     if (err)
1926       goto error;
1927 
1928     /* Validate that flags are valid. */
1929     if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
1930       goto invalid;
1931     /* Validate that reserved2 is zero. */
1932     if (frame_header.reserved2 != 0)
1933       goto invalid;
1934 
1935     /* Parse xfer flags. */
1936     xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
1937     if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
1938       /* Socket coming -- determine the type. */
1939       xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
1940                       ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
1941                       : UV__IPC_SOCKET_XFER_TCP_SERVER;
1942     } else if (xfer_flags == 0) {
1943       /* No socket. */
1944       xfer_type = UV__IPC_SOCKET_XFER_NONE;
1945     } else {
1946       /* Invalid flags. */
1947       goto invalid;
1948     }
1949 
1950     /* Parse data frame information. */
1951     if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
1952       *data_remaining = frame_header.data_length;
1953     } else if (frame_header.data_length != 0) {
1954       /* Data length greater than zero but data flag not set -- invalid. */
1955       goto invalid;
1956     }
1957 
1958     /* If no socket xfer info follows, return here. Data will be read in a
1959      * subsequent invocation of uv__pipe_read_ipc(). */
1960     if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
1961       return sizeof frame_header; /* Number of bytes read. */
1962 
1963     /* Read transferred socket information. */
1964     err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
1965     if (err)
1966       goto error;
1967 
1968     /* Store the pending socket info. */
1969     uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
1970 
1971     /* Return number of bytes read. */
1972     return sizeof frame_header + sizeof xfer_info;
1973   }
1974 
1975 invalid:
1976   /* Invalid frame. */
1977   err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
1978 
1979 error:
1980   uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1981   return 0; /* Break out of read loop. */
1982 }
1983 
1984 
uv__process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1985 void uv__process_pipe_read_req(uv_loop_t* loop,
1986                                uv_pipe_t* handle,
1987                                uv_req_t* req) {
1988   assert(handle->type == UV_NAMED_PIPE);
1989 
1990   handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
1991   DECREASE_PENDING_REQ_COUNT(handle);
1992   eof_timer_stop(handle);
1993 
1994   /* At this point, we're done with bookkeeping. If the user has stopped
1995    * reading the pipe in the meantime, there is nothing left to do, since there
1996    * is no callback that we can call. */
1997   if (!(handle->flags & UV_HANDLE_READING))
1998     return;
1999 
2000   if (!REQ_SUCCESS(req)) {
2001     /* An error occurred doing the zero-read. */
2002     DWORD err = GET_REQ_ERROR(req);
2003 
2004     /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
2005      * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
2006      * the user; we'll start a new zero-read at the end of this function. */
2007     if (err != ERROR_OPERATION_ABORTED)
2008       uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2009 
2010   } else {
2011     /* The zero-read completed without error, indicating there is data
2012      * available in the kernel buffer. */
2013     DWORD avail;
2014 
2015     /* Get the number of bytes available. */
2016     avail = 0;
2017     if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
2018       uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
2019 
2020     /* Read until we've either read all the bytes available, or the 'reading'
2021      * flag is cleared. */
2022     while (avail > 0 && handle->flags & UV_HANDLE_READING) {
2023       /* Depending on the type of pipe, read either IPC frames or raw data. */
2024       DWORD bytes_read =
2025           handle->ipc ? uv__pipe_read_ipc(loop, handle)
2026                       : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2027 
2028       /* If no bytes were read, treat this as an indication that an error
2029        * occurred, and break out of the read loop. */
2030       if (bytes_read == 0)
2031         break;
2032 
2033       /* It is possible that more bytes were read than we thought were
2034        * available. To prevent `avail` from underflowing, break out of the loop
2035        * if this is the case. */
2036       if (bytes_read > avail)
2037         break;
2038 
2039       /* Recompute the number of bytes available. */
2040       avail -= bytes_read;
2041     }
2042   }
2043 
2044   /* Start another zero-read request if necessary. */
2045   if ((handle->flags & UV_HANDLE_READING) &&
2046       !(handle->flags & UV_HANDLE_READ_PENDING)) {
2047     uv__pipe_queue_read(loop, handle);
2048   }
2049 }
2050 
2051 
uv__process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)2052 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2053     uv_write_t* req) {
2054   int err;
2055 
2056   assert(handle->type == UV_NAMED_PIPE);
2057 
2058   assert(handle->write_queue_size >= req->u.io.queued_bytes);
2059   handle->write_queue_size -= req->u.io.queued_bytes;
2060 
2061   UNREGISTER_HANDLE_REQ(loop, handle, req);
2062 
2063   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2064     if (req->wait_handle != INVALID_HANDLE_VALUE) {
2065       UnregisterWait(req->wait_handle);
2066       req->wait_handle = INVALID_HANDLE_VALUE;
2067     }
2068     if (req->event_handle) {
2069       CloseHandle(req->event_handle);
2070       req->event_handle = NULL;
2071     }
2072   }
2073 
2074   err = GET_REQ_ERROR(req);
2075 
2076   /* If this was a coalesced write, extract pointer to the user_provided
2077    * uv_write_t structure so we can pass the expected pointer to the callback,
2078    * then free the heap-allocated write req. */
2079   if (req->coalesced) {
2080     uv__coalesced_write_t* coalesced_write =
2081         container_of(req, uv__coalesced_write_t, req);
2082     req = coalesced_write->user_req;
2083     uv__free(coalesced_write);
2084   }
2085   if (req->cb) {
2086     req->cb(req, uv_translate_sys_error(err));
2087   }
2088 
2089   handle->stream.conn.write_reqs_pending--;
2090 
2091   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2092       handle->pipe.conn.non_overlapped_writes_tail) {
2093     assert(handle->stream.conn.write_reqs_pending > 0);
2094     uv__queue_non_overlapped_write(handle);
2095   }
2096 
2097   if (handle->stream.conn.shutdown_req != NULL &&
2098       handle->stream.conn.write_reqs_pending == 0) {
2099     uv__want_endgame(loop, (uv_handle_t*)handle);
2100   }
2101 
2102   DECREASE_PENDING_REQ_COUNT(handle);
2103 }
2104 
2105 
uv__process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)2106 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2107     uv_req_t* raw_req) {
2108   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2109 
2110   assert(handle->type == UV_NAMED_PIPE);
2111 
2112   if (handle->flags & UV_HANDLE_CLOSING) {
2113     /* The req->pipeHandle should be freed already in uv__pipe_cleanup(). */
2114     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2115     DECREASE_PENDING_REQ_COUNT(handle);
2116     return;
2117   }
2118 
2119   if (REQ_SUCCESS(req)) {
2120     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2121     req->next_pending = handle->pipe.serv.pending_accepts;
2122     handle->pipe.serv.pending_accepts = req;
2123 
2124     if (handle->stream.serv.connection_cb) {
2125       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2126     }
2127   } else {
2128     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2129       CloseHandle(req->pipeHandle);
2130       req->pipeHandle = INVALID_HANDLE_VALUE;
2131     }
2132     if (!(handle->flags & UV_HANDLE_CLOSING)) {
2133       uv__pipe_queue_accept(loop, handle, req, FALSE);
2134     }
2135   }
2136 
2137   DECREASE_PENDING_REQ_COUNT(handle);
2138 }
2139 
2140 
uv__process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)2141 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2142     uv_connect_t* req) {
2143   int err;
2144 
2145   assert(handle->type == UV_NAMED_PIPE);
2146 
2147   UNREGISTER_HANDLE_REQ(loop, handle, req);
2148 
2149   if (req->cb) {
2150     err = 0;
2151     if (REQ_SUCCESS(req)) {
2152       uv__pipe_connection_init(handle);
2153     } else {
2154       err = GET_REQ_ERROR(req);
2155     }
2156     req->cb(req, uv_translate_sys_error(err));
2157   }
2158 
2159   DECREASE_PENDING_REQ_COUNT(handle);
2160 }
2161 
2162 
uv__process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)2163 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2164     uv_shutdown_t* req) {
2165   assert(handle->type == UV_NAMED_PIPE);
2166 
2167   UNREGISTER_HANDLE_REQ(loop, handle, req);
2168 
2169   if (handle->flags & UV_HANDLE_READABLE) {
2170     /* Initialize and optionally start the eof timer. Only do this if the pipe
2171      * is readable and we haven't seen EOF come in ourselves. */
2172     eof_timer_init(handle);
2173 
2174     /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2175      * start it. */
2176     if (handle->flags & UV_HANDLE_READ_PENDING) {
2177       eof_timer_start(handle);
2178     }
2179 
2180   } else {
2181     /* This pipe is not readable. We can just close it to let the other end
2182      * know that we're done writing. */
2183     close_pipe(handle);
2184   }
2185 
2186   if (req->cb) {
2187     req->cb(req, 0);
2188   }
2189 
2190   DECREASE_PENDING_REQ_COUNT(handle);
2191 }
2192 
2193 
eof_timer_init(uv_pipe_t * pipe)2194 static void eof_timer_init(uv_pipe_t* pipe) {
2195   int r;
2196 
2197   assert(pipe->pipe.conn.eof_timer == NULL);
2198   assert(pipe->flags & UV_HANDLE_CONNECTION);
2199 
2200   pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2201 
2202   r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2203   assert(r == 0); /* timers can't fail */
2204   pipe->pipe.conn.eof_timer->data = pipe;
2205   uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2206 }
2207 
2208 
eof_timer_start(uv_pipe_t * pipe)2209 static void eof_timer_start(uv_pipe_t* pipe) {
2210   assert(pipe->flags & UV_HANDLE_CONNECTION);
2211 
2212   if (pipe->pipe.conn.eof_timer != NULL) {
2213     uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2214   }
2215 }
2216 
2217 
eof_timer_stop(uv_pipe_t * pipe)2218 static void eof_timer_stop(uv_pipe_t* pipe) {
2219   assert(pipe->flags & UV_HANDLE_CONNECTION);
2220 
2221   if (pipe->pipe.conn.eof_timer != NULL) {
2222     uv_timer_stop(pipe->pipe.conn.eof_timer);
2223   }
2224 }
2225 
2226 
eof_timer_cb(uv_timer_t * timer)2227 static void eof_timer_cb(uv_timer_t* timer) {
2228   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2229   uv_loop_t* loop = timer->loop;
2230 
2231   assert(pipe->type == UV_NAMED_PIPE);
2232 
2233   /* This should always be true, since we start the timer only in
2234    * uv__pipe_queue_read after successfully calling ReadFile, or in
2235    * uv__process_pipe_shutdown_req if a read is pending, and we always
2236    * immediately stop the timer in uv__process_pipe_read_req. */
2237   assert(pipe->flags & UV_HANDLE_READ_PENDING);
2238 
2239   /* If there are many packets coming off the iocp then the timer callback may
2240    * be called before the read request is coming off the queue. Therefore we
2241    * check here if the read request has completed but will be processed later.
2242    */
2243   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2244       HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2245     return;
2246   }
2247 
2248   /* Force both ends off the pipe. */
2249   close_pipe(pipe);
2250 
2251   /* Stop reading, so the pending read that is going to fail will not be
2252    * reported to the user. */
2253   uv_read_stop((uv_stream_t*) pipe);
2254 
2255   /* Report the eof and update flags. This will get reported even if the user
2256    * stopped reading in the meantime. TODO: is that okay? */
2257   uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2258 }
2259 
2260 
eof_timer_destroy(uv_pipe_t * pipe)2261 static void eof_timer_destroy(uv_pipe_t* pipe) {
2262   assert(pipe->flags & UV_HANDLE_CONNECTION);
2263 
2264   if (pipe->pipe.conn.eof_timer) {
2265     uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2266     pipe->pipe.conn.eof_timer = NULL;
2267   }
2268 }
2269 
2270 
eof_timer_close_cb(uv_handle_t * handle)2271 static void eof_timer_close_cb(uv_handle_t* handle) {
2272   assert(handle->type == UV_TIMER);
2273   uv__free(handle);
2274 }
2275 
2276 
uv_pipe_open(uv_pipe_t * pipe,uv_file file)2277 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2278   HANDLE os_handle = uv__get_osfhandle(file);
2279   NTSTATUS nt_status;
2280   IO_STATUS_BLOCK io_status;
2281   FILE_ACCESS_INFORMATION access;
2282   DWORD duplex_flags = 0;
2283 
2284   if (os_handle == INVALID_HANDLE_VALUE)
2285     return UV_EBADF;
2286 
2287   uv__once_init();
2288   /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2289    * underlying OS handle and forget about the original fd.
2290    * We could also opt to use the original OS handle and just never close it,
2291    * but then there would be no reliable way to cancel pending read operations
2292    * upon close.
2293    */
2294   if (file <= 2) {
2295     if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2296                          os_handle,
2297                          INVALID_HANDLE_VALUE,
2298                          &os_handle,
2299                          0,
2300                          FALSE,
2301                          DUPLICATE_SAME_ACCESS))
2302       return uv_translate_sys_error(GetLastError());
2303     file = -1;
2304   }
2305 
2306   /* Determine what kind of permissions we have on this handle.
2307    * Cygwin opens the pipe in message mode, but we can support it,
2308    * just query the access flags and set the stream flags accordingly.
2309    */
2310   nt_status = pNtQueryInformationFile(os_handle,
2311                                       &io_status,
2312                                       &access,
2313                                       sizeof(access),
2314                                       FileAccessInformation);
2315   if (nt_status != STATUS_SUCCESS)
2316     return UV_EINVAL;
2317 
2318   if (pipe->ipc) {
2319     if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2320         !(access.AccessFlags & FILE_READ_DATA)) {
2321       return UV_EINVAL;
2322     }
2323   }
2324 
2325   if (access.AccessFlags & FILE_WRITE_DATA)
2326     duplex_flags |= UV_HANDLE_WRITABLE;
2327   if (access.AccessFlags & FILE_READ_DATA)
2328     duplex_flags |= UV_HANDLE_READABLE;
2329 
2330   if (os_handle == INVALID_HANDLE_VALUE ||
2331       uv__set_pipe_handle(pipe->loop,
2332                           pipe,
2333                           os_handle,
2334                           file,
2335                           duplex_flags) == -1) {
2336     return UV_EINVAL;
2337   }
2338 
2339   uv__pipe_connection_init(pipe);
2340 
2341   if (pipe->ipc) {
2342     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2343     pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
2344     assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2345   }
2346   return 0;
2347 }
2348 
2349 
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)2350 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2351   NTSTATUS nt_status;
2352   IO_STATUS_BLOCK io_status;
2353   FILE_NAME_INFORMATION tmp_name_info;
2354   FILE_NAME_INFORMATION* name_info;
2355   WCHAR* name_buf;
2356   unsigned int addrlen;
2357   unsigned int name_size;
2358   unsigned int name_len;
2359   int err;
2360 
2361   uv__once_init();
2362   name_info = NULL;
2363 
2364   if (handle->handle == INVALID_HANDLE_VALUE) {
2365     *size = 0;
2366     return UV_EINVAL;
2367   }
2368 
2369   /* NtQueryInformationFile will block if another thread is performing a
2370    * blocking operation on the queried handle. If the pipe handle is
2371    * synchronous, there may be a worker thread currently calling ReadFile() on
2372    * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2373    * the read. */
2374   if (handle->flags & UV_HANDLE_CONNECTION &&
2375       handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2376     uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2377   }
2378 
2379   nt_status = pNtQueryInformationFile(handle->handle,
2380                                       &io_status,
2381                                       &tmp_name_info,
2382                                       sizeof tmp_name_info,
2383                                       FileNameInformation);
2384   if (nt_status == STATUS_BUFFER_OVERFLOW) {
2385     name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2386     name_info = uv__malloc(name_size);
2387     if (!name_info) {
2388       *size = 0;
2389       err = UV_ENOMEM;
2390       goto cleanup;
2391     }
2392 
2393     nt_status = pNtQueryInformationFile(handle->handle,
2394                                         &io_status,
2395                                         name_info,
2396                                         name_size,
2397                                         FileNameInformation);
2398   }
2399 
2400   if (nt_status != STATUS_SUCCESS) {
2401     *size = 0;
2402     err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2403     goto error;
2404   }
2405 
2406   if (!name_info) {
2407     /* the struct on stack was used */
2408     name_buf = tmp_name_info.FileName;
2409     name_len = tmp_name_info.FileNameLength;
2410   } else {
2411     name_buf = name_info->FileName;
2412     name_len = name_info->FileNameLength;
2413   }
2414 
2415   if (name_len == 0) {
2416     *size = 0;
2417     err = 0;
2418     goto error;
2419   }
2420 
2421   name_len /= sizeof(WCHAR);
2422 
2423   /* check how much space we need */
2424   addrlen = WideCharToMultiByte(CP_UTF8,
2425                                 0,
2426                                 name_buf,
2427                                 name_len,
2428                                 NULL,
2429                                 0,
2430                                 NULL,
2431                                 NULL);
2432   if (!addrlen) {
2433     *size = 0;
2434     err = uv_translate_sys_error(GetLastError());
2435     goto error;
2436   } else if (pipe_prefix_len + addrlen >= *size) {
2437     /* "\\\\.\\pipe" + name */
2438     *size = pipe_prefix_len + addrlen + 1;
2439     err = UV_ENOBUFS;
2440     goto error;
2441   }
2442 
2443   memcpy(buffer, pipe_prefix, pipe_prefix_len);
2444   addrlen = WideCharToMultiByte(CP_UTF8,
2445                                 0,
2446                                 name_buf,
2447                                 name_len,
2448                                 buffer+pipe_prefix_len,
2449                                 *size-pipe_prefix_len,
2450                                 NULL,
2451                                 NULL);
2452   if (!addrlen) {
2453     *size = 0;
2454     err = uv_translate_sys_error(GetLastError());
2455     goto error;
2456   }
2457 
2458   addrlen += pipe_prefix_len;
2459   *size = addrlen;
2460   buffer[addrlen] = '\0';
2461 
2462   err = 0;
2463 
2464 error:
2465   uv__free(name_info);
2466 
2467 cleanup:
2468   return err;
2469 }
2470 
2471 
uv_pipe_pending_count(uv_pipe_t * handle)2472 int uv_pipe_pending_count(uv_pipe_t* handle) {
2473   if (!handle->ipc)
2474     return 0;
2475   return handle->pipe.conn.ipc_xfer_queue_length;
2476 }
2477 
2478 
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2479 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2480   if (handle->flags & UV_HANDLE_BOUND)
2481     return uv__pipe_getname(handle, buffer, size);
2482 
2483   if (handle->flags & UV_HANDLE_CONNECTION ||
2484       handle->handle != INVALID_HANDLE_VALUE) {
2485     *size = 0;
2486     return 0;
2487   }
2488 
2489   return UV_EBADF;
2490 }
2491 
2492 
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2493 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2494   /* emulate unix behaviour */
2495   if (handle->flags & UV_HANDLE_BOUND)
2496     return UV_ENOTCONN;
2497 
2498   if (handle->handle != INVALID_HANDLE_VALUE)
2499     return uv__pipe_getname(handle, buffer, size);
2500 
2501   return UV_EBADF;
2502 }
2503 
2504 
uv_pipe_pending_type(uv_pipe_t * handle)2505 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2506   if (!handle->ipc)
2507     return UV_UNKNOWN_HANDLE;
2508   if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2509     return UV_UNKNOWN_HANDLE;
2510   else
2511     return UV_TCP;
2512 }
2513 
uv_pipe_chmod(uv_pipe_t * handle,int mode)2514 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2515   SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2516   PACL old_dacl, new_dacl;
2517   PSECURITY_DESCRIPTOR sd;
2518   EXPLICIT_ACCESS ea;
2519   PSID everyone;
2520   int error;
2521 
2522   if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2523     return UV_EBADF;
2524 
2525   if (mode != UV_READABLE &&
2526       mode != UV_WRITABLE &&
2527       mode != (UV_WRITABLE | UV_READABLE))
2528     return UV_EINVAL;
2529 
2530   if (!AllocateAndInitializeSid(&sid_world,
2531                                 1,
2532                                 SECURITY_WORLD_RID,
2533                                 0, 0, 0, 0, 0, 0, 0,
2534                                 &everyone)) {
2535     error = GetLastError();
2536     goto done;
2537   }
2538 
2539   if (GetSecurityInfo(handle->handle,
2540                       SE_KERNEL_OBJECT,
2541                       DACL_SECURITY_INFORMATION,
2542                       NULL,
2543                       NULL,
2544                       &old_dacl,
2545                       NULL,
2546                       &sd)) {
2547     error = GetLastError();
2548     goto clean_sid;
2549   }
2550 
2551   memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2552   if (mode & UV_READABLE)
2553     ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2554   if (mode & UV_WRITABLE)
2555     ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2556   ea.grfAccessPermissions |= SYNCHRONIZE;
2557   ea.grfAccessMode = SET_ACCESS;
2558   ea.grfInheritance = NO_INHERITANCE;
2559   ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2560   ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2561   ea.Trustee.ptstrName = (LPTSTR)everyone;
2562 
2563   if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2564     error = GetLastError();
2565     goto clean_sd;
2566   }
2567 
2568   if (SetSecurityInfo(handle->handle,
2569                       SE_KERNEL_OBJECT,
2570                       DACL_SECURITY_INFORMATION,
2571                       NULL,
2572                       NULL,
2573                       new_dacl,
2574                       NULL)) {
2575     error = GetLastError();
2576     goto clean_dacl;
2577   }
2578 
2579   error = 0;
2580 
2581 clean_dacl:
2582   LocalFree((HLOCAL) new_dacl);
2583 clean_sd:
2584   LocalFree((HLOCAL) sd);
2585 clean_sid:
2586   FreeSid(everyone);
2587 done:
2588   return uv_translate_sys_error(error);
2589 }
2590