1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 Array, 26 ArrayIsArray, 27 ArrayPrototypePush, 28 FunctionPrototypeBind, 29 FunctionPrototypeCall, 30 ObjectDefineProperty, 31 ObjectSetPrototypeOf, 32 ReflectApply, 33 SymbolAsyncDispose, 34 SymbolDispose, 35} = primordials; 36 37const errors = require('internal/errors'); 38const { 39 kStateSymbol, 40 _createSocketHandle, 41 newHandle, 42} = require('internal/dgram'); 43const { guessHandleType } = internalBinding('util'); 44const { 45 ERR_BUFFER_OUT_OF_BOUNDS, 46 ERR_INVALID_ARG_TYPE, 47 ERR_MISSING_ARGS, 48 ERR_SOCKET_ALREADY_BOUND, 49 ERR_SOCKET_BAD_BUFFER_SIZE, 50 ERR_SOCKET_BUFFER_SIZE, 51 ERR_SOCKET_DGRAM_IS_CONNECTED, 52 ERR_SOCKET_DGRAM_NOT_CONNECTED, 53 ERR_SOCKET_DGRAM_NOT_RUNNING, 54 ERR_INVALID_FD_TYPE, 55} = errors.codes; 56const { 57 isInt32, 58 validateAbortSignal, 59 validateString, 60 validateNumber, 61 validatePort, 62} = require('internal/validators'); 63const { Buffer } = require('buffer'); 64const { deprecate, promisify } = require('internal/util'); 65const { isArrayBufferView } = require('internal/util/types'); 66const EventEmitter = require('events'); 67const { 68 defaultTriggerAsyncIdScope, 69 symbols: { async_id_symbol, owner_symbol }, 70} = require('internal/async_hooks'); 71const { UV_UDP_REUSEADDR } = internalBinding('constants').os; 72 73const { 74 constants: { UV_UDP_IPV6ONLY }, 75 UDP, 76 SendWrap, 77} = internalBinding('udp_wrap'); 78 79const dc = require('diagnostics_channel'); 80const udpSocketChannel = dc.channel('udp.socket'); 81 82const BIND_STATE_UNBOUND = 0; 83const BIND_STATE_BINDING = 1; 84const BIND_STATE_BOUND = 2; 85 86const CONNECT_STATE_DISCONNECTED = 0; 87const CONNECT_STATE_CONNECTING = 1; 88const CONNECT_STATE_CONNECTED = 2; 89 90const RECV_BUFFER = true; 91const SEND_BUFFER = false; 92 93// Lazily loaded 94let _cluster = null; 95function lazyLoadCluster() { 96 if (!_cluster) _cluster = require('cluster'); 97 return _cluster; 98} 99 100const errnoException = errors.errnoException; 101const exceptionWithHostPort = errors.exceptionWithHostPort; 102 103 104function Socket(type, listener) { 105 FunctionPrototypeCall(EventEmitter, this); 106 let lookup; 107 let recvBufferSize; 108 let sendBufferSize; 109 110 let options; 111 if (type !== null && typeof type === 'object') { 112 options = type; 113 type = options.type; 114 lookup = options.lookup; 115 recvBufferSize = options.recvBufferSize; 116 sendBufferSize = options.sendBufferSize; 117 } 118 119 const handle = newHandle(type, lookup); 120 handle[owner_symbol] = this; 121 122 this[async_id_symbol] = handle.getAsyncId(); 123 this.type = type; 124 125 if (typeof listener === 'function') 126 this.on('message', listener); 127 128 this[kStateSymbol] = { 129 handle, 130 receiving: false, 131 bindState: BIND_STATE_UNBOUND, 132 connectState: CONNECT_STATE_DISCONNECTED, 133 queue: undefined, 134 reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. 135 ipv6Only: options && options.ipv6Only, 136 recvBufferSize, 137 sendBufferSize, 138 }; 139 140 if (options?.signal !== undefined) { 141 const { signal } = options; 142 validateAbortSignal(signal, 'options.signal'); 143 const onAborted = () => { 144 if (this[kStateSymbol].handle) this.close(); 145 }; 146 if (signal.aborted) { 147 onAborted(); 148 } else { 149 const disposable = EventEmitter.addAbortListener(signal, onAborted); 150 this.once('close', disposable[SymbolDispose]); 151 } 152 } 153 if (udpSocketChannel.hasSubscribers) { 154 udpSocketChannel.publish({ 155 socket: this, 156 }); 157 } 158} 159ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype); 160ObjectSetPrototypeOf(Socket, EventEmitter); 161 162 163function createSocket(type, listener) { 164 return new Socket(type, listener); 165} 166 167 168function startListening(socket) { 169 const state = socket[kStateSymbol]; 170 171 state.handle.onmessage = onMessage; 172 state.handle.onerror = onError; 173 state.handle.recvStart(); 174 state.receiving = true; 175 state.bindState = BIND_STATE_BOUND; 176 177 if (state.recvBufferSize) 178 bufferSize(socket, state.recvBufferSize, RECV_BUFFER); 179 180 if (state.sendBufferSize) 181 bufferSize(socket, state.sendBufferSize, SEND_BUFFER); 182 183 socket.emit('listening'); 184} 185 186function replaceHandle(self, newHandle) { 187 const state = self[kStateSymbol]; 188 const oldHandle = state.handle; 189 // Sync the old handle state to new handle 190 if (!oldHandle.hasRef() && typeof newHandle.unref === 'function') { 191 newHandle.unref(); 192 } 193 // Set up the handle that we got from primary. 194 newHandle.lookup = oldHandle.lookup; 195 newHandle.bind = oldHandle.bind; 196 newHandle.send = oldHandle.send; 197 newHandle[owner_symbol] = self; 198 199 // Replace the existing handle by the handle we got from primary. 200 oldHandle.close(); 201 state.handle = newHandle; 202} 203 204function bufferSize(self, size, buffer) { 205 if (size >>> 0 !== size) 206 throw new ERR_SOCKET_BAD_BUFFER_SIZE(); 207 208 const ctx = {}; 209 const ret = self[kStateSymbol].handle.bufferSize(size, buffer, ctx); 210 if (ret === undefined) { 211 throw new ERR_SOCKET_BUFFER_SIZE(ctx); 212 } 213 return ret; 214} 215 216// Query primary process to get the server handle and utilize it. 217function bindServerHandle(self, options, errCb) { 218 const cluster = lazyLoadCluster(); 219 220 const state = self[kStateSymbol]; 221 cluster._getServer(self, options, (err, handle) => { 222 if (err) { 223 errCb(err); 224 return; 225 } 226 227 if (!state.handle) { 228 // Handle has been closed in the mean time. 229 return handle.close(); 230 } 231 232 replaceHandle(self, handle); 233 startListening(self); 234 }); 235} 236 237Socket.prototype.bind = function(port_, address_ /* , callback */) { 238 let port = port_; 239 240 healthCheck(this); 241 const state = this[kStateSymbol]; 242 243 if (state.bindState !== BIND_STATE_UNBOUND) 244 throw new ERR_SOCKET_ALREADY_BOUND(); 245 246 state.bindState = BIND_STATE_BINDING; 247 248 const cb = arguments.length && arguments[arguments.length - 1]; 249 if (typeof cb === 'function') { 250 function removeListeners() { 251 this.removeListener('error', removeListeners); 252 this.removeListener('listening', onListening); 253 } 254 255 function onListening() { 256 FunctionPrototypeCall(removeListeners, this); 257 FunctionPrototypeCall(cb, this); 258 } 259 260 this.on('error', removeListeners); 261 this.on('listening', onListening); 262 } 263 264 if (port !== null && 265 typeof port === 'object' && 266 typeof port.recvStart === 'function') { 267 replaceHandle(this, port); 268 startListening(this); 269 return this; 270 } 271 272 // Open an existing fd instead of creating a new one. 273 if (port !== null && typeof port === 'object' && 274 isInt32(port.fd) && port.fd > 0) { 275 const fd = port.fd; 276 const exclusive = !!port.exclusive; 277 const state = this[kStateSymbol]; 278 279 const cluster = lazyLoadCluster(); 280 281 if (cluster.isWorker && !exclusive) { 282 bindServerHandle(this, { 283 address: null, 284 port: null, 285 addressType: this.type, 286 fd, 287 flags: null, 288 }, (err) => { 289 // Callback to handle error. 290 const ex = errnoException(err, 'open'); 291 state.bindState = BIND_STATE_UNBOUND; 292 this.emit('error', ex); 293 }); 294 return this; 295 } 296 297 const type = guessHandleType(fd); 298 if (type !== 'UDP') 299 throw new ERR_INVALID_FD_TYPE(type); 300 const err = state.handle.open(fd); 301 302 if (err) 303 throw errnoException(err, 'open'); 304 305 startListening(this); 306 return this; 307 } 308 309 let address; 310 let exclusive; 311 312 if (port !== null && typeof port === 'object') { 313 address = port.address || ''; 314 exclusive = !!port.exclusive; 315 port = port.port; 316 } else { 317 address = typeof address_ === 'function' ? '' : address_; 318 exclusive = false; 319 } 320 321 // Defaulting address for bind to all interfaces 322 if (!address) { 323 if (this.type === 'udp4') 324 address = '0.0.0.0'; 325 else 326 address = '::'; 327 } 328 329 // Resolve address first 330 state.handle.lookup(address, (err, ip) => { 331 if (err) { 332 state.bindState = BIND_STATE_UNBOUND; 333 this.emit('error', err); 334 return; 335 } 336 337 const cluster = lazyLoadCluster(); 338 339 let flags = 0; 340 if (state.reuseAddr) 341 flags |= UV_UDP_REUSEADDR; 342 if (state.ipv6Only) 343 flags |= UV_UDP_IPV6ONLY; 344 345 if (cluster.isWorker && !exclusive) { 346 bindServerHandle(this, { 347 address: ip, 348 port: port, 349 addressType: this.type, 350 fd: -1, 351 flags: flags, 352 }, (err) => { 353 // Callback to handle error. 354 const ex = exceptionWithHostPort(err, 'bind', ip, port); 355 state.bindState = BIND_STATE_UNBOUND; 356 this.emit('error', ex); 357 }); 358 } else { 359 if (!state.handle) 360 return; // Handle has been closed in the mean time 361 362 const err = state.handle.bind(ip, port || 0, flags); 363 if (err) { 364 const ex = exceptionWithHostPort(err, 'bind', ip, port); 365 state.bindState = BIND_STATE_UNBOUND; 366 this.emit('error', ex); 367 // Todo: close? 368 return; 369 } 370 371 startListening(this); 372 } 373 }); 374 375 return this; 376}; 377 378Socket.prototype.connect = function(port, address, callback) { 379 port = validatePort(port, 'Port', false); 380 if (typeof address === 'function') { 381 callback = address; 382 address = ''; 383 } else if (address === undefined) { 384 address = ''; 385 } 386 387 validateString(address, 'address'); 388 389 const state = this[kStateSymbol]; 390 391 if (state.connectState !== CONNECT_STATE_DISCONNECTED) 392 throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); 393 394 state.connectState = CONNECT_STATE_CONNECTING; 395 if (state.bindState === BIND_STATE_UNBOUND) 396 this.bind({ port: 0, exclusive: true }, null); 397 398 if (state.bindState !== BIND_STATE_BOUND) { 399 enqueue(this, FunctionPrototypeBind(_connect, this, 400 port, address, callback)); 401 return; 402 } 403 404 ReflectApply(_connect, this, [port, address, callback]); 405}; 406 407 408function _connect(port, address, callback) { 409 const state = this[kStateSymbol]; 410 if (callback) 411 this.once('connect', callback); 412 413 const afterDns = (ex, ip) => { 414 defaultTriggerAsyncIdScope( 415 this[async_id_symbol], 416 doConnect, 417 ex, this, ip, address, port, callback, 418 ); 419 }; 420 421 state.handle.lookup(address, afterDns); 422} 423 424 425function doConnect(ex, self, ip, address, port, callback) { 426 const state = self[kStateSymbol]; 427 if (!state.handle) 428 return; 429 430 if (!ex) { 431 const err = state.handle.connect(ip, port); 432 if (err) { 433 ex = exceptionWithHostPort(err, 'connect', address, port); 434 } 435 } 436 437 if (ex) { 438 state.connectState = CONNECT_STATE_DISCONNECTED; 439 return process.nextTick(() => { 440 if (callback) { 441 self.removeListener('connect', callback); 442 callback(ex); 443 } else { 444 self.emit('error', ex); 445 } 446 }); 447 } 448 449 state.connectState = CONNECT_STATE_CONNECTED; 450 process.nextTick(() => self.emit('connect')); 451} 452 453 454Socket.prototype.disconnect = function() { 455 const state = this[kStateSymbol]; 456 if (state.connectState !== CONNECT_STATE_CONNECTED) 457 throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); 458 459 const err = state.handle.disconnect(); 460 if (err) 461 throw errnoException(err, 'connect'); 462 else 463 state.connectState = CONNECT_STATE_DISCONNECTED; 464}; 465 466 467// Thin wrapper around `send`, here for compatibility with dgram_legacy.js 468Socket.prototype.sendto = function(buffer, 469 offset, 470 length, 471 port, 472 address, 473 callback) { 474 validateNumber(offset, 'offset'); 475 validateNumber(length, 'length'); 476 validateNumber(port, 'port'); 477 validateString(address, 'address'); 478 479 this.send(buffer, offset, length, port, address, callback); 480}; 481 482 483function sliceBuffer(buffer, offset, length) { 484 if (typeof buffer === 'string') { 485 buffer = Buffer.from(buffer); 486 } else if (!isArrayBufferView(buffer)) { 487 throw new ERR_INVALID_ARG_TYPE('buffer', 488 ['Buffer', 489 'TypedArray', 490 'DataView', 491 'string'], 492 buffer); 493 } 494 495 offset = offset >>> 0; 496 length = length >>> 0; 497 if (offset > buffer.byteLength) { 498 throw new ERR_BUFFER_OUT_OF_BOUNDS('offset'); 499 } 500 501 if (offset + length > buffer.byteLength) { 502 throw new ERR_BUFFER_OUT_OF_BOUNDS('length'); 503 } 504 505 return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length); 506} 507 508 509function fixBufferList(list) { 510 const newlist = new Array(list.length); 511 512 for (let i = 0, l = list.length; i < l; i++) { 513 const buf = list[i]; 514 if (typeof buf === 'string') 515 newlist[i] = Buffer.from(buf); 516 else if (!isArrayBufferView(buf)) 517 return null; 518 else 519 newlist[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength); 520 } 521 522 return newlist; 523} 524 525 526function enqueue(self, toEnqueue) { 527 const state = self[kStateSymbol]; 528 529 // If the send queue hasn't been initialized yet, do it, and install an 530 // event handler that flushes the send queue after binding is done. 531 if (state.queue === undefined) { 532 state.queue = []; 533 self.once(EventEmitter.errorMonitor, onListenError); 534 self.once('listening', onListenSuccess); 535 } 536 ArrayPrototypePush(state.queue, toEnqueue); 537} 538 539 540function onListenSuccess() { 541 this.removeListener(EventEmitter.errorMonitor, onListenError); 542 FunctionPrototypeCall(clearQueue, this); 543} 544 545 546function onListenError(err) { 547 this.removeListener('listening', onListenSuccess); 548 this[kStateSymbol].queue = undefined; 549} 550 551 552function clearQueue() { 553 const state = this[kStateSymbol]; 554 const queue = state.queue; 555 state.queue = undefined; 556 557 // Flush the send queue. 558 for (const queueEntry of queue) 559 queueEntry(); 560} 561 562// valid combinations 563// For connectionless sockets 564// send(buffer, offset, length, port, address, callback) 565// send(buffer, offset, length, port, address) 566// send(buffer, offset, length, port, callback) 567// send(buffer, offset, length, port) 568// send(bufferOrList, port, address, callback) 569// send(bufferOrList, port, address) 570// send(bufferOrList, port, callback) 571// send(bufferOrList, port) 572// For connected sockets 573// send(buffer, offset, length, callback) 574// send(buffer, offset, length) 575// send(bufferOrList, callback) 576// send(bufferOrList) 577Socket.prototype.send = function(buffer, 578 offset, 579 length, 580 port, 581 address, 582 callback) { 583 584 let list; 585 const state = this[kStateSymbol]; 586 const connected = state.connectState === CONNECT_STATE_CONNECTED; 587 if (!connected) { 588 if (address || (port && typeof port !== 'function')) { 589 buffer = sliceBuffer(buffer, offset, length); 590 } else { 591 callback = port; 592 port = offset; 593 address = length; 594 } 595 } else { 596 if (typeof length === 'number') { 597 buffer = sliceBuffer(buffer, offset, length); 598 if (typeof port === 'function') { 599 callback = port; 600 port = null; 601 } 602 } else { 603 callback = offset; 604 } 605 606 if (port || address) 607 throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); 608 } 609 610 if (!ArrayIsArray(buffer)) { 611 if (typeof buffer === 'string') { 612 list = [ Buffer.from(buffer) ]; 613 } else if (!isArrayBufferView(buffer)) { 614 throw new ERR_INVALID_ARG_TYPE('buffer', 615 ['Buffer', 616 'TypedArray', 617 'DataView', 618 'string'], 619 buffer); 620 } else { 621 list = [ buffer ]; 622 } 623 } else if (!(list = fixBufferList(buffer))) { 624 throw new ERR_INVALID_ARG_TYPE('buffer list arguments', 625 ['Buffer', 626 'TypedArray', 627 'DataView', 628 'string'], 629 buffer); 630 } 631 632 if (!connected) 633 port = validatePort(port, 'Port', false); 634 635 // Normalize callback so it's either a function or undefined but not anything 636 // else. 637 if (typeof callback !== 'function') 638 callback = undefined; 639 640 if (typeof address === 'function') { 641 callback = address; 642 address = undefined; 643 } else if (address != null) { 644 validateString(address, 'address'); 645 } 646 647 healthCheck(this); 648 649 if (state.bindState === BIND_STATE_UNBOUND) 650 this.bind({ port: 0, exclusive: true }, null); 651 652 if (list.length === 0) 653 ArrayPrototypePush(list, Buffer.alloc(0)); 654 655 // If the socket hasn't been bound yet, push the outbound packet onto the 656 // send queue and send after binding is complete. 657 if (state.bindState !== BIND_STATE_BOUND) { 658 enqueue(this, FunctionPrototypeBind(this.send, this, 659 list, port, address, callback)); 660 return; 661 } 662 663 const afterDns = (ex, ip) => { 664 defaultTriggerAsyncIdScope( 665 this[async_id_symbol], 666 doSend, 667 ex, this, ip, list, address, port, callback, 668 ); 669 }; 670 671 if (!connected) { 672 state.handle.lookup(address, afterDns); 673 } else { 674 afterDns(null, null); 675 } 676}; 677 678function doSend(ex, self, ip, list, address, port, callback) { 679 const state = self[kStateSymbol]; 680 681 if (ex) { 682 if (typeof callback === 'function') { 683 process.nextTick(callback, ex); 684 return; 685 } 686 687 process.nextTick(() => self.emit('error', ex)); 688 return; 689 } else if (!state.handle) { 690 return; 691 } 692 693 const req = new SendWrap(); 694 req.list = list; // Keep reference alive. 695 req.address = address; 696 req.port = port; 697 if (callback) { 698 req.callback = callback; 699 req.oncomplete = afterSend; 700 } 701 702 let err; 703 if (port) 704 err = state.handle.send(req, list, list.length, port, ip, !!callback); 705 else 706 err = state.handle.send(req, list, list.length, !!callback); 707 708 if (err >= 1) { 709 // Synchronous finish. The return code is msg_length + 1 so that we can 710 // distinguish between synchronous success and asynchronous success. 711 if (callback) 712 process.nextTick(callback, null, err - 1); 713 return; 714 } 715 716 if (err && callback) { 717 // Don't emit as error, dgram_legacy.js compatibility 718 const ex = exceptionWithHostPort(err, 'send', address, port); 719 process.nextTick(callback, ex); 720 } 721} 722 723function afterSend(err, sent) { 724 if (err) { 725 err = exceptionWithHostPort(err, 'send', this.address, this.port); 726 } else { 727 err = null; 728 } 729 730 this.callback(err, sent); 731} 732 733Socket.prototype.close = function(callback) { 734 const state = this[kStateSymbol]; 735 const queue = state.queue; 736 737 if (typeof callback === 'function') 738 this.on('close', callback); 739 740 if (queue !== undefined) { 741 ArrayPrototypePush(queue, FunctionPrototypeBind(this.close, this)); 742 return this; 743 } 744 745 healthCheck(this); 746 stopReceiving(this); 747 state.handle.close(); 748 state.handle = null; 749 defaultTriggerAsyncIdScope(this[async_id_symbol], 750 process.nextTick, 751 socketCloseNT, 752 this); 753 754 return this; 755}; 756 757Socket.prototype[SymbolAsyncDispose] = async function() { 758 if (!this[kStateSymbol].handle) { 759 return; 760 } 761 return FunctionPrototypeCall(promisify(this.close), this); 762}; 763 764 765function socketCloseNT(self) { 766 self.emit('close'); 767} 768 769 770Socket.prototype.address = function() { 771 healthCheck(this); 772 773 const out = {}; 774 const err = this[kStateSymbol].handle.getsockname(out); 775 if (err) { 776 throw errnoException(err, 'getsockname'); 777 } 778 779 return out; 780}; 781 782Socket.prototype.remoteAddress = function() { 783 healthCheck(this); 784 785 const state = this[kStateSymbol]; 786 if (state.connectState !== CONNECT_STATE_CONNECTED) 787 throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); 788 789 const out = {}; 790 const err = state.handle.getpeername(out); 791 if (err) 792 throw errnoException(err, 'getpeername'); 793 794 return out; 795}; 796 797 798Socket.prototype.setBroadcast = function(arg) { 799 const err = this[kStateSymbol].handle.setBroadcast(arg ? 1 : 0); 800 if (err) { 801 throw errnoException(err, 'setBroadcast'); 802 } 803}; 804 805 806Socket.prototype.setTTL = function(ttl) { 807 validateNumber(ttl, 'ttl'); 808 809 const err = this[kStateSymbol].handle.setTTL(ttl); 810 if (err) { 811 throw errnoException(err, 'setTTL'); 812 } 813 814 return ttl; 815}; 816 817 818Socket.prototype.setMulticastTTL = function(ttl) { 819 validateNumber(ttl, 'ttl'); 820 821 const err = this[kStateSymbol].handle.setMulticastTTL(ttl); 822 if (err) { 823 throw errnoException(err, 'setMulticastTTL'); 824 } 825 826 return ttl; 827}; 828 829 830Socket.prototype.setMulticastLoopback = function(arg) { 831 const err = this[kStateSymbol].handle.setMulticastLoopback(arg ? 1 : 0); 832 if (err) { 833 throw errnoException(err, 'setMulticastLoopback'); 834 } 835 836 return arg; // 0.4 compatibility 837}; 838 839 840Socket.prototype.setMulticastInterface = function(interfaceAddress) { 841 healthCheck(this); 842 validateString(interfaceAddress, 'interfaceAddress'); 843 844 const err = this[kStateSymbol].handle.setMulticastInterface(interfaceAddress); 845 if (err) { 846 throw errnoException(err, 'setMulticastInterface'); 847 } 848}; 849 850Socket.prototype.addMembership = function(multicastAddress, 851 interfaceAddress) { 852 healthCheck(this); 853 854 if (!multicastAddress) { 855 throw new ERR_MISSING_ARGS('multicastAddress'); 856 } 857 858 const { handle } = this[kStateSymbol]; 859 const err = handle.addMembership(multicastAddress, interfaceAddress); 860 if (err) { 861 throw errnoException(err, 'addMembership'); 862 } 863}; 864 865 866Socket.prototype.dropMembership = function(multicastAddress, 867 interfaceAddress) { 868 healthCheck(this); 869 870 if (!multicastAddress) { 871 throw new ERR_MISSING_ARGS('multicastAddress'); 872 } 873 874 const { handle } = this[kStateSymbol]; 875 const err = handle.dropMembership(multicastAddress, interfaceAddress); 876 if (err) { 877 throw errnoException(err, 'dropMembership'); 878 } 879}; 880 881Socket.prototype.addSourceSpecificMembership = function(sourceAddress, 882 groupAddress, 883 interfaceAddress) { 884 healthCheck(this); 885 886 validateString(sourceAddress, 'sourceAddress'); 887 validateString(groupAddress, 'groupAddress'); 888 889 const err = 890 this[kStateSymbol].handle.addSourceSpecificMembership(sourceAddress, 891 groupAddress, 892 interfaceAddress); 893 if (err) { 894 throw errnoException(err, 'addSourceSpecificMembership'); 895 } 896}; 897 898 899Socket.prototype.dropSourceSpecificMembership = function(sourceAddress, 900 groupAddress, 901 interfaceAddress) { 902 healthCheck(this); 903 904 validateString(sourceAddress, 'sourceAddress'); 905 validateString(groupAddress, 'groupAddress'); 906 907 const err = 908 this[kStateSymbol].handle.dropSourceSpecificMembership(sourceAddress, 909 groupAddress, 910 interfaceAddress); 911 if (err) { 912 throw errnoException(err, 'dropSourceSpecificMembership'); 913 } 914}; 915 916 917function healthCheck(socket) { 918 if (!socket[kStateSymbol].handle) { 919 // Error message from dgram_legacy.js. 920 throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); 921 } 922} 923 924 925function stopReceiving(socket) { 926 const state = socket[kStateSymbol]; 927 928 if (!state.receiving) 929 return; 930 931 state.handle.recvStop(); 932 state.receiving = false; 933} 934 935 936function onMessage(nread, handle, buf, rinfo) { 937 const self = handle[owner_symbol]; 938 if (nread < 0) { 939 return self.emit('error', errnoException(nread, 'recvmsg')); 940 } 941 rinfo.size = buf.length; // compatibility 942 self.emit('message', buf, rinfo); 943} 944 945 946function onError(nread, handle, error) { 947 const self = handle[owner_symbol]; 948 return self.emit('error', error); 949} 950 951 952Socket.prototype.ref = function() { 953 const handle = this[kStateSymbol].handle; 954 955 if (handle) 956 handle.ref(); 957 958 return this; 959}; 960 961 962Socket.prototype.unref = function() { 963 const handle = this[kStateSymbol].handle; 964 965 if (handle) 966 handle.unref(); 967 968 return this; 969}; 970 971 972Socket.prototype.setRecvBufferSize = function(size) { 973 bufferSize(this, size, RECV_BUFFER); 974}; 975 976 977Socket.prototype.setSendBufferSize = function(size) { 978 bufferSize(this, size, SEND_BUFFER); 979}; 980 981 982Socket.prototype.getRecvBufferSize = function() { 983 return bufferSize(this, 0, RECV_BUFFER); 984}; 985 986 987Socket.prototype.getSendBufferSize = function() { 988 return bufferSize(this, 0, SEND_BUFFER); 989}; 990 991Socket.prototype.getSendQueueSize = function() { 992 return this[kStateSymbol].handle.getSendQueueSize(); 993}; 994 995Socket.prototype.getSendQueueCount = function() { 996 return this[kStateSymbol].handle.getSendQueueCount(); 997}; 998 999// Deprecated private APIs. 1000ObjectDefineProperty(Socket.prototype, '_handle', { 1001 __proto__: null, 1002 get: deprecate(function() { 1003 return this[kStateSymbol].handle; 1004 }, 'Socket.prototype._handle is deprecated', 'DEP0112'), 1005 set: deprecate(function(val) { 1006 this[kStateSymbol].handle = val; 1007 }, 'Socket.prototype._handle is deprecated', 'DEP0112'), 1008}); 1009 1010 1011ObjectDefineProperty(Socket.prototype, '_receiving', { 1012 __proto__: null, 1013 get: deprecate(function() { 1014 return this[kStateSymbol].receiving; 1015 }, 'Socket.prototype._receiving is deprecated', 'DEP0112'), 1016 set: deprecate(function(val) { 1017 this[kStateSymbol].receiving = val; 1018 }, 'Socket.prototype._receiving is deprecated', 'DEP0112'), 1019}); 1020 1021 1022ObjectDefineProperty(Socket.prototype, '_bindState', { 1023 __proto__: null, 1024 get: deprecate(function() { 1025 return this[kStateSymbol].bindState; 1026 }, 'Socket.prototype._bindState is deprecated', 'DEP0112'), 1027 set: deprecate(function(val) { 1028 this[kStateSymbol].bindState = val; 1029 }, 'Socket.prototype._bindState is deprecated', 'DEP0112'), 1030}); 1031 1032 1033ObjectDefineProperty(Socket.prototype, '_queue', { 1034 __proto__: null, 1035 get: deprecate(function() { 1036 return this[kStateSymbol].queue; 1037 }, 'Socket.prototype._queue is deprecated', 'DEP0112'), 1038 set: deprecate(function(val) { 1039 this[kStateSymbol].queue = val; 1040 }, 'Socket.prototype._queue is deprecated', 'DEP0112'), 1041}); 1042 1043 1044ObjectDefineProperty(Socket.prototype, '_reuseAddr', { 1045 __proto__: null, 1046 get: deprecate(function() { 1047 return this[kStateSymbol].reuseAddr; 1048 }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'), 1049 set: deprecate(function(val) { 1050 this[kStateSymbol].reuseAddr = val; 1051 }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'), 1052}); 1053 1054 1055Socket.prototype._healthCheck = deprecate(function() { 1056 healthCheck(this); 1057}, 'Socket.prototype._healthCheck() is deprecated', 'DEP0112'); 1058 1059 1060Socket.prototype._stopReceiving = deprecate(function() { 1061 stopReceiving(this); 1062}, 'Socket.prototype._stopReceiving() is deprecated', 'DEP0112'); 1063 1064 1065// Legacy alias on the C++ wrapper object. This is not public API, so we may 1066// want to runtime-deprecate it at some point. There's no hurry, though. 1067ObjectDefineProperty(UDP.prototype, 'owner', { 1068 __proto__: null, 1069 get() { return this[owner_symbol]; }, 1070 set(v) { return this[owner_symbol] = v; }, 1071}); 1072 1073 1074module.exports = { 1075 _createSocketHandle: deprecate( 1076 _createSocketHandle, 1077 'dgram._createSocketHandle() is deprecated', 1078 'DEP0112', 1079 ), 1080 createSocket, 1081 Socket, 1082}; 1083