• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  JSONParse,
5  JSONStringify,
6  StringPrototypeSplit,
7  Symbol,
8  TypedArrayPrototypeSubarray,
9} = primordials;
10const { Buffer } = require('buffer');
11const { StringDecoder } = require('string_decoder');
12const v8 = require('v8');
13const { isArrayBufferView } = require('internal/util/types');
14const assert = require('internal/assert');
15const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');
16
17const kMessageBuffer = Symbol('kMessageBuffer');
18const kJSONBuffer = Symbol('kJSONBuffer');
19const kStringDecoder = Symbol('kStringDecoder');
20
21// Extend V8's serializer APIs to give more JSON-like behaviour in
22// some cases; in particular, for native objects this serializes them the same
23// way that JSON does rather than throwing an exception.
24const kArrayBufferViewTag = 0;
25const kNotArrayBufferViewTag = 1;
26class ChildProcessSerializer extends v8.DefaultSerializer {
27  _writeHostObject(object) {
28    if (isArrayBufferView(object)) {
29      this.writeUint32(kArrayBufferViewTag);
30      return super._writeHostObject(object);
31    }
32    this.writeUint32(kNotArrayBufferViewTag);
33    this.writeValue({ ...object });
34  }
35}
36
37class ChildProcessDeserializer extends v8.DefaultDeserializer {
38  _readHostObject() {
39    const tag = this.readUint32();
40    if (tag === kArrayBufferViewTag)
41      return super._readHostObject();
42
43    assert(tag === kNotArrayBufferViewTag);
44    return this.readValue();
45  }
46}
47
48// Messages are parsed in either of the following formats:
49// - Newline-delimited JSON, or
50// - V8-serialized buffers, prefixed with their length as a big endian uint32
51//   (aka 'advanced')
52const advanced = {
53  initMessageChannel(channel) {
54    channel[kMessageBuffer] = Buffer.alloc(0);
55    channel.buffering = false;
56  },
57
58  *parseChannelMessages(channel, readData) {
59    if (readData.length === 0) return;
60
61    let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
62    while (messageBuffer.length > 4) {
63      const size = messageBuffer.readUInt32BE();
64      if (messageBuffer.length < 4 + size) {
65        break;
66      }
67
68      const deserializer = new ChildProcessDeserializer(
69        TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size));
70      messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size);
71
72      deserializer.readHeader();
73      yield deserializer.readValue();
74    }
75    channel[kMessageBuffer] = messageBuffer;
76    channel.buffering = messageBuffer.length > 0;
77  },
78
79  writeChannelMessage(channel, req, message, handle) {
80    const ser = new ChildProcessSerializer();
81    ser.writeHeader();
82    ser.writeValue(message);
83    const serializedMessage = ser.releaseBuffer();
84    const sizeBuffer = Buffer.allocUnsafe(4);
85    sizeBuffer.writeUInt32BE(serializedMessage.length);
86
87    const buffer = Buffer.concat([
88      sizeBuffer,
89      serializedMessage,
90    ]);
91    const result = channel.writeBuffer(req, buffer, handle);
92    // Mirror what stream_base_commons.js does for Buffer retention.
93    if (streamBaseState[kLastWriteWasAsync])
94      req.buffer = buffer;
95    return result;
96  },
97};
98
99const json = {
100  initMessageChannel(channel) {
101    channel[kJSONBuffer] = '';
102    channel[kStringDecoder] = undefined;
103  },
104
105  *parseChannelMessages(channel, readData) {
106    if (readData.length === 0) return;
107
108    if (channel[kStringDecoder] === undefined)
109      channel[kStringDecoder] = new StringDecoder('utf8');
110    const chunks =
111      StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n');
112    const numCompleteChunks = chunks.length - 1;
113    // Last line does not have trailing linebreak
114    const incompleteChunk = chunks[numCompleteChunks];
115    if (numCompleteChunks === 0) {
116      channel[kJSONBuffer] += incompleteChunk;
117    } else {
118      chunks[0] = channel[kJSONBuffer] + chunks[0];
119      for (let i = 0; i < numCompleteChunks; i++)
120        yield JSONParse(chunks[i]);
121      channel[kJSONBuffer] = incompleteChunk;
122    }
123    channel.buffering = channel[kJSONBuffer].length !== 0;
124  },
125
126  writeChannelMessage(channel, req, message, handle) {
127    const string = JSONStringify(message) + '\n';
128    return channel.writeUtf8String(req, string, handle);
129  },
130};
131
132module.exports = { advanced, json };
133