• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const common = require('../common');
25const { Writable: W, Duplex: D } = require('stream');
26const assert = require('assert');
27
28class TestWriter extends W {
29  constructor(opts) {
30    super(opts);
31    this.buffer = [];
32    this.written = 0;
33  }
34
35  _write(chunk, encoding, cb) {
36    // Simulate a small unpredictable latency
37    setTimeout(() => {
38      this.buffer.push(chunk.toString());
39      this.written += chunk.length;
40      cb();
41    }, Math.floor(Math.random() * 10));
42  }
43}
44
45const chunks = new Array(50);
46for (let i = 0; i < chunks.length; i++) {
47  chunks[i] = 'x'.repeat(i);
48}
49
50{
51  // Verify fast writing
52  const tw = new TestWriter({
53    highWaterMark: 100
54  });
55
56  tw.on('finish', common.mustCall(function() {
57    // Got chunks in the right order
58    assert.deepStrictEqual(tw.buffer, chunks);
59  }));
60
61  chunks.forEach(function(chunk) {
62    // Ignore backpressure. Just buffer it all up.
63    tw.write(chunk);
64  });
65  tw.end();
66}
67
68{
69  // Verify slow writing
70  const tw = new TestWriter({
71    highWaterMark: 100
72  });
73
74  tw.on('finish', common.mustCall(function() {
75    //  Got chunks in the right order
76    assert.deepStrictEqual(tw.buffer, chunks);
77  }));
78
79  let i = 0;
80  (function W() {
81    tw.write(chunks[i++]);
82    if (i < chunks.length)
83      setTimeout(W, 10);
84    else
85      tw.end();
86  })();
87}
88
89{
90  // Verify write backpressure
91  const tw = new TestWriter({
92    highWaterMark: 50
93  });
94
95  let drains = 0;
96
97  tw.on('finish', common.mustCall(function() {
98    // Got chunks in the right order
99    assert.deepStrictEqual(tw.buffer, chunks);
100    assert.strictEqual(drains, 17);
101  }));
102
103  tw.on('drain', function() {
104    drains++;
105  });
106
107  let i = 0;
108  (function W() {
109    let ret;
110    do {
111      ret = tw.write(chunks[i++]);
112    } while (ret !== false && i < chunks.length);
113
114    if (i < chunks.length) {
115      assert(tw.writableLength >= 50);
116      tw.once('drain', W);
117    } else {
118      tw.end();
119    }
120  })();
121}
122
123{
124  // Verify write buffersize
125  const tw = new TestWriter({
126    highWaterMark: 100
127  });
128
129  const encodings =
130    [ 'hex',
131      'utf8',
132      'utf-8',
133      'ascii',
134      'latin1',
135      'binary',
136      'base64',
137      'ucs2',
138      'ucs-2',
139      'utf16le',
140      'utf-16le',
141      undefined ];
142
143  tw.on('finish', function() {
144    // Got the expected chunks
145    assert.deepStrictEqual(tw.buffer, chunks);
146  });
147
148  chunks.forEach(function(chunk, i) {
149    const enc = encodings[i % encodings.length];
150    chunk = Buffer.from(chunk);
151    tw.write(chunk.toString(enc), enc);
152  });
153}
154
155{
156  // Verify write with no buffersize
157  const tw = new TestWriter({
158    highWaterMark: 100,
159    decodeStrings: false
160  });
161
162  tw._write = function(chunk, encoding, cb) {
163    assert.strictEqual(typeof chunk, 'string');
164    chunk = Buffer.from(chunk, encoding);
165    return TestWriter.prototype._write.call(this, chunk, encoding, cb);
166  };
167
168  const encodings =
169    [ 'hex',
170      'utf8',
171      'utf-8',
172      'ascii',
173      'latin1',
174      'binary',
175      'base64',
176      'ucs2',
177      'ucs-2',
178      'utf16le',
179      'utf-16le',
180      undefined ];
181
182  tw.on('finish', function() {
183    // Got the expected chunks
184    assert.deepStrictEqual(tw.buffer, chunks);
185  });
186
187  chunks.forEach(function(chunk, i) {
188    const enc = encodings[i % encodings.length];
189    chunk = Buffer.from(chunk);
190    tw.write(chunk.toString(enc), enc);
191  });
192}
193
194{
195  // Verify write callbacks
196  const callbacks = chunks.map(function(chunk, i) {
197    return [i, function() {
198      callbacks._called[i] = chunk;
199    }];
200  }).reduce(function(set, x) {
201    set[`callback-${x[0]}`] = x[1];
202    return set;
203  }, {});
204  callbacks._called = [];
205
206  const tw = new TestWriter({
207    highWaterMark: 100
208  });
209
210  tw.on('finish', common.mustCall(function() {
211    process.nextTick(common.mustCall(function() {
212      // Got chunks in the right order
213      assert.deepStrictEqual(tw.buffer, chunks);
214      // Called all callbacks
215      assert.deepStrictEqual(callbacks._called, chunks);
216    }));
217  }));
218
219  chunks.forEach(function(chunk, i) {
220    tw.write(chunk, callbacks[`callback-${i}`]);
221  });
222  tw.end();
223}
224
225{
226  // Verify end() callback
227  const tw = new TestWriter();
228  tw.end(common.mustCall());
229}
230
231const helloWorldBuffer = Buffer.from('hello world');
232
233{
234  // Verify end() callback with chunk
235  const tw = new TestWriter();
236  tw.end(helloWorldBuffer, common.mustCall());
237}
238
239{
240  // Verify end() callback with chunk and encoding
241  const tw = new TestWriter();
242  tw.end('hello world', 'ascii', common.mustCall());
243}
244
245{
246  // Verify end() callback after write() call
247  const tw = new TestWriter();
248  tw.write(helloWorldBuffer);
249  tw.end(common.mustCall());
250}
251
252{
253  // Verify end() callback after write() callback
254  const tw = new TestWriter();
255  let writeCalledback = false;
256  tw.write(helloWorldBuffer, function() {
257    writeCalledback = true;
258  });
259  tw.end(common.mustCall(function() {
260    assert.strictEqual(writeCalledback, true);
261  }));
262}
263
264{
265  // Verify encoding is ignored for buffers
266  const tw = new W();
267  const hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb';
268  tw._write = common.mustCall(function(chunk) {
269    assert.strictEqual(chunk.toString('hex'), hex);
270  });
271  const buf = Buffer.from(hex, 'hex');
272  tw.write(buf, 'latin1');
273}
274
275{
276  // Verify writables cannot be piped
277  const w = new W({ autoDestroy: false });
278  w._write = common.mustNotCall();
279  let gotError = false;
280  w.on('error', function() {
281    gotError = true;
282  });
283  w.pipe(process.stdout);
284  assert.strictEqual(gotError, true);
285}
286
287{
288  // Verify that duplex streams cannot be piped
289  const d = new D();
290  d._read = common.mustCall();
291  d._write = common.mustNotCall();
292  let gotError = false;
293  d.on('error', function() {
294    gotError = true;
295  });
296  d.pipe(process.stdout);
297  assert.strictEqual(gotError, false);
298}
299
300{
301  // Verify that end(chunk) twice is an error
302  const w = new W();
303  w._write = common.mustCall((msg) => {
304    assert.strictEqual(msg.toString(), 'this is the end');
305  });
306  let gotError = false;
307  w.on('error', function(er) {
308    gotError = true;
309    assert.strictEqual(er.message, 'write after end');
310  });
311  w.end('this is the end');
312  w.end('and so is this');
313  process.nextTick(common.mustCall(function() {
314    assert.strictEqual(gotError, true);
315  }));
316}
317
318{
319  // Verify stream doesn't end while writing
320  const w = new W();
321  let wrote = false;
322  w._write = function(chunk, e, cb) {
323    assert.strictEqual(this.writing, undefined);
324    wrote = true;
325    this.writing = true;
326    setTimeout(() => {
327      this.writing = false;
328      cb();
329    }, 1);
330  };
331  w.on('finish', common.mustCall(function() {
332    assert.strictEqual(wrote, true);
333    assert.strictEqual(this.writing, false);
334  }));
335  w.write(Buffer.alloc(0));
336  w.end();
337}
338
339{
340  // Verify finish does not come before write() callback
341  const w = new W();
342  let writeCb = false;
343  w._write = function(chunk, e, cb) {
344    setTimeout(function() {
345      writeCb = true;
346      cb();
347    }, 10);
348  };
349  w.on('finish', common.mustCall(function() {
350    assert.strictEqual(writeCb, true);
351  }));
352  w.write(Buffer.alloc(0));
353  w.end();
354}
355
356{
357  // Verify finish does not come before synchronous _write() callback
358  const w = new W();
359  let writeCb = false;
360  w._write = function(chunk, e, cb) {
361    cb();
362  };
363  w.on('finish', common.mustCall(function() {
364    assert.strictEqual(writeCb, true);
365  }));
366  w.write(Buffer.alloc(0), function() {
367    writeCb = true;
368  });
369  w.end();
370}
371
372{
373  // Verify finish is emitted if the last chunk is empty
374  const w = new W();
375  w._write = function(chunk, e, cb) {
376    process.nextTick(cb);
377  };
378  w.on('finish', common.mustCall());
379  w.write(Buffer.allocUnsafe(1));
380  w.end(Buffer.alloc(0));
381}
382
383{
384  // Verify that finish is emitted after shutdown
385  const w = new W();
386  let shutdown = false;
387
388  w._final = common.mustCall(function(cb) {
389    assert.strictEqual(this, w);
390    setTimeout(function() {
391      shutdown = true;
392      cb();
393    }, 100);
394  });
395  w._write = function(chunk, e, cb) {
396    process.nextTick(cb);
397  };
398  w.on('finish', common.mustCall(function() {
399    assert.strictEqual(shutdown, true);
400  }));
401  w.write(Buffer.allocUnsafe(1));
402  w.end(Buffer.allocUnsafe(0));
403}
404
405{
406  // Verify that error is only emitted once when failing in _finish.
407  const w = new W();
408
409  w._final = common.mustCall(function(cb) {
410    cb(new Error('test'));
411  });
412  w.on('error', common.mustCall((err) => {
413    assert.strictEqual(w._writableState.errorEmitted, true);
414    assert.strictEqual(err.message, 'test');
415    w.on('error', common.mustNotCall());
416    w.destroy(new Error());
417  }));
418  w.end();
419}
420
421{
422  // Verify that error is only emitted once when failing in write.
423  const w = new W();
424  w.on('error', common.mustNotCall());
425  assert.throws(() => {
426    w.write(null);
427  }, {
428    code: 'ERR_STREAM_NULL_VALUES'
429  });
430}
431
432{
433  // Verify that error is only emitted once when failing in write after end.
434  const w = new W();
435  w.on('error', common.mustCall((err) => {
436    assert.strictEqual(w._writableState.errorEmitted, true);
437    assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
438  }));
439  w.end();
440  w.write('hello');
441  w.destroy(new Error());
442}
443
444{
445  // Verify that finish is not emitted after error
446  const w = new W();
447
448  w._final = common.mustCall(function(cb) {
449    cb(new Error());
450  });
451  w._write = function(chunk, e, cb) {
452    process.nextTick(cb);
453  };
454  w.on('error', common.mustCall());
455  w.on('prefinish', common.mustNotCall());
456  w.on('finish', common.mustNotCall());
457  w.write(Buffer.allocUnsafe(1));
458  w.end(Buffer.allocUnsafe(0));
459}
460