• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals
2
3'use strict';
4
5const common = require('../common');
6const {
7  Readable,
8  Transform,
9  Writable,
10  finished,
11  PassThrough
12} = require('stream');
13const compose = require('internal/streams/compose');
14const assert = require('assert');
15
16{
17  let res = '';
18  compose(
19    new Transform({
20      transform: common.mustCall((chunk, encoding, callback) => {
21        callback(null, chunk + chunk);
22      })
23    }),
24    new Transform({
25      transform: common.mustCall((chunk, encoding, callback) => {
26        callback(null, chunk.toString().toUpperCase());
27      })
28    })
29  )
30  .end('asd')
31  .on('data', common.mustCall((buf) => {
32    res += buf;
33  }))
34  .on('end', common.mustCall(() => {
35    assert.strictEqual(res, 'ASDASD');
36  }));
37}
38
39{
40  let res = '';
41  compose(
42    async function*(source) {
43      for await (const chunk of source) {
44        yield chunk + chunk;
45      }
46    },
47    async function*(source) {
48      for await (const chunk of source) {
49        yield chunk.toString().toUpperCase();
50      }
51    }
52  )
53  .end('asd')
54  .on('data', common.mustCall((buf) => {
55    res += buf;
56  }))
57  .on('end', common.mustCall(() => {
58    assert.strictEqual(res, 'ASDASD');
59  }));
60}
61
62{
63  let res = '';
64  compose(
65    async function*(source) {
66      for await (const chunk of source) {
67        yield chunk + chunk;
68      }
69    }
70  )
71  .end('asd')
72  .on('data', common.mustCall((buf) => {
73    res += buf;
74  }))
75  .on('end', common.mustCall(() => {
76    assert.strictEqual(res, 'asdasd');
77  }));
78}
79
80{
81  let res = '';
82  compose(
83    Readable.from(['asd']),
84    new Transform({
85      transform: common.mustCall((chunk, encoding, callback) => {
86        callback(null, chunk.toString().toUpperCase());
87      })
88    })
89  )
90  .on('data', common.mustCall((buf) => {
91    res += buf;
92  }))
93  .on('end', common.mustCall(() => {
94    assert.strictEqual(res, 'ASD');
95  }));
96}
97
98{
99  let res = '';
100  compose(
101    async function* () {
102      yield 'asd';
103    }(),
104    new Transform({
105      transform: common.mustCall((chunk, encoding, callback) => {
106        callback(null, chunk.toString().toUpperCase());
107      })
108    })
109  )
110  .on('data', common.mustCall((buf) => {
111    res += buf;
112  }))
113  .on('end', common.mustCall(() => {
114    assert.strictEqual(res, 'ASD');
115  }));
116}
117
118{
119  let res = '';
120  compose(
121    new Transform({
122      transform: common.mustCall((chunk, encoding, callback) => {
123        callback(null, chunk.toString().toUpperCase());
124      })
125    }),
126    async function*(source) {
127      for await (const chunk of source) {
128        yield chunk;
129      }
130    },
131    new Writable({
132      write: common.mustCall((chunk, encoding, callback) => {
133        res += chunk;
134        callback(null);
135      })
136    })
137  )
138  .end('asd')
139  .on('finish', common.mustCall(() => {
140    assert.strictEqual(res, 'ASD');
141  }));
142}
143
144{
145  let res = '';
146  compose(
147    new Transform({
148      transform: common.mustCall((chunk, encoding, callback) => {
149        callback(null, chunk.toString().toUpperCase());
150      })
151    }),
152    async function*(source) {
153      for await (const chunk of source) {
154        yield chunk;
155      }
156    },
157    async function(source) {
158      for await (const chunk of source) {
159        res += chunk;
160      }
161    }
162  )
163  .end('asd')
164  .on('finish', common.mustCall(() => {
165    assert.strictEqual(res, 'ASD');
166  }));
167}
168
169{
170  let res;
171  compose(
172    new Transform({
173      objectMode: true,
174      transform: common.mustCall((chunk, encoding, callback) => {
175        callback(null, { chunk });
176      })
177    }),
178    async function*(source) {
179      for await (const chunk of source) {
180        yield chunk;
181      }
182    },
183    new Transform({
184      objectMode: true,
185      transform: common.mustCall((chunk, encoding, callback) => {
186        callback(null, { chunk });
187      })
188    })
189  )
190  .end(true)
191  .on('data', common.mustCall((buf) => {
192    res = buf;
193  }))
194  .on('end', common.mustCall(() => {
195    assert.strictEqual(res.chunk.chunk, true);
196  }));
197}
198
199{
200  const _err = new Error('asd');
201  compose(
202    new Transform({
203      objectMode: true,
204      transform: common.mustCall((chunk, encoding, callback) => {
205        callback(_err);
206      })
207    }),
208    async function*(source) {
209      for await (const chunk of source) {
210        yield chunk;
211      }
212    },
213    new Transform({
214      objectMode: true,
215      transform: common.mustNotCall((chunk, encoding, callback) => {
216        callback(null, { chunk });
217      })
218    })
219  )
220  .end(true)
221  .on('data', common.mustNotCall())
222  .on('end', common.mustNotCall())
223  .on('error', (err) => {
224    assert.strictEqual(err, _err);
225  });
226}
227
228{
229  const _err = new Error('asd');
230  compose(
231    new Transform({
232      objectMode: true,
233      transform: common.mustCall((chunk, encoding, callback) => {
234        callback(null, chunk);
235      })
236    }),
237    async function*(source) { // eslint-disable-line require-yield
238      let tmp = '';
239      for await (const chunk of source) {
240        tmp += chunk;
241        throw _err;
242      }
243      return tmp;
244    },
245    new Transform({
246      objectMode: true,
247      transform: common.mustNotCall((chunk, encoding, callback) => {
248        callback(null, { chunk });
249      })
250    })
251  )
252  .end(true)
253  .on('data', common.mustNotCall())
254  .on('end', common.mustNotCall())
255  .on('error', (err) => {
256    assert.strictEqual(err, _err);
257  });
258}
259
260{
261  let buf = '';
262
263  // Convert into readable Duplex.
264  const s1 = compose(async function* () {
265    yield 'Hello';
266    yield 'World';
267  }(), async function* (source) {
268    for await (const chunk of source) {
269      yield String(chunk).toUpperCase();
270    }
271  }, async function(source) {
272    for await (const chunk of source) {
273      buf += chunk;
274    }
275  });
276
277  assert.strictEqual(s1.writable, false);
278  assert.strictEqual(s1.readable, false);
279
280  finished(s1.resume(), common.mustCall((err) => {
281    assert(!err);
282    assert.strictEqual(buf, 'HELLOWORLD');
283  }));
284}
285
286{
287  let buf = '';
288  // Convert into transform duplex.
289  const s2 = compose(async function* (source) {
290    for await (const chunk of source) {
291      yield String(chunk).toUpperCase();
292    }
293  });
294  s2.end('helloworld');
295  s2.resume();
296  s2.on('data', (chunk) => {
297    buf += chunk;
298  });
299
300  finished(s2.resume(), common.mustCall((err) => {
301    assert(!err);
302    assert.strictEqual(buf, 'HELLOWORLD');
303  }));
304}
305
306{
307  let buf = '';
308
309  // Convert into readable Duplex.
310  const s1 = compose(async function* () {
311    yield 'Hello';
312    yield 'World';
313  }());
314
315  // Convert into transform duplex.
316  const s2 = compose(async function* (source) {
317    for await (const chunk of source) {
318      yield String(chunk).toUpperCase();
319    }
320  });
321
322  // Convert into writable duplex.
323  const s3 = compose(async function(source) {
324    for await (const chunk of source) {
325      buf += chunk;
326    }
327  });
328
329  const s4 = compose(s1, s2, s3);
330
331  finished(s4, common.mustCall((err) => {
332    assert(!err);
333    assert.strictEqual(buf, 'HELLOWORLD');
334  }));
335}
336
337{
338  let buf = '';
339
340  // Convert into readable Duplex.
341  const s1 = compose(async function* () {
342    yield 'Hello';
343    yield 'World';
344  }(), async function* (source) {
345    for await (const chunk of source) {
346      yield String(chunk).toUpperCase();
347    }
348  }, async function(source) {
349    for await (const chunk of source) {
350      buf += chunk;
351    }
352  });
353
354  finished(s1, common.mustCall((err) => {
355    assert(!err);
356    assert.strictEqual(buf, 'HELLOWORLD');
357  }));
358}
359
360{
361  assert.throws(
362    () => compose(),
363    { code: 'ERR_MISSING_ARGS' }
364  );
365}
366
367{
368  assert.throws(
369    () => compose(new Writable(), new PassThrough()),
370    { code: 'ERR_INVALID_ARG_VALUE' }
371  );
372}
373
374{
375  assert.throws(
376    () => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
377    { code: 'ERR_INVALID_ARG_VALUE' }
378  );
379}
380
381{
382  let buf = '';
383
384  // Convert into readable Duplex.
385  const s1 = compose(async function* () {
386    yield 'Hello';
387    yield 'World';
388  }(), async function* (source) {
389    for await (const chunk of source) {
390      yield String(chunk).toUpperCase();
391    }
392  }, async function(source) {
393    for await (const chunk of source) {
394      buf += chunk;
395    }
396    return buf;
397  });
398
399  finished(s1, common.mustCall((err) => {
400    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
401  }));
402}
403
404{
405  let buf = '';
406
407  // Convert into readable Duplex.
408  const s1 = compose('HelloWorld', async function* (source) {
409    for await (const chunk of source) {
410      yield String(chunk).toUpperCase();
411    }
412  }, async function(source) {
413    for await (const chunk of source) {
414      buf += chunk;
415    }
416  });
417
418  finished(s1, common.mustCall((err) => {
419    assert(!err);
420    assert.strictEqual(buf, 'HELLOWORLD');
421  }));
422}
423
424{
425  // In the new stream than should use the writeable of the first stream and readable of the last stream
426  // #46829
427  (async () => {
428    const newStream = compose(
429      new PassThrough({
430        // reading FROM you in object mode or not
431        readableObjectMode: false,
432
433        // writing TO you in object mode or not
434        writableObjectMode: false,
435      }),
436      new Transform({
437        // reading FROM you in object mode or not
438        readableObjectMode: true,
439
440        // writing TO you in object mode or not
441        writableObjectMode: false,
442        transform: (chunk, encoding, callback) => {
443          callback(null, {
444            value: chunk.toString()
445          });
446        }
447      })
448    );
449
450    assert.strictEqual(newStream.writableObjectMode, false);
451    assert.strictEqual(newStream.readableObjectMode, true);
452
453    newStream.write('Steve Rogers');
454    newStream.write('On your left');
455
456    newStream.end();
457
458    assert.deepStrictEqual(await newStream.toArray(), [{ value: 'Steve Rogers' }, { value: 'On your left' }]);
459  })().then(common.mustCall());
460}
461
462{
463  // In the new stream than should use the writeable of the first stream and readable of the last stream
464  // #46829
465  (async () => {
466    const newStream = compose(
467      new PassThrough({
468        // reading FROM you in object mode or not
469        readableObjectMode: true,
470
471        // writing TO you in object mode or not
472        writableObjectMode: true,
473      }),
474      new Transform({
475        // reading FROM you in object mode or not
476        readableObjectMode: false,
477
478        // writing TO you in object mode or not
479        writableObjectMode: true,
480        transform: (chunk, encoding, callback) => {
481          callback(null, chunk.value);
482        }
483      })
484    );
485
486    assert.strictEqual(newStream.writableObjectMode, true);
487    assert.strictEqual(newStream.readableObjectMode, false);
488
489    newStream.write({ value: 'Steve Rogers' });
490    newStream.write({ value: 'On your left' });
491
492    newStream.end();
493
494    assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
495  })().then(common.mustCall());
496}
497