• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5
6const {
7  Transform,
8  Readable,
9  Writable,
10  compose
11} = require('stream');
12
13const {
14  TransformStream,
15  ReadableStream,
16  WritableStream,
17} = require('stream/web');
18
19{
20  let res = '';
21
22  const d = compose(
23    new TransformStream({
24      transform: common.mustCall((chunk, controller) => {
25        controller.enqueue(chunk?.toString()?.replace(' ', '_'));
26      })
27    }),
28    new TransformStream({
29      transform: common.mustCall((chunk, controller) => {
30        controller.enqueue(chunk?.toString()?.toUpperCase());
31      })
32    })
33  );
34
35  d.on('data', common.mustCall((chunk) => {
36    res += chunk;
37  }));
38
39  d.on('end', common.mustCall(() => {
40    assert.strictEqual(res, 'HELLO_WORLD');
41  }));
42
43  d.end('hello world');
44}
45
46{
47  let res = '';
48
49  compose(
50    new Transform({
51      transform: common.mustCall((chunk, encoding, callback) => {
52        callback(null, chunk + chunk);
53      })
54    }),
55    new TransformStream({
56      transform: common.mustCall((chunk, controller) => {
57        controller.enqueue(chunk.toString().toUpperCase());
58      })
59    })
60  )
61  .end('asd')
62  .on('data', common.mustCall((buf) => {
63    res += buf;
64  }))
65  .on('end', common.mustCall(() => {
66    assert.strictEqual(res, 'ASDASD');
67  }));
68}
69
70{
71  let res = '';
72
73  compose(
74    async function*(source) {
75      for await (const chunk of source) {
76        yield chunk + chunk;
77      }
78    },
79    new TransformStream({
80      transform: common.mustCall((chunk, controller) => {
81        controller.enqueue(chunk.toString().toUpperCase());
82      }),
83    })
84  )
85  .end('asd')
86  .on('data', common.mustCall((buf) => {
87    res += buf;
88  }))
89  .on('end', common.mustCall(() => {
90    assert.strictEqual(res, 'ASDASD');
91  }));
92}
93
94{
95  let res = '';
96
97  compose(
98    new TransformStream({
99      transform: common.mustCall((chunk, controller) => {
100        controller.enqueue(chunk.toString().toUpperCase());
101      }),
102    }),
103    async function*(source) {
104      for await (const chunk of source) {
105        yield chunk + chunk;
106      }
107    },
108    new Transform({
109      transform: common.mustCall((chunk, enc, clb) => {
110        clb(null, chunk?.toString()?.replaceAll('A', 'B'));
111      })
112    })
113  )
114  .end('asd')
115  .on('data', common.mustCall((buf) => {
116    res += buf;
117  }))
118  .on('end', common.mustCall(() => {
119    assert.strictEqual(res, 'BSDBSD');
120  }));
121}
122
123{
124  let res = '';
125
126  compose(
127    new TransformStream({
128      transform: common.mustCall((chunk, controller) => {
129        controller.enqueue(chunk.toString().toUpperCase());
130      }),
131    }),
132    async function*(source) {
133      for await (const chunk of source) {
134        yield chunk + chunk;
135      }
136    },
137    new TransformStream({
138      transform: common.mustCall((chunk, controller) => {
139        controller.enqueue(chunk?.toString()?.replaceAll('A', 'B'));
140      })
141    })
142  )
143  .end('asd')
144  .on('data', common.mustCall((buf) => {
145    res += buf;
146  }))
147  .on('end', common.mustCall(() => {
148    assert.strictEqual(res, 'BSDBSD');
149  }));
150}
151
152{
153  let res = '';
154  compose(
155    new ReadableStream({
156      start(controller) {
157        controller.enqueue('asd');
158        controller.close();
159      }
160    }),
161    new TransformStream({
162      transform: common.mustCall((chunk, controller) => {
163        controller.enqueue(chunk?.toString()?.toUpperCase());
164      })
165    })
166  )
167  .on('data', common.mustCall((buf) => {
168    res += buf;
169  }))
170  .on('end', common.mustCall(() => {
171    assert.strictEqual(res, 'ASD');
172  }));
173}
174
175{
176  let res = '';
177  compose(
178    new ReadableStream({
179      start(controller) {
180        controller.enqueue('asd');
181        controller.close();
182      }
183    }),
184    new Transform({
185      transform: common.mustCall((chunk, enc, clb) => {
186        clb(null, chunk?.toString()?.toUpperCase());
187      })
188    })
189  )
190  .on('data', common.mustCall((buf) => {
191    res += buf;
192  }))
193  .on('end', common.mustCall(() => {
194    assert.strictEqual(res, 'ASD');
195  }));
196}
197
198{
199  let res = '';
200  compose(
201    Readable.from(['asd']),
202    new TransformStream({
203      transform: common.mustCall((chunk, controller) => {
204        controller.enqueue(chunk?.toString()?.toUpperCase());
205      })
206    })
207  )
208  .on('data', common.mustCall((buf) => {
209    res += buf;
210  }))
211  .on('end', common.mustCall(() => {
212    assert.strictEqual(res, 'ASD');
213  }));
214}
215
216{
217  let res = '';
218  compose(
219    new TransformStream({
220      transform: common.mustCall((chunk, controller) => {
221        controller.enqueue(chunk.toString().toUpperCase());
222      })
223    }),
224    async function*(source) {
225      for await (const chunk of source) {
226        yield chunk;
227      }
228    },
229    new Writable({
230      write: common.mustCall((chunk, encoding, callback) => {
231        res += chunk;
232        callback(null);
233      })
234    })
235  )
236    .end('asd')
237    .on('finish', common.mustCall(() => {
238      assert.strictEqual(res, 'ASD');
239    }));
240}
241
242{
243  let res = '';
244  compose(
245    new Transform({
246      transform: common.mustCall((chunk, encoding, callback) => {
247        callback(null, chunk.toString().toUpperCase());
248      })
249    }),
250    async function*(source) {
251      for await (const chunk of source) {
252        yield chunk;
253      }
254    },
255    new WritableStream({
256      write: common.mustCall((chunk) => {
257        res += chunk;
258      })
259    })
260  )
261  .end('asd')
262  .on('finish', common.mustCall(() => {
263    assert.strictEqual(res, 'ASD');
264  }));
265}
266
267{
268  let res = '';
269  compose(
270    new TransformStream({
271      transform: common.mustCall((chunk, controller) => {
272        controller.enqueue(chunk.toString().toUpperCase());
273      })
274    }),
275    async function*(source) {
276      for await (const chunk of source) {
277        yield chunk;
278      }
279    },
280    new WritableStream({
281      write: common.mustCall((chunk) => {
282        res += chunk;
283      })
284    })
285  )
286  .end('asd')
287  .on('finish', common.mustCall(() => {
288    assert.strictEqual(res, 'ASD');
289  }));
290}
291
292{
293  let res = '';
294  compose(
295    new TransformStream({
296      transform: common.mustCall((chunk, controller) => {
297        controller.enqueue(chunk.toString().toUpperCase());
298      })
299    }),
300    async function*(source) {
301      for await (const chunk of source) {
302        yield chunk;
303      }
304    },
305    async function(source) {
306      for await (const chunk of source) {
307        res += chunk;
308      }
309    }
310  )
311  .end('asd')
312  .on('finish', common.mustCall(() => {
313    assert.strictEqual(res, 'ASD');
314  }));
315}
316
317{
318
319  compose(
320    new TransformStream({
321      transform: common.mustCall((chunk, controller) => {
322        controller.error(new Error('asd'));
323      })
324    }),
325    new TransformStream({
326      transform: common.mustNotCall()
327    })
328  )
329  .on('data', common.mustNotCall())
330  .on('end', common.mustNotCall())
331  .on('error', (err) => {
332    assert.strictEqual(err?.message, 'asd');
333  })
334  .end('xyz');
335}
336
337{
338
339  compose(
340    new TransformStream({
341      transform: common.mustCall((chunk, controller) => {
342        controller.enqueue(chunk);
343      })
344    }),
345    new TransformStream({
346      transform: common.mustCall((chunk, controller) => {
347        controller.error(new Error('asd'));
348      })
349    })
350  )
351  .on('data', common.mustNotCall())
352  .on('end', common.mustNotCall())
353  .on('error', (err) => {
354    assert.strictEqual(err?.message, 'asd');
355  })
356  .end('xyz');
357}
358
359{
360
361  compose(
362    new TransformStream({
363      transform: common.mustCall((chunk, controller) => {
364        controller.enqueue(chunk);
365      })
366    }),
367    async function*(source) { // eslint-disable-line require-yield
368      let tmp = '';
369      for await (const chunk of source) {
370        tmp += chunk;
371        throw new Error('asd');
372      }
373      return tmp;
374    },
375    new TransformStream({
376      transform: common.mustNotCall()
377    })
378  )
379  .on('data', common.mustNotCall())
380  .on('end', common.mustNotCall())
381  .on('error', (err) => {
382    assert.strictEqual(err?.message, 'asd');
383  })
384  .end('xyz');
385}
386
387{
388
389  compose(
390    new TransformStream({
391      transform: common.mustCall((chunk, controller) => {
392        controller.error(new Error('asd'));
393      })
394    }),
395    new Transform({
396      transform: common.mustNotCall()
397    })
398  )
399  .on('data', common.mustNotCall())
400  .on('end', common.mustNotCall())
401  .on('error', (err) => {
402    assert.strictEqual(err?.message, 'asd');
403  })
404  .end('xyz');
405}
406
407{
408
409  compose(
410    new Transform({
411      transform: common.mustCall((chunk, enc, clb) => {
412        clb(new Error('asd'));
413      })
414    }),
415    new TransformStream({
416      transform: common.mustNotCall()
417    })
418  )
419  .on('data', common.mustNotCall())
420  .on('end', common.mustNotCall())
421  .on('error', (err) => {
422    assert.strictEqual(err?.message, 'asd');
423  })
424  .end('xyz');
425}
426
427{
428  compose(
429    new ReadableStream({
430      start(controller) {
431        controller.enqueue(new Error('asd'));
432      }
433    }),
434    new TransformStream({
435      transform: common.mustNotCall()
436    })
437  )
438  .on('data', common.mustNotCall())
439  .on('end', common.mustNotCall())
440  .on('error', (err) => {
441    assert.strictEqual(err?.message, 'asd');
442  })
443  .end('xyz');
444}
445
446{
447  compose(
448    new TransformStream({
449      transform: common.mustCall((chunk, controller) => {
450        controller.enqueue(chunk.toString().toUpperCase());
451      })
452    }),
453    new WritableStream({
454      write: common.mustCall((chunk, controller) => {
455        controller.error(new Error('asd'));
456      })
457    })
458  )
459  .on('error', (err) => {
460    assert.strictEqual(err?.message, 'asd');
461  })
462  .end('xyz');
463}
464
465{
466  compose(
467    new TransformStream({
468      transform: common.mustCall((chunk, controller) => {
469        controller.enqueue(chunk.toString().toUpperCase());
470      })
471    }),
472    async function*(source) {
473      for await (const chunk of source) {
474        yield chunk;
475      }
476    },
477    async function(source) {
478      throw new Error('asd');
479    }
480  ).on('error', (err) => {
481    assert.strictEqual(err?.message, 'asd');
482  }).end('xyz');
483}
484