1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// a transform stream is a readable/writable stream where you do 23// something with the data. Sometimes it's called a "filter", 24// but that's not a great name for it, since that implies a thing where 25// some bits pass through, and others are simply ignored. (That would 26// be a valid example of a transform, of course.) 27// 28// While the output is causally related to the input, it's not a 29// necessarily symmetric or synchronous transformation. For example, 30// a zlib stream might take multiple plain-text writes(), and then 31// emit a single compressed chunk some time in the future. 32// 33// Here's how this works: 34// 35// The Transform stream has all the aspects of the readable and writable 36// stream classes. When you write(chunk), that calls _write(chunk,cb) 37// internally, and returns false if there's a lot of pending writes 38// buffered up. When you call read(), that calls _read(n) until 39// there's enough pending readable data buffered up. 40// 41// In a transform stream, the written data is placed in a buffer. When 42// _read(n) is called, it transforms the queued up data, calling the 43// buffered _write cb's as it consumes chunks. If consuming a single 44// written chunk would result in multiple output chunks, then the first 45// outputted bit calls the readcb, and subsequent chunks just go into 46// the read buffer, and will cause it to emit 'readable' if necessary. 47// 48// This way, back-pressure is actually determined by the reading side, 49// since _read has to be called to start processing a new chunk. However, 50// a pathological inflate type of transform can cause excessive buffering 51// here. For example, imagine a stream where every byte of input is 52// interpreted as an integer from 0-255, and then results in that many 53// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 54// 1kb of data being output. In this case, you could write a very small 55// amount of input, and end up with a very large amount of output. In 56// such a pathological inflating mechanism, there'd be no way to tell 57// the system to stop doing the transform. A single 4MB write could 58// cause the system to run out of memory. 59// 60// However, even in such a pathological case, only a single written chunk 61// would be consumed, and then the rest would wait (un-transformed) until 62// the results of the previous transformed chunk were consumed. 63 64'use strict'; 65 66const { 67 ObjectDefineProperty, 68 ObjectSetPrototypeOf, 69 Symbol 70} = primordials; 71 72module.exports = Transform; 73const { 74 ERR_METHOD_NOT_IMPLEMENTED, 75 ERR_MULTIPLE_CALLBACK, 76 ERR_TRANSFORM_ALREADY_TRANSFORMING, 77 ERR_TRANSFORM_WITH_LENGTH_0 78} = require('internal/errors').codes; 79const Duplex = require('internal/streams/duplex'); 80const internalUtil = require('internal/util'); 81 82ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); 83ObjectSetPrototypeOf(Transform, Duplex); 84 85const kTransformState = Symbol('kTransformState'); 86 87function afterTransform(er, data) { 88 const ts = this[kTransformState]; 89 ts.transforming = false; 90 91 const cb = ts.writecb; 92 93 if (cb === null) { 94 return this.emit('error', new ERR_MULTIPLE_CALLBACK()); 95 } 96 97 ts.writechunk = null; 98 ts.writecb = null; 99 100 if (data != null) // Single equals check for both `null` and `undefined` 101 this.push(data); 102 103 cb(er); 104 105 const rs = this._readableState; 106 rs.reading = false; 107 if (rs.needReadable || rs.length < rs.highWaterMark) { 108 this._read(rs.highWaterMark); 109 } 110} 111 112 113function Transform(options) { 114 if (!(this instanceof Transform)) 115 return new Transform(options); 116 117 Duplex.call(this, options); 118 119 this[kTransformState] = { 120 afterTransform: afterTransform.bind(this), 121 needTransform: false, 122 transforming: false, 123 writecb: null, 124 writechunk: null, 125 writeencoding: null 126 }; 127 128 // We have implemented the _read method, and done the other things 129 // that Readable wants before the first _read call, so unset the 130 // sync guard flag. 131 this._readableState.sync = false; 132 133 if (options) { 134 if (typeof options.transform === 'function') 135 this._transform = options.transform; 136 137 if (typeof options.flush === 'function') 138 this._flush = options.flush; 139 } 140 141 // When the writable side finishes, then flush out anything remaining. 142 this.on('prefinish', prefinish); 143} 144 145function prefinish() { 146 if (typeof this._flush === 'function' && !this._readableState.destroyed) { 147 this._flush((er, data) => { 148 done(this, er, data); 149 }); 150 } else { 151 done(this, null, null); 152 } 153} 154 155ObjectDefineProperty(Transform.prototype, '_transformState', { 156 get: internalUtil.deprecate(function() { 157 return this[kTransformState]; 158 }, 'Transform.prototype._transformState is deprecated', 'DEP0143'), 159 set: internalUtil.deprecate(function(val) { 160 this[kTransformState] = val; 161 }, 'Transform.prototype._transformState is deprecated', 'DEP0143') 162}); 163 164Transform.prototype.push = function(chunk, encoding) { 165 this[kTransformState].needTransform = false; 166 return Duplex.prototype.push.call(this, chunk, encoding); 167}; 168 169// This is the part where you do stuff! 170// override this function in implementation classes. 171// 'chunk' is an input chunk. 172// 173// Call `push(newChunk)` to pass along transformed output 174// to the readable side. You may call 'push' zero or more times. 175// 176// Call `cb(err)` when you are done with this chunk. If you pass 177// an error, then that'll put the hurt on the whole operation. If you 178// never call cb(), then you'll never get another chunk. 179Transform.prototype._transform = function(chunk, encoding, cb) { 180 throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); 181}; 182 183Transform.prototype._write = function(chunk, encoding, cb) { 184 const ts = this[kTransformState]; 185 ts.writecb = cb; 186 ts.writechunk = chunk; 187 ts.writeencoding = encoding; 188 if (!ts.transforming) { 189 const rs = this._readableState; 190 if (ts.needTransform || 191 rs.needReadable || 192 rs.length < rs.highWaterMark) 193 this._read(rs.highWaterMark); 194 } 195}; 196 197// Doesn't matter what the args are here. 198// _transform does all the work. 199// That we got here means that the readable side wants more data. 200Transform.prototype._read = function(n) { 201 const ts = this[kTransformState]; 202 203 if (ts.writechunk !== null && !ts.transforming) { 204 ts.transforming = true; 205 this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); 206 } else { 207 // Mark that we need a transform, so that any data that comes in 208 // will get processed, now that we've asked for it. 209 ts.needTransform = true; 210 } 211}; 212 213 214Transform.prototype._destroy = function(err, cb) { 215 Duplex.prototype._destroy.call(this, err, (err2) => { 216 cb(err2); 217 }); 218}; 219 220 221function done(stream, er, data) { 222 if (er) 223 return stream.emit('error', er); 224 225 if (data != null) // Single equals check for both `null` and `undefined` 226 stream.push(data); 227 228 // These two error cases are coherence checks that can likely not be tested. 229 if (stream._writableState.length) 230 throw new ERR_TRANSFORM_WITH_LENGTH_0(); 231 232 if (stream[kTransformState].transforming) 233 throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); 234 return stream.push(null); 235} 236