1'use strict'; 2 3const { 4 ArrayPrototypeIndexOf, 5 ArrayPrototypePush, 6 ArrayPrototypeSplice, 7 ObjectCreate, 8 ObjectGetPrototypeOf, 9 ObjectSetPrototypeOf, 10 SymbolHasInstance, 11} = primordials; 12 13const { 14 codes: { 15 ERR_INVALID_ARG_TYPE, 16 }, 17} = require('internal/errors'); 18const { 19 validateFunction, 20} = require('internal/validators'); 21 22const { triggerUncaughtException } = internalBinding('errors'); 23 24const { WeakReference } = internalBinding('util'); 25 26// TODO(qard): should there be a C++ channel interface? 27class ActiveChannel { 28 subscribe(subscription) { 29 validateFunction(subscription, 'subscription'); 30 ArrayPrototypePush(this._subscribers, subscription); 31 } 32 33 unsubscribe(subscription) { 34 const index = ArrayPrototypeIndexOf(this._subscribers, subscription); 35 if (index === -1) return false; 36 37 ArrayPrototypeSplice(this._subscribers, index, 1); 38 39 // When there are no more active subscribers, restore to fast prototype. 40 if (!this._subscribers.length) { 41 // eslint-disable-next-line no-use-before-define 42 ObjectSetPrototypeOf(this, Channel.prototype); 43 } 44 45 return true; 46 } 47 48 get hasSubscribers() { 49 return true; 50 } 51 52 publish(data) { 53 for (let i = 0; i < this._subscribers.length; i++) { 54 try { 55 const onMessage = this._subscribers[i]; 56 onMessage(data, this.name); 57 } catch (err) { 58 process.nextTick(() => { 59 triggerUncaughtException(err, false); 60 }); 61 } 62 } 63 } 64} 65 66class Channel { 67 constructor(name) { 68 this._subscribers = undefined; 69 this.name = name; 70 } 71 72 static [SymbolHasInstance](instance) { 73 const prototype = ObjectGetPrototypeOf(instance); 74 return prototype === Channel.prototype || 75 prototype === ActiveChannel.prototype; 76 } 77 78 subscribe(subscription) { 79 ObjectSetPrototypeOf(this, ActiveChannel.prototype); 80 this._subscribers = []; 81 this.subscribe(subscription); 82 } 83 84 unsubscribe() { 85 return false; 86 } 87 88 get hasSubscribers() { 89 return false; 90 } 91 92 publish() {} 93} 94 95const channels = ObjectCreate(null); 96 97function channel(name) { 98 let channel; 99 const ref = channels[name]; 100 if (ref) channel = ref.get(); 101 if (channel) return channel; 102 103 if (typeof name !== 'string' && typeof name !== 'symbol') { 104 throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); 105 } 106 107 channel = new Channel(name); 108 channels[name] = new WeakReference(channel); 109 return channel; 110} 111 112function subscribe(name, subscription) { 113 const chan = channel(name); 114 channels[name].incRef(); 115 chan.subscribe(subscription); 116} 117 118function unsubscribe(name, subscription) { 119 const chan = channel(name); 120 if (!chan.unsubscribe(subscription)) { 121 return false; 122 } 123 124 channels[name].decRef(); 125 if (channels[name].getRef() === 0) { 126 delete channels[name]; 127 } 128 return true; 129} 130 131function hasSubscribers(name) { 132 let channel; 133 const ref = channels[name]; 134 if (ref) channel = ref.get(); 135 if (!channel) { 136 return false; 137 } 138 139 return channel.hasSubscribers; 140} 141 142module.exports = { 143 channel, 144 hasSubscribers, 145 subscribe, 146 unsubscribe, 147 Channel, 148}; 149