1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 ArrayIsArray, 26 Boolean, 27 Error, 28 Number, 29 NumberIsNaN, 30 ObjectDefineProperty, 31 ObjectSetPrototypeOf, 32 Symbol, 33} = primordials; 34 35const EventEmitter = require('events'); 36const stream = require('stream'); 37let debug = require('internal/util/debuglog').debuglog('net', (fn) => { 38 debug = fn; 39}); 40const { deprecate } = require('internal/util'); 41const { 42 isIP, 43 isIPv4, 44 isIPv6, 45 normalizedArgsSymbol, 46 makeSyncWrite 47} = require('internal/net'); 48const assert = require('internal/assert'); 49const { 50 UV_EADDRINUSE, 51 UV_EINVAL, 52 UV_ENOTCONN 53} = internalBinding('uv'); 54 55const { Buffer } = require('buffer'); 56const { guessHandleType } = internalBinding('util'); 57const { ShutdownWrap } = internalBinding('stream_wrap'); 58const { 59 TCP, 60 TCPConnectWrap, 61 constants: TCPConstants 62} = internalBinding('tcp_wrap'); 63const { 64 Pipe, 65 PipeConnectWrap, 66 constants: PipeConstants 67} = internalBinding('pipe_wrap'); 68const { 69 newAsyncId, 70 defaultTriggerAsyncIdScope, 71 symbols: { async_id_symbol, owner_symbol } 72} = require('internal/async_hooks'); 73const { 74 writevGeneric, 75 writeGeneric, 76 onStreamRead, 77 kAfterAsyncWrite, 78 kHandle, 79 kUpdateTimer, 80 setStreamTimeout, 81 kBuffer, 82 kBufferCb, 83 kBufferGen 84} = require('internal/stream_base_commons'); 85const { 86 codes: { 87 ERR_INVALID_ADDRESS_FAMILY, 88 ERR_INVALID_ARG_TYPE, 89 ERR_INVALID_ARG_VALUE, 90 ERR_INVALID_FD_TYPE, 91 ERR_INVALID_IP_ADDRESS, 92 ERR_INVALID_OPT_VALUE, 93 ERR_SERVER_ALREADY_LISTEN, 94 ERR_SERVER_NOT_RUNNING, 95 ERR_SOCKET_CLOSED 96 }, 97 errnoException, 98 exceptionWithHostPort, 99 uvExceptionWithHostPort 100} = require('internal/errors'); 101const { isUint8Array } = require('internal/util/types'); 102const { 103 validateInt32, 104 validatePort, 105 validateString 106} = require('internal/validators'); 107const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); 108const { 109 DTRACE_NET_SERVER_CONNECTION, 110 DTRACE_NET_STREAM_END 111} = require('internal/dtrace'); 112 113// Lazy loaded to improve startup performance. 114let cluster; 115let dns; 116 117const { clearTimeout } = require('timers'); 118const { kTimeout } = require('internal/timers'); 119 120const DEFAULT_IPV4_ADDR = '0.0.0.0'; 121const DEFAULT_IPV6_ADDR = '::'; 122 123const isWindows = process.platform === 'win32'; 124 125function noop() {} 126 127function getFlags(ipv6Only) { 128 return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0; 129} 130 131function createHandle(fd, is_server) { 132 validateInt32(fd, 'fd', 0); 133 const type = guessHandleType(fd); 134 if (type === 'PIPE') { 135 return new Pipe( 136 is_server ? PipeConstants.SERVER : PipeConstants.SOCKET 137 ); 138 } 139 140 if (type === 'TCP') { 141 return new TCP( 142 is_server ? TCPConstants.SERVER : TCPConstants.SOCKET 143 ); 144 } 145 146 throw new ERR_INVALID_FD_TYPE(type); 147} 148 149 150function getNewAsyncId(handle) { 151 return (!handle || typeof handle.getAsyncId !== 'function') ? 152 newAsyncId() : handle.getAsyncId(); 153} 154 155 156function isPipeName(s) { 157 return typeof s === 'string' && toNumber(s) === false; 158} 159 160function createServer(options, connectionListener) { 161 return new Server(options, connectionListener); 162} 163 164 165// Target API: 166// 167// let s = net.connect({port: 80, host: 'google.com'}, function() { 168// ... 169// }); 170// 171// There are various forms: 172// 173// connect(options, [cb]) 174// connect(port, [host], [cb]) 175// connect(path, [cb]); 176// 177function connect(...args) { 178 const normalized = normalizeArgs(args); 179 const options = normalized[0]; 180 debug('createConnection', normalized); 181 const socket = new Socket(options); 182 183 if (options.timeout) { 184 socket.setTimeout(options.timeout); 185 } 186 187 return socket.connect(normalized); 188} 189 190 191// Returns an array [options, cb], where options is an object, 192// cb is either a function or null. 193// Used to normalize arguments of Socket.prototype.connect() and 194// Server.prototype.listen(). Possible combinations of parameters: 195// (options[...][, cb]) 196// (path[...][, cb]) 197// ([port][, host][...][, cb]) 198// For Socket.prototype.connect(), the [...] part is ignored 199// For Server.prototype.listen(), the [...] part is [, backlog] 200// but will not be handled here (handled in listen()) 201function normalizeArgs(args) { 202 let arr; 203 204 if (args.length === 0) { 205 arr = [{}, null]; 206 arr[normalizedArgsSymbol] = true; 207 return arr; 208 } 209 210 const arg0 = args[0]; 211 let options = {}; 212 if (typeof arg0 === 'object' && arg0 !== null) { 213 // (options[...][, cb]) 214 options = arg0; 215 } else if (isPipeName(arg0)) { 216 // (path[...][, cb]) 217 options.path = arg0; 218 } else { 219 // ([port][, host][...][, cb]) 220 options.port = arg0; 221 if (args.length > 1 && typeof args[1] === 'string') { 222 options.host = args[1]; 223 } 224 } 225 226 const cb = args[args.length - 1]; 227 if (typeof cb !== 'function') 228 arr = [options, null]; 229 else 230 arr = [options, cb]; 231 232 arr[normalizedArgsSymbol] = true; 233 return arr; 234} 235 236 237// Called when creating new Socket, or when re-using a closed Socket 238function initSocketHandle(self) { 239 self._undestroy(); 240 self._sockname = null; 241 242 // Handle creation may be deferred to bind() or connect() time. 243 if (self._handle) { 244 self._handle[owner_symbol] = self; 245 self._handle.onread = onStreamRead; 246 self[async_id_symbol] = getNewAsyncId(self._handle); 247 248 let userBuf = self[kBuffer]; 249 if (userBuf) { 250 const bufGen = self[kBufferGen]; 251 if (bufGen !== null) { 252 userBuf = bufGen(); 253 if (!isUint8Array(userBuf)) 254 return; 255 self[kBuffer] = userBuf; 256 } 257 self._handle.useUserBuffer(userBuf); 258 } 259 } 260} 261 262 263const kBytesRead = Symbol('kBytesRead'); 264const kBytesWritten = Symbol('kBytesWritten'); 265const kSetNoDelay = Symbol('kSetNoDelay'); 266 267function Socket(options) { 268 if (!(this instanceof Socket)) return new Socket(options); 269 270 this.connecting = false; 271 // Problem with this is that users can supply their own handle, that may not 272 // have _handle.getAsyncId(). In this case an[async_id_symbol] should 273 // probably be supplied by async_hooks. 274 this[async_id_symbol] = -1; 275 this._hadError = false; 276 this[kHandle] = null; 277 this._parent = null; 278 this._host = null; 279 this[kSetNoDelay] = false; 280 this[kLastWriteQueueSize] = 0; 281 this[kTimeout] = null; 282 this[kBuffer] = null; 283 this[kBufferCb] = null; 284 this[kBufferGen] = null; 285 286 if (typeof options === 'number') 287 options = { fd: options }; // Legacy interface. 288 else 289 options = { ...options }; 290 291 options.readable = options.readable || false; 292 options.writable = options.writable || false; 293 const { allowHalfOpen } = options; 294 295 // Prevent the "no-half-open enforcer" from being inherited from `Duplex`. 296 options.allowHalfOpen = true; 297 // For backwards compat do not emit close on destroy. 298 options.emitClose = false; 299 // Handle strings directly. 300 options.decodeStrings = false; 301 stream.Duplex.call(this, options); 302 303 // Default to *not* allowing half open sockets. 304 this.allowHalfOpen = Boolean(allowHalfOpen); 305 306 if (options.handle) { 307 this._handle = options.handle; // private 308 this[async_id_symbol] = getNewAsyncId(this._handle); 309 } else { 310 const onread = options.onread; 311 if (onread !== null && typeof onread === 'object' && 312 (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') && 313 typeof onread.callback === 'function') { 314 if (typeof onread.buffer === 'function') { 315 this[kBuffer] = true; 316 this[kBufferGen] = onread.buffer; 317 } else { 318 this[kBuffer] = onread.buffer; 319 } 320 this[kBufferCb] = onread.callback; 321 } 322 if (options.fd !== undefined) { 323 const { fd } = options; 324 let err; 325 326 // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not 327 // a valid `PIPE` or `TCP` descriptor 328 this._handle = createHandle(fd, false); 329 330 err = this._handle.open(fd); 331 332 // While difficult to fabricate, in some architectures 333 // `open` may return an error code for valid file descriptors 334 // which cannot be opened. This is difficult to test as most 335 // un-openable fds will throw on `createHandle` 336 if (err) 337 throw errnoException(err, 'open'); 338 339 this[async_id_symbol] = this._handle.getAsyncId(); 340 341 if ((fd === 1 || fd === 2) && 342 (this._handle instanceof Pipe) && isWindows) { 343 // Make stdout and stderr blocking on Windows 344 err = this._handle.setBlocking(true); 345 if (err) 346 throw errnoException(err, 'setBlocking'); 347 348 this._writev = null; 349 this._write = makeSyncWrite(fd); 350 // makeSyncWrite adjusts this value like the original handle would, so 351 // we need to let it do that by turning it into a writable, own 352 // property. 353 ObjectDefineProperty(this._handle, 'bytesWritten', { 354 value: 0, writable: true 355 }); 356 } 357 } 358 } 359 360 // Shut down the socket when we're finished with it. 361 this.on('end', onReadableStreamEnd); 362 363 initSocketHandle(this); 364 365 this._pendingData = null; 366 this._pendingEncoding = ''; 367 368 // If we have a handle, then start the flow of data into the 369 // buffer. if not, then this will happen when we connect 370 if (this._handle && options.readable !== false) { 371 if (options.pauseOnCreate) { 372 // Stop the handle from reading and pause the stream 373 this._handle.reading = false; 374 this._handle.readStop(); 375 this.readableFlowing = false; 376 } else if (!options.manualStart) { 377 this.read(0); 378 } 379 } 380 381 // Reserve properties 382 this.server = null; 383 this._server = null; 384 385 // Used after `.destroy()` 386 this[kBytesRead] = 0; 387 this[kBytesWritten] = 0; 388} 389ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype); 390ObjectSetPrototypeOf(Socket, stream.Duplex); 391 392// Refresh existing timeouts. 393Socket.prototype._unrefTimer = function _unrefTimer() { 394 for (let s = this; s !== null; s = s._parent) { 395 if (s[kTimeout]) 396 s[kTimeout].refresh(); 397 } 398}; 399 400 401// The user has called .end(), and all the bytes have been 402// sent out to the other side. 403Socket.prototype._final = function(cb) { 404 // If still connecting - defer handling `_final` until 'connect' will happen 405 if (this.pending) { 406 debug('_final: not yet connected'); 407 return this.once('connect', () => this._final(cb)); 408 } 409 410 if (!this._handle) 411 return cb(); 412 413 debug('_final: not ended, call shutdown()'); 414 415 const req = new ShutdownWrap(); 416 req.oncomplete = afterShutdown; 417 req.handle = this._handle; 418 req.callback = cb; 419 const err = this._handle.shutdown(req); 420 421 if (err === 1 || err === UV_ENOTCONN) // synchronous finish 422 return afterShutdown.call(req, 0); 423 else if (err !== 0) 424 return this.destroy(errnoException(err, 'shutdown')); 425}; 426 427 428function afterShutdown(status) { 429 const self = this.handle[owner_symbol]; 430 431 debug('afterShutdown destroyed=%j', self.destroyed, 432 self._readableState); 433 434 this.callback(); 435 436 // Callback may come after call to destroy. 437 if (self.destroyed) 438 return; 439 440 if (!self.readable || self.readableEnded) { 441 debug('readableState ended, destroying'); 442 self.destroy(); 443 } 444} 445 446// Provide a better error message when we call end() as a result 447// of the other side sending a FIN. The standard 'write after end' 448// is overly vague, and makes it seem like the user's code is to blame. 449function writeAfterFIN(chunk, encoding, cb) { 450 if (typeof encoding === 'function') { 451 cb = encoding; 452 encoding = null; 453 } 454 455 // eslint-disable-next-line no-restricted-syntax 456 const er = new Error('This socket has been ended by the other party'); 457 er.code = 'EPIPE'; 458 process.nextTick(emitErrorNT, this, er); 459 if (typeof cb === 'function') { 460 defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, cb, er); 461 } 462 463 return false; 464} 465 466Socket.prototype.setTimeout = setStreamTimeout; 467 468 469Socket.prototype._onTimeout = function() { 470 const handle = this._handle; 471 const lastWriteQueueSize = this[kLastWriteQueueSize]; 472 if (lastWriteQueueSize > 0 && handle) { 473 // `lastWriteQueueSize !== writeQueueSize` means there is 474 // an active write in progress, so we suppress the timeout. 475 const { writeQueueSize } = handle; 476 if (lastWriteQueueSize !== writeQueueSize) { 477 this[kLastWriteQueueSize] = writeQueueSize; 478 this._unrefTimer(); 479 return; 480 } 481 } 482 debug('_onTimeout'); 483 this.emit('timeout'); 484}; 485 486 487Socket.prototype.setNoDelay = function(enable) { 488 if (!this._handle) { 489 this.once('connect', 490 enable ? this.setNoDelay : () => this.setNoDelay(enable)); 491 return this; 492 } 493 494 // Backwards compatibility: assume true when `enable` is omitted 495 const newValue = enable === undefined ? true : !!enable; 496 if (this._handle.setNoDelay && newValue !== this[kSetNoDelay]) { 497 this[kSetNoDelay] = newValue; 498 this._handle.setNoDelay(newValue); 499 } 500 501 return this; 502}; 503 504 505Socket.prototype.setKeepAlive = function(setting, msecs) { 506 if (!this._handle) { 507 this.once('connect', () => this.setKeepAlive(setting, msecs)); 508 return this; 509 } 510 511 if (this._handle.setKeepAlive) 512 this._handle.setKeepAlive(setting, ~~(msecs / 1000)); 513 514 return this; 515}; 516 517 518Socket.prototype.address = function() { 519 return this._getsockname(); 520}; 521 522 523ObjectDefineProperty(Socket.prototype, '_connecting', { 524 get: function() { 525 return this.connecting; 526 } 527}); 528 529ObjectDefineProperty(Socket.prototype, 'pending', { 530 get() { 531 return !this._handle || this.connecting; 532 }, 533 configurable: true 534}); 535 536 537ObjectDefineProperty(Socket.prototype, 'readyState', { 538 get: function() { 539 if (this.connecting) { 540 return 'opening'; 541 } else if (this.readable && this.writable) { 542 return 'open'; 543 } else if (this.readable && !this.writable) { 544 return 'readOnly'; 545 } else if (!this.readable && this.writable) { 546 return 'writeOnly'; 547 } 548 return 'closed'; 549 } 550}); 551 552 553ObjectDefineProperty(Socket.prototype, 'bufferSize', { 554 get: function() { 555 if (this._handle) { 556 return this.writableLength; 557 } 558 } 559}); 560 561ObjectDefineProperty(Socket.prototype, kUpdateTimer, { 562 get: function() { 563 return this._unrefTimer; 564 } 565}); 566 567 568function tryReadStart(socket) { 569 // Not already reading, start the flow 570 debug('Socket._handle.readStart'); 571 socket._handle.reading = true; 572 const err = socket._handle.readStart(); 573 if (err) 574 socket.destroy(errnoException(err, 'read')); 575} 576 577// Just call handle.readStart until we have enough in the buffer 578Socket.prototype._read = function(n) { 579 debug('_read'); 580 581 if (this.connecting || !this._handle) { 582 debug('_read wait for connection'); 583 this.once('connect', () => this._read(n)); 584 } else if (!this._handle.reading) { 585 tryReadStart(this); 586 } 587}; 588 589 590Socket.prototype.end = function(data, encoding, callback) { 591 stream.Duplex.prototype.end.call(this, data, encoding, callback); 592 DTRACE_NET_STREAM_END(this); 593 return this; 594}; 595 596 597Socket.prototype.pause = function() { 598 if (this[kBuffer] && !this.connecting && this._handle && 599 this._handle.reading) { 600 this._handle.reading = false; 601 if (!this.destroyed) { 602 const err = this._handle.readStop(); 603 if (err) 604 this.destroy(errnoException(err, 'read')); 605 } 606 } 607 return stream.Duplex.prototype.pause.call(this); 608}; 609 610 611Socket.prototype.resume = function() { 612 if (this[kBuffer] && !this.connecting && this._handle && 613 !this._handle.reading) { 614 tryReadStart(this); 615 } 616 return stream.Duplex.prototype.resume.call(this); 617}; 618 619 620Socket.prototype.read = function(n) { 621 if (this[kBuffer] && !this.connecting && this._handle && 622 !this._handle.reading) { 623 tryReadStart(this); 624 } 625 return stream.Duplex.prototype.read.call(this, n); 626}; 627 628 629// Called when the 'end' event is emitted. 630function onReadableStreamEnd() { 631 if (!this.allowHalfOpen) { 632 this.write = writeAfterFIN; 633 if (this.writable) 634 this.end(); 635 else if (!this.writableLength) 636 this.destroy(); 637 } else if (!this.destroyed && !this.writable && !this.writableLength) 638 this.destroy(); 639} 640 641 642Socket.prototype.destroySoon = function() { 643 if (this.writable) 644 this.end(); 645 646 if (this.writableFinished) 647 this.destroy(); 648 else 649 this.once('finish', this.destroy); 650}; 651 652 653Socket.prototype._destroy = function(exception, cb) { 654 debug('destroy'); 655 656 this.connecting = false; 657 658 this.readable = this.writable = false; 659 660 for (let s = this; s !== null; s = s._parent) { 661 clearTimeout(s[kTimeout]); 662 } 663 664 debug('close'); 665 if (this._handle) { 666 if (this !== process.stderr) 667 debug('close handle'); 668 const isException = exception ? true : false; 669 // `bytesRead` and `kBytesWritten` should be accessible after `.destroy()` 670 this[kBytesRead] = this._handle.bytesRead; 671 this[kBytesWritten] = this._handle.bytesWritten; 672 673 this._handle.close(() => { 674 debug('emit close'); 675 this.emit('close', isException); 676 }); 677 this._handle.onread = noop; 678 this._handle = null; 679 this._sockname = null; 680 cb(exception); 681 } else { 682 cb(exception); 683 process.nextTick(emitCloseNT, this); 684 } 685 686 if (this._server) { 687 debug('has server'); 688 this._server._connections--; 689 if (this._server._emitCloseIfDrained) { 690 this._server._emitCloseIfDrained(); 691 } 692 } 693}; 694 695Socket.prototype._getpeername = function() { 696 if (!this._peername) { 697 if (!this._handle || !this._handle.getpeername) { 698 return {}; 699 } 700 const out = {}; 701 const err = this._handle.getpeername(out); 702 if (err) return {}; // FIXME(bnoordhuis) Throw? 703 this._peername = out; 704 } 705 return this._peername; 706}; 707 708function protoGetter(name, callback) { 709 ObjectDefineProperty(Socket.prototype, name, { 710 configurable: false, 711 enumerable: true, 712 get: callback 713 }); 714} 715 716protoGetter('bytesRead', function bytesRead() { 717 return this._handle ? this._handle.bytesRead : this[kBytesRead]; 718}); 719 720protoGetter('remoteAddress', function remoteAddress() { 721 return this._getpeername().address; 722}); 723 724protoGetter('remoteFamily', function remoteFamily() { 725 return this._getpeername().family; 726}); 727 728protoGetter('remotePort', function remotePort() { 729 return this._getpeername().port; 730}); 731 732 733Socket.prototype._getsockname = function() { 734 if (!this._handle || !this._handle.getsockname) { 735 return {}; 736 } 737 if (!this._sockname) { 738 const out = {}; 739 const err = this._handle.getsockname(out); 740 if (err) return {}; // FIXME(bnoordhuis) Throw? 741 this._sockname = out; 742 } 743 return this._sockname; 744}; 745 746 747protoGetter('localAddress', function localAddress() { 748 return this._getsockname().address; 749}); 750 751 752protoGetter('localPort', function localPort() { 753 return this._getsockname().port; 754}); 755 756 757Socket.prototype[kAfterAsyncWrite] = function() { 758 this[kLastWriteQueueSize] = 0; 759}; 760 761Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { 762 // If we are still connecting, then buffer this for later. 763 // The Writable logic will buffer up any more writes while 764 // waiting for this one to be done. 765 if (this.connecting) { 766 this._pendingData = data; 767 this._pendingEncoding = encoding; 768 this.once('connect', function connect() { 769 this._writeGeneric(writev, data, encoding, cb); 770 }); 771 return; 772 } 773 this._pendingData = null; 774 this._pendingEncoding = ''; 775 776 if (!this._handle) { 777 cb(new ERR_SOCKET_CLOSED()); 778 return false; 779 } 780 781 this._unrefTimer(); 782 783 let req; 784 if (writev) 785 req = writevGeneric(this, data, cb); 786 else 787 req = writeGeneric(this, data, encoding, cb); 788 if (req.async) 789 this[kLastWriteQueueSize] = req.bytes; 790}; 791 792 793Socket.prototype._writev = function(chunks, cb) { 794 this._writeGeneric(true, chunks, '', cb); 795}; 796 797 798Socket.prototype._write = function(data, encoding, cb) { 799 this._writeGeneric(false, data, encoding, cb); 800}; 801 802 803// Legacy alias. Having this is probably being overly cautious, but it doesn't 804// really hurt anyone either. This can probably be removed safely if desired. 805protoGetter('_bytesDispatched', function _bytesDispatched() { 806 return this._handle ? this._handle.bytesWritten : this[kBytesWritten]; 807}); 808 809protoGetter('bytesWritten', function bytesWritten() { 810 let bytes = this._bytesDispatched; 811 const data = this._pendingData; 812 const encoding = this._pendingEncoding; 813 const writableBuffer = this.writableBuffer; 814 815 if (!writableBuffer) 816 return undefined; 817 818 for (const el of writableBuffer) { 819 bytes += el.chunk instanceof Buffer ? 820 el.chunk.length : 821 Buffer.byteLength(el.chunk, el.encoding); 822 } 823 824 if (ArrayIsArray(data)) { 825 // Was a writev, iterate over chunks to get total length 826 for (let i = 0; i < data.length; i++) { 827 const chunk = data[i]; 828 829 if (data.allBuffers || chunk instanceof Buffer) 830 bytes += chunk.length; 831 else 832 bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); 833 } 834 } else if (data) { 835 // Writes are either a string or a Buffer. 836 if (typeof data !== 'string') 837 bytes += data.length; 838 else 839 bytes += Buffer.byteLength(data, encoding); 840 } 841 842 return bytes; 843}); 844 845 846function checkBindError(err, port, handle) { 847 // EADDRINUSE may not be reported until we call listen() or connect(). 848 // To complicate matters, a failed bind() followed by listen() or connect() 849 // will implicitly bind to a random port. Ergo, check that the socket is 850 // bound to the expected port before calling listen() or connect(). 851 // 852 // FIXME(bnoordhuis) Doesn't work for pipe handles, they don't have a 853 // getsockname() method. Non-issue for now, the cluster module doesn't 854 // really support pipes anyway. 855 if (err === 0 && port > 0 && handle.getsockname) { 856 const out = {}; 857 err = handle.getsockname(out); 858 if (err === 0 && port !== out.port) { 859 debug(`checkBindError, bound to ${out.port} instead of ${port}`); 860 err = UV_EADDRINUSE; 861 } 862 } 863 return err; 864} 865 866 867function internalConnect( 868 self, address, port, addressType, localAddress, localPort, flags) { 869 // TODO return promise from Socket.prototype.connect which 870 // wraps _connectReq. 871 872 assert(self.connecting); 873 874 let err; 875 876 if (localAddress || localPort) { 877 if (addressType === 4) { 878 localAddress = localAddress || DEFAULT_IPV4_ADDR; 879 err = self._handle.bind(localAddress, localPort); 880 } else { // addressType === 6 881 localAddress = localAddress || DEFAULT_IPV6_ADDR; 882 err = self._handle.bind6(localAddress, localPort, flags); 883 } 884 debug('binding to localAddress: %s and localPort: %d (addressType: %d)', 885 localAddress, localPort, addressType); 886 887 err = checkBindError(err, localPort, self._handle); 888 if (err) { 889 const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort); 890 self.destroy(ex); 891 return; 892 } 893 } 894 895 if (addressType === 6 || addressType === 4) { 896 const req = new TCPConnectWrap(); 897 req.oncomplete = afterConnect; 898 req.address = address; 899 req.port = port; 900 req.localAddress = localAddress; 901 req.localPort = localPort; 902 903 if (addressType === 4) 904 err = self._handle.connect(req, address, port); 905 else 906 err = self._handle.connect6(req, address, port); 907 } else { 908 const req = new PipeConnectWrap(); 909 req.address = address; 910 req.oncomplete = afterConnect; 911 912 err = self._handle.connect(req, address, afterConnect); 913 } 914 915 if (err) { 916 const sockname = self._getsockname(); 917 let details; 918 919 if (sockname) { 920 details = sockname.address + ':' + sockname.port; 921 } 922 923 const ex = exceptionWithHostPort(err, 'connect', address, port, details); 924 self.destroy(ex); 925 } 926} 927 928 929Socket.prototype.connect = function(...args) { 930 let normalized; 931 // If passed an array, it's treated as an array of arguments that have 932 // already been normalized (so we don't normalize more than once). This has 933 // been solved before in https://github.com/nodejs/node/pull/12342, but was 934 // reverted as it had unintended side effects. 935 if (ArrayIsArray(args[0]) && args[0][normalizedArgsSymbol]) { 936 normalized = args[0]; 937 } else { 938 normalized = normalizeArgs(args); 939 } 940 const options = normalized[0]; 941 const cb = normalized[1]; 942 943 if (this.write !== Socket.prototype.write) 944 this.write = Socket.prototype.write; 945 946 if (this.destroyed) { 947 this._handle = null; 948 this._peername = null; 949 this._sockname = null; 950 } 951 952 const { path } = options; 953 const pipe = !!path; 954 debug('pipe', pipe, path); 955 956 if (!this._handle) { 957 this._handle = pipe ? 958 new Pipe(PipeConstants.SOCKET) : 959 new TCP(TCPConstants.SOCKET); 960 initSocketHandle(this); 961 } 962 963 if (cb !== null) { 964 this.once('connect', cb); 965 } 966 967 this._unrefTimer(); 968 969 this.connecting = true; 970 this.writable = true; 971 972 if (pipe) { 973 validateString(path, 'options.path'); 974 defaultTriggerAsyncIdScope( 975 this[async_id_symbol], internalConnect, this, path 976 ); 977 } else { 978 lookupAndConnect(this, options); 979 } 980 return this; 981}; 982 983 984function lookupAndConnect(self, options) { 985 const { localAddress, localPort } = options; 986 const host = options.host || 'localhost'; 987 let { port } = options; 988 989 if (localAddress && !isIP(localAddress)) { 990 throw new ERR_INVALID_IP_ADDRESS(localAddress); 991 } 992 993 if (localPort && typeof localPort !== 'number') { 994 throw new ERR_INVALID_ARG_TYPE('options.localPort', 'number', localPort); 995 } 996 997 if (typeof port !== 'undefined') { 998 if (typeof port !== 'number' && typeof port !== 'string') { 999 throw new ERR_INVALID_ARG_TYPE('options.port', 1000 ['number', 'string'], port); 1001 } 1002 validatePort(port); 1003 } 1004 port |= 0; 1005 1006 // If host is an IP, skip performing a lookup 1007 const addressType = isIP(host); 1008 if (addressType) { 1009 defaultTriggerAsyncIdScope(self[async_id_symbol], process.nextTick, () => { 1010 if (self.connecting) 1011 defaultTriggerAsyncIdScope( 1012 self[async_id_symbol], 1013 internalConnect, 1014 self, host, port, addressType, localAddress, localPort 1015 ); 1016 }); 1017 return; 1018 } 1019 1020 if (options.lookup && typeof options.lookup !== 'function') 1021 throw new ERR_INVALID_ARG_TYPE('options.lookup', 1022 'Function', options.lookup); 1023 1024 1025 if (dns === undefined) dns = require('dns'); 1026 const dnsopts = { 1027 family: options.family, 1028 hints: options.hints || 0 1029 }; 1030 1031 if (!isWindows && 1032 dnsopts.family !== 4 && 1033 dnsopts.family !== 6 && 1034 dnsopts.hints === 0) { 1035 dnsopts.hints = dns.ADDRCONFIG; 1036 } 1037 1038 debug('connect: find host', host); 1039 debug('connect: dns options', dnsopts); 1040 self._host = host; 1041 const lookup = options.lookup || dns.lookup; 1042 defaultTriggerAsyncIdScope(self[async_id_symbol], function() { 1043 lookup(host, dnsopts, function emitLookup(err, ip, addressType) { 1044 self.emit('lookup', err, ip, addressType, host); 1045 1046 // It's possible we were destroyed while looking this up. 1047 // XXX it would be great if we could cancel the promise returned by 1048 // the look up. 1049 if (!self.connecting) return; 1050 1051 if (err) { 1052 // net.createConnection() creates a net.Socket object and immediately 1053 // calls net.Socket.connect() on it (that's us). There are no event 1054 // listeners registered yet so defer the error event to the next tick. 1055 process.nextTick(connectErrorNT, self, err); 1056 } else if (!isIP(ip)) { 1057 err = new ERR_INVALID_IP_ADDRESS(ip); 1058 process.nextTick(connectErrorNT, self, err); 1059 } else if (addressType !== 4 && addressType !== 6) { 1060 err = new ERR_INVALID_ADDRESS_FAMILY(addressType, 1061 options.host, 1062 options.port); 1063 process.nextTick(connectErrorNT, self, err); 1064 } else { 1065 self._unrefTimer(); 1066 defaultTriggerAsyncIdScope( 1067 self[async_id_symbol], 1068 internalConnect, 1069 self, ip, port, addressType, localAddress, localPort 1070 ); 1071 } 1072 }); 1073 }); 1074} 1075 1076 1077function connectErrorNT(self, err) { 1078 self.destroy(err); 1079} 1080 1081 1082Socket.prototype.ref = function() { 1083 if (!this._handle) { 1084 this.once('connect', this.ref); 1085 return this; 1086 } 1087 1088 if (typeof this._handle.ref === 'function') { 1089 this._handle.ref(); 1090 } 1091 1092 return this; 1093}; 1094 1095 1096Socket.prototype.unref = function() { 1097 if (!this._handle) { 1098 this.once('connect', this.unref); 1099 return this; 1100 } 1101 1102 if (typeof this._handle.unref === 'function') { 1103 this._handle.unref(); 1104 } 1105 1106 return this; 1107}; 1108 1109 1110function afterConnect(status, handle, req, readable, writable) { 1111 const self = handle[owner_symbol]; 1112 1113 // Callback may come after call to destroy 1114 if (self.destroyed) { 1115 return; 1116 } 1117 1118 debug('afterConnect'); 1119 1120 assert(self.connecting); 1121 self.connecting = false; 1122 self._sockname = null; 1123 1124 if (status === 0) { 1125 self.readable = readable; 1126 if (!self._writableState.ended) 1127 self.writable = writable; 1128 self._unrefTimer(); 1129 1130 self.emit('connect'); 1131 self.emit('ready'); 1132 1133 // Start the first read, or get an immediate EOF. 1134 // this doesn't actually consume any bytes, because len=0. 1135 if (readable && !self.isPaused()) 1136 self.read(0); 1137 1138 } else { 1139 self.connecting = false; 1140 let details; 1141 if (req.localAddress && req.localPort) { 1142 details = req.localAddress + ':' + req.localPort; 1143 } 1144 const ex = exceptionWithHostPort(status, 1145 'connect', 1146 req.address, 1147 req.port, 1148 details); 1149 if (details) { 1150 ex.localAddress = req.localAddress; 1151 ex.localPort = req.localPort; 1152 } 1153 self.destroy(ex); 1154 } 1155} 1156 1157 1158function Server(options, connectionListener) { 1159 if (!(this instanceof Server)) 1160 return new Server(options, connectionListener); 1161 1162 EventEmitter.call(this); 1163 1164 if (typeof options === 'function') { 1165 connectionListener = options; 1166 options = {}; 1167 this.on('connection', connectionListener); 1168 } else if (options == null || typeof options === 'object') { 1169 options = { ...options }; 1170 1171 if (typeof connectionListener === 'function') { 1172 this.on('connection', connectionListener); 1173 } 1174 } else { 1175 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 1176 } 1177 1178 this._connections = 0; 1179 1180 ObjectDefineProperty(this, 'connections', { 1181 get: deprecate(() => { 1182 1183 if (this._usingWorkers) { 1184 return null; 1185 } 1186 return this._connections; 1187 }, 'Server.connections property is deprecated. ' + 1188 'Use Server.getConnections method instead.', 'DEP0020'), 1189 set: deprecate((val) => (this._connections = val), 1190 'Server.connections property is deprecated.', 1191 'DEP0020'), 1192 configurable: true, enumerable: false 1193 }); 1194 1195 this[async_id_symbol] = -1; 1196 this._handle = null; 1197 this._usingWorkers = false; 1198 this._workers = []; 1199 this._unref = false; 1200 1201 this.allowHalfOpen = options.allowHalfOpen || false; 1202 this.pauseOnConnect = !!options.pauseOnConnect; 1203} 1204ObjectSetPrototypeOf(Server.prototype, EventEmitter.prototype); 1205ObjectSetPrototypeOf(Server, EventEmitter); 1206 1207 1208function toNumber(x) { return (x = Number(x)) >= 0 ? x : false; } 1209 1210// Returns handle if it can be created, or error code if it can't 1211function createServerHandle(address, port, addressType, fd, flags) { 1212 let err = 0; 1213 // Assign handle in listen, and clean up if bind or listen fails 1214 let handle; 1215 1216 let isTCP = false; 1217 if (typeof fd === 'number' && fd >= 0) { 1218 try { 1219 handle = createHandle(fd, true); 1220 } catch (e) { 1221 // Not a fd we can listen on. This will trigger an error. 1222 debug('listen invalid fd=%d:', fd, e.message); 1223 return UV_EINVAL; 1224 } 1225 1226 err = handle.open(fd); 1227 if (err) 1228 return err; 1229 1230 assert(!address && !port); 1231 } else if (port === -1 && addressType === -1) { 1232 handle = new Pipe(PipeConstants.SERVER); 1233 if (isWindows) { 1234 const instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES); 1235 if (!NumberIsNaN(instances)) { 1236 handle.setPendingInstances(instances); 1237 } 1238 } 1239 } else { 1240 handle = new TCP(TCPConstants.SERVER); 1241 isTCP = true; 1242 } 1243 1244 if (address || port || isTCP) { 1245 debug('bind to', address || 'any'); 1246 if (!address) { 1247 // Try binding to ipv6 first 1248 err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags); 1249 if (err) { 1250 handle.close(); 1251 // Fallback to ipv4 1252 return createServerHandle(DEFAULT_IPV4_ADDR, port); 1253 } 1254 } else if (addressType === 6) { 1255 err = handle.bind6(address, port, flags); 1256 } else { 1257 err = handle.bind(address, port); 1258 } 1259 } 1260 1261 if (err) { 1262 handle.close(); 1263 return err; 1264 } 1265 1266 return handle; 1267} 1268 1269function setupListenHandle(address, port, addressType, backlog, fd, flags) { 1270 debug('setupListenHandle', address, port, addressType, backlog, fd); 1271 1272 // If there is not yet a handle, we need to create one and bind. 1273 // In the case of a server sent via IPC, we don't need to do this. 1274 if (this._handle) { 1275 debug('setupListenHandle: have a handle already'); 1276 } else { 1277 debug('setupListenHandle: create a handle'); 1278 1279 let rval = null; 1280 1281 // Try to bind to the unspecified IPv6 address, see if IPv6 is available 1282 if (!address && typeof fd !== 'number') { 1283 rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags); 1284 1285 if (typeof rval === 'number') { 1286 rval = null; 1287 address = DEFAULT_IPV4_ADDR; 1288 addressType = 4; 1289 } else { 1290 address = DEFAULT_IPV6_ADDR; 1291 addressType = 6; 1292 } 1293 } 1294 1295 if (rval === null) 1296 rval = createServerHandle(address, port, addressType, fd, flags); 1297 1298 if (typeof rval === 'number') { 1299 const error = uvExceptionWithHostPort(rval, 'listen', address, port); 1300 process.nextTick(emitErrorNT, this, error); 1301 return; 1302 } 1303 this._handle = rval; 1304 } 1305 1306 this[async_id_symbol] = getNewAsyncId(this._handle); 1307 this._handle.onconnection = onconnection; 1308 this._handle[owner_symbol] = this; 1309 1310 // Use a backlog of 512 entries. We pass 511 to the listen() call because 1311 // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); 1312 // which will thus give us a backlog of 512 entries. 1313 const err = this._handle.listen(backlog || 511); 1314 1315 if (err) { 1316 const ex = uvExceptionWithHostPort(err, 'listen', address, port); 1317 this._handle.close(); 1318 this._handle = null; 1319 defaultTriggerAsyncIdScope(this[async_id_symbol], 1320 process.nextTick, 1321 emitErrorNT, 1322 this, 1323 ex); 1324 return; 1325 } 1326 1327 // Generate connection key, this should be unique to the connection 1328 this._connectionKey = addressType + ':' + address + ':' + port; 1329 1330 // Unref the handle if the server was unref'ed prior to listening 1331 if (this._unref) 1332 this.unref(); 1333 1334 defaultTriggerAsyncIdScope(this[async_id_symbol], 1335 process.nextTick, 1336 emitListeningNT, 1337 this); 1338} 1339 1340Server.prototype._listen2 = setupListenHandle; // legacy alias 1341 1342function emitErrorNT(self, err) { 1343 self.emit('error', err); 1344} 1345 1346 1347function emitListeningNT(self) { 1348 // Ensure handle hasn't closed 1349 if (self._handle) 1350 self.emit('listening'); 1351} 1352 1353 1354function listenInCluster(server, address, port, addressType, 1355 backlog, fd, exclusive, flags) { 1356 exclusive = !!exclusive; 1357 1358 if (cluster === undefined) cluster = require('cluster'); 1359 1360 if (cluster.isMaster || exclusive) { 1361 // Will create a new handle 1362 // _listen2 sets up the listened handle, it is still named like this 1363 // to avoid breaking code that wraps this method 1364 server._listen2(address, port, addressType, backlog, fd, flags); 1365 return; 1366 } 1367 1368 const serverQuery = { 1369 address: address, 1370 port: port, 1371 addressType: addressType, 1372 fd: fd, 1373 flags, 1374 }; 1375 1376 // Get the master's server handle, and listen on it 1377 cluster._getServer(server, serverQuery, listenOnMasterHandle); 1378 1379 function listenOnMasterHandle(err, handle) { 1380 err = checkBindError(err, port, handle); 1381 1382 if (err) { 1383 const ex = exceptionWithHostPort(err, 'bind', address, port); 1384 return server.emit('error', ex); 1385 } 1386 1387 // Reuse master's server handle 1388 server._handle = handle; 1389 // _listen2 sets up the listened handle, it is still named like this 1390 // to avoid breaking code that wraps this method 1391 server._listen2(address, port, addressType, backlog, fd, flags); 1392 } 1393} 1394 1395 1396Server.prototype.listen = function(...args) { 1397 const normalized = normalizeArgs(args); 1398 let options = normalized[0]; 1399 const cb = normalized[1]; 1400 1401 if (this._handle) { 1402 throw new ERR_SERVER_ALREADY_LISTEN(); 1403 } 1404 1405 if (cb !== null) { 1406 this.once('listening', cb); 1407 } 1408 const backlogFromArgs = 1409 // (handle, backlog) or (path, backlog) or (port, backlog) 1410 toNumber(args.length > 1 && args[1]) || 1411 toNumber(args.length > 2 && args[2]); // (port, host, backlog) 1412 1413 options = options._handle || options.handle || options; 1414 const flags = getFlags(options.ipv6Only); 1415 // (handle[, backlog][, cb]) where handle is an object with a handle 1416 if (options instanceof TCP) { 1417 this._handle = options; 1418 this[async_id_symbol] = this._handle.getAsyncId(); 1419 listenInCluster(this, null, -1, -1, backlogFromArgs); 1420 return this; 1421 } 1422 // (handle[, backlog][, cb]) where handle is an object with a fd 1423 if (typeof options.fd === 'number' && options.fd >= 0) { 1424 listenInCluster(this, null, null, null, backlogFromArgs, options.fd); 1425 return this; 1426 } 1427 1428 // ([port][, host][, backlog][, cb]) where port is omitted, 1429 // that is, listen(), listen(null), listen(cb), or listen(null, cb) 1430 // or (options[, cb]) where options.port is explicitly set as undefined or 1431 // null, bind to an arbitrary unused port 1432 if (args.length === 0 || typeof args[0] === 'function' || 1433 (typeof options.port === 'undefined' && 'port' in options) || 1434 options.port === null) { 1435 options.port = 0; 1436 } 1437 // ([port][, host][, backlog][, cb]) where port is specified 1438 // or (options[, cb]) where options.port is specified 1439 // or if options.port is normalized as 0 before 1440 let backlog; 1441 if (typeof options.port === 'number' || typeof options.port === 'string') { 1442 validatePort(options.port, 'options.port'); 1443 backlog = options.backlog || backlogFromArgs; 1444 // start TCP server listening on host:port 1445 if (options.host) { 1446 lookupAndListen(this, options.port | 0, options.host, backlog, 1447 options.exclusive, flags); 1448 } else { // Undefined host, listens on unspecified address 1449 // Default addressType 4 will be used to search for master server 1450 listenInCluster(this, null, options.port | 0, 4, 1451 backlog, undefined, options.exclusive); 1452 } 1453 return this; 1454 } 1455 1456 // (path[, backlog][, cb]) or (options[, cb]) 1457 // where path or options.path is a UNIX domain socket or Windows pipe 1458 if (options.path && isPipeName(options.path)) { 1459 const pipeName = this._pipeName = options.path; 1460 backlog = options.backlog || backlogFromArgs; 1461 listenInCluster(this, pipeName, -1, -1, 1462 backlog, undefined, options.exclusive); 1463 1464 if (!this._handle) { 1465 // Failed and an error shall be emitted in the next tick. 1466 // Therefore, we directly return. 1467 return this; 1468 } 1469 1470 let mode = 0; 1471 if (options.readableAll === true) 1472 mode |= PipeConstants.UV_READABLE; 1473 if (options.writableAll === true) 1474 mode |= PipeConstants.UV_WRITABLE; 1475 if (mode !== 0) { 1476 const err = this._handle.fchmod(mode); 1477 if (err) { 1478 this._handle.close(); 1479 this._handle = null; 1480 throw errnoException(err, 'uv_pipe_chmod'); 1481 } 1482 } 1483 return this; 1484 } 1485 1486 if (!(('port' in options) || ('path' in options))) { 1487 throw new ERR_INVALID_ARG_VALUE('options', options, 1488 'must have the property "port" or "path"'); 1489 } 1490 1491 throw new ERR_INVALID_OPT_VALUE('options', options); 1492}; 1493 1494function lookupAndListen(self, port, address, backlog, exclusive, flags) { 1495 if (dns === undefined) dns = require('dns'); 1496 dns.lookup(address, function doListen(err, ip, addressType) { 1497 if (err) { 1498 self.emit('error', err); 1499 } else { 1500 addressType = ip ? addressType : 4; 1501 listenInCluster(self, ip, port, addressType, 1502 backlog, undefined, exclusive, flags); 1503 } 1504 }); 1505} 1506 1507ObjectDefineProperty(Server.prototype, 'listening', { 1508 get: function() { 1509 return !!this._handle; 1510 }, 1511 configurable: true, 1512 enumerable: true 1513}); 1514 1515Server.prototype.address = function() { 1516 if (this._handle && this._handle.getsockname) { 1517 const out = {}; 1518 const err = this._handle.getsockname(out); 1519 if (err) { 1520 throw errnoException(err, 'address'); 1521 } 1522 return out; 1523 } else if (this._pipeName) { 1524 return this._pipeName; 1525 } 1526 return null; 1527}; 1528 1529function onconnection(err, clientHandle) { 1530 const handle = this; 1531 const self = handle[owner_symbol]; 1532 1533 debug('onconnection'); 1534 1535 if (err) { 1536 self.emit('error', errnoException(err, 'accept')); 1537 return; 1538 } 1539 1540 if (self.maxConnections && self._connections >= self.maxConnections) { 1541 clientHandle.close(); 1542 return; 1543 } 1544 1545 const socket = new Socket({ 1546 handle: clientHandle, 1547 allowHalfOpen: self.allowHalfOpen, 1548 pauseOnCreate: self.pauseOnConnect, 1549 readable: true, 1550 writable: true 1551 }); 1552 1553 self._connections++; 1554 socket.server = self; 1555 socket._server = self; 1556 1557 DTRACE_NET_SERVER_CONNECTION(socket); 1558 self.emit('connection', socket); 1559} 1560 1561 1562Server.prototype.getConnections = function(cb) { 1563 const self = this; 1564 1565 function end(err, connections) { 1566 defaultTriggerAsyncIdScope(self[async_id_symbol], 1567 process.nextTick, 1568 cb, 1569 err, 1570 connections); 1571 } 1572 1573 if (!this._usingWorkers) { 1574 end(null, this._connections); 1575 return this; 1576 } 1577 1578 // Poll workers 1579 let left = this._workers.length; 1580 let total = this._connections; 1581 1582 function oncount(err, count) { 1583 if (err) { 1584 left = -1; 1585 return end(err); 1586 } 1587 1588 total += count; 1589 if (--left === 0) return end(null, total); 1590 } 1591 1592 for (let n = 0; n < this._workers.length; n++) { 1593 this._workers[n].getConnections(oncount); 1594 } 1595 1596 return this; 1597}; 1598 1599 1600Server.prototype.close = function(cb) { 1601 if (typeof cb === 'function') { 1602 if (!this._handle) { 1603 this.once('close', function close() { 1604 cb(new ERR_SERVER_NOT_RUNNING()); 1605 }); 1606 } else { 1607 this.once('close', cb); 1608 } 1609 } 1610 1611 if (this._handle) { 1612 this._handle.close(); 1613 this._handle = null; 1614 } 1615 1616 if (this._usingWorkers) { 1617 let left = this._workers.length; 1618 const onWorkerClose = () => { 1619 if (--left !== 0) return; 1620 1621 this._connections = 0; 1622 this._emitCloseIfDrained(); 1623 }; 1624 1625 // Increment connections to be sure that, even if all sockets will be closed 1626 // during polling of workers, `close` event will be emitted only once. 1627 this._connections++; 1628 1629 // Poll workers 1630 for (let n = 0; n < this._workers.length; n++) 1631 this._workers[n].close(onWorkerClose); 1632 } else { 1633 this._emitCloseIfDrained(); 1634 } 1635 1636 return this; 1637}; 1638 1639Server.prototype._emitCloseIfDrained = function() { 1640 debug('SERVER _emitCloseIfDrained'); 1641 1642 if (this._handle || this._connections) { 1643 debug('SERVER handle? %j connections? %d', 1644 !!this._handle, this._connections); 1645 return; 1646 } 1647 1648 defaultTriggerAsyncIdScope(this[async_id_symbol], 1649 process.nextTick, 1650 emitCloseNT, 1651 this); 1652}; 1653 1654 1655function emitCloseNT(self) { 1656 debug('SERVER: emit close'); 1657 self.emit('close'); 1658} 1659 1660 1661Server.prototype[EventEmitter.captureRejectionSymbol] = function( 1662 err, event, sock) { 1663 1664 switch (event) { 1665 case 'connection': 1666 sock.destroy(err); 1667 break; 1668 default: 1669 this.emit('error', err); 1670 } 1671}; 1672 1673 1674// Legacy alias on the C++ wrapper object. This is not public API, so we may 1675// want to runtime-deprecate it at some point. There's no hurry, though. 1676ObjectDefineProperty(TCP.prototype, 'owner', { 1677 get() { return this[owner_symbol]; }, 1678 set(v) { return this[owner_symbol] = v; } 1679}); 1680 1681ObjectDefineProperty(Socket.prototype, '_handle', { 1682 get() { return this[kHandle]; }, 1683 set(v) { return this[kHandle] = v; } 1684}); 1685 1686Server.prototype._setupWorker = function(socketList) { 1687 this._usingWorkers = true; 1688 this._workers.push(socketList); 1689 socketList.once('exit', (socketList) => { 1690 const index = this._workers.indexOf(socketList); 1691 this._workers.splice(index, 1); 1692 }); 1693}; 1694 1695Server.prototype.ref = function() { 1696 this._unref = false; 1697 1698 if (this._handle) 1699 this._handle.ref(); 1700 1701 return this; 1702}; 1703 1704Server.prototype.unref = function() { 1705 this._unref = true; 1706 1707 if (this._handle) 1708 this._handle.unref(); 1709 1710 return this; 1711}; 1712 1713let _setSimultaneousAccepts; 1714let warnSimultaneousAccepts = true; 1715 1716if (isWindows) { 1717 let simultaneousAccepts; 1718 1719 _setSimultaneousAccepts = function(handle) { 1720 if (warnSimultaneousAccepts) { 1721 process.emitWarning( 1722 'net._setSimultaneousAccepts() is deprecated and will be removed.', 1723 'DeprecationWarning', 'DEP0121'); 1724 warnSimultaneousAccepts = false; 1725 } 1726 if (handle === undefined) { 1727 return; 1728 } 1729 1730 if (simultaneousAccepts === undefined) { 1731 simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS && 1732 process.env.NODE_MANY_ACCEPTS !== '0'); 1733 } 1734 1735 if (handle._simultaneousAccepts !== simultaneousAccepts) { 1736 handle.setSimultaneousAccepts(!!simultaneousAccepts); 1737 handle._simultaneousAccepts = simultaneousAccepts; 1738 } 1739 }; 1740} else { 1741 _setSimultaneousAccepts = function() { 1742 if (warnSimultaneousAccepts) { 1743 process.emitWarning( 1744 'net._setSimultaneousAccepts() is deprecated and will be removed.', 1745 'DeprecationWarning', 'DEP0121'); 1746 warnSimultaneousAccepts = false; 1747 } 1748 }; 1749} 1750 1751module.exports = { 1752 _createServerHandle: createServerHandle, 1753 _normalizeArgs: normalizeArgs, 1754 _setSimultaneousAccepts, 1755 connect, 1756 createConnection: connect, 1757 createServer, 1758 isIP: isIP, 1759 isIPv4: isIPv4, 1760 isIPv6: isIPv6, 1761 Server, 1762 Socket, 1763 Stream: Socket, // Legacy naming 1764}; 1765