• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23const common = require('../common');
24const assert = require('assert');
25const { PassThrough, Transform } = require('stream');
26
27{
28  // Verify writable side consumption
29  const tx = new Transform({
30    highWaterMark: 10
31  });
32
33  let transformed = 0;
34  tx._transform = function(chunk, encoding, cb) {
35    transformed += chunk.length;
36    tx.push(chunk);
37    cb();
38  };
39
40  for (let i = 1; i <= 10; i++) {
41    tx.write(Buffer.allocUnsafe(i));
42  }
43  tx.end();
44
45  assert.strictEqual(tx.readableLength, 10);
46  assert.strictEqual(transformed, 10);
47  assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
48    return c.chunk.length;
49  }), [5, 6, 7, 8, 9, 10]);
50}
51
52{
53  // Verify passthrough behavior
54  const pt = new PassThrough();
55
56  pt.write(Buffer.from('foog'));
57  pt.write(Buffer.from('bark'));
58  pt.write(Buffer.from('bazy'));
59  pt.write(Buffer.from('kuel'));
60  pt.end();
61
62  assert.strictEqual(pt.read(5).toString(), 'foogb');
63  assert.strictEqual(pt.read(5).toString(), 'arkba');
64  assert.strictEqual(pt.read(5).toString(), 'zykue');
65  assert.strictEqual(pt.read(5).toString(), 'l');
66}
67
68{
69  // Verify object passthrough behavior
70  const pt = new PassThrough({ objectMode: true });
71
72  pt.write(1);
73  pt.write(true);
74  pt.write(false);
75  pt.write(0);
76  pt.write('foo');
77  pt.write('');
78  pt.write({ a: 'b' });
79  pt.end();
80
81  assert.strictEqual(pt.read(), 1);
82  assert.strictEqual(pt.read(), true);
83  assert.strictEqual(pt.read(), false);
84  assert.strictEqual(pt.read(), 0);
85  assert.strictEqual(pt.read(), 'foo');
86  assert.strictEqual(pt.read(), '');
87  assert.deepStrictEqual(pt.read(), { a: 'b' });
88}
89
90{
91  // Verify passthrough constructor behavior
92  const pt = PassThrough();
93
94  assert(pt instanceof PassThrough);
95}
96
97{
98  // Verify transform constructor behavior
99  const pt = Transform();
100
101  assert(pt instanceof Transform);
102}
103
104{
105  // Perform a simple transform
106  const pt = new Transform();
107  pt._transform = function(c, e, cb) {
108    const ret = Buffer.alloc(c.length, 'x');
109    pt.push(ret);
110    cb();
111  };
112
113  pt.write(Buffer.from('foog'));
114  pt.write(Buffer.from('bark'));
115  pt.write(Buffer.from('bazy'));
116  pt.write(Buffer.from('kuel'));
117  pt.end();
118
119  assert.strictEqual(pt.read(5).toString(), 'xxxxx');
120  assert.strictEqual(pt.read(5).toString(), 'xxxxx');
121  assert.strictEqual(pt.read(5).toString(), 'xxxxx');
122  assert.strictEqual(pt.read(5).toString(), 'x');
123}
124
125{
126  // Verify simple object transform
127  const pt = new Transform({ objectMode: true });
128  pt._transform = function(c, e, cb) {
129    pt.push(JSON.stringify(c));
130    cb();
131  };
132
133  pt.write(1);
134  pt.write(true);
135  pt.write(false);
136  pt.write(0);
137  pt.write('foo');
138  pt.write('');
139  pt.write({ a: 'b' });
140  pt.end();
141
142  assert.strictEqual(pt.read(), '1');
143  assert.strictEqual(pt.read(), 'true');
144  assert.strictEqual(pt.read(), 'false');
145  assert.strictEqual(pt.read(), '0');
146  assert.strictEqual(pt.read(), '"foo"');
147  assert.strictEqual(pt.read(), '""');
148  assert.strictEqual(pt.read(), '{"a":"b"}');
149}
150
151{
152  // Verify async passthrough
153  const pt = new Transform();
154  pt._transform = function(chunk, encoding, cb) {
155    setTimeout(function() {
156      pt.push(chunk);
157      cb();
158    }, 10);
159  };
160
161  pt.write(Buffer.from('foog'));
162  pt.write(Buffer.from('bark'));
163  pt.write(Buffer.from('bazy'));
164  pt.write(Buffer.from('kuel'));
165  pt.end();
166
167  pt.on('finish', common.mustCall(function() {
168    assert.strictEqual(pt.read(5).toString(), 'foogb');
169    assert.strictEqual(pt.read(5).toString(), 'arkba');
170    assert.strictEqual(pt.read(5).toString(), 'zykue');
171    assert.strictEqual(pt.read(5).toString(), 'l');
172  }));
173}
174
175{
176  // Verify asymmetric transform (expand)
177  const pt = new Transform();
178
179  // Emit each chunk 2 times.
180  pt._transform = function(chunk, encoding, cb) {
181    setTimeout(function() {
182      pt.push(chunk);
183      setTimeout(function() {
184        pt.push(chunk);
185        cb();
186      }, 10);
187    }, 10);
188  };
189
190  pt.write(Buffer.from('foog'));
191  pt.write(Buffer.from('bark'));
192  pt.write(Buffer.from('bazy'));
193  pt.write(Buffer.from('kuel'));
194  pt.end();
195
196  pt.on('finish', common.mustCall(function() {
197    assert.strictEqual(pt.read(5).toString(), 'foogf');
198    assert.strictEqual(pt.read(5).toString(), 'oogba');
199    assert.strictEqual(pt.read(5).toString(), 'rkbar');
200    assert.strictEqual(pt.read(5).toString(), 'kbazy');
201    assert.strictEqual(pt.read(5).toString(), 'bazyk');
202    assert.strictEqual(pt.read(5).toString(), 'uelku');
203    assert.strictEqual(pt.read(5).toString(), 'el');
204  }));
205}
206
207{
208  // Verify asymmetric transform (compress)
209  const pt = new Transform();
210
211  // Each output is the first char of 3 consecutive chunks,
212  // or whatever's left.
213  pt.state = '';
214
215  pt._transform = function(chunk, encoding, cb) {
216    if (!chunk)
217      chunk = '';
218    const s = chunk.toString();
219    setTimeout(() => {
220      this.state += s.charAt(0);
221      if (this.state.length === 3) {
222        pt.push(Buffer.from(this.state));
223        this.state = '';
224      }
225      cb();
226    }, 10);
227  };
228
229  pt._flush = function(cb) {
230    // Just output whatever we have.
231    pt.push(Buffer.from(this.state));
232    this.state = '';
233    cb();
234  };
235
236  pt.write(Buffer.from('aaaa'));
237  pt.write(Buffer.from('bbbb'));
238  pt.write(Buffer.from('cccc'));
239  pt.write(Buffer.from('dddd'));
240  pt.write(Buffer.from('eeee'));
241  pt.write(Buffer.from('aaaa'));
242  pt.write(Buffer.from('bbbb'));
243  pt.write(Buffer.from('cccc'));
244  pt.write(Buffer.from('dddd'));
245  pt.write(Buffer.from('eeee'));
246  pt.write(Buffer.from('aaaa'));
247  pt.write(Buffer.from('bbbb'));
248  pt.write(Buffer.from('cccc'));
249  pt.write(Buffer.from('dddd'));
250  pt.end();
251
252  // 'abcdeabcdeabcd'
253  pt.on('finish', common.mustCall(function() {
254    assert.strictEqual(pt.read(5).toString(), 'abcde');
255    assert.strictEqual(pt.read(5).toString(), 'abcde');
256    assert.strictEqual(pt.read(5).toString(), 'abcd');
257  }));
258}
259
260// This tests for a stall when data is written to a full stream
261// that has empty transforms.
262{
263  // Verify complex transform behavior
264  let count = 0;
265  let saved = null;
266  const pt = new Transform({ highWaterMark: 3 });
267  pt._transform = function(c, e, cb) {
268    if (count++ === 1)
269      saved = c;
270    else {
271      if (saved) {
272        pt.push(saved);
273        saved = null;
274      }
275      pt.push(c);
276    }
277
278    cb();
279  };
280
281  pt.once('readable', function() {
282    process.nextTick(function() {
283      pt.write(Buffer.from('d'));
284      pt.write(Buffer.from('ef'), common.mustCall(function() {
285        pt.end();
286      }));
287      assert.strictEqual(pt.read().toString(), 'abcdef');
288      assert.strictEqual(pt.read(), null);
289    });
290  });
291
292  pt.write(Buffer.from('abc'));
293}
294
295
296{
297  // Verify passthrough event emission
298  const pt = new PassThrough();
299  let emits = 0;
300  pt.on('readable', function() {
301    emits++;
302  });
303
304  pt.write(Buffer.from('foog'));
305  pt.write(Buffer.from('bark'));
306
307  assert.strictEqual(emits, 0);
308  assert.strictEqual(pt.read(5).toString(), 'foogb');
309  assert.strictEqual(String(pt.read(5)), 'null');
310  assert.strictEqual(emits, 0);
311
312  pt.write(Buffer.from('bazy'));
313  pt.write(Buffer.from('kuel'));
314
315  assert.strictEqual(emits, 0);
316  assert.strictEqual(pt.read(5).toString(), 'arkba');
317  assert.strictEqual(pt.read(5).toString(), 'zykue');
318  assert.strictEqual(pt.read(5), null);
319
320  pt.end();
321
322  assert.strictEqual(emits, 1);
323  assert.strictEqual(pt.read(5).toString(), 'l');
324  assert.strictEqual(pt.read(5), null);
325  assert.strictEqual(emits, 1);
326}
327
328{
329  // Verify passthrough event emission reordering
330  const pt = new PassThrough();
331  let emits = 0;
332  pt.on('readable', function() {
333    emits++;
334  });
335
336  pt.write(Buffer.from('foog'));
337  pt.write(Buffer.from('bark'));
338
339  assert.strictEqual(emits, 0);
340  assert.strictEqual(pt.read(5).toString(), 'foogb');
341  assert.strictEqual(pt.read(5), null);
342
343  pt.once('readable', common.mustCall(function() {
344    assert.strictEqual(pt.read(5).toString(), 'arkba');
345    assert.strictEqual(pt.read(5), null);
346
347    pt.once('readable', common.mustCall(function() {
348      assert.strictEqual(pt.read(5).toString(), 'zykue');
349      assert.strictEqual(pt.read(5), null);
350      pt.once('readable', common.mustCall(function() {
351        assert.strictEqual(pt.read(5).toString(), 'l');
352        assert.strictEqual(pt.read(5), null);
353        assert.strictEqual(emits, 3);
354      }));
355      pt.end();
356    }));
357    pt.write(Buffer.from('kuel'));
358  }));
359
360  pt.write(Buffer.from('bazy'));
361}
362
363{
364  // Verify passthrough facade
365  const pt = new PassThrough();
366  const datas = [];
367  pt.on('data', function(chunk) {
368    datas.push(chunk.toString());
369  });
370
371  pt.on('end', common.mustCall(function() {
372    assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']);
373  }));
374
375  pt.write(Buffer.from('foog'));
376  setTimeout(function() {
377    pt.write(Buffer.from('bark'));
378    setTimeout(function() {
379      pt.write(Buffer.from('bazy'));
380      setTimeout(function() {
381        pt.write(Buffer.from('kuel'));
382        setTimeout(function() {
383          pt.end();
384        }, 10);
385      }, 10);
386    }, 10);
387  }, 10);
388}
389
390{
391  // Verify object transform (JSON parse)
392  const jp = new Transform({ objectMode: true });
393  jp._transform = function(data, encoding, cb) {
394    try {
395      jp.push(JSON.parse(data));
396      cb();
397    } catch (er) {
398      cb(er);
399    }
400  };
401
402  // Anything except null/undefined is fine.
403  // those are "magic" in the stream API, because they signal EOF.
404  const objects = [
405    { foo: 'bar' },
406    100,
407    'string',
408    { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } },
409  ];
410
411  let ended = false;
412  jp.on('end', function() {
413    ended = true;
414  });
415
416  objects.forEach(function(obj) {
417    jp.write(JSON.stringify(obj));
418    const res = jp.read();
419    assert.deepStrictEqual(res, obj);
420  });
421
422  jp.end();
423  // Read one more time to get the 'end' event
424  jp.read();
425
426  process.nextTick(common.mustCall(function() {
427    assert.strictEqual(ended, true);
428  }));
429}
430
431{
432  // Verify object transform (JSON stringify)
433  const js = new Transform({ objectMode: true });
434  js._transform = function(data, encoding, cb) {
435    try {
436      js.push(JSON.stringify(data));
437      cb();
438    } catch (er) {
439      cb(er);
440    }
441  };
442
443  // Anything except null/undefined is fine.
444  // those are "magic" in the stream API, because they signal EOF.
445  const objects = [
446    { foo: 'bar' },
447    100,
448    'string',
449    { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } },
450  ];
451
452  let ended = false;
453  js.on('end', function() {
454    ended = true;
455  });
456
457  objects.forEach(function(obj) {
458    js.write(obj);
459    const res = js.read();
460    assert.strictEqual(res, JSON.stringify(obj));
461  });
462
463  js.end();
464  // Read one more time to get the 'end' event
465  js.read();
466
467  process.nextTick(common.mustCall(function() {
468    assert.strictEqual(ended, true);
469  }));
470}
471
472{
473  const s = new Transform({
474    objectMode: true,
475    construct(callback) {
476      this.push('header from constructor');
477      callback();
478    },
479    transform: (row, encoding, callback) => {
480      callback(null, row);
481    },
482  });
483
484  const expected = [
485    'header from constructor',
486    'firstLine',
487    'secondLine',
488  ];
489  s.on('data', common.mustCall((data) => {
490    assert.strictEqual(data.toString(), expected.shift());
491  }, 3));
492  s.write('firstLine');
493  process.nextTick(() => s.write('secondLine'));
494}
495