1'use strict'; 2 3const { 4 JSONParse, 5 JSONStringify, 6 StringPrototypeSplit, 7 ArrayPrototypePush, 8 Symbol, 9 TypedArrayPrototypeSubarray, 10} = primordials; 11const { Buffer } = require('buffer'); 12const { StringDecoder } = require('string_decoder'); 13const v8 = require('v8'); 14const { isArrayBufferView } = require('internal/util/types'); 15const assert = require('internal/assert'); 16const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); 17 18const kMessageBuffer = Symbol('kMessageBuffer'); 19const kMessageBufferSize = Symbol('kMessageBufferSize'); 20const kJSONBuffer = Symbol('kJSONBuffer'); 21const kStringDecoder = Symbol('kStringDecoder'); 22 23// Extend V8's serializer APIs to give more JSON-like behaviour in 24// some cases; in particular, for native objects this serializes them the same 25// way that JSON does rather than throwing an exception. 26const kArrayBufferViewTag = 0; 27const kNotArrayBufferViewTag = 1; 28class ChildProcessSerializer extends v8.DefaultSerializer { 29 _writeHostObject(object) { 30 if (isArrayBufferView(object)) { 31 this.writeUint32(kArrayBufferViewTag); 32 return super._writeHostObject(object); 33 } 34 this.writeUint32(kNotArrayBufferViewTag); 35 this.writeValue({ ...object }); 36 } 37} 38 39class ChildProcessDeserializer extends v8.DefaultDeserializer { 40 _readHostObject() { 41 const tag = this.readUint32(); 42 if (tag === kArrayBufferViewTag) 43 return super._readHostObject(); 44 45 assert(tag === kNotArrayBufferViewTag); 46 return this.readValue(); 47 } 48} 49 50// Messages are parsed in either of the following formats: 51// - Newline-delimited JSON, or 52// - V8-serialized buffers, prefixed with their length as a big endian uint32 53// (aka 'advanced') 54const advanced = { 55 initMessageChannel(channel) { 56 channel[kMessageBuffer] = []; 57 channel[kMessageBufferSize] = 0; 58 channel.buffering = false; 59 }, 60 61 *parseChannelMessages(channel, readData) { 62 if (readData.length === 0) return; 63 64 ArrayPrototypePush(channel[kMessageBuffer], readData); 65 channel[kMessageBufferSize] += readData.length; 66 67 // Index 0 should always be present because we just pushed data into it. 68 let messageBufferHead = channel[kMessageBuffer][0]; 69 while (messageBufferHead.length >= 4) { 70 // We call `readUInt32BE` manually here, because this is faster than first converting 71 // it to a buffer and using `readUInt32BE` on that. 72 const fullMessageSize = ( 73 messageBufferHead[0] << 24 | 74 messageBufferHead[1] << 16 | 75 messageBufferHead[2] << 8 | 76 messageBufferHead[3] 77 ) + 4; 78 79 if (channel[kMessageBufferSize] < fullMessageSize) break; 80 81 const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? 82 channel[kMessageBuffer][0] : 83 Buffer.concat( 84 channel[kMessageBuffer], 85 channel[kMessageBufferSize], 86 ); 87 88 const deserializer = new ChildProcessDeserializer( 89 TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize), 90 ); 91 92 messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize); 93 channel[kMessageBufferSize] = messageBufferHead.length; 94 channel[kMessageBuffer] = 95 channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; 96 97 deserializer.readHeader(); 98 yield deserializer.readValue(); 99 } 100 101 channel.buffering = channel[kMessageBufferSize] > 0; 102 }, 103 104 writeChannelMessage(channel, req, message, handle) { 105 const ser = new ChildProcessSerializer(); 106 // Add 4 bytes, to later populate with message length 107 ser.writeRawBytes(Buffer.allocUnsafe(4)); 108 ser.writeHeader(); 109 ser.writeValue(message); 110 111 const serializedMessage = ser.releaseBuffer(); 112 const serializedMessageLength = serializedMessage.length - 4; 113 114 serializedMessage.set([ 115 serializedMessageLength >> 24 & 0xFF, 116 serializedMessageLength >> 16 & 0xFF, 117 serializedMessageLength >> 8 & 0xFF, 118 serializedMessageLength & 0xFF, 119 ], 0); 120 121 const result = channel.writeBuffer(req, serializedMessage, handle); 122 123 // Mirror what stream_base_commons.js does for Buffer retention. 124 if (streamBaseState[kLastWriteWasAsync]) 125 req.buffer = serializedMessage; 126 127 return result; 128 }, 129}; 130 131const json = { 132 initMessageChannel(channel) { 133 channel[kJSONBuffer] = ''; 134 channel[kStringDecoder] = undefined; 135 }, 136 137 *parseChannelMessages(channel, readData) { 138 if (readData.length === 0) return; 139 140 if (channel[kStringDecoder] === undefined) 141 channel[kStringDecoder] = new StringDecoder('utf8'); 142 const chunks = 143 StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n'); 144 const numCompleteChunks = chunks.length - 1; 145 // Last line does not have trailing linebreak 146 const incompleteChunk = chunks[numCompleteChunks]; 147 if (numCompleteChunks === 0) { 148 channel[kJSONBuffer] += incompleteChunk; 149 } else { 150 chunks[0] = channel[kJSONBuffer] + chunks[0]; 151 for (let i = 0; i < numCompleteChunks; i++) 152 yield JSONParse(chunks[i]); 153 channel[kJSONBuffer] = incompleteChunk; 154 } 155 channel.buffering = channel[kJSONBuffer].length !== 0; 156 }, 157 158 writeChannelMessage(channel, req, message, handle) { 159 const string = JSONStringify(message) + '\n'; 160 return channel.writeUtf8String(req, string, handle); 161 }, 162}; 163 164module.exports = { advanced, json }; 165