• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  JSONParse,
5  JSONStringify,
6  StringPrototypeSplit,
7  ArrayPrototypePush,
8  Symbol,
9  TypedArrayPrototypeSubarray,
10} = primordials;
11const { Buffer } = require('buffer');
12const { StringDecoder } = require('string_decoder');
13const v8 = require('v8');
14const { isArrayBufferView } = require('internal/util/types');
15const assert = require('internal/assert');
16const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');
17
18const kMessageBuffer = Symbol('kMessageBuffer');
19const kMessageBufferSize = Symbol('kMessageBufferSize');
20const kJSONBuffer = Symbol('kJSONBuffer');
21const kStringDecoder = Symbol('kStringDecoder');
22
23// Extend V8's serializer APIs to give more JSON-like behaviour in
24// some cases; in particular, for native objects this serializes them the same
25// way that JSON does rather than throwing an exception.
26const kArrayBufferViewTag = 0;
27const kNotArrayBufferViewTag = 1;
28class ChildProcessSerializer extends v8.DefaultSerializer {
29  _writeHostObject(object) {
30    if (isArrayBufferView(object)) {
31      this.writeUint32(kArrayBufferViewTag);
32      return super._writeHostObject(object);
33    }
34    this.writeUint32(kNotArrayBufferViewTag);
35    this.writeValue({ ...object });
36  }
37}
38
39class ChildProcessDeserializer extends v8.DefaultDeserializer {
40  _readHostObject() {
41    const tag = this.readUint32();
42    if (tag === kArrayBufferViewTag)
43      return super._readHostObject();
44
45    assert(tag === kNotArrayBufferViewTag);
46    return this.readValue();
47  }
48}
49
50// Messages are parsed in either of the following formats:
51// - Newline-delimited JSON, or
52// - V8-serialized buffers, prefixed with their length as a big endian uint32
53//   (aka 'advanced')
54const advanced = {
55  initMessageChannel(channel) {
56    channel[kMessageBuffer] = [];
57    channel[kMessageBufferSize] = 0;
58    channel.buffering = false;
59  },
60
61  *parseChannelMessages(channel, readData) {
62    if (readData.length === 0) return;
63
64    ArrayPrototypePush(channel[kMessageBuffer], readData);
65    channel[kMessageBufferSize] += readData.length;
66
67    // Index 0 should always be present because we just pushed data into it.
68    let messageBufferHead = channel[kMessageBuffer][0];
69    while (messageBufferHead.length >= 4) {
70      // We call `readUInt32BE` manually here, because this is faster than first converting
71      // it to a buffer and using `readUInt32BE` on that.
72      const fullMessageSize = (
73        messageBufferHead[0] << 24 |
74        messageBufferHead[1] << 16 |
75        messageBufferHead[2] << 8 |
76        messageBufferHead[3]
77      ) + 4;
78
79      if (channel[kMessageBufferSize] < fullMessageSize) break;
80
81      const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
82        channel[kMessageBuffer][0] :
83        Buffer.concat(
84          channel[kMessageBuffer],
85          channel[kMessageBufferSize],
86        );
87
88      const deserializer = new ChildProcessDeserializer(
89        TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize),
90      );
91
92      messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
93      channel[kMessageBufferSize] = messageBufferHead.length;
94      channel[kMessageBuffer] =
95        channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];
96
97      deserializer.readHeader();
98      yield deserializer.readValue();
99    }
100
101    channel.buffering = channel[kMessageBufferSize] > 0;
102  },
103
104  writeChannelMessage(channel, req, message, handle) {
105    const ser = new ChildProcessSerializer();
106    // Add 4 bytes, to later populate with message length
107    ser.writeRawBytes(Buffer.allocUnsafe(4));
108    ser.writeHeader();
109    ser.writeValue(message);
110
111    const serializedMessage = ser.releaseBuffer();
112    const serializedMessageLength = serializedMessage.length - 4;
113
114    serializedMessage.set([
115      serializedMessageLength >> 24 & 0xFF,
116      serializedMessageLength >> 16 & 0xFF,
117      serializedMessageLength >> 8 & 0xFF,
118      serializedMessageLength & 0xFF,
119    ], 0);
120
121    const result = channel.writeBuffer(req, serializedMessage, handle);
122
123    // Mirror what stream_base_commons.js does for Buffer retention.
124    if (streamBaseState[kLastWriteWasAsync])
125      req.buffer = serializedMessage;
126
127    return result;
128  },
129};
130
131const json = {
132  initMessageChannel(channel) {
133    channel[kJSONBuffer] = '';
134    channel[kStringDecoder] = undefined;
135  },
136
137  *parseChannelMessages(channel, readData) {
138    if (readData.length === 0) return;
139
140    if (channel[kStringDecoder] === undefined)
141      channel[kStringDecoder] = new StringDecoder('utf8');
142    const chunks =
143      StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n');
144    const numCompleteChunks = chunks.length - 1;
145    // Last line does not have trailing linebreak
146    const incompleteChunk = chunks[numCompleteChunks];
147    if (numCompleteChunks === 0) {
148      channel[kJSONBuffer] += incompleteChunk;
149    } else {
150      chunks[0] = channel[kJSONBuffer] + chunks[0];
151      for (let i = 0; i < numCompleteChunks; i++)
152        yield JSONParse(chunks[i]);
153      channel[kJSONBuffer] = incompleteChunk;
154    }
155    channel.buffering = channel[kJSONBuffer].length !== 0;
156  },
157
158  writeChannelMessage(channel, req, message, handle) {
159    const string = JSONStringify(message) + '\n';
160    return channel.writeUtf8String(req, string, handle);
161  },
162};
163
164module.exports = { advanced, json };
165