1'use strict'; 2 3const { 4 JSONParse, 5 JSONStringify, 6 StringPrototypeSplit, 7 Symbol, 8 TypedArrayPrototypeSubarray, 9} = primordials; 10const { Buffer } = require('buffer'); 11const { StringDecoder } = require('string_decoder'); 12const v8 = require('v8'); 13const { isArrayBufferView } = require('internal/util/types'); 14const assert = require('internal/assert'); 15const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); 16 17const kMessageBuffer = Symbol('kMessageBuffer'); 18const kJSONBuffer = Symbol('kJSONBuffer'); 19const kStringDecoder = Symbol('kStringDecoder'); 20 21// Extend V8's serializer APIs to give more JSON-like behaviour in 22// some cases; in particular, for native objects this serializes them the same 23// way that JSON does rather than throwing an exception. 24const kArrayBufferViewTag = 0; 25const kNotArrayBufferViewTag = 1; 26class ChildProcessSerializer extends v8.DefaultSerializer { 27 _writeHostObject(object) { 28 if (isArrayBufferView(object)) { 29 this.writeUint32(kArrayBufferViewTag); 30 return super._writeHostObject(object); 31 } 32 this.writeUint32(kNotArrayBufferViewTag); 33 this.writeValue({ ...object }); 34 } 35} 36 37class ChildProcessDeserializer extends v8.DefaultDeserializer { 38 _readHostObject() { 39 const tag = this.readUint32(); 40 if (tag === kArrayBufferViewTag) 41 return super._readHostObject(); 42 43 assert(tag === kNotArrayBufferViewTag); 44 return this.readValue(); 45 } 46} 47 48// Messages are parsed in either of the following formats: 49// - Newline-delimited JSON, or 50// - V8-serialized buffers, prefixed with their length as a big endian uint32 51// (aka 'advanced') 52const advanced = { 53 initMessageChannel(channel) { 54 channel[kMessageBuffer] = Buffer.alloc(0); 55 channel.buffering = false; 56 }, 57 58 *parseChannelMessages(channel, readData) { 59 if (readData.length === 0) return; 60 61 let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]); 62 while (messageBuffer.length > 4) { 63 const size = messageBuffer.readUInt32BE(); 64 if (messageBuffer.length < 4 + size) { 65 break; 66 } 67 68 const deserializer = new ChildProcessDeserializer( 69 TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size)); 70 messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size); 71 72 deserializer.readHeader(); 73 yield deserializer.readValue(); 74 } 75 channel[kMessageBuffer] = messageBuffer; 76 channel.buffering = messageBuffer.length > 0; 77 }, 78 79 writeChannelMessage(channel, req, message, handle) { 80 const ser = new ChildProcessSerializer(); 81 ser.writeHeader(); 82 ser.writeValue(message); 83 const serializedMessage = ser.releaseBuffer(); 84 const sizeBuffer = Buffer.allocUnsafe(4); 85 sizeBuffer.writeUInt32BE(serializedMessage.length); 86 87 const buffer = Buffer.concat([ 88 sizeBuffer, 89 serializedMessage, 90 ]); 91 const result = channel.writeBuffer(req, buffer, handle); 92 // Mirror what stream_base_commons.js does for Buffer retention. 93 if (streamBaseState[kLastWriteWasAsync]) 94 req.buffer = buffer; 95 return result; 96 }, 97}; 98 99const json = { 100 initMessageChannel(channel) { 101 channel[kJSONBuffer] = ''; 102 channel[kStringDecoder] = undefined; 103 }, 104 105 *parseChannelMessages(channel, readData) { 106 if (readData.length === 0) return; 107 108 if (channel[kStringDecoder] === undefined) 109 channel[kStringDecoder] = new StringDecoder('utf8'); 110 const chunks = 111 StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n'); 112 const numCompleteChunks = chunks.length - 1; 113 // Last line does not have trailing linebreak 114 const incompleteChunk = chunks[numCompleteChunks]; 115 if (numCompleteChunks === 0) { 116 channel[kJSONBuffer] += incompleteChunk; 117 } else { 118 chunks[0] = channel[kJSONBuffer] + chunks[0]; 119 for (let i = 0; i < numCompleteChunks; i++) 120 yield JSONParse(chunks[i]); 121 channel[kJSONBuffer] = incompleteChunk; 122 } 123 channel.buffering = channel[kJSONBuffer].length !== 0; 124 }, 125 126 writeChannelMessage(channel, req, message, handle) { 127 const string = JSONStringify(message) + '\n'; 128 return channel.writeUtf8String(req, string, handle); 129 }, 130}; 131 132module.exports = { advanced, json }; 133