1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5 6const { 7 Transform, 8 Readable, 9 Writable, 10 compose 11} = require('stream'); 12 13const { 14 TransformStream, 15 ReadableStream, 16 WritableStream, 17} = require('stream/web'); 18 19{ 20 let res = ''; 21 22 const d = compose( 23 new TransformStream({ 24 transform: common.mustCall((chunk, controller) => { 25 controller.enqueue(chunk?.toString()?.replace(' ', '_')); 26 }) 27 }), 28 new TransformStream({ 29 transform: common.mustCall((chunk, controller) => { 30 controller.enqueue(chunk?.toString()?.toUpperCase()); 31 }) 32 }) 33 ); 34 35 d.on('data', common.mustCall((chunk) => { 36 res += chunk; 37 })); 38 39 d.on('end', common.mustCall(() => { 40 assert.strictEqual(res, 'HELLO_WORLD'); 41 })); 42 43 d.end('hello world'); 44} 45 46{ 47 let res = ''; 48 49 compose( 50 new Transform({ 51 transform: common.mustCall((chunk, encoding, callback) => { 52 callback(null, chunk + chunk); 53 }) 54 }), 55 new TransformStream({ 56 transform: common.mustCall((chunk, controller) => { 57 controller.enqueue(chunk.toString().toUpperCase()); 58 }) 59 }) 60 ) 61 .end('asd') 62 .on('data', common.mustCall((buf) => { 63 res += buf; 64 })) 65 .on('end', common.mustCall(() => { 66 assert.strictEqual(res, 'ASDASD'); 67 })); 68} 69 70{ 71 let res = ''; 72 73 compose( 74 async function*(source) { 75 for await (const chunk of source) { 76 yield chunk + chunk; 77 } 78 }, 79 new TransformStream({ 80 transform: common.mustCall((chunk, controller) => { 81 controller.enqueue(chunk.toString().toUpperCase()); 82 }), 83 }) 84 ) 85 .end('asd') 86 .on('data', common.mustCall((buf) => { 87 res += buf; 88 })) 89 .on('end', common.mustCall(() => { 90 assert.strictEqual(res, 'ASDASD'); 91 })); 92} 93 94{ 95 let res = ''; 96 97 compose( 98 new TransformStream({ 99 transform: common.mustCall((chunk, controller) => { 100 controller.enqueue(chunk.toString().toUpperCase()); 101 }), 102 }), 103 async function*(source) { 104 for await (const chunk of source) { 105 yield chunk + chunk; 106 } 107 }, 108 new Transform({ 109 transform: common.mustCall((chunk, enc, clb) => { 110 clb(null, chunk?.toString()?.replaceAll('A', 'B')); 111 }) 112 }) 113 ) 114 .end('asd') 115 .on('data', common.mustCall((buf) => { 116 res += buf; 117 })) 118 .on('end', common.mustCall(() => { 119 assert.strictEqual(res, 'BSDBSD'); 120 })); 121} 122 123{ 124 let res = ''; 125 126 compose( 127 new TransformStream({ 128 transform: common.mustCall((chunk, controller) => { 129 controller.enqueue(chunk.toString().toUpperCase()); 130 }), 131 }), 132 async function*(source) { 133 for await (const chunk of source) { 134 yield chunk + chunk; 135 } 136 }, 137 new TransformStream({ 138 transform: common.mustCall((chunk, controller) => { 139 controller.enqueue(chunk?.toString()?.replaceAll('A', 'B')); 140 }) 141 }) 142 ) 143 .end('asd') 144 .on('data', common.mustCall((buf) => { 145 res += buf; 146 })) 147 .on('end', common.mustCall(() => { 148 assert.strictEqual(res, 'BSDBSD'); 149 })); 150} 151 152{ 153 let res = ''; 154 compose( 155 new ReadableStream({ 156 start(controller) { 157 controller.enqueue('asd'); 158 controller.close(); 159 } 160 }), 161 new TransformStream({ 162 transform: common.mustCall((chunk, controller) => { 163 controller.enqueue(chunk?.toString()?.toUpperCase()); 164 }) 165 }) 166 ) 167 .on('data', common.mustCall((buf) => { 168 res += buf; 169 })) 170 .on('end', common.mustCall(() => { 171 assert.strictEqual(res, 'ASD'); 172 })); 173} 174 175{ 176 let res = ''; 177 compose( 178 new ReadableStream({ 179 start(controller) { 180 controller.enqueue('asd'); 181 controller.close(); 182 } 183 }), 184 new Transform({ 185 transform: common.mustCall((chunk, enc, clb) => { 186 clb(null, chunk?.toString()?.toUpperCase()); 187 }) 188 }) 189 ) 190 .on('data', common.mustCall((buf) => { 191 res += buf; 192 })) 193 .on('end', common.mustCall(() => { 194 assert.strictEqual(res, 'ASD'); 195 })); 196} 197 198{ 199 let res = ''; 200 compose( 201 Readable.from(['asd']), 202 new TransformStream({ 203 transform: common.mustCall((chunk, controller) => { 204 controller.enqueue(chunk?.toString()?.toUpperCase()); 205 }) 206 }) 207 ) 208 .on('data', common.mustCall((buf) => { 209 res += buf; 210 })) 211 .on('end', common.mustCall(() => { 212 assert.strictEqual(res, 'ASD'); 213 })); 214} 215 216{ 217 let res = ''; 218 compose( 219 new TransformStream({ 220 transform: common.mustCall((chunk, controller) => { 221 controller.enqueue(chunk.toString().toUpperCase()); 222 }) 223 }), 224 async function*(source) { 225 for await (const chunk of source) { 226 yield chunk; 227 } 228 }, 229 new Writable({ 230 write: common.mustCall((chunk, encoding, callback) => { 231 res += chunk; 232 callback(null); 233 }) 234 }) 235 ) 236 .end('asd') 237 .on('finish', common.mustCall(() => { 238 assert.strictEqual(res, 'ASD'); 239 })); 240} 241 242{ 243 let res = ''; 244 compose( 245 new Transform({ 246 transform: common.mustCall((chunk, encoding, callback) => { 247 callback(null, chunk.toString().toUpperCase()); 248 }) 249 }), 250 async function*(source) { 251 for await (const chunk of source) { 252 yield chunk; 253 } 254 }, 255 new WritableStream({ 256 write: common.mustCall((chunk) => { 257 res += chunk; 258 }) 259 }) 260 ) 261 .end('asd') 262 .on('finish', common.mustCall(() => { 263 assert.strictEqual(res, 'ASD'); 264 })); 265} 266 267{ 268 let res = ''; 269 compose( 270 new TransformStream({ 271 transform: common.mustCall((chunk, controller) => { 272 controller.enqueue(chunk.toString().toUpperCase()); 273 }) 274 }), 275 async function*(source) { 276 for await (const chunk of source) { 277 yield chunk; 278 } 279 }, 280 new WritableStream({ 281 write: common.mustCall((chunk) => { 282 res += chunk; 283 }) 284 }) 285 ) 286 .end('asd') 287 .on('finish', common.mustCall(() => { 288 assert.strictEqual(res, 'ASD'); 289 })); 290} 291 292{ 293 let res = ''; 294 compose( 295 new TransformStream({ 296 transform: common.mustCall((chunk, controller) => { 297 controller.enqueue(chunk.toString().toUpperCase()); 298 }) 299 }), 300 async function*(source) { 301 for await (const chunk of source) { 302 yield chunk; 303 } 304 }, 305 async function(source) { 306 for await (const chunk of source) { 307 res += chunk; 308 } 309 } 310 ) 311 .end('asd') 312 .on('finish', common.mustCall(() => { 313 assert.strictEqual(res, 'ASD'); 314 })); 315} 316 317{ 318 319 compose( 320 new TransformStream({ 321 transform: common.mustCall((chunk, controller) => { 322 controller.error(new Error('asd')); 323 }) 324 }), 325 new TransformStream({ 326 transform: common.mustNotCall() 327 }) 328 ) 329 .on('data', common.mustNotCall()) 330 .on('end', common.mustNotCall()) 331 .on('error', (err) => { 332 assert.strictEqual(err?.message, 'asd'); 333 }) 334 .end('xyz'); 335} 336 337{ 338 339 compose( 340 new TransformStream({ 341 transform: common.mustCall((chunk, controller) => { 342 controller.enqueue(chunk); 343 }) 344 }), 345 new TransformStream({ 346 transform: common.mustCall((chunk, controller) => { 347 controller.error(new Error('asd')); 348 }) 349 }) 350 ) 351 .on('data', common.mustNotCall()) 352 .on('end', common.mustNotCall()) 353 .on('error', (err) => { 354 assert.strictEqual(err?.message, 'asd'); 355 }) 356 .end('xyz'); 357} 358 359{ 360 361 compose( 362 new TransformStream({ 363 transform: common.mustCall((chunk, controller) => { 364 controller.enqueue(chunk); 365 }) 366 }), 367 async function*(source) { // eslint-disable-line require-yield 368 let tmp = ''; 369 for await (const chunk of source) { 370 tmp += chunk; 371 throw new Error('asd'); 372 } 373 return tmp; 374 }, 375 new TransformStream({ 376 transform: common.mustNotCall() 377 }) 378 ) 379 .on('data', common.mustNotCall()) 380 .on('end', common.mustNotCall()) 381 .on('error', (err) => { 382 assert.strictEqual(err?.message, 'asd'); 383 }) 384 .end('xyz'); 385} 386 387{ 388 389 compose( 390 new TransformStream({ 391 transform: common.mustCall((chunk, controller) => { 392 controller.error(new Error('asd')); 393 }) 394 }), 395 new Transform({ 396 transform: common.mustNotCall() 397 }) 398 ) 399 .on('data', common.mustNotCall()) 400 .on('end', common.mustNotCall()) 401 .on('error', (err) => { 402 assert.strictEqual(err?.message, 'asd'); 403 }) 404 .end('xyz'); 405} 406 407{ 408 409 compose( 410 new Transform({ 411 transform: common.mustCall((chunk, enc, clb) => { 412 clb(new Error('asd')); 413 }) 414 }), 415 new TransformStream({ 416 transform: common.mustNotCall() 417 }) 418 ) 419 .on('data', common.mustNotCall()) 420 .on('end', common.mustNotCall()) 421 .on('error', (err) => { 422 assert.strictEqual(err?.message, 'asd'); 423 }) 424 .end('xyz'); 425} 426 427{ 428 compose( 429 new ReadableStream({ 430 start(controller) { 431 controller.enqueue(new Error('asd')); 432 } 433 }), 434 new TransformStream({ 435 transform: common.mustNotCall() 436 }) 437 ) 438 .on('data', common.mustNotCall()) 439 .on('end', common.mustNotCall()) 440 .on('error', (err) => { 441 assert.strictEqual(err?.message, 'asd'); 442 }) 443 .end('xyz'); 444} 445 446{ 447 compose( 448 new TransformStream({ 449 transform: common.mustCall((chunk, controller) => { 450 controller.enqueue(chunk.toString().toUpperCase()); 451 }) 452 }), 453 new WritableStream({ 454 write: common.mustCall((chunk, controller) => { 455 controller.error(new Error('asd')); 456 }) 457 }) 458 ) 459 .on('error', (err) => { 460 assert.strictEqual(err?.message, 'asd'); 461 }) 462 .end('xyz'); 463} 464 465{ 466 compose( 467 new TransformStream({ 468 transform: common.mustCall((chunk, controller) => { 469 controller.enqueue(chunk.toString().toUpperCase()); 470 }) 471 }), 472 async function*(source) { 473 for await (const chunk of source) { 474 yield chunk; 475 } 476 }, 477 async function(source) { 478 throw new Error('asd'); 479 } 480 ).on('error', (err) => { 481 assert.strictEqual(err?.message, 'asd'); 482 }).end('xyz'); 483} 484