1'use strict'; 2 3const { 4 ArrayPrototypeIndexOf, 5 ArrayPrototypePush, 6 ArrayPrototypeSplice, 7 ObjectCreate, 8 ObjectGetPrototypeOf, 9 ObjectSetPrototypeOf, 10 SymbolHasInstance, 11} = primordials; 12 13const { 14 codes: { 15 ERR_INVALID_ARG_TYPE, 16 } 17} = require('internal/errors'); 18 19const { triggerUncaughtException } = internalBinding('errors'); 20 21const { WeakReference } = internalBinding('util'); 22 23// TODO(qard): should there be a C++ channel interface? 24class ActiveChannel { 25 subscribe(subscription) { 26 if (typeof subscription !== 'function') { 27 throw new ERR_INVALID_ARG_TYPE('subscription', ['function'], 28 subscription); 29 } 30 ArrayPrototypePush(this._subscribers, subscription); 31 } 32 33 unsubscribe(subscription) { 34 const index = ArrayPrototypeIndexOf(this._subscribers, subscription); 35 if (index === -1) return false; 36 37 ArrayPrototypeSplice(this._subscribers, index, 1); 38 39 // When there are no more active subscribers, restore to fast prototype. 40 if (!this._subscribers.length) { 41 // eslint-disable-next-line no-use-before-define 42 ObjectSetPrototypeOf(this, Channel.prototype); 43 } 44 45 return true; 46 } 47 48 get hasSubscribers() { 49 return true; 50 } 51 52 publish(data) { 53 for (let i = 0; i < this._subscribers.length; i++) { 54 try { 55 const onMessage = this._subscribers[i]; 56 onMessage(data, this.name); 57 } catch (err) { 58 process.nextTick(() => { 59 triggerUncaughtException(err, false); 60 }); 61 } 62 } 63 } 64} 65 66class Channel { 67 constructor(name) { 68 this._subscribers = undefined; 69 this.name = name; 70 } 71 72 static [SymbolHasInstance](instance) { 73 const prototype = ObjectGetPrototypeOf(instance); 74 return prototype === Channel.prototype || 75 prototype === ActiveChannel.prototype; 76 } 77 78 subscribe(subscription) { 79 ObjectSetPrototypeOf(this, ActiveChannel.prototype); 80 this._subscribers = []; 81 this.subscribe(subscription); 82 } 83 84 unsubscribe() { 85 return false; 86 } 87 88 get hasSubscribers() { 89 return false; 90 } 91 92 publish() {} 93} 94 95const channels = ObjectCreate(null); 96 97function channel(name) { 98 let channel; 99 const ref = channels[name]; 100 if (ref) channel = ref.get(); 101 if (channel) return channel; 102 103 if (typeof name !== 'string' && typeof name !== 'symbol') { 104 throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); 105 } 106 107 channel = new Channel(name); 108 channels[name] = new WeakReference(channel); 109 return channel; 110} 111 112function hasSubscribers(name) { 113 let channel; 114 const ref = channels[name]; 115 if (ref) channel = ref.get(); 116 if (!channel) { 117 return false; 118 } 119 120 return channel.hasSubscribers; 121} 122 123module.exports = { 124 channel, 125 hasSubscribers, 126 Channel 127}; 128