• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeIndexOf,
5  ArrayPrototypePush,
6  ArrayPrototypeSplice,
7  ObjectCreate,
8  ObjectGetPrototypeOf,
9  ObjectSetPrototypeOf,
10  SymbolHasInstance,
11} = primordials;
12
13const {
14  codes: {
15    ERR_INVALID_ARG_TYPE,
16  },
17} = require('internal/errors');
18const {
19  validateFunction,
20} = require('internal/validators');
21
22const { triggerUncaughtException } = internalBinding('errors');
23
24const { WeakReference } = internalBinding('util');
25
26// TODO(qard): should there be a C++ channel interface?
27class ActiveChannel {
28  subscribe(subscription) {
29    validateFunction(subscription, 'subscription');
30    ArrayPrototypePush(this._subscribers, subscription);
31  }
32
33  unsubscribe(subscription) {
34    const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
35    if (index === -1) return false;
36
37    ArrayPrototypeSplice(this._subscribers, index, 1);
38
39    // When there are no more active subscribers, restore to fast prototype.
40    if (!this._subscribers.length) {
41      // eslint-disable-next-line no-use-before-define
42      ObjectSetPrototypeOf(this, Channel.prototype);
43    }
44
45    return true;
46  }
47
48  get hasSubscribers() {
49    return true;
50  }
51
52  publish(data) {
53    for (let i = 0; i < this._subscribers.length; i++) {
54      try {
55        const onMessage = this._subscribers[i];
56        onMessage(data, this.name);
57      } catch (err) {
58        process.nextTick(() => {
59          triggerUncaughtException(err, false);
60        });
61      }
62    }
63  }
64}
65
66class Channel {
67  constructor(name) {
68    this._subscribers = undefined;
69    this.name = name;
70  }
71
72  static [SymbolHasInstance](instance) {
73    const prototype = ObjectGetPrototypeOf(instance);
74    return prototype === Channel.prototype ||
75           prototype === ActiveChannel.prototype;
76  }
77
78  subscribe(subscription) {
79    ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80    this._subscribers = [];
81    this.subscribe(subscription);
82  }
83
84  unsubscribe() {
85    return false;
86  }
87
88  get hasSubscribers() {
89    return false;
90  }
91
92  publish() {}
93}
94
95const channels = ObjectCreate(null);
96
97function channel(name) {
98  let channel;
99  const ref = channels[name];
100  if (ref) channel = ref.get();
101  if (channel) return channel;
102
103  if (typeof name !== 'string' && typeof name !== 'symbol') {
104    throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
105  }
106
107  channel = new Channel(name);
108  channels[name] = new WeakReference(channel);
109  return channel;
110}
111
112function subscribe(name, subscription) {
113  const chan = channel(name);
114  channels[name].incRef();
115  chan.subscribe(subscription);
116}
117
118function unsubscribe(name, subscription) {
119  const chan = channel(name);
120  if (!chan.unsubscribe(subscription)) {
121    return false;
122  }
123
124  channels[name].decRef();
125  if (channels[name].getRef() === 0) {
126    delete channels[name];
127  }
128  return true;
129}
130
131function hasSubscribers(name) {
132  let channel;
133  const ref = channels[name];
134  if (ref) channel = ref.get();
135  if (!channel) {
136    return false;
137  }
138
139  return channel.hasSubscribers;
140}
141
142module.exports = {
143  channel,
144  hasSubscribers,
145  subscribe,
146  unsubscribe,
147  Channel,
148};
149