• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeIndexOf,
5  ArrayPrototypePush,
6  ArrayPrototypeSplice,
7  ObjectCreate,
8  ObjectGetPrototypeOf,
9  ObjectSetPrototypeOf,
10  SymbolHasInstance,
11} = primordials;
12
13const {
14  codes: {
15    ERR_INVALID_ARG_TYPE,
16  }
17} = require('internal/errors');
18
19const { triggerUncaughtException } = internalBinding('errors');
20
21const { WeakReference } = internalBinding('util');
22
23// TODO(qard): should there be a C++ channel interface?
24class ActiveChannel {
25  subscribe(subscription) {
26    if (typeof subscription !== 'function') {
27      throw new ERR_INVALID_ARG_TYPE('subscription', ['function'],
28                                     subscription);
29    }
30    ArrayPrototypePush(this._subscribers, subscription);
31  }
32
33  unsubscribe(subscription) {
34    const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
35    if (index === -1) return false;
36
37    ArrayPrototypeSplice(this._subscribers, index, 1);
38
39    // When there are no more active subscribers, restore to fast prototype.
40    if (!this._subscribers.length) {
41      // eslint-disable-next-line no-use-before-define
42      ObjectSetPrototypeOf(this, Channel.prototype);
43    }
44
45    return true;
46  }
47
48  get hasSubscribers() {
49    return true;
50  }
51
52  publish(data) {
53    for (let i = 0; i < this._subscribers.length; i++) {
54      try {
55        const onMessage = this._subscribers[i];
56        onMessage(data, this.name);
57      } catch (err) {
58        process.nextTick(() => {
59          triggerUncaughtException(err, false);
60        });
61      }
62    }
63  }
64}
65
66class Channel {
67  constructor(name) {
68    this._subscribers = undefined;
69    this.name = name;
70  }
71
72  static [SymbolHasInstance](instance) {
73    const prototype = ObjectGetPrototypeOf(instance);
74    return prototype === Channel.prototype ||
75           prototype === ActiveChannel.prototype;
76  }
77
78  subscribe(subscription) {
79    ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80    this._subscribers = [];
81    this.subscribe(subscription);
82  }
83
84  unsubscribe() {
85    return false;
86  }
87
88  get hasSubscribers() {
89    return false;
90  }
91
92  publish() {}
93}
94
95const channels = ObjectCreate(null);
96
97function channel(name) {
98  let channel;
99  const ref = channels[name];
100  if (ref) channel = ref.get();
101  if (channel) return channel;
102
103  if (typeof name !== 'string' && typeof name !== 'symbol') {
104    throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
105  }
106
107  channel = new Channel(name);
108  channels[name] = new WeakReference(channel);
109  return channel;
110}
111
112function hasSubscribers(name) {
113  let channel;
114  const ref = channels[name];
115  if (ref) channel = ref.get();
116  if (!channel) {
117    return false;
118  }
119
120  return channel.hasSubscribers;
121}
122
123module.exports = {
124  channel,
125  hasSubscribers,
126  Channel
127};
128