• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const common = require('../common');
25const W = require('_stream_writable');
26const D = require('_stream_duplex');
27const assert = require('assert');
28
29class TestWriter extends W {
30  constructor(opts) {
31    super(opts);
32    this.buffer = [];
33    this.written = 0;
34  }
35
36  _write(chunk, encoding, cb) {
37    // Simulate a small unpredictable latency
38    setTimeout(() => {
39      this.buffer.push(chunk.toString());
40      this.written += chunk.length;
41      cb();
42    }, Math.floor(Math.random() * 10));
43  }
44}
45
46const chunks = new Array(50);
47for (let i = 0; i < chunks.length; i++) {
48  chunks[i] = 'x'.repeat(i);
49}
50
51{
52  // Verify fast writing
53  const tw = new TestWriter({
54    highWaterMark: 100
55  });
56
57  tw.on('finish', common.mustCall(function() {
58    // Got chunks in the right order
59    assert.deepStrictEqual(tw.buffer, chunks);
60  }));
61
62  chunks.forEach(function(chunk) {
63    // Ignore backpressure. Just buffer it all up.
64    tw.write(chunk);
65  });
66  tw.end();
67}
68
69{
70  // Verify slow writing
71  const tw = new TestWriter({
72    highWaterMark: 100
73  });
74
75  tw.on('finish', common.mustCall(function() {
76    //  Got chunks in the right order
77    assert.deepStrictEqual(tw.buffer, chunks);
78  }));
79
80  let i = 0;
81  (function W() {
82    tw.write(chunks[i++]);
83    if (i < chunks.length)
84      setTimeout(W, 10);
85    else
86      tw.end();
87  })();
88}
89
90{
91  // Verify write backpressure
92  const tw = new TestWriter({
93    highWaterMark: 50
94  });
95
96  let drains = 0;
97
98  tw.on('finish', common.mustCall(function() {
99    // Got chunks in the right order
100    assert.deepStrictEqual(tw.buffer, chunks);
101    assert.strictEqual(drains, 17);
102  }));
103
104  tw.on('drain', function() {
105    drains++;
106  });
107
108  let i = 0;
109  (function W() {
110    let ret;
111    do {
112      ret = tw.write(chunks[i++]);
113    } while (ret !== false && i < chunks.length);
114
115    if (i < chunks.length) {
116      assert(tw.writableLength >= 50);
117      tw.once('drain', W);
118    } else {
119      tw.end();
120    }
121  })();
122}
123
124{
125  // Verify write buffersize
126  const tw = new TestWriter({
127    highWaterMark: 100
128  });
129
130  const encodings =
131    [ 'hex',
132      'utf8',
133      'utf-8',
134      'ascii',
135      'latin1',
136      'binary',
137      'base64',
138      'ucs2',
139      'ucs-2',
140      'utf16le',
141      'utf-16le',
142      undefined ];
143
144  tw.on('finish', function() {
145    // Got the expected chunks
146    assert.deepStrictEqual(tw.buffer, chunks);
147  });
148
149  chunks.forEach(function(chunk, i) {
150    const enc = encodings[i % encodings.length];
151    chunk = Buffer.from(chunk);
152    tw.write(chunk.toString(enc), enc);
153  });
154}
155
156{
157  // Verify write with no buffersize
158  const tw = new TestWriter({
159    highWaterMark: 100,
160    decodeStrings: false
161  });
162
163  tw._write = function(chunk, encoding, cb) {
164    assert.strictEqual(typeof chunk, 'string');
165    chunk = Buffer.from(chunk, encoding);
166    return TestWriter.prototype._write.call(this, chunk, encoding, cb);
167  };
168
169  const encodings =
170    [ 'hex',
171      'utf8',
172      'utf-8',
173      'ascii',
174      'latin1',
175      'binary',
176      'base64',
177      'ucs2',
178      'ucs-2',
179      'utf16le',
180      'utf-16le',
181      undefined ];
182
183  tw.on('finish', function() {
184    // Got the expected chunks
185    assert.deepStrictEqual(tw.buffer, chunks);
186  });
187
188  chunks.forEach(function(chunk, i) {
189    const enc = encodings[i % encodings.length];
190    chunk = Buffer.from(chunk);
191    tw.write(chunk.toString(enc), enc);
192  });
193}
194
195{
196  // Verify write callbacks
197  const callbacks = chunks.map(function(chunk, i) {
198    return [i, function() {
199      callbacks._called[i] = chunk;
200    }];
201  }).reduce(function(set, x) {
202    set[`callback-${x[0]}`] = x[1];
203    return set;
204  }, {});
205  callbacks._called = [];
206
207  const tw = new TestWriter({
208    highWaterMark: 100
209  });
210
211  tw.on('finish', common.mustCall(function() {
212    process.nextTick(common.mustCall(function() {
213      // Got chunks in the right order
214      assert.deepStrictEqual(tw.buffer, chunks);
215      // Called all callbacks
216      assert.deepStrictEqual(callbacks._called, chunks);
217    }));
218  }));
219
220  chunks.forEach(function(chunk, i) {
221    tw.write(chunk, callbacks[`callback-${i}`]);
222  });
223  tw.end();
224}
225
226{
227  // Verify end() callback
228  const tw = new TestWriter();
229  tw.end(common.mustCall());
230}
231
232const helloWorldBuffer = Buffer.from('hello world');
233
234{
235  // Verify end() callback with chunk
236  const tw = new TestWriter();
237  tw.end(helloWorldBuffer, common.mustCall());
238}
239
240{
241  // Verify end() callback with chunk and encoding
242  const tw = new TestWriter();
243  tw.end('hello world', 'ascii', common.mustCall());
244}
245
246{
247  // Verify end() callback after write() call
248  const tw = new TestWriter();
249  tw.write(helloWorldBuffer);
250  tw.end(common.mustCall());
251}
252
253{
254  // Verify end() callback after write() callback
255  const tw = new TestWriter();
256  let writeCalledback = false;
257  tw.write(helloWorldBuffer, function() {
258    writeCalledback = true;
259  });
260  tw.end(common.mustCall(function() {
261    assert.strictEqual(writeCalledback, true);
262  }));
263}
264
265{
266  // Verify encoding is ignored for buffers
267  const tw = new W();
268  const hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb';
269  tw._write = common.mustCall(function(chunk) {
270    assert.strictEqual(chunk.toString('hex'), hex);
271  });
272  const buf = Buffer.from(hex, 'hex');
273  tw.write(buf, 'latin1');
274}
275
276{
277  // Verify writables cannot be piped
278  const w = new W({ autoDestroy: false });
279  w._write = common.mustNotCall();
280  let gotError = false;
281  w.on('error', function() {
282    gotError = true;
283  });
284  w.pipe(process.stdout);
285  assert.strictEqual(gotError, true);
286}
287
288{
289  // Verify that duplex streams cannot be piped
290  const d = new D();
291  d._read = common.mustCall();
292  d._write = common.mustNotCall();
293  let gotError = false;
294  d.on('error', function() {
295    gotError = true;
296  });
297  d.pipe(process.stdout);
298  assert.strictEqual(gotError, false);
299}
300
301{
302  // Verify that end(chunk) twice is an error
303  const w = new W();
304  w._write = common.mustCall((msg) => {
305    assert.strictEqual(msg.toString(), 'this is the end');
306  });
307  let gotError = false;
308  w.on('error', function(er) {
309    gotError = true;
310    assert.strictEqual(er.message, 'write after end');
311  });
312  w.end('this is the end');
313  w.end('and so is this');
314  process.nextTick(common.mustCall(function() {
315    assert.strictEqual(gotError, true);
316  }));
317}
318
319{
320  // Verify stream doesn't end while writing
321  const w = new W();
322  let wrote = false;
323  w._write = function(chunk, e, cb) {
324    assert.strictEqual(this.writing, undefined);
325    wrote = true;
326    this.writing = true;
327    setTimeout(() => {
328      this.writing = false;
329      cb();
330    }, 1);
331  };
332  w.on('finish', common.mustCall(function() {
333    assert.strictEqual(wrote, true);
334    assert.strictEqual(this.writing, false);
335  }));
336  w.write(Buffer.alloc(0));
337  w.end();
338}
339
340{
341  // Verify finish does not come before write() callback
342  const w = new W();
343  let writeCb = false;
344  w._write = function(chunk, e, cb) {
345    setTimeout(function() {
346      writeCb = true;
347      cb();
348    }, 10);
349  };
350  w.on('finish', common.mustCall(function() {
351    assert.strictEqual(writeCb, true);
352  }));
353  w.write(Buffer.alloc(0));
354  w.end();
355}
356
357{
358  // Verify finish does not come before synchronous _write() callback
359  const w = new W();
360  let writeCb = false;
361  w._write = function(chunk, e, cb) {
362    cb();
363  };
364  w.on('finish', common.mustCall(function() {
365    assert.strictEqual(writeCb, true);
366  }));
367  w.write(Buffer.alloc(0), function() {
368    writeCb = true;
369  });
370  w.end();
371}
372
373{
374  // Verify finish is emitted if the last chunk is empty
375  const w = new W();
376  w._write = function(chunk, e, cb) {
377    process.nextTick(cb);
378  };
379  w.on('finish', common.mustCall());
380  w.write(Buffer.allocUnsafe(1));
381  w.end(Buffer.alloc(0));
382}
383
384{
385  // Verify that finish is emitted after shutdown
386  const w = new W();
387  let shutdown = false;
388
389  w._final = common.mustCall(function(cb) {
390    assert.strictEqual(this, w);
391    setTimeout(function() {
392      shutdown = true;
393      cb();
394    }, 100);
395  });
396  w._write = function(chunk, e, cb) {
397    process.nextTick(cb);
398  };
399  w.on('finish', common.mustCall(function() {
400    assert.strictEqual(shutdown, true);
401  }));
402  w.write(Buffer.allocUnsafe(1));
403  w.end(Buffer.allocUnsafe(0));
404}
405
406{
407  // Verify that error is only emitted once when failing in _finish.
408  const w = new W();
409
410  w._final = common.mustCall(function(cb) {
411    cb(new Error('test'));
412  });
413  w.on('error', common.mustCall((err) => {
414    assert.strictEqual(w._writableState.errorEmitted, true);
415    assert.strictEqual(err.message, 'test');
416    w.on('error', common.mustNotCall());
417    w.destroy(new Error());
418  }));
419  w.end();
420}
421
422{
423  // Verify that error is only emitted once when failing in write.
424  const w = new W();
425  w.on('error', common.mustNotCall());
426  assert.throws(() => {
427    w.write(null);
428  }, {
429    code: 'ERR_STREAM_NULL_VALUES'
430  });
431}
432
433{
434  // Verify that error is only emitted once when failing in write after end.
435  const w = new W();
436  w.on('error', common.mustCall((err) => {
437    assert.strictEqual(w._writableState.errorEmitted, true);
438    assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
439  }));
440  w.end();
441  w.write('hello');
442  w.destroy(new Error());
443}
444
445{
446  // Verify that finish is not emitted after error
447  const w = new W();
448
449  w._final = common.mustCall(function(cb) {
450    cb(new Error());
451  });
452  w._write = function(chunk, e, cb) {
453    process.nextTick(cb);
454  };
455  w.on('error', common.mustCall());
456  w.on('prefinish', common.mustNotCall());
457  w.on('finish', common.mustNotCall());
458  w.write(Buffer.allocUnsafe(1));
459  w.end(Buffer.allocUnsafe(0));
460}
461