1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const common = require('../common'); 25const W = require('_stream_writable'); 26const D = require('_stream_duplex'); 27const assert = require('assert'); 28 29class TestWriter extends W { 30 constructor(opts) { 31 super(opts); 32 this.buffer = []; 33 this.written = 0; 34 } 35 36 _write(chunk, encoding, cb) { 37 // Simulate a small unpredictable latency 38 setTimeout(() => { 39 this.buffer.push(chunk.toString()); 40 this.written += chunk.length; 41 cb(); 42 }, Math.floor(Math.random() * 10)); 43 } 44} 45 46const chunks = new Array(50); 47for (let i = 0; i < chunks.length; i++) { 48 chunks[i] = 'x'.repeat(i); 49} 50 51{ 52 // Verify fast writing 53 const tw = new TestWriter({ 54 highWaterMark: 100 55 }); 56 57 tw.on('finish', common.mustCall(function() { 58 // Got chunks in the right order 59 assert.deepStrictEqual(tw.buffer, chunks); 60 })); 61 62 chunks.forEach(function(chunk) { 63 // Ignore backpressure. Just buffer it all up. 64 tw.write(chunk); 65 }); 66 tw.end(); 67} 68 69{ 70 // Verify slow writing 71 const tw = new TestWriter({ 72 highWaterMark: 100 73 }); 74 75 tw.on('finish', common.mustCall(function() { 76 // Got chunks in the right order 77 assert.deepStrictEqual(tw.buffer, chunks); 78 })); 79 80 let i = 0; 81 (function W() { 82 tw.write(chunks[i++]); 83 if (i < chunks.length) 84 setTimeout(W, 10); 85 else 86 tw.end(); 87 })(); 88} 89 90{ 91 // Verify write backpressure 92 const tw = new TestWriter({ 93 highWaterMark: 50 94 }); 95 96 let drains = 0; 97 98 tw.on('finish', common.mustCall(function() { 99 // Got chunks in the right order 100 assert.deepStrictEqual(tw.buffer, chunks); 101 assert.strictEqual(drains, 17); 102 })); 103 104 tw.on('drain', function() { 105 drains++; 106 }); 107 108 let i = 0; 109 (function W() { 110 let ret; 111 do { 112 ret = tw.write(chunks[i++]); 113 } while (ret !== false && i < chunks.length); 114 115 if (i < chunks.length) { 116 assert(tw.writableLength >= 50); 117 tw.once('drain', W); 118 } else { 119 tw.end(); 120 } 121 })(); 122} 123 124{ 125 // Verify write buffersize 126 const tw = new TestWriter({ 127 highWaterMark: 100 128 }); 129 130 const encodings = 131 [ 'hex', 132 'utf8', 133 'utf-8', 134 'ascii', 135 'latin1', 136 'binary', 137 'base64', 138 'ucs2', 139 'ucs-2', 140 'utf16le', 141 'utf-16le', 142 undefined ]; 143 144 tw.on('finish', function() { 145 // Got the expected chunks 146 assert.deepStrictEqual(tw.buffer, chunks); 147 }); 148 149 chunks.forEach(function(chunk, i) { 150 const enc = encodings[i % encodings.length]; 151 chunk = Buffer.from(chunk); 152 tw.write(chunk.toString(enc), enc); 153 }); 154} 155 156{ 157 // Verify write with no buffersize 158 const tw = new TestWriter({ 159 highWaterMark: 100, 160 decodeStrings: false 161 }); 162 163 tw._write = function(chunk, encoding, cb) { 164 assert.strictEqual(typeof chunk, 'string'); 165 chunk = Buffer.from(chunk, encoding); 166 return TestWriter.prototype._write.call(this, chunk, encoding, cb); 167 }; 168 169 const encodings = 170 [ 'hex', 171 'utf8', 172 'utf-8', 173 'ascii', 174 'latin1', 175 'binary', 176 'base64', 177 'ucs2', 178 'ucs-2', 179 'utf16le', 180 'utf-16le', 181 undefined ]; 182 183 tw.on('finish', function() { 184 // Got the expected chunks 185 assert.deepStrictEqual(tw.buffer, chunks); 186 }); 187 188 chunks.forEach(function(chunk, i) { 189 const enc = encodings[i % encodings.length]; 190 chunk = Buffer.from(chunk); 191 tw.write(chunk.toString(enc), enc); 192 }); 193} 194 195{ 196 // Verify write callbacks 197 const callbacks = chunks.map(function(chunk, i) { 198 return [i, function() { 199 callbacks._called[i] = chunk; 200 }]; 201 }).reduce(function(set, x) { 202 set[`callback-${x[0]}`] = x[1]; 203 return set; 204 }, {}); 205 callbacks._called = []; 206 207 const tw = new TestWriter({ 208 highWaterMark: 100 209 }); 210 211 tw.on('finish', common.mustCall(function() { 212 process.nextTick(common.mustCall(function() { 213 // Got chunks in the right order 214 assert.deepStrictEqual(tw.buffer, chunks); 215 // Called all callbacks 216 assert.deepStrictEqual(callbacks._called, chunks); 217 })); 218 })); 219 220 chunks.forEach(function(chunk, i) { 221 tw.write(chunk, callbacks[`callback-${i}`]); 222 }); 223 tw.end(); 224} 225 226{ 227 // Verify end() callback 228 const tw = new TestWriter(); 229 tw.end(common.mustCall()); 230} 231 232const helloWorldBuffer = Buffer.from('hello world'); 233 234{ 235 // Verify end() callback with chunk 236 const tw = new TestWriter(); 237 tw.end(helloWorldBuffer, common.mustCall()); 238} 239 240{ 241 // Verify end() callback with chunk and encoding 242 const tw = new TestWriter(); 243 tw.end('hello world', 'ascii', common.mustCall()); 244} 245 246{ 247 // Verify end() callback after write() call 248 const tw = new TestWriter(); 249 tw.write(helloWorldBuffer); 250 tw.end(common.mustCall()); 251} 252 253{ 254 // Verify end() callback after write() callback 255 const tw = new TestWriter(); 256 let writeCalledback = false; 257 tw.write(helloWorldBuffer, function() { 258 writeCalledback = true; 259 }); 260 tw.end(common.mustCall(function() { 261 assert.strictEqual(writeCalledback, true); 262 })); 263} 264 265{ 266 // Verify encoding is ignored for buffers 267 const tw = new W(); 268 const hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; 269 tw._write = common.mustCall(function(chunk) { 270 assert.strictEqual(chunk.toString('hex'), hex); 271 }); 272 const buf = Buffer.from(hex, 'hex'); 273 tw.write(buf, 'latin1'); 274} 275 276{ 277 // Verify writables cannot be piped 278 const w = new W({ autoDestroy: false }); 279 w._write = common.mustNotCall(); 280 let gotError = false; 281 w.on('error', function() { 282 gotError = true; 283 }); 284 w.pipe(process.stdout); 285 assert.strictEqual(gotError, true); 286} 287 288{ 289 // Verify that duplex streams cannot be piped 290 const d = new D(); 291 d._read = common.mustCall(); 292 d._write = common.mustNotCall(); 293 let gotError = false; 294 d.on('error', function() { 295 gotError = true; 296 }); 297 d.pipe(process.stdout); 298 assert.strictEqual(gotError, false); 299} 300 301{ 302 // Verify that end(chunk) twice is an error 303 const w = new W(); 304 w._write = common.mustCall((msg) => { 305 assert.strictEqual(msg.toString(), 'this is the end'); 306 }); 307 let gotError = false; 308 w.on('error', function(er) { 309 gotError = true; 310 assert.strictEqual(er.message, 'write after end'); 311 }); 312 w.end('this is the end'); 313 w.end('and so is this'); 314 process.nextTick(common.mustCall(function() { 315 assert.strictEqual(gotError, true); 316 })); 317} 318 319{ 320 // Verify stream doesn't end while writing 321 const w = new W(); 322 let wrote = false; 323 w._write = function(chunk, e, cb) { 324 assert.strictEqual(this.writing, undefined); 325 wrote = true; 326 this.writing = true; 327 setTimeout(() => { 328 this.writing = false; 329 cb(); 330 }, 1); 331 }; 332 w.on('finish', common.mustCall(function() { 333 assert.strictEqual(wrote, true); 334 assert.strictEqual(this.writing, false); 335 })); 336 w.write(Buffer.alloc(0)); 337 w.end(); 338} 339 340{ 341 // Verify finish does not come before write() callback 342 const w = new W(); 343 let writeCb = false; 344 w._write = function(chunk, e, cb) { 345 setTimeout(function() { 346 writeCb = true; 347 cb(); 348 }, 10); 349 }; 350 w.on('finish', common.mustCall(function() { 351 assert.strictEqual(writeCb, true); 352 })); 353 w.write(Buffer.alloc(0)); 354 w.end(); 355} 356 357{ 358 // Verify finish does not come before synchronous _write() callback 359 const w = new W(); 360 let writeCb = false; 361 w._write = function(chunk, e, cb) { 362 cb(); 363 }; 364 w.on('finish', common.mustCall(function() { 365 assert.strictEqual(writeCb, true); 366 })); 367 w.write(Buffer.alloc(0), function() { 368 writeCb = true; 369 }); 370 w.end(); 371} 372 373{ 374 // Verify finish is emitted if the last chunk is empty 375 const w = new W(); 376 w._write = function(chunk, e, cb) { 377 process.nextTick(cb); 378 }; 379 w.on('finish', common.mustCall()); 380 w.write(Buffer.allocUnsafe(1)); 381 w.end(Buffer.alloc(0)); 382} 383 384{ 385 // Verify that finish is emitted after shutdown 386 const w = new W(); 387 let shutdown = false; 388 389 w._final = common.mustCall(function(cb) { 390 assert.strictEqual(this, w); 391 setTimeout(function() { 392 shutdown = true; 393 cb(); 394 }, 100); 395 }); 396 w._write = function(chunk, e, cb) { 397 process.nextTick(cb); 398 }; 399 w.on('finish', common.mustCall(function() { 400 assert.strictEqual(shutdown, true); 401 })); 402 w.write(Buffer.allocUnsafe(1)); 403 w.end(Buffer.allocUnsafe(0)); 404} 405 406{ 407 // Verify that error is only emitted once when failing in _finish. 408 const w = new W(); 409 410 w._final = common.mustCall(function(cb) { 411 cb(new Error('test')); 412 }); 413 w.on('error', common.mustCall((err) => { 414 assert.strictEqual(w._writableState.errorEmitted, true); 415 assert.strictEqual(err.message, 'test'); 416 w.on('error', common.mustNotCall()); 417 w.destroy(new Error()); 418 })); 419 w.end(); 420} 421 422{ 423 // Verify that error is only emitted once when failing in write. 424 const w = new W(); 425 w.on('error', common.mustNotCall()); 426 assert.throws(() => { 427 w.write(null); 428 }, { 429 code: 'ERR_STREAM_NULL_VALUES' 430 }); 431} 432 433{ 434 // Verify that error is only emitted once when failing in write after end. 435 const w = new W(); 436 w.on('error', common.mustCall((err) => { 437 assert.strictEqual(w._writableState.errorEmitted, true); 438 assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); 439 })); 440 w.end(); 441 w.write('hello'); 442 w.destroy(new Error()); 443} 444 445{ 446 // Verify that finish is not emitted after error 447 const w = new W(); 448 449 w._final = common.mustCall(function(cb) { 450 cb(new Error()); 451 }); 452 w._write = function(chunk, e, cb) { 453 process.nextTick(cb); 454 }; 455 w.on('error', common.mustCall()); 456 w.on('prefinish', common.mustNotCall()); 457 w.on('finish', common.mustNotCall()); 458 w.write(Buffer.allocUnsafe(1)); 459 w.end(Buffer.allocUnsafe(0)); 460} 461