• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  Array,
26  ArrayIsArray,
27  ObjectDefineProperty,
28  ObjectSetPrototypeOf,
29} = primordials;
30
31const errors = require('internal/errors');
32const {
33  kStateSymbol,
34  _createSocketHandle,
35  newHandle,
36} = require('internal/dgram');
37const { guessHandleType } = internalBinding('util');
38const {
39  ERR_INVALID_ARG_TYPE,
40  ERR_MISSING_ARGS,
41  ERR_SOCKET_ALREADY_BOUND,
42  ERR_SOCKET_BAD_BUFFER_SIZE,
43  ERR_SOCKET_BUFFER_SIZE,
44  ERR_SOCKET_CANNOT_SEND,
45  ERR_SOCKET_DGRAM_IS_CONNECTED,
46  ERR_SOCKET_DGRAM_NOT_CONNECTED,
47  ERR_SOCKET_DGRAM_NOT_RUNNING,
48  ERR_INVALID_FD_TYPE
49} = errors.codes;
50const {
51  isInt32,
52  validateString,
53  validateNumber,
54  validatePort,
55} = require('internal/validators');
56const { Buffer } = require('buffer');
57const { deprecate } = require('internal/util');
58const { isArrayBufferView } = require('internal/util/types');
59const EventEmitter = require('events');
60const {
61  defaultTriggerAsyncIdScope,
62  symbols: { async_id_symbol, owner_symbol }
63} = require('internal/async_hooks');
64const { UV_UDP_REUSEADDR } = internalBinding('constants').os;
65
66const {
67  constants: { UV_UDP_IPV6ONLY },
68  UDP,
69  SendWrap
70} = internalBinding('udp_wrap');
71
72const BIND_STATE_UNBOUND = 0;
73const BIND_STATE_BINDING = 1;
74const BIND_STATE_BOUND = 2;
75
76const CONNECT_STATE_DISCONNECTED = 0;
77const CONNECT_STATE_CONNECTING = 1;
78const CONNECT_STATE_CONNECTED = 2;
79
80const RECV_BUFFER = true;
81const SEND_BUFFER = false;
82
83// Lazily loaded
84let cluster = null;
85
86const errnoException = errors.errnoException;
87const exceptionWithHostPort = errors.exceptionWithHostPort;
88
89
90function Socket(type, listener) {
91  EventEmitter.call(this);
92  let lookup;
93  let recvBufferSize;
94  let sendBufferSize;
95
96  let options;
97  if (type !== null && typeof type === 'object') {
98    options = type;
99    type = options.type;
100    lookup = options.lookup;
101    recvBufferSize = options.recvBufferSize;
102    sendBufferSize = options.sendBufferSize;
103  }
104
105  const handle = newHandle(type, lookup);
106  handle[owner_symbol] = this;
107
108  this[async_id_symbol] = handle.getAsyncId();
109  this.type = type;
110
111  if (typeof listener === 'function')
112    this.on('message', listener);
113
114  this[kStateSymbol] = {
115    handle,
116    receiving: false,
117    bindState: BIND_STATE_UNBOUND,
118    connectState: CONNECT_STATE_DISCONNECTED,
119    queue: undefined,
120    reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true.
121    ipv6Only: options && options.ipv6Only,
122    recvBufferSize,
123    sendBufferSize
124  };
125}
126ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype);
127ObjectSetPrototypeOf(Socket, EventEmitter);
128
129
130function createSocket(type, listener) {
131  return new Socket(type, listener);
132}
133
134
135function startListening(socket) {
136  const state = socket[kStateSymbol];
137
138  state.handle.onmessage = onMessage;
139  // Todo: handle errors
140  state.handle.recvStart();
141  state.receiving = true;
142  state.bindState = BIND_STATE_BOUND;
143
144  if (state.recvBufferSize)
145    bufferSize(socket, state.recvBufferSize, RECV_BUFFER);
146
147  if (state.sendBufferSize)
148    bufferSize(socket, state.sendBufferSize, SEND_BUFFER);
149
150  socket.emit('listening');
151}
152
153function replaceHandle(self, newHandle) {
154  const state = self[kStateSymbol];
155  const oldHandle = state.handle;
156
157  // Set up the handle that we got from master.
158  newHandle.lookup = oldHandle.lookup;
159  newHandle.bind = oldHandle.bind;
160  newHandle.send = oldHandle.send;
161  newHandle[owner_symbol] = self;
162
163  // Replace the existing handle by the handle we got from master.
164  oldHandle.close();
165  state.handle = newHandle;
166  // Check if the udp handle was connected and set the state accordingly
167  if (isConnected(self))
168    state.connectState = CONNECT_STATE_CONNECTED;
169}
170
171function bufferSize(self, size, buffer) {
172  if (size >>> 0 !== size)
173    throw new ERR_SOCKET_BAD_BUFFER_SIZE();
174
175  const ctx = {};
176  const ret = self[kStateSymbol].handle.bufferSize(size, buffer, ctx);
177  if (ret === undefined) {
178    throw new ERR_SOCKET_BUFFER_SIZE(ctx);
179  }
180  return ret;
181}
182
183// Query master process to get the server handle and utilize it.
184function bindServerHandle(self, options, errCb) {
185  if (!cluster)
186    cluster = require('cluster');
187
188  const state = self[kStateSymbol];
189  cluster._getServer(self, options, (err, handle) => {
190    if (err) {
191      errCb(err);
192      return;
193    }
194
195    if (!state.handle) {
196      // Handle has been closed in the mean time.
197      return handle.close();
198    }
199
200    replaceHandle(self, handle);
201    startListening(self);
202  });
203}
204
205Socket.prototype.bind = function(port_, address_ /* , callback */) {
206  let port = port_;
207
208  healthCheck(this);
209  const state = this[kStateSymbol];
210
211  if (state.bindState !== BIND_STATE_UNBOUND)
212    throw new ERR_SOCKET_ALREADY_BOUND();
213
214  state.bindState = BIND_STATE_BINDING;
215
216  const cb = arguments.length && arguments[arguments.length - 1];
217  if (typeof cb === 'function') {
218    function removeListeners() {
219      this.removeListener('error', removeListeners);
220      this.removeListener('listening', onListening);
221    }
222
223    function onListening() {
224      removeListeners.call(this);
225      cb.call(this);
226    }
227
228    this.on('error', removeListeners);
229    this.on('listening', onListening);
230  }
231
232  if (port !== null &&
233      typeof port === 'object' &&
234      typeof port.recvStart === 'function') {
235    replaceHandle(this, port);
236    startListening(this);
237    return this;
238  }
239
240  // Open an existing fd instead of creating a new one.
241  if (port !== null && typeof port === 'object' &&
242      isInt32(port.fd) && port.fd > 0) {
243    const fd = port.fd;
244    const exclusive = !!port.exclusive;
245    const state = this[kStateSymbol];
246
247    if (!cluster)
248      cluster = require('cluster');
249
250    if (cluster.isWorker && !exclusive) {
251      bindServerHandle(this, {
252        address: null,
253        port: null,
254        addressType: this.type,
255        fd,
256        flags: null
257      }, (err) => {
258        // Callback to handle error.
259        const ex = errnoException(err, 'open');
260        state.bindState = BIND_STATE_UNBOUND;
261        this.emit('error', ex);
262      });
263      return this;
264    }
265
266    const type = guessHandleType(fd);
267    if (type !== 'UDP')
268      throw new ERR_INVALID_FD_TYPE(type);
269    const err = state.handle.open(fd);
270
271    if (err)
272      throw errnoException(err, 'open');
273
274    // Check if the udp handle was connected and set the state accordingly
275    if (isConnected(this))
276      state.connectState = CONNECT_STATE_CONNECTED;
277
278    startListening(this);
279    return this;
280  }
281
282  let address;
283  let exclusive;
284
285  if (port !== null && typeof port === 'object') {
286    address = port.address || '';
287    exclusive = !!port.exclusive;
288    port = port.port;
289  } else {
290    address = typeof address_ === 'function' ? '' : address_;
291    exclusive = false;
292  }
293
294  // Defaulting address for bind to all interfaces
295  if (!address) {
296    if (this.type === 'udp4')
297      address = '0.0.0.0';
298    else
299      address = '::';
300  }
301
302  // Resolve address first
303  state.handle.lookup(address, (err, ip) => {
304    if (err) {
305      state.bindState = BIND_STATE_UNBOUND;
306      this.emit('error', err);
307      return;
308    }
309
310    if (!cluster)
311      cluster = require('cluster');
312
313    let flags = 0;
314    if (state.reuseAddr)
315      flags |= UV_UDP_REUSEADDR;
316    if (state.ipv6Only)
317      flags |= UV_UDP_IPV6ONLY;
318
319    if (cluster.isWorker && !exclusive) {
320      bindServerHandle(this, {
321        address: ip,
322        port: port,
323        addressType: this.type,
324        fd: -1,
325        flags: flags
326      }, (err) => {
327        // Callback to handle error.
328        const ex = exceptionWithHostPort(err, 'bind', ip, port);
329        state.bindState = BIND_STATE_UNBOUND;
330        this.emit('error', ex);
331      });
332    } else {
333      if (!state.handle)
334        return; // Handle has been closed in the mean time
335
336      const err = state.handle.bind(ip, port || 0, flags);
337      if (err) {
338        const ex = exceptionWithHostPort(err, 'bind', ip, port);
339        state.bindState = BIND_STATE_UNBOUND;
340        this.emit('error', ex);
341        // Todo: close?
342        return;
343      }
344
345      startListening(this);
346    }
347  });
348
349  return this;
350};
351
352Socket.prototype.connect = function(port, address, callback) {
353  port = validatePort(port, 'Port', { allowZero: false });
354  if (typeof address === 'function') {
355    callback = address;
356    address = '';
357  } else if (address === undefined) {
358    address = '';
359  }
360
361  validateString(address, 'address');
362
363  const state = this[kStateSymbol];
364
365  if (state.connectState !== CONNECT_STATE_DISCONNECTED)
366    throw new ERR_SOCKET_DGRAM_IS_CONNECTED();
367
368  state.connectState = CONNECT_STATE_CONNECTING;
369  if (state.bindState === BIND_STATE_UNBOUND)
370    this.bind({ port: 0, exclusive: true }, null);
371
372  if (state.bindState !== BIND_STATE_BOUND) {
373    enqueue(this, _connect.bind(this, port, address, callback));
374    return;
375  }
376
377  _connect.call(this, port, address, callback);
378};
379
380
381function _connect(port, address, callback) {
382  const state = this[kStateSymbol];
383  if (callback)
384    this.once('connect', callback);
385
386  const afterDns = (ex, ip) => {
387    defaultTriggerAsyncIdScope(
388      this[async_id_symbol],
389      doConnect,
390      ex, this, ip, address, port, callback
391    );
392  };
393
394  state.handle.lookup(address, afterDns);
395}
396
397
398function doConnect(ex, self, ip, address, port, callback) {
399  const state = self[kStateSymbol];
400  if (!state.handle)
401    return;
402
403  if (!ex) {
404    const err = state.handle.connect(ip, port);
405    if (err) {
406      ex = exceptionWithHostPort(err, 'connect', address, port);
407    }
408  }
409
410  if (ex) {
411    state.connectState = CONNECT_STATE_DISCONNECTED;
412    return process.nextTick(() => {
413      if (callback) {
414        self.removeListener('connect', callback);
415        callback(ex);
416      } else {
417        self.emit('error', ex);
418      }
419    });
420  }
421
422  state.connectState = CONNECT_STATE_CONNECTED;
423  process.nextTick(() => self.emit('connect'));
424}
425
426
427Socket.prototype.disconnect = function() {
428  const state = this[kStateSymbol];
429  if (state.connectState !== CONNECT_STATE_CONNECTED)
430    throw new ERR_SOCKET_DGRAM_NOT_CONNECTED();
431
432  const err = state.handle.disconnect();
433  if (err)
434    throw errnoException(err, 'connect');
435  else
436    state.connectState = CONNECT_STATE_DISCONNECTED;
437};
438
439
440// Thin wrapper around `send`, here for compatibility with dgram_legacy.js
441Socket.prototype.sendto = function(buffer,
442                                   offset,
443                                   length,
444                                   port,
445                                   address,
446                                   callback) {
447  validateNumber(offset, 'offset');
448  validateNumber(length, 'length');
449  validateNumber(port, 'port');
450  validateString(address, 'address');
451
452  this.send(buffer, offset, length, port, address, callback);
453};
454
455
456function sliceBuffer(buffer, offset, length) {
457  if (typeof buffer === 'string') {
458    buffer = Buffer.from(buffer);
459  } else if (!isArrayBufferView(buffer)) {
460    throw new ERR_INVALID_ARG_TYPE('buffer',
461                                   ['Buffer',
462                                    'TypedArray',
463                                    'DataView',
464                                    'string'],
465                                   buffer);
466  }
467
468  offset = offset >>> 0;
469  length = length >>> 0;
470
471  return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length);
472}
473
474
475function fixBufferList(list) {
476  const newlist = new Array(list.length);
477
478  for (let i = 0, l = list.length; i < l; i++) {
479    const buf = list[i];
480    if (typeof buf === 'string')
481      newlist[i] = Buffer.from(buf);
482    else if (!isArrayBufferView(buf))
483      return null;
484    else
485      newlist[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength);
486  }
487
488  return newlist;
489}
490
491
492function enqueue(self, toEnqueue) {
493  const state = self[kStateSymbol];
494
495  // If the send queue hasn't been initialized yet, do it, and install an
496  // event handler that flushes the send queue after binding is done.
497  if (state.queue === undefined) {
498    state.queue = [];
499    self.once('error', onListenError);
500    self.once('listening', onListenSuccess);
501  }
502  state.queue.push(toEnqueue);
503}
504
505
506function onListenSuccess() {
507  this.removeListener('error', onListenError);
508  clearQueue.call(this);
509}
510
511
512function onListenError(err) {
513  this.removeListener('listening', onListenSuccess);
514  this[kStateSymbol].queue = undefined;
515  this.emit('error', new ERR_SOCKET_CANNOT_SEND());
516}
517
518
519function clearQueue() {
520  const state = this[kStateSymbol];
521  const queue = state.queue;
522  state.queue = undefined;
523
524  // Flush the send queue.
525  for (const queueEntry of queue)
526    queueEntry();
527}
528
529function isConnected(self) {
530  try {
531    self.remoteAddress();
532    return true;
533  } catch {
534    return false;
535  }
536}
537
538
539// valid combinations
540// For connectionless sockets
541// send(buffer, offset, length, port, address, callback)
542// send(buffer, offset, length, port, address)
543// send(buffer, offset, length, port, callback)
544// send(buffer, offset, length, port)
545// send(bufferOrList, port, address, callback)
546// send(bufferOrList, port, address)
547// send(bufferOrList, port, callback)
548// send(bufferOrList, port)
549// For connected sockets
550// send(buffer, offset, length, callback)
551// send(buffer, offset, length)
552// send(bufferOrList, callback)
553// send(bufferOrList)
554Socket.prototype.send = function(buffer,
555                                 offset,
556                                 length,
557                                 port,
558                                 address,
559                                 callback) {
560
561  let list;
562  const state = this[kStateSymbol];
563  const connected = state.connectState === CONNECT_STATE_CONNECTED;
564  if (!connected) {
565    if (address || (port && typeof port !== 'function')) {
566      buffer = sliceBuffer(buffer, offset, length);
567    } else {
568      callback = port;
569      port = offset;
570      address = length;
571    }
572  } else {
573    if (typeof length === 'number') {
574      buffer = sliceBuffer(buffer, offset, length);
575      if (typeof port === 'function') {
576        callback = port;
577        port = null;
578      }
579    } else {
580      callback = offset;
581    }
582
583    if (port || address)
584      throw new ERR_SOCKET_DGRAM_IS_CONNECTED();
585  }
586
587  if (!ArrayIsArray(buffer)) {
588    if (typeof buffer === 'string') {
589      list = [ Buffer.from(buffer) ];
590    } else if (!isArrayBufferView(buffer)) {
591      throw new ERR_INVALID_ARG_TYPE('buffer',
592                                     ['Buffer',
593                                      'TypedArray',
594                                      'DataView',
595                                      'string'],
596                                     buffer);
597    } else {
598      list = [ buffer ];
599    }
600  } else if (!(list = fixBufferList(buffer))) {
601    throw new ERR_INVALID_ARG_TYPE('buffer list arguments',
602                                   ['Buffer',
603                                    'TypedArray',
604                                    'DataView',
605                                    'string'],
606                                   buffer);
607  }
608
609  if (!connected)
610    port = validatePort(port, 'Port', { allowZero: false });
611
612  // Normalize callback so it's either a function or undefined but not anything
613  // else.
614  if (typeof callback !== 'function')
615    callback = undefined;
616
617  if (typeof address === 'function') {
618    callback = address;
619    address = undefined;
620  } else if (address && typeof address !== 'string') {
621    throw new ERR_INVALID_ARG_TYPE('address', ['string', 'falsy'], address);
622  }
623
624  healthCheck(this);
625
626  if (state.bindState === BIND_STATE_UNBOUND)
627    this.bind({ port: 0, exclusive: true }, null);
628
629  if (list.length === 0)
630    list.push(Buffer.alloc(0));
631
632  // If the socket hasn't been bound yet, push the outbound packet onto the
633  // send queue and send after binding is complete.
634  if (state.bindState !== BIND_STATE_BOUND) {
635    enqueue(this, this.send.bind(this, list, port, address, callback));
636    return;
637  }
638
639  const afterDns = (ex, ip) => {
640    defaultTriggerAsyncIdScope(
641      this[async_id_symbol],
642      doSend,
643      ex, this, ip, list, address, port, callback
644    );
645  };
646
647  if (!connected) {
648    state.handle.lookup(address, afterDns);
649  } else {
650    afterDns(null, null);
651  }
652};
653
654function doSend(ex, self, ip, list, address, port, callback) {
655  const state = self[kStateSymbol];
656
657  if (ex) {
658    if (typeof callback === 'function') {
659      process.nextTick(callback, ex);
660      return;
661    }
662
663    process.nextTick(() => self.emit('error', ex));
664    return;
665  } else if (!state.handle) {
666    return;
667  }
668
669  const req = new SendWrap();
670  req.list = list;  // Keep reference alive.
671  req.address = address;
672  req.port = port;
673  if (callback) {
674    req.callback = callback;
675    req.oncomplete = afterSend;
676  }
677
678  let err;
679  if (port)
680    err = state.handle.send(req, list, list.length, port, ip, !!callback);
681  else
682    err = state.handle.send(req, list, list.length, !!callback);
683
684  if (err >= 1) {
685    // Synchronous finish. The return code is msg_length + 1 so that we can
686    // distinguish between synchronous success and asynchronous success.
687    if (callback)
688      process.nextTick(callback, null, err - 1);
689    return;
690  }
691
692  if (err && callback) {
693    // Don't emit as error, dgram_legacy.js compatibility
694    const ex = exceptionWithHostPort(err, 'send', address, port);
695    process.nextTick(callback, ex);
696  }
697}
698
699function afterSend(err, sent) {
700  if (err) {
701    err = exceptionWithHostPort(err, 'send', this.address, this.port);
702  } else {
703    err = null;
704  }
705
706  this.callback(err, sent);
707}
708
709Socket.prototype.close = function(callback) {
710  const state = this[kStateSymbol];
711  const queue = state.queue;
712
713  if (typeof callback === 'function')
714    this.on('close', callback);
715
716  if (queue !== undefined) {
717    queue.push(this.close.bind(this));
718    return this;
719  }
720
721  healthCheck(this);
722  stopReceiving(this);
723  state.handle.close();
724  state.handle = null;
725  defaultTriggerAsyncIdScope(this[async_id_symbol],
726                             process.nextTick,
727                             socketCloseNT,
728                             this);
729
730  return this;
731};
732
733
734function socketCloseNT(self) {
735  self.emit('close');
736}
737
738
739Socket.prototype.address = function() {
740  healthCheck(this);
741
742  const out = {};
743  const err = this[kStateSymbol].handle.getsockname(out);
744  if (err) {
745    throw errnoException(err, 'getsockname');
746  }
747
748  return out;
749};
750
751Socket.prototype.remoteAddress = function() {
752  healthCheck(this);
753
754  const state = this[kStateSymbol];
755  if (state.connectState !== CONNECT_STATE_CONNECTED)
756    throw new ERR_SOCKET_DGRAM_NOT_CONNECTED();
757
758  const out = {};
759  const err = state.handle.getpeername(out);
760  if (err)
761    throw errnoException(err, 'getpeername');
762
763  return out;
764};
765
766
767Socket.prototype.setBroadcast = function(arg) {
768  const err = this[kStateSymbol].handle.setBroadcast(arg ? 1 : 0);
769  if (err) {
770    throw errnoException(err, 'setBroadcast');
771  }
772};
773
774
775Socket.prototype.setTTL = function(ttl) {
776  validateNumber(ttl, 'ttl');
777
778  const err = this[kStateSymbol].handle.setTTL(ttl);
779  if (err) {
780    throw errnoException(err, 'setTTL');
781  }
782
783  return ttl;
784};
785
786
787Socket.prototype.setMulticastTTL = function(ttl) {
788  validateNumber(ttl, 'ttl');
789
790  const err = this[kStateSymbol].handle.setMulticastTTL(ttl);
791  if (err) {
792    throw errnoException(err, 'setMulticastTTL');
793  }
794
795  return ttl;
796};
797
798
799Socket.prototype.setMulticastLoopback = function(arg) {
800  const err = this[kStateSymbol].handle.setMulticastLoopback(arg ? 1 : 0);
801  if (err) {
802    throw errnoException(err, 'setMulticastLoopback');
803  }
804
805  return arg; // 0.4 compatibility
806};
807
808
809Socket.prototype.setMulticastInterface = function(interfaceAddress) {
810  healthCheck(this);
811  validateString(interfaceAddress, 'interfaceAddress');
812
813  const err = this[kStateSymbol].handle.setMulticastInterface(interfaceAddress);
814  if (err) {
815    throw errnoException(err, 'setMulticastInterface');
816  }
817};
818
819Socket.prototype.addMembership = function(multicastAddress,
820                                          interfaceAddress) {
821  healthCheck(this);
822
823  if (!multicastAddress) {
824    throw new ERR_MISSING_ARGS('multicastAddress');
825  }
826
827  const { handle } = this[kStateSymbol];
828  const err = handle.addMembership(multicastAddress, interfaceAddress);
829  if (err) {
830    throw errnoException(err, 'addMembership');
831  }
832};
833
834
835Socket.prototype.dropMembership = function(multicastAddress,
836                                           interfaceAddress) {
837  healthCheck(this);
838
839  if (!multicastAddress) {
840    throw new ERR_MISSING_ARGS('multicastAddress');
841  }
842
843  const { handle } = this[kStateSymbol];
844  const err = handle.dropMembership(multicastAddress, interfaceAddress);
845  if (err) {
846    throw errnoException(err, 'dropMembership');
847  }
848};
849
850Socket.prototype.addSourceSpecificMembership = function(sourceAddress,
851                                                        groupAddress,
852                                                        interfaceAddress) {
853  healthCheck(this);
854
855  if (typeof sourceAddress !== 'string') {
856    throw new ERR_INVALID_ARG_TYPE('sourceAddress', 'string', sourceAddress);
857  }
858
859  if (typeof groupAddress !== 'string') {
860    throw new ERR_INVALID_ARG_TYPE('groupAddress', 'string', groupAddress);
861  }
862
863  const err =
864    this[kStateSymbol].handle.addSourceSpecificMembership(sourceAddress,
865                                                          groupAddress,
866                                                          interfaceAddress);
867  if (err) {
868    throw errnoException(err, 'addSourceSpecificMembership');
869  }
870};
871
872
873Socket.prototype.dropSourceSpecificMembership = function(sourceAddress,
874                                                         groupAddress,
875                                                         interfaceAddress) {
876  healthCheck(this);
877
878  if (typeof sourceAddress !== 'string') {
879    throw new ERR_INVALID_ARG_TYPE('sourceAddress', 'string', sourceAddress);
880  }
881
882  if (typeof groupAddress !== 'string') {
883    throw new ERR_INVALID_ARG_TYPE('groupAddress', 'string', groupAddress);
884  }
885
886  const err =
887    this[kStateSymbol].handle.dropSourceSpecificMembership(sourceAddress,
888                                                           groupAddress,
889                                                           interfaceAddress);
890  if (err) {
891    throw errnoException(err, 'dropSourceSpecificMembership');
892  }
893};
894
895
896function healthCheck(socket) {
897  if (!socket[kStateSymbol].handle) {
898    // Error message from dgram_legacy.js.
899    throw new ERR_SOCKET_DGRAM_NOT_RUNNING();
900  }
901}
902
903
904function stopReceiving(socket) {
905  const state = socket[kStateSymbol];
906
907  if (!state.receiving)
908    return;
909
910  state.handle.recvStop();
911  state.receiving = false;
912}
913
914
915function onMessage(nread, handle, buf, rinfo) {
916  const self = handle[owner_symbol];
917  if (nread < 0) {
918    return self.emit('error', errnoException(nread, 'recvmsg'));
919  }
920  rinfo.size = buf.length; // compatibility
921  self.emit('message', buf, rinfo);
922}
923
924
925Socket.prototype.ref = function() {
926  const handle = this[kStateSymbol].handle;
927
928  if (handle)
929    handle.ref();
930
931  return this;
932};
933
934
935Socket.prototype.unref = function() {
936  const handle = this[kStateSymbol].handle;
937
938  if (handle)
939    handle.unref();
940
941  return this;
942};
943
944
945Socket.prototype.setRecvBufferSize = function(size) {
946  bufferSize(this, size, RECV_BUFFER);
947};
948
949
950Socket.prototype.setSendBufferSize = function(size) {
951  bufferSize(this, size, SEND_BUFFER);
952};
953
954
955Socket.prototype.getRecvBufferSize = function() {
956  return bufferSize(this, 0, RECV_BUFFER);
957};
958
959
960Socket.prototype.getSendBufferSize = function() {
961  return bufferSize(this, 0, SEND_BUFFER);
962};
963
964
965// Deprecated private APIs.
966ObjectDefineProperty(Socket.prototype, '_handle', {
967  get: deprecate(function() {
968    return this[kStateSymbol].handle;
969  }, 'Socket.prototype._handle is deprecated', 'DEP0112'),
970  set: deprecate(function(val) {
971    this[kStateSymbol].handle = val;
972  }, 'Socket.prototype._handle is deprecated', 'DEP0112')
973});
974
975
976ObjectDefineProperty(Socket.prototype, '_receiving', {
977  get: deprecate(function() {
978    return this[kStateSymbol].receiving;
979  }, 'Socket.prototype._receiving is deprecated', 'DEP0112'),
980  set: deprecate(function(val) {
981    this[kStateSymbol].receiving = val;
982  }, 'Socket.prototype._receiving is deprecated', 'DEP0112')
983});
984
985
986ObjectDefineProperty(Socket.prototype, '_bindState', {
987  get: deprecate(function() {
988    return this[kStateSymbol].bindState;
989  }, 'Socket.prototype._bindState is deprecated', 'DEP0112'),
990  set: deprecate(function(val) {
991    this[kStateSymbol].bindState = val;
992  }, 'Socket.prototype._bindState is deprecated', 'DEP0112')
993});
994
995
996ObjectDefineProperty(Socket.prototype, '_queue', {
997  get: deprecate(function() {
998    return this[kStateSymbol].queue;
999  }, 'Socket.prototype._queue is deprecated', 'DEP0112'),
1000  set: deprecate(function(val) {
1001    this[kStateSymbol].queue = val;
1002  }, 'Socket.prototype._queue is deprecated', 'DEP0112')
1003});
1004
1005
1006ObjectDefineProperty(Socket.prototype, '_reuseAddr', {
1007  get: deprecate(function() {
1008    return this[kStateSymbol].reuseAddr;
1009  }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'),
1010  set: deprecate(function(val) {
1011    this[kStateSymbol].reuseAddr = val;
1012  }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112')
1013});
1014
1015
1016Socket.prototype._healthCheck = deprecate(function() {
1017  healthCheck(this);
1018}, 'Socket.prototype._healthCheck() is deprecated', 'DEP0112');
1019
1020
1021Socket.prototype._stopReceiving = deprecate(function() {
1022  stopReceiving(this);
1023}, 'Socket.prototype._stopReceiving() is deprecated', 'DEP0112');
1024
1025
1026// Legacy alias on the C++ wrapper object. This is not public API, so we may
1027// want to runtime-deprecate it at some point. There's no hurry, though.
1028ObjectDefineProperty(UDP.prototype, 'owner', {
1029  get() { return this[owner_symbol]; },
1030  set(v) { return this[owner_symbol] = v; }
1031});
1032
1033
1034module.exports = {
1035  _createSocketHandle: deprecate(
1036    _createSocketHandle,
1037    'dgram._createSocketHandle() is deprecated',
1038    'DEP0112'
1039  ),
1040  createSocket,
1041  Socket
1042};
1043