1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 Array, 26 ArrayIsArray, 27 ArrayPrototypePush, 28 FunctionPrototypeBind, 29 FunctionPrototypeCall, 30 ObjectDefineProperty, 31 ObjectSetPrototypeOf, 32 ReflectApply, 33} = primordials; 34 35const errors = require('internal/errors'); 36const { 37 kStateSymbol, 38 _createSocketHandle, 39 newHandle, 40} = require('internal/dgram'); 41const { guessHandleType } = internalBinding('util'); 42const { 43 ERR_INVALID_ARG_TYPE, 44 ERR_MISSING_ARGS, 45 ERR_SOCKET_ALREADY_BOUND, 46 ERR_SOCKET_BAD_BUFFER_SIZE, 47 ERR_SOCKET_BUFFER_SIZE, 48 ERR_SOCKET_DGRAM_IS_CONNECTED, 49 ERR_SOCKET_DGRAM_NOT_CONNECTED, 50 ERR_SOCKET_DGRAM_NOT_RUNNING, 51 ERR_INVALID_FD_TYPE 52} = errors.codes; 53const { 54 isInt32, 55 validateAbortSignal, 56 validateString, 57 validateNumber, 58 validatePort, 59} = require('internal/validators'); 60const { Buffer } = require('buffer'); 61const { deprecate } = require('internal/util'); 62const { isArrayBufferView } = require('internal/util/types'); 63const EventEmitter = require('events'); 64const { 65 defaultTriggerAsyncIdScope, 66 symbols: { async_id_symbol, owner_symbol } 67} = require('internal/async_hooks'); 68const { UV_UDP_REUSEADDR } = internalBinding('constants').os; 69 70const { 71 constants: { UV_UDP_IPV6ONLY }, 72 UDP, 73 SendWrap 74} = internalBinding('udp_wrap'); 75 76const BIND_STATE_UNBOUND = 0; 77const BIND_STATE_BINDING = 1; 78const BIND_STATE_BOUND = 2; 79 80const CONNECT_STATE_DISCONNECTED = 0; 81const CONNECT_STATE_CONNECTING = 1; 82const CONNECT_STATE_CONNECTED = 2; 83 84const RECV_BUFFER = true; 85const SEND_BUFFER = false; 86 87// Lazily loaded 88let _cluster = null; 89function lazyLoadCluster() { 90 if (!_cluster) _cluster = require('cluster'); 91 return _cluster; 92} 93 94const errnoException = errors.errnoException; 95const exceptionWithHostPort = errors.exceptionWithHostPort; 96 97 98function Socket(type, listener) { 99 FunctionPrototypeCall(EventEmitter, this); 100 let lookup; 101 let recvBufferSize; 102 let sendBufferSize; 103 104 let options; 105 if (type !== null && typeof type === 'object') { 106 options = type; 107 type = options.type; 108 lookup = options.lookup; 109 recvBufferSize = options.recvBufferSize; 110 sendBufferSize = options.sendBufferSize; 111 } 112 113 const handle = newHandle(type, lookup); 114 handle[owner_symbol] = this; 115 116 this[async_id_symbol] = handle.getAsyncId(); 117 this.type = type; 118 119 if (typeof listener === 'function') 120 this.on('message', listener); 121 122 this[kStateSymbol] = { 123 handle, 124 receiving: false, 125 bindState: BIND_STATE_UNBOUND, 126 connectState: CONNECT_STATE_DISCONNECTED, 127 queue: undefined, 128 reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. 129 ipv6Only: options && options.ipv6Only, 130 recvBufferSize, 131 sendBufferSize 132 }; 133 134 if (options?.signal !== undefined) { 135 const { signal } = options; 136 validateAbortSignal(signal, 'options.signal'); 137 const onAborted = () => { 138 this.close(); 139 }; 140 if (signal.aborted) { 141 onAborted(); 142 } else { 143 signal.addEventListener('abort', onAborted); 144 this.once('close', () => signal.removeEventListener('abort', onAborted)); 145 } 146 } 147} 148ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype); 149ObjectSetPrototypeOf(Socket, EventEmitter); 150 151 152function createSocket(type, listener) { 153 return new Socket(type, listener); 154} 155 156 157function startListening(socket) { 158 const state = socket[kStateSymbol]; 159 160 state.handle.onmessage = onMessage; 161 // Todo: handle errors 162 state.handle.recvStart(); 163 state.receiving = true; 164 state.bindState = BIND_STATE_BOUND; 165 166 if (state.recvBufferSize) 167 bufferSize(socket, state.recvBufferSize, RECV_BUFFER); 168 169 if (state.sendBufferSize) 170 bufferSize(socket, state.sendBufferSize, SEND_BUFFER); 171 172 socket.emit('listening'); 173} 174 175function replaceHandle(self, newHandle) { 176 const state = self[kStateSymbol]; 177 const oldHandle = state.handle; 178 179 // Set up the handle that we got from master. 180 newHandle.lookup = oldHandle.lookup; 181 newHandle.bind = oldHandle.bind; 182 newHandle.send = oldHandle.send; 183 newHandle[owner_symbol] = self; 184 185 // Replace the existing handle by the handle we got from master. 186 oldHandle.close(); 187 state.handle = newHandle; 188 // Check if the udp handle was connected and set the state accordingly 189 if (isConnected(self)) 190 state.connectState = CONNECT_STATE_CONNECTED; 191} 192 193function bufferSize(self, size, buffer) { 194 if (size >>> 0 !== size) 195 throw new ERR_SOCKET_BAD_BUFFER_SIZE(); 196 197 const ctx = {}; 198 const ret = self[kStateSymbol].handle.bufferSize(size, buffer, ctx); 199 if (ret === undefined) { 200 throw new ERR_SOCKET_BUFFER_SIZE(ctx); 201 } 202 return ret; 203} 204 205// Query master process to get the server handle and utilize it. 206function bindServerHandle(self, options, errCb) { 207 const cluster = lazyLoadCluster(); 208 209 const state = self[kStateSymbol]; 210 cluster._getServer(self, options, (err, handle) => { 211 if (err) { 212 errCb(err); 213 return; 214 } 215 216 if (!state.handle) { 217 // Handle has been closed in the mean time. 218 return handle.close(); 219 } 220 221 replaceHandle(self, handle); 222 startListening(self); 223 }); 224} 225 226Socket.prototype.bind = function(port_, address_ /* , callback */) { 227 let port = port_; 228 229 healthCheck(this); 230 const state = this[kStateSymbol]; 231 232 if (state.bindState !== BIND_STATE_UNBOUND) 233 throw new ERR_SOCKET_ALREADY_BOUND(); 234 235 state.bindState = BIND_STATE_BINDING; 236 237 const cb = arguments.length && arguments[arguments.length - 1]; 238 if (typeof cb === 'function') { 239 function removeListeners() { 240 this.removeListener('error', removeListeners); 241 this.removeListener('listening', onListening); 242 } 243 244 function onListening() { 245 FunctionPrototypeCall(removeListeners, this); 246 FunctionPrototypeCall(cb, this); 247 } 248 249 this.on('error', removeListeners); 250 this.on('listening', onListening); 251 } 252 253 if (port !== null && 254 typeof port === 'object' && 255 typeof port.recvStart === 'function') { 256 replaceHandle(this, port); 257 startListening(this); 258 return this; 259 } 260 261 // Open an existing fd instead of creating a new one. 262 if (port !== null && typeof port === 'object' && 263 isInt32(port.fd) && port.fd > 0) { 264 const fd = port.fd; 265 const exclusive = !!port.exclusive; 266 const state = this[kStateSymbol]; 267 268 const cluster = lazyLoadCluster(); 269 270 if (cluster.isWorker && !exclusive) { 271 bindServerHandle(this, { 272 address: null, 273 port: null, 274 addressType: this.type, 275 fd, 276 flags: null 277 }, (err) => { 278 // Callback to handle error. 279 const ex = errnoException(err, 'open'); 280 state.bindState = BIND_STATE_UNBOUND; 281 this.emit('error', ex); 282 }); 283 return this; 284 } 285 286 const type = guessHandleType(fd); 287 if (type !== 'UDP') 288 throw new ERR_INVALID_FD_TYPE(type); 289 const err = state.handle.open(fd); 290 291 if (err) 292 throw errnoException(err, 'open'); 293 294 // Check if the udp handle was connected and set the state accordingly 295 if (isConnected(this)) 296 state.connectState = CONNECT_STATE_CONNECTED; 297 298 startListening(this); 299 return this; 300 } 301 302 let address; 303 let exclusive; 304 305 if (port !== null && typeof port === 'object') { 306 address = port.address || ''; 307 exclusive = !!port.exclusive; 308 port = port.port; 309 } else { 310 address = typeof address_ === 'function' ? '' : address_; 311 exclusive = false; 312 } 313 314 // Defaulting address for bind to all interfaces 315 if (!address) { 316 if (this.type === 'udp4') 317 address = '0.0.0.0'; 318 else 319 address = '::'; 320 } 321 322 // Resolve address first 323 state.handle.lookup(address, (err, ip) => { 324 if (err) { 325 state.bindState = BIND_STATE_UNBOUND; 326 this.emit('error', err); 327 return; 328 } 329 330 const cluster = lazyLoadCluster(); 331 332 let flags = 0; 333 if (state.reuseAddr) 334 flags |= UV_UDP_REUSEADDR; 335 if (state.ipv6Only) 336 flags |= UV_UDP_IPV6ONLY; 337 338 if (cluster.isWorker && !exclusive) { 339 bindServerHandle(this, { 340 address: ip, 341 port: port, 342 addressType: this.type, 343 fd: -1, 344 flags: flags 345 }, (err) => { 346 // Callback to handle error. 347 const ex = exceptionWithHostPort(err, 'bind', ip, port); 348 state.bindState = BIND_STATE_UNBOUND; 349 this.emit('error', ex); 350 }); 351 } else { 352 if (!state.handle) 353 return; // Handle has been closed in the mean time 354 355 const err = state.handle.bind(ip, port || 0, flags); 356 if (err) { 357 const ex = exceptionWithHostPort(err, 'bind', ip, port); 358 state.bindState = BIND_STATE_UNBOUND; 359 this.emit('error', ex); 360 // Todo: close? 361 return; 362 } 363 364 startListening(this); 365 } 366 }); 367 368 return this; 369}; 370 371Socket.prototype.connect = function(port, address, callback) { 372 port = validatePort(port, 'Port', false); 373 if (typeof address === 'function') { 374 callback = address; 375 address = ''; 376 } else if (address === undefined) { 377 address = ''; 378 } 379 380 validateString(address, 'address'); 381 382 const state = this[kStateSymbol]; 383 384 if (state.connectState !== CONNECT_STATE_DISCONNECTED) 385 throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); 386 387 state.connectState = CONNECT_STATE_CONNECTING; 388 if (state.bindState === BIND_STATE_UNBOUND) 389 this.bind({ port: 0, exclusive: true }, null); 390 391 if (state.bindState !== BIND_STATE_BOUND) { 392 enqueue(this, FunctionPrototypeBind(_connect, this, 393 port, address, callback)); 394 return; 395 } 396 397 ReflectApply(_connect, this, [port, address, callback]); 398}; 399 400 401function _connect(port, address, callback) { 402 const state = this[kStateSymbol]; 403 if (callback) 404 this.once('connect', callback); 405 406 const afterDns = (ex, ip) => { 407 defaultTriggerAsyncIdScope( 408 this[async_id_symbol], 409 doConnect, 410 ex, this, ip, address, port, callback 411 ); 412 }; 413 414 state.handle.lookup(address, afterDns); 415} 416 417 418function doConnect(ex, self, ip, address, port, callback) { 419 const state = self[kStateSymbol]; 420 if (!state.handle) 421 return; 422 423 if (!ex) { 424 const err = state.handle.connect(ip, port); 425 if (err) { 426 ex = exceptionWithHostPort(err, 'connect', address, port); 427 } 428 } 429 430 if (ex) { 431 state.connectState = CONNECT_STATE_DISCONNECTED; 432 return process.nextTick(() => { 433 if (callback) { 434 self.removeListener('connect', callback); 435 callback(ex); 436 } else { 437 self.emit('error', ex); 438 } 439 }); 440 } 441 442 state.connectState = CONNECT_STATE_CONNECTED; 443 process.nextTick(() => self.emit('connect')); 444} 445 446 447Socket.prototype.disconnect = function() { 448 const state = this[kStateSymbol]; 449 if (state.connectState !== CONNECT_STATE_CONNECTED) 450 throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); 451 452 const err = state.handle.disconnect(); 453 if (err) 454 throw errnoException(err, 'connect'); 455 else 456 state.connectState = CONNECT_STATE_DISCONNECTED; 457}; 458 459 460// Thin wrapper around `send`, here for compatibility with dgram_legacy.js 461Socket.prototype.sendto = function(buffer, 462 offset, 463 length, 464 port, 465 address, 466 callback) { 467 validateNumber(offset, 'offset'); 468 validateNumber(length, 'length'); 469 validateNumber(port, 'port'); 470 validateString(address, 'address'); 471 472 this.send(buffer, offset, length, port, address, callback); 473}; 474 475 476function sliceBuffer(buffer, offset, length) { 477 if (typeof buffer === 'string') { 478 buffer = Buffer.from(buffer); 479 } else if (!isArrayBufferView(buffer)) { 480 throw new ERR_INVALID_ARG_TYPE('buffer', 481 ['Buffer', 482 'TypedArray', 483 'DataView', 484 'string'], 485 buffer); 486 } 487 488 offset = offset >>> 0; 489 length = length >>> 0; 490 491 return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length); 492} 493 494 495function fixBufferList(list) { 496 const newlist = new Array(list.length); 497 498 for (let i = 0, l = list.length; i < l; i++) { 499 const buf = list[i]; 500 if (typeof buf === 'string') 501 newlist[i] = Buffer.from(buf); 502 else if (!isArrayBufferView(buf)) 503 return null; 504 else 505 newlist[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength); 506 } 507 508 return newlist; 509} 510 511 512function enqueue(self, toEnqueue) { 513 const state = self[kStateSymbol]; 514 515 // If the send queue hasn't been initialized yet, do it, and install an 516 // event handler that flushes the send queue after binding is done. 517 if (state.queue === undefined) { 518 state.queue = []; 519 self.once(EventEmitter.errorMonitor, onListenError); 520 self.once('listening', onListenSuccess); 521 } 522 ArrayPrototypePush(state.queue, toEnqueue); 523} 524 525 526function onListenSuccess() { 527 this.removeListener(EventEmitter.errorMonitor, onListenError); 528 FunctionPrototypeCall(clearQueue, this); 529} 530 531 532function onListenError(err) { 533 this.removeListener('listening', onListenSuccess); 534 this[kStateSymbol].queue = undefined; 535} 536 537 538function clearQueue() { 539 const state = this[kStateSymbol]; 540 const queue = state.queue; 541 state.queue = undefined; 542 543 // Flush the send queue. 544 for (const queueEntry of queue) 545 queueEntry(); 546} 547 548function isConnected(self) { 549 try { 550 self.remoteAddress(); 551 return true; 552 } catch { 553 return false; 554 } 555} 556 557 558// valid combinations 559// For connectionless sockets 560// send(buffer, offset, length, port, address, callback) 561// send(buffer, offset, length, port, address) 562// send(buffer, offset, length, port, callback) 563// send(buffer, offset, length, port) 564// send(bufferOrList, port, address, callback) 565// send(bufferOrList, port, address) 566// send(bufferOrList, port, callback) 567// send(bufferOrList, port) 568// For connected sockets 569// send(buffer, offset, length, callback) 570// send(buffer, offset, length) 571// send(bufferOrList, callback) 572// send(bufferOrList) 573Socket.prototype.send = function(buffer, 574 offset, 575 length, 576 port, 577 address, 578 callback) { 579 580 let list; 581 const state = this[kStateSymbol]; 582 const connected = state.connectState === CONNECT_STATE_CONNECTED; 583 if (!connected) { 584 if (address || (port && typeof port !== 'function')) { 585 buffer = sliceBuffer(buffer, offset, length); 586 } else { 587 callback = port; 588 port = offset; 589 address = length; 590 } 591 } else { 592 if (typeof length === 'number') { 593 buffer = sliceBuffer(buffer, offset, length); 594 if (typeof port === 'function') { 595 callback = port; 596 port = null; 597 } 598 } else { 599 callback = offset; 600 } 601 602 if (port || address) 603 throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); 604 } 605 606 if (!ArrayIsArray(buffer)) { 607 if (typeof buffer === 'string') { 608 list = [ Buffer.from(buffer) ]; 609 } else if (!isArrayBufferView(buffer)) { 610 throw new ERR_INVALID_ARG_TYPE('buffer', 611 ['Buffer', 612 'TypedArray', 613 'DataView', 614 'string'], 615 buffer); 616 } else { 617 list = [ buffer ]; 618 } 619 } else if (!(list = fixBufferList(buffer))) { 620 throw new ERR_INVALID_ARG_TYPE('buffer list arguments', 621 ['Buffer', 622 'TypedArray', 623 'DataView', 624 'string'], 625 buffer); 626 } 627 628 if (!connected) 629 port = validatePort(port, 'Port', false); 630 631 // Normalize callback so it's either a function or undefined but not anything 632 // else. 633 if (typeof callback !== 'function') 634 callback = undefined; 635 636 if (typeof address === 'function') { 637 callback = address; 638 address = undefined; 639 } else if (address && typeof address !== 'string') { 640 throw new ERR_INVALID_ARG_TYPE('address', ['string', 'falsy'], address); 641 } 642 643 healthCheck(this); 644 645 if (state.bindState === BIND_STATE_UNBOUND) 646 this.bind({ port: 0, exclusive: true }, null); 647 648 if (list.length === 0) 649 ArrayPrototypePush(list, Buffer.alloc(0)); 650 651 // If the socket hasn't been bound yet, push the outbound packet onto the 652 // send queue and send after binding is complete. 653 if (state.bindState !== BIND_STATE_BOUND) { 654 enqueue(this, FunctionPrototypeBind(this.send, this, 655 list, port, address, callback)); 656 return; 657 } 658 659 const afterDns = (ex, ip) => { 660 defaultTriggerAsyncIdScope( 661 this[async_id_symbol], 662 doSend, 663 ex, this, ip, list, address, port, callback 664 ); 665 }; 666 667 if (!connected) { 668 state.handle.lookup(address, afterDns); 669 } else { 670 afterDns(null, null); 671 } 672}; 673 674function doSend(ex, self, ip, list, address, port, callback) { 675 const state = self[kStateSymbol]; 676 677 if (ex) { 678 if (typeof callback === 'function') { 679 process.nextTick(callback, ex); 680 return; 681 } 682 683 process.nextTick(() => self.emit('error', ex)); 684 return; 685 } else if (!state.handle) { 686 return; 687 } 688 689 const req = new SendWrap(); 690 req.list = list; // Keep reference alive. 691 req.address = address; 692 req.port = port; 693 if (callback) { 694 req.callback = callback; 695 req.oncomplete = afterSend; 696 } 697 698 let err; 699 if (port) 700 err = state.handle.send(req, list, list.length, port, ip, !!callback); 701 else 702 err = state.handle.send(req, list, list.length, !!callback); 703 704 if (err >= 1) { 705 // Synchronous finish. The return code is msg_length + 1 so that we can 706 // distinguish between synchronous success and asynchronous success. 707 if (callback) 708 process.nextTick(callback, null, err - 1); 709 return; 710 } 711 712 if (err && callback) { 713 // Don't emit as error, dgram_legacy.js compatibility 714 const ex = exceptionWithHostPort(err, 'send', address, port); 715 process.nextTick(callback, ex); 716 } 717} 718 719function afterSend(err, sent) { 720 if (err) { 721 err = exceptionWithHostPort(err, 'send', this.address, this.port); 722 } else { 723 err = null; 724 } 725 726 this.callback(err, sent); 727} 728 729Socket.prototype.close = function(callback) { 730 const state = this[kStateSymbol]; 731 const queue = state.queue; 732 733 if (typeof callback === 'function') 734 this.on('close', callback); 735 736 if (queue !== undefined) { 737 ArrayPrototypePush(queue, FunctionPrototypeBind(this.close, this)); 738 return this; 739 } 740 741 healthCheck(this); 742 stopReceiving(this); 743 state.handle.close(); 744 state.handle = null; 745 defaultTriggerAsyncIdScope(this[async_id_symbol], 746 process.nextTick, 747 socketCloseNT, 748 this); 749 750 return this; 751}; 752 753 754function socketCloseNT(self) { 755 self.emit('close'); 756} 757 758 759Socket.prototype.address = function() { 760 healthCheck(this); 761 762 const out = {}; 763 const err = this[kStateSymbol].handle.getsockname(out); 764 if (err) { 765 throw errnoException(err, 'getsockname'); 766 } 767 768 return out; 769}; 770 771Socket.prototype.remoteAddress = function() { 772 healthCheck(this); 773 774 const state = this[kStateSymbol]; 775 if (state.connectState !== CONNECT_STATE_CONNECTED) 776 throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); 777 778 const out = {}; 779 const err = state.handle.getpeername(out); 780 if (err) 781 throw errnoException(err, 'getpeername'); 782 783 return out; 784}; 785 786 787Socket.prototype.setBroadcast = function(arg) { 788 const err = this[kStateSymbol].handle.setBroadcast(arg ? 1 : 0); 789 if (err) { 790 throw errnoException(err, 'setBroadcast'); 791 } 792}; 793 794 795Socket.prototype.setTTL = function(ttl) { 796 validateNumber(ttl, 'ttl'); 797 798 const err = this[kStateSymbol].handle.setTTL(ttl); 799 if (err) { 800 throw errnoException(err, 'setTTL'); 801 } 802 803 return ttl; 804}; 805 806 807Socket.prototype.setMulticastTTL = function(ttl) { 808 validateNumber(ttl, 'ttl'); 809 810 const err = this[kStateSymbol].handle.setMulticastTTL(ttl); 811 if (err) { 812 throw errnoException(err, 'setMulticastTTL'); 813 } 814 815 return ttl; 816}; 817 818 819Socket.prototype.setMulticastLoopback = function(arg) { 820 const err = this[kStateSymbol].handle.setMulticastLoopback(arg ? 1 : 0); 821 if (err) { 822 throw errnoException(err, 'setMulticastLoopback'); 823 } 824 825 return arg; // 0.4 compatibility 826}; 827 828 829Socket.prototype.setMulticastInterface = function(interfaceAddress) { 830 healthCheck(this); 831 validateString(interfaceAddress, 'interfaceAddress'); 832 833 const err = this[kStateSymbol].handle.setMulticastInterface(interfaceAddress); 834 if (err) { 835 throw errnoException(err, 'setMulticastInterface'); 836 } 837}; 838 839Socket.prototype.addMembership = function(multicastAddress, 840 interfaceAddress) { 841 healthCheck(this); 842 843 if (!multicastAddress) { 844 throw new ERR_MISSING_ARGS('multicastAddress'); 845 } 846 847 const { handle } = this[kStateSymbol]; 848 const err = handle.addMembership(multicastAddress, interfaceAddress); 849 if (err) { 850 throw errnoException(err, 'addMembership'); 851 } 852}; 853 854 855Socket.prototype.dropMembership = function(multicastAddress, 856 interfaceAddress) { 857 healthCheck(this); 858 859 if (!multicastAddress) { 860 throw new ERR_MISSING_ARGS('multicastAddress'); 861 } 862 863 const { handle } = this[kStateSymbol]; 864 const err = handle.dropMembership(multicastAddress, interfaceAddress); 865 if (err) { 866 throw errnoException(err, 'dropMembership'); 867 } 868}; 869 870Socket.prototype.addSourceSpecificMembership = function(sourceAddress, 871 groupAddress, 872 interfaceAddress) { 873 healthCheck(this); 874 875 validateString(sourceAddress, 'sourceAddress'); 876 validateString(groupAddress, 'groupAddress'); 877 878 const err = 879 this[kStateSymbol].handle.addSourceSpecificMembership(sourceAddress, 880 groupAddress, 881 interfaceAddress); 882 if (err) { 883 throw errnoException(err, 'addSourceSpecificMembership'); 884 } 885}; 886 887 888Socket.prototype.dropSourceSpecificMembership = function(sourceAddress, 889 groupAddress, 890 interfaceAddress) { 891 healthCheck(this); 892 893 validateString(sourceAddress, 'sourceAddress'); 894 validateString(groupAddress, 'groupAddress'); 895 896 const err = 897 this[kStateSymbol].handle.dropSourceSpecificMembership(sourceAddress, 898 groupAddress, 899 interfaceAddress); 900 if (err) { 901 throw errnoException(err, 'dropSourceSpecificMembership'); 902 } 903}; 904 905 906function healthCheck(socket) { 907 if (!socket[kStateSymbol].handle) { 908 // Error message from dgram_legacy.js. 909 throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); 910 } 911} 912 913 914function stopReceiving(socket) { 915 const state = socket[kStateSymbol]; 916 917 if (!state.receiving) 918 return; 919 920 state.handle.recvStop(); 921 state.receiving = false; 922} 923 924 925function onMessage(nread, handle, buf, rinfo) { 926 const self = handle[owner_symbol]; 927 if (nread < 0) { 928 return self.emit('error', errnoException(nread, 'recvmsg')); 929 } 930 rinfo.size = buf.length; // compatibility 931 self.emit('message', buf, rinfo); 932} 933 934 935Socket.prototype.ref = function() { 936 const handle = this[kStateSymbol].handle; 937 938 if (handle) 939 handle.ref(); 940 941 return this; 942}; 943 944 945Socket.prototype.unref = function() { 946 const handle = this[kStateSymbol].handle; 947 948 if (handle) 949 handle.unref(); 950 951 return this; 952}; 953 954 955Socket.prototype.setRecvBufferSize = function(size) { 956 bufferSize(this, size, RECV_BUFFER); 957}; 958 959 960Socket.prototype.setSendBufferSize = function(size) { 961 bufferSize(this, size, SEND_BUFFER); 962}; 963 964 965Socket.prototype.getRecvBufferSize = function() { 966 return bufferSize(this, 0, RECV_BUFFER); 967}; 968 969 970Socket.prototype.getSendBufferSize = function() { 971 return bufferSize(this, 0, SEND_BUFFER); 972}; 973 974 975// Deprecated private APIs. 976ObjectDefineProperty(Socket.prototype, '_handle', { 977 get: deprecate(function() { 978 return this[kStateSymbol].handle; 979 }, 'Socket.prototype._handle is deprecated', 'DEP0112'), 980 set: deprecate(function(val) { 981 this[kStateSymbol].handle = val; 982 }, 'Socket.prototype._handle is deprecated', 'DEP0112') 983}); 984 985 986ObjectDefineProperty(Socket.prototype, '_receiving', { 987 get: deprecate(function() { 988 return this[kStateSymbol].receiving; 989 }, 'Socket.prototype._receiving is deprecated', 'DEP0112'), 990 set: deprecate(function(val) { 991 this[kStateSymbol].receiving = val; 992 }, 'Socket.prototype._receiving is deprecated', 'DEP0112') 993}); 994 995 996ObjectDefineProperty(Socket.prototype, '_bindState', { 997 get: deprecate(function() { 998 return this[kStateSymbol].bindState; 999 }, 'Socket.prototype._bindState is deprecated', 'DEP0112'), 1000 set: deprecate(function(val) { 1001 this[kStateSymbol].bindState = val; 1002 }, 'Socket.prototype._bindState is deprecated', 'DEP0112') 1003}); 1004 1005 1006ObjectDefineProperty(Socket.prototype, '_queue', { 1007 get: deprecate(function() { 1008 return this[kStateSymbol].queue; 1009 }, 'Socket.prototype._queue is deprecated', 'DEP0112'), 1010 set: deprecate(function(val) { 1011 this[kStateSymbol].queue = val; 1012 }, 'Socket.prototype._queue is deprecated', 'DEP0112') 1013}); 1014 1015 1016ObjectDefineProperty(Socket.prototype, '_reuseAddr', { 1017 get: deprecate(function() { 1018 return this[kStateSymbol].reuseAddr; 1019 }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'), 1020 set: deprecate(function(val) { 1021 this[kStateSymbol].reuseAddr = val; 1022 }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112') 1023}); 1024 1025 1026Socket.prototype._healthCheck = deprecate(function() { 1027 healthCheck(this); 1028}, 'Socket.prototype._healthCheck() is deprecated', 'DEP0112'); 1029 1030 1031Socket.prototype._stopReceiving = deprecate(function() { 1032 stopReceiving(this); 1033}, 'Socket.prototype._stopReceiving() is deprecated', 'DEP0112'); 1034 1035 1036// Legacy alias on the C++ wrapper object. This is not public API, so we may 1037// want to runtime-deprecate it at some point. There's no hurry, though. 1038ObjectDefineProperty(UDP.prototype, 'owner', { 1039 get() { return this[owner_symbol]; }, 1040 set(v) { return this[owner_symbol] = v; } 1041}); 1042 1043 1044module.exports = { 1045 _createSocketHandle: deprecate( 1046 _createSocketHandle, 1047 'dgram._createSocketHandle() is deprecated', 1048 'DEP0112' 1049 ), 1050 createSocket, 1051 Socket 1052}; 1053