1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23const common = require('../common'); 24const assert = require('assert'); 25const { PassThrough, Transform } = require('stream'); 26 27{ 28 // Verify writable side consumption 29 const tx = new Transform({ 30 highWaterMark: 10 31 }); 32 33 let transformed = 0; 34 tx._transform = function(chunk, encoding, cb) { 35 transformed += chunk.length; 36 tx.push(chunk); 37 cb(); 38 }; 39 40 for (let i = 1; i <= 10; i++) { 41 tx.write(Buffer.allocUnsafe(i)); 42 } 43 tx.end(); 44 45 assert.strictEqual(tx.readableLength, 10); 46 assert.strictEqual(transformed, 10); 47 assert.deepStrictEqual(tx.writableBuffer.map(function(c) { 48 return c.chunk.length; 49 }), [5, 6, 7, 8, 9, 10]); 50} 51 52{ 53 // Verify passthrough behavior 54 const pt = new PassThrough(); 55 56 pt.write(Buffer.from('foog')); 57 pt.write(Buffer.from('bark')); 58 pt.write(Buffer.from('bazy')); 59 pt.write(Buffer.from('kuel')); 60 pt.end(); 61 62 assert.strictEqual(pt.read(5).toString(), 'foogb'); 63 assert.strictEqual(pt.read(5).toString(), 'arkba'); 64 assert.strictEqual(pt.read(5).toString(), 'zykue'); 65 assert.strictEqual(pt.read(5).toString(), 'l'); 66} 67 68{ 69 // Verify object passthrough behavior 70 const pt = new PassThrough({ objectMode: true }); 71 72 pt.write(1); 73 pt.write(true); 74 pt.write(false); 75 pt.write(0); 76 pt.write('foo'); 77 pt.write(''); 78 pt.write({ a: 'b' }); 79 pt.end(); 80 81 assert.strictEqual(pt.read(), 1); 82 assert.strictEqual(pt.read(), true); 83 assert.strictEqual(pt.read(), false); 84 assert.strictEqual(pt.read(), 0); 85 assert.strictEqual(pt.read(), 'foo'); 86 assert.strictEqual(pt.read(), ''); 87 assert.deepStrictEqual(pt.read(), { a: 'b' }); 88} 89 90{ 91 // Verify passthrough constructor behavior 92 const pt = PassThrough(); 93 94 assert(pt instanceof PassThrough); 95} 96 97{ 98 // Verify transform constructor behavior 99 const pt = Transform(); 100 101 assert(pt instanceof Transform); 102} 103 104{ 105 // Perform a simple transform 106 const pt = new Transform(); 107 pt._transform = function(c, e, cb) { 108 const ret = Buffer.alloc(c.length, 'x'); 109 pt.push(ret); 110 cb(); 111 }; 112 113 pt.write(Buffer.from('foog')); 114 pt.write(Buffer.from('bark')); 115 pt.write(Buffer.from('bazy')); 116 pt.write(Buffer.from('kuel')); 117 pt.end(); 118 119 assert.strictEqual(pt.read(5).toString(), 'xxxxx'); 120 assert.strictEqual(pt.read(5).toString(), 'xxxxx'); 121 assert.strictEqual(pt.read(5).toString(), 'xxxxx'); 122 assert.strictEqual(pt.read(5).toString(), 'x'); 123} 124 125{ 126 // Verify simple object transform 127 const pt = new Transform({ objectMode: true }); 128 pt._transform = function(c, e, cb) { 129 pt.push(JSON.stringify(c)); 130 cb(); 131 }; 132 133 pt.write(1); 134 pt.write(true); 135 pt.write(false); 136 pt.write(0); 137 pt.write('foo'); 138 pt.write(''); 139 pt.write({ a: 'b' }); 140 pt.end(); 141 142 assert.strictEqual(pt.read(), '1'); 143 assert.strictEqual(pt.read(), 'true'); 144 assert.strictEqual(pt.read(), 'false'); 145 assert.strictEqual(pt.read(), '0'); 146 assert.strictEqual(pt.read(), '"foo"'); 147 assert.strictEqual(pt.read(), '""'); 148 assert.strictEqual(pt.read(), '{"a":"b"}'); 149} 150 151{ 152 // Verify async passthrough 153 const pt = new Transform(); 154 pt._transform = function(chunk, encoding, cb) { 155 setTimeout(function() { 156 pt.push(chunk); 157 cb(); 158 }, 10); 159 }; 160 161 pt.write(Buffer.from('foog')); 162 pt.write(Buffer.from('bark')); 163 pt.write(Buffer.from('bazy')); 164 pt.write(Buffer.from('kuel')); 165 pt.end(); 166 167 pt.on('finish', common.mustCall(function() { 168 assert.strictEqual(pt.read(5).toString(), 'foogb'); 169 assert.strictEqual(pt.read(5).toString(), 'arkba'); 170 assert.strictEqual(pt.read(5).toString(), 'zykue'); 171 assert.strictEqual(pt.read(5).toString(), 'l'); 172 })); 173} 174 175{ 176 // Verify asymmetric transform (expand) 177 const pt = new Transform(); 178 179 // Emit each chunk 2 times. 180 pt._transform = function(chunk, encoding, cb) { 181 setTimeout(function() { 182 pt.push(chunk); 183 setTimeout(function() { 184 pt.push(chunk); 185 cb(); 186 }, 10); 187 }, 10); 188 }; 189 190 pt.write(Buffer.from('foog')); 191 pt.write(Buffer.from('bark')); 192 pt.write(Buffer.from('bazy')); 193 pt.write(Buffer.from('kuel')); 194 pt.end(); 195 196 pt.on('finish', common.mustCall(function() { 197 assert.strictEqual(pt.read(5).toString(), 'foogf'); 198 assert.strictEqual(pt.read(5).toString(), 'oogba'); 199 assert.strictEqual(pt.read(5).toString(), 'rkbar'); 200 assert.strictEqual(pt.read(5).toString(), 'kbazy'); 201 assert.strictEqual(pt.read(5).toString(), 'bazyk'); 202 assert.strictEqual(pt.read(5).toString(), 'uelku'); 203 assert.strictEqual(pt.read(5).toString(), 'el'); 204 })); 205} 206 207{ 208 // Verify asymmetric transform (compress) 209 const pt = new Transform(); 210 211 // Each output is the first char of 3 consecutive chunks, 212 // or whatever's left. 213 pt.state = ''; 214 215 pt._transform = function(chunk, encoding, cb) { 216 if (!chunk) 217 chunk = ''; 218 const s = chunk.toString(); 219 setTimeout(() => { 220 this.state += s.charAt(0); 221 if (this.state.length === 3) { 222 pt.push(Buffer.from(this.state)); 223 this.state = ''; 224 } 225 cb(); 226 }, 10); 227 }; 228 229 pt._flush = function(cb) { 230 // Just output whatever we have. 231 pt.push(Buffer.from(this.state)); 232 this.state = ''; 233 cb(); 234 }; 235 236 pt.write(Buffer.from('aaaa')); 237 pt.write(Buffer.from('bbbb')); 238 pt.write(Buffer.from('cccc')); 239 pt.write(Buffer.from('dddd')); 240 pt.write(Buffer.from('eeee')); 241 pt.write(Buffer.from('aaaa')); 242 pt.write(Buffer.from('bbbb')); 243 pt.write(Buffer.from('cccc')); 244 pt.write(Buffer.from('dddd')); 245 pt.write(Buffer.from('eeee')); 246 pt.write(Buffer.from('aaaa')); 247 pt.write(Buffer.from('bbbb')); 248 pt.write(Buffer.from('cccc')); 249 pt.write(Buffer.from('dddd')); 250 pt.end(); 251 252 // 'abcdeabcdeabcd' 253 pt.on('finish', common.mustCall(function() { 254 assert.strictEqual(pt.read(5).toString(), 'abcde'); 255 assert.strictEqual(pt.read(5).toString(), 'abcde'); 256 assert.strictEqual(pt.read(5).toString(), 'abcd'); 257 })); 258} 259 260// This tests for a stall when data is written to a full stream 261// that has empty transforms. 262{ 263 // Verify complex transform behavior 264 let count = 0; 265 let saved = null; 266 const pt = new Transform({ highWaterMark: 3 }); 267 pt._transform = function(c, e, cb) { 268 if (count++ === 1) 269 saved = c; 270 else { 271 if (saved) { 272 pt.push(saved); 273 saved = null; 274 } 275 pt.push(c); 276 } 277 278 cb(); 279 }; 280 281 pt.once('readable', function() { 282 process.nextTick(function() { 283 pt.write(Buffer.from('d')); 284 pt.write(Buffer.from('ef'), common.mustCall(function() { 285 pt.end(); 286 })); 287 assert.strictEqual(pt.read().toString(), 'abcdef'); 288 assert.strictEqual(pt.read(), null); 289 }); 290 }); 291 292 pt.write(Buffer.from('abc')); 293} 294 295 296{ 297 // Verify passthrough event emission 298 const pt = new PassThrough(); 299 let emits = 0; 300 pt.on('readable', function() { 301 emits++; 302 }); 303 304 pt.write(Buffer.from('foog')); 305 pt.write(Buffer.from('bark')); 306 307 assert.strictEqual(emits, 0); 308 assert.strictEqual(pt.read(5).toString(), 'foogb'); 309 assert.strictEqual(String(pt.read(5)), 'null'); 310 assert.strictEqual(emits, 0); 311 312 pt.write(Buffer.from('bazy')); 313 pt.write(Buffer.from('kuel')); 314 315 assert.strictEqual(emits, 0); 316 assert.strictEqual(pt.read(5).toString(), 'arkba'); 317 assert.strictEqual(pt.read(5).toString(), 'zykue'); 318 assert.strictEqual(pt.read(5), null); 319 320 pt.end(); 321 322 assert.strictEqual(emits, 1); 323 assert.strictEqual(pt.read(5).toString(), 'l'); 324 assert.strictEqual(pt.read(5), null); 325 assert.strictEqual(emits, 1); 326} 327 328{ 329 // Verify passthrough event emission reordering 330 const pt = new PassThrough(); 331 let emits = 0; 332 pt.on('readable', function() { 333 emits++; 334 }); 335 336 pt.write(Buffer.from('foog')); 337 pt.write(Buffer.from('bark')); 338 339 assert.strictEqual(emits, 0); 340 assert.strictEqual(pt.read(5).toString(), 'foogb'); 341 assert.strictEqual(pt.read(5), null); 342 343 pt.once('readable', common.mustCall(function() { 344 assert.strictEqual(pt.read(5).toString(), 'arkba'); 345 assert.strictEqual(pt.read(5), null); 346 347 pt.once('readable', common.mustCall(function() { 348 assert.strictEqual(pt.read(5).toString(), 'zykue'); 349 assert.strictEqual(pt.read(5), null); 350 pt.once('readable', common.mustCall(function() { 351 assert.strictEqual(pt.read(5).toString(), 'l'); 352 assert.strictEqual(pt.read(5), null); 353 assert.strictEqual(emits, 3); 354 })); 355 pt.end(); 356 })); 357 pt.write(Buffer.from('kuel')); 358 })); 359 360 pt.write(Buffer.from('bazy')); 361} 362 363{ 364 // Verify passthrough facade 365 const pt = new PassThrough(); 366 const datas = []; 367 pt.on('data', function(chunk) { 368 datas.push(chunk.toString()); 369 }); 370 371 pt.on('end', common.mustCall(function() { 372 assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']); 373 })); 374 375 pt.write(Buffer.from('foog')); 376 setTimeout(function() { 377 pt.write(Buffer.from('bark')); 378 setTimeout(function() { 379 pt.write(Buffer.from('bazy')); 380 setTimeout(function() { 381 pt.write(Buffer.from('kuel')); 382 setTimeout(function() { 383 pt.end(); 384 }, 10); 385 }, 10); 386 }, 10); 387 }, 10); 388} 389 390{ 391 // Verify object transform (JSON parse) 392 const jp = new Transform({ objectMode: true }); 393 jp._transform = function(data, encoding, cb) { 394 try { 395 jp.push(JSON.parse(data)); 396 cb(); 397 } catch (er) { 398 cb(er); 399 } 400 }; 401 402 // Anything except null/undefined is fine. 403 // those are "magic" in the stream API, because they signal EOF. 404 const objects = [ 405 { foo: 'bar' }, 406 100, 407 'string', 408 { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } }, 409 ]; 410 411 let ended = false; 412 jp.on('end', function() { 413 ended = true; 414 }); 415 416 objects.forEach(function(obj) { 417 jp.write(JSON.stringify(obj)); 418 const res = jp.read(); 419 assert.deepStrictEqual(res, obj); 420 }); 421 422 jp.end(); 423 // Read one more time to get the 'end' event 424 jp.read(); 425 426 process.nextTick(common.mustCall(function() { 427 assert.strictEqual(ended, true); 428 })); 429} 430 431{ 432 // Verify object transform (JSON stringify) 433 const js = new Transform({ objectMode: true }); 434 js._transform = function(data, encoding, cb) { 435 try { 436 js.push(JSON.stringify(data)); 437 cb(); 438 } catch (er) { 439 cb(er); 440 } 441 }; 442 443 // Anything except null/undefined is fine. 444 // those are "magic" in the stream API, because they signal EOF. 445 const objects = [ 446 { foo: 'bar' }, 447 100, 448 'string', 449 { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } }, 450 ]; 451 452 let ended = false; 453 js.on('end', function() { 454 ended = true; 455 }); 456 457 objects.forEach(function(obj) { 458 js.write(obj); 459 const res = js.read(); 460 assert.strictEqual(res, JSON.stringify(obj)); 461 }); 462 463 js.end(); 464 // Read one more time to get the 'end' event 465 js.read(); 466 467 process.nextTick(common.mustCall(function() { 468 assert.strictEqual(ended, true); 469 })); 470} 471 472{ 473 const s = new Transform({ 474 objectMode: true, 475 construct(callback) { 476 this.push('header from constructor'); 477 callback(); 478 }, 479 transform: (row, encoding, callback) => { 480 callback(null, row); 481 }, 482 }); 483 484 const expected = [ 485 'header from constructor', 486 'firstLine', 487 'secondLine', 488 ]; 489 s.on('data', common.mustCall((data) => { 490 assert.strictEqual(data.toString(), expected.shift()); 491 }, 3)); 492 s.write('firstLine'); 493 process.nextTick(() => s.write('secondLine')); 494} 495