1// Flags: --expose-internals 2 3'use strict'; 4 5const common = require('../common'); 6const { 7 Readable, 8 Transform, 9 Writable, 10 finished, 11 PassThrough 12} = require('stream'); 13const compose = require('internal/streams/compose'); 14const assert = require('assert'); 15 16{ 17 let res = ''; 18 compose( 19 new Transform({ 20 transform: common.mustCall((chunk, encoding, callback) => { 21 callback(null, chunk + chunk); 22 }) 23 }), 24 new Transform({ 25 transform: common.mustCall((chunk, encoding, callback) => { 26 callback(null, chunk.toString().toUpperCase()); 27 }) 28 }) 29 ) 30 .end('asd') 31 .on('data', common.mustCall((buf) => { 32 res += buf; 33 })) 34 .on('end', common.mustCall(() => { 35 assert.strictEqual(res, 'ASDASD'); 36 })); 37} 38 39{ 40 let res = ''; 41 compose( 42 async function*(source) { 43 for await (const chunk of source) { 44 yield chunk + chunk; 45 } 46 }, 47 async function*(source) { 48 for await (const chunk of source) { 49 yield chunk.toString().toUpperCase(); 50 } 51 } 52 ) 53 .end('asd') 54 .on('data', common.mustCall((buf) => { 55 res += buf; 56 })) 57 .on('end', common.mustCall(() => { 58 assert.strictEqual(res, 'ASDASD'); 59 })); 60} 61 62{ 63 let res = ''; 64 compose( 65 async function*(source) { 66 for await (const chunk of source) { 67 yield chunk + chunk; 68 } 69 } 70 ) 71 .end('asd') 72 .on('data', common.mustCall((buf) => { 73 res += buf; 74 })) 75 .on('end', common.mustCall(() => { 76 assert.strictEqual(res, 'asdasd'); 77 })); 78} 79 80{ 81 let res = ''; 82 compose( 83 Readable.from(['asd']), 84 new Transform({ 85 transform: common.mustCall((chunk, encoding, callback) => { 86 callback(null, chunk.toString().toUpperCase()); 87 }) 88 }) 89 ) 90 .on('data', common.mustCall((buf) => { 91 res += buf; 92 })) 93 .on('end', common.mustCall(() => { 94 assert.strictEqual(res, 'ASD'); 95 })); 96} 97 98{ 99 let res = ''; 100 compose( 101 async function* () { 102 yield 'asd'; 103 }(), 104 new Transform({ 105 transform: common.mustCall((chunk, encoding, callback) => { 106 callback(null, chunk.toString().toUpperCase()); 107 }) 108 }) 109 ) 110 .on('data', common.mustCall((buf) => { 111 res += buf; 112 })) 113 .on('end', common.mustCall(() => { 114 assert.strictEqual(res, 'ASD'); 115 })); 116} 117 118{ 119 let res = ''; 120 compose( 121 new Transform({ 122 transform: common.mustCall((chunk, encoding, callback) => { 123 callback(null, chunk.toString().toUpperCase()); 124 }) 125 }), 126 async function*(source) { 127 for await (const chunk of source) { 128 yield chunk; 129 } 130 }, 131 new Writable({ 132 write: common.mustCall((chunk, encoding, callback) => { 133 res += chunk; 134 callback(null); 135 }) 136 }) 137 ) 138 .end('asd') 139 .on('finish', common.mustCall(() => { 140 assert.strictEqual(res, 'ASD'); 141 })); 142} 143 144{ 145 let res = ''; 146 compose( 147 new Transform({ 148 transform: common.mustCall((chunk, encoding, callback) => { 149 callback(null, chunk.toString().toUpperCase()); 150 }) 151 }), 152 async function*(source) { 153 for await (const chunk of source) { 154 yield chunk; 155 } 156 }, 157 async function(source) { 158 for await (const chunk of source) { 159 res += chunk; 160 } 161 } 162 ) 163 .end('asd') 164 .on('finish', common.mustCall(() => { 165 assert.strictEqual(res, 'ASD'); 166 })); 167} 168 169{ 170 let res; 171 compose( 172 new Transform({ 173 objectMode: true, 174 transform: common.mustCall((chunk, encoding, callback) => { 175 callback(null, { chunk }); 176 }) 177 }), 178 async function*(source) { 179 for await (const chunk of source) { 180 yield chunk; 181 } 182 }, 183 new Transform({ 184 objectMode: true, 185 transform: common.mustCall((chunk, encoding, callback) => { 186 callback(null, { chunk }); 187 }) 188 }) 189 ) 190 .end(true) 191 .on('data', common.mustCall((buf) => { 192 res = buf; 193 })) 194 .on('end', common.mustCall(() => { 195 assert.strictEqual(res.chunk.chunk, true); 196 })); 197} 198 199{ 200 const _err = new Error('asd'); 201 compose( 202 new Transform({ 203 objectMode: true, 204 transform: common.mustCall((chunk, encoding, callback) => { 205 callback(_err); 206 }) 207 }), 208 async function*(source) { 209 for await (const chunk of source) { 210 yield chunk; 211 } 212 }, 213 new Transform({ 214 objectMode: true, 215 transform: common.mustNotCall((chunk, encoding, callback) => { 216 callback(null, { chunk }); 217 }) 218 }) 219 ) 220 .end(true) 221 .on('data', common.mustNotCall()) 222 .on('end', common.mustNotCall()) 223 .on('error', (err) => { 224 assert.strictEqual(err, _err); 225 }); 226} 227 228{ 229 const _err = new Error('asd'); 230 compose( 231 new Transform({ 232 objectMode: true, 233 transform: common.mustCall((chunk, encoding, callback) => { 234 callback(null, chunk); 235 }) 236 }), 237 async function*(source) { // eslint-disable-line require-yield 238 let tmp = ''; 239 for await (const chunk of source) { 240 tmp += chunk; 241 throw _err; 242 } 243 return tmp; 244 }, 245 new Transform({ 246 objectMode: true, 247 transform: common.mustNotCall((chunk, encoding, callback) => { 248 callback(null, { chunk }); 249 }) 250 }) 251 ) 252 .end(true) 253 .on('data', common.mustNotCall()) 254 .on('end', common.mustNotCall()) 255 .on('error', (err) => { 256 assert.strictEqual(err, _err); 257 }); 258} 259 260{ 261 let buf = ''; 262 263 // Convert into readable Duplex. 264 const s1 = compose(async function* () { 265 yield 'Hello'; 266 yield 'World'; 267 }(), async function* (source) { 268 for await (const chunk of source) { 269 yield String(chunk).toUpperCase(); 270 } 271 }, async function(source) { 272 for await (const chunk of source) { 273 buf += chunk; 274 } 275 }); 276 277 assert.strictEqual(s1.writable, false); 278 assert.strictEqual(s1.readable, false); 279 280 finished(s1.resume(), common.mustCall((err) => { 281 assert(!err); 282 assert.strictEqual(buf, 'HELLOWORLD'); 283 })); 284} 285 286{ 287 let buf = ''; 288 // Convert into transform duplex. 289 const s2 = compose(async function* (source) { 290 for await (const chunk of source) { 291 yield String(chunk).toUpperCase(); 292 } 293 }); 294 s2.end('helloworld'); 295 s2.resume(); 296 s2.on('data', (chunk) => { 297 buf += chunk; 298 }); 299 300 finished(s2.resume(), common.mustCall((err) => { 301 assert(!err); 302 assert.strictEqual(buf, 'HELLOWORLD'); 303 })); 304} 305 306{ 307 let buf = ''; 308 309 // Convert into readable Duplex. 310 const s1 = compose(async function* () { 311 yield 'Hello'; 312 yield 'World'; 313 }()); 314 315 // Convert into transform duplex. 316 const s2 = compose(async function* (source) { 317 for await (const chunk of source) { 318 yield String(chunk).toUpperCase(); 319 } 320 }); 321 322 // Convert into writable duplex. 323 const s3 = compose(async function(source) { 324 for await (const chunk of source) { 325 buf += chunk; 326 } 327 }); 328 329 const s4 = compose(s1, s2, s3); 330 331 finished(s4, common.mustCall((err) => { 332 assert(!err); 333 assert.strictEqual(buf, 'HELLOWORLD'); 334 })); 335} 336 337{ 338 let buf = ''; 339 340 // Convert into readable Duplex. 341 const s1 = compose(async function* () { 342 yield 'Hello'; 343 yield 'World'; 344 }(), async function* (source) { 345 for await (const chunk of source) { 346 yield String(chunk).toUpperCase(); 347 } 348 }, async function(source) { 349 for await (const chunk of source) { 350 buf += chunk; 351 } 352 }); 353 354 finished(s1, common.mustCall((err) => { 355 assert(!err); 356 assert.strictEqual(buf, 'HELLOWORLD'); 357 })); 358} 359 360{ 361 assert.throws( 362 () => compose(), 363 { code: 'ERR_MISSING_ARGS' } 364 ); 365} 366 367{ 368 assert.throws( 369 () => compose(new Writable(), new PassThrough()), 370 { code: 'ERR_INVALID_ARG_VALUE' } 371 ); 372} 373 374{ 375 assert.throws( 376 () => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()), 377 { code: 'ERR_INVALID_ARG_VALUE' } 378 ); 379} 380 381{ 382 let buf = ''; 383 384 // Convert into readable Duplex. 385 const s1 = compose(async function* () { 386 yield 'Hello'; 387 yield 'World'; 388 }(), async function* (source) { 389 for await (const chunk of source) { 390 yield String(chunk).toUpperCase(); 391 } 392 }, async function(source) { 393 for await (const chunk of source) { 394 buf += chunk; 395 } 396 return buf; 397 }); 398 399 finished(s1, common.mustCall((err) => { 400 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 401 })); 402} 403 404{ 405 let buf = ''; 406 407 // Convert into readable Duplex. 408 const s1 = compose('HelloWorld', async function* (source) { 409 for await (const chunk of source) { 410 yield String(chunk).toUpperCase(); 411 } 412 }, async function(source) { 413 for await (const chunk of source) { 414 buf += chunk; 415 } 416 }); 417 418 finished(s1, common.mustCall((err) => { 419 assert(!err); 420 assert.strictEqual(buf, 'HELLOWORLD'); 421 })); 422} 423 424{ 425 // In the new stream than should use the writeable of the first stream and readable of the last stream 426 // #46829 427 (async () => { 428 const newStream = compose( 429 new PassThrough({ 430 // reading FROM you in object mode or not 431 readableObjectMode: false, 432 433 // writing TO you in object mode or not 434 writableObjectMode: false, 435 }), 436 new Transform({ 437 // reading FROM you in object mode or not 438 readableObjectMode: true, 439 440 // writing TO you in object mode or not 441 writableObjectMode: false, 442 transform: (chunk, encoding, callback) => { 443 callback(null, { 444 value: chunk.toString() 445 }); 446 } 447 }) 448 ); 449 450 assert.strictEqual(newStream.writableObjectMode, false); 451 assert.strictEqual(newStream.readableObjectMode, true); 452 453 newStream.write('Steve Rogers'); 454 newStream.write('On your left'); 455 456 newStream.end(); 457 458 assert.deepStrictEqual(await newStream.toArray(), [{ value: 'Steve Rogers' }, { value: 'On your left' }]); 459 })().then(common.mustCall()); 460} 461 462{ 463 // In the new stream than should use the writeable of the first stream and readable of the last stream 464 // #46829 465 (async () => { 466 const newStream = compose( 467 new PassThrough({ 468 // reading FROM you in object mode or not 469 readableObjectMode: true, 470 471 // writing TO you in object mode or not 472 writableObjectMode: true, 473 }), 474 new Transform({ 475 // reading FROM you in object mode or not 476 readableObjectMode: false, 477 478 // writing TO you in object mode or not 479 writableObjectMode: true, 480 transform: (chunk, encoding, callback) => { 481 callback(null, chunk.value); 482 } 483 }) 484 ); 485 486 assert.strictEqual(newStream.writableObjectMode, true); 487 assert.strictEqual(newStream.readableObjectMode, false); 488 489 newStream.write({ value: 'Steve Rogers' }); 490 newStream.write({ value: 'On your left' }); 491 492 newStream.end(); 493 494 assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]); 495 })().then(common.mustCall()); 496} 497