1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// a transform stream is a readable/writable stream where you do 23// something with the data. Sometimes it's called a "filter", 24// but that's not a great name for it, since that implies a thing where 25// some bits pass through, and others are simply ignored. (That would 26// be a valid example of a transform, of course.) 27// 28// While the output is causally related to the input, it's not a 29// necessarily symmetric or synchronous transformation. For example, 30// a zlib stream might take multiple plain-text writes(), and then 31// emit a single compressed chunk some time in the future. 32// 33// Here's how this works: 34// 35// The Transform stream has all the aspects of the readable and writable 36// stream classes. When you write(chunk), that calls _write(chunk,cb) 37// internally, and returns false if there's a lot of pending writes 38// buffered up. When you call read(), that calls _read(n) until 39// there's enough pending readable data buffered up. 40// 41// In a transform stream, the written data is placed in a buffer. When 42// _read(n) is called, it transforms the queued up data, calling the 43// buffered _write cb's as it consumes chunks. If consuming a single 44// written chunk would result in multiple output chunks, then the first 45// outputted bit calls the readcb, and subsequent chunks just go into 46// the read buffer, and will cause it to emit 'readable' if necessary. 47// 48// This way, back-pressure is actually determined by the reading side, 49// since _read has to be called to start processing a new chunk. However, 50// a pathological inflate type of transform can cause excessive buffering 51// here. For example, imagine a stream where every byte of input is 52// interpreted as an integer from 0-255, and then results in that many 53// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 54// 1kb of data being output. In this case, you could write a very small 55// amount of input, and end up with a very large amount of output. In 56// such a pathological inflating mechanism, there'd be no way to tell 57// the system to stop doing the transform. A single 4MB write could 58// cause the system to run out of memory. 59// 60// However, even in such a pathological case, only a single written chunk 61// would be consumed, and then the rest would wait (un-transformed) until 62// the results of the previous transformed chunk were consumed. 63 64'use strict'; 65 66const { 67 ObjectSetPrototypeOf, 68} = primordials; 69 70module.exports = Transform; 71const { 72 ERR_METHOD_NOT_IMPLEMENTED, 73 ERR_MULTIPLE_CALLBACK, 74 ERR_TRANSFORM_ALREADY_TRANSFORMING, 75 ERR_TRANSFORM_WITH_LENGTH_0 76} = require('internal/errors').codes; 77const Duplex = require('_stream_duplex'); 78ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); 79ObjectSetPrototypeOf(Transform, Duplex); 80 81 82function afterTransform(er, data) { 83 const ts = this._transformState; 84 ts.transforming = false; 85 86 const cb = ts.writecb; 87 88 if (cb === null) { 89 return this.emit('error', new ERR_MULTIPLE_CALLBACK()); 90 } 91 92 ts.writechunk = null; 93 ts.writecb = null; 94 95 if (data != null) // Single equals check for both `null` and `undefined` 96 this.push(data); 97 98 cb(er); 99 100 const rs = this._readableState; 101 rs.reading = false; 102 if (rs.needReadable || rs.length < rs.highWaterMark) { 103 this._read(rs.highWaterMark); 104 } 105} 106 107 108function Transform(options) { 109 if (!(this instanceof Transform)) 110 return new Transform(options); 111 112 Duplex.call(this, options); 113 114 this._transformState = { 115 afterTransform: afterTransform.bind(this), 116 needTransform: false, 117 transforming: false, 118 writecb: null, 119 writechunk: null, 120 writeencoding: null 121 }; 122 123 // We have implemented the _read method, and done the other things 124 // that Readable wants before the first _read call, so unset the 125 // sync guard flag. 126 this._readableState.sync = false; 127 128 if (options) { 129 if (typeof options.transform === 'function') 130 this._transform = options.transform; 131 132 if (typeof options.flush === 'function') 133 this._flush = options.flush; 134 } 135 136 // When the writable side finishes, then flush out anything remaining. 137 this.on('prefinish', prefinish); 138} 139 140function prefinish() { 141 if (typeof this._flush === 'function' && !this._readableState.destroyed) { 142 this._flush((er, data) => { 143 done(this, er, data); 144 }); 145 } else { 146 done(this, null, null); 147 } 148} 149 150Transform.prototype.push = function(chunk, encoding) { 151 this._transformState.needTransform = false; 152 return Duplex.prototype.push.call(this, chunk, encoding); 153}; 154 155// This is the part where you do stuff! 156// override this function in implementation classes. 157// 'chunk' is an input chunk. 158// 159// Call `push(newChunk)` to pass along transformed output 160// to the readable side. You may call 'push' zero or more times. 161// 162// Call `cb(err)` when you are done with this chunk. If you pass 163// an error, then that'll put the hurt on the whole operation. If you 164// never call cb(), then you'll never get another chunk. 165Transform.prototype._transform = function(chunk, encoding, cb) { 166 cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()')); 167}; 168 169Transform.prototype._write = function(chunk, encoding, cb) { 170 const ts = this._transformState; 171 ts.writecb = cb; 172 ts.writechunk = chunk; 173 ts.writeencoding = encoding; 174 if (!ts.transforming) { 175 var rs = this._readableState; 176 if (ts.needTransform || 177 rs.needReadable || 178 rs.length < rs.highWaterMark) 179 this._read(rs.highWaterMark); 180 } 181}; 182 183// Doesn't matter what the args are here. 184// _transform does all the work. 185// That we got here means that the readable side wants more data. 186Transform.prototype._read = function(n) { 187 const ts = this._transformState; 188 189 if (ts.writechunk !== null && !ts.transforming) { 190 ts.transforming = true; 191 this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); 192 } else { 193 // Mark that we need a transform, so that any data that comes in 194 // will get processed, now that we've asked for it. 195 ts.needTransform = true; 196 } 197}; 198 199 200Transform.prototype._destroy = function(err, cb) { 201 Duplex.prototype._destroy.call(this, err, (err2) => { 202 cb(err2); 203 }); 204}; 205 206 207function done(stream, er, data) { 208 if (er) 209 return stream.emit('error', er); 210 211 if (data != null) // Single equals check for both `null` and `undefined` 212 stream.push(data); 213 214 // These two error cases are coherence checks that can likely not be tested. 215 if (stream._writableState.length) 216 throw new ERR_TRANSFORM_WITH_LENGTH_0(); 217 218 if (stream._transformState.transforming) 219 throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); 220 return stream.push(null); 221} 222