1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const common = require('../common'); 25const { Writable: W, Duplex: D } = require('stream'); 26const assert = require('assert'); 27 28class TestWriter extends W { 29 constructor(opts) { 30 super(opts); 31 this.buffer = []; 32 this.written = 0; 33 } 34 35 _write(chunk, encoding, cb) { 36 // Simulate a small unpredictable latency 37 setTimeout(() => { 38 this.buffer.push(chunk.toString()); 39 this.written += chunk.length; 40 cb(); 41 }, Math.floor(Math.random() * 10)); 42 } 43} 44 45const chunks = new Array(50); 46for (let i = 0; i < chunks.length; i++) { 47 chunks[i] = 'x'.repeat(i); 48} 49 50{ 51 // Verify fast writing 52 const tw = new TestWriter({ 53 highWaterMark: 100 54 }); 55 56 tw.on('finish', common.mustCall(function() { 57 // Got chunks in the right order 58 assert.deepStrictEqual(tw.buffer, chunks); 59 })); 60 61 chunks.forEach(function(chunk) { 62 // Ignore backpressure. Just buffer it all up. 63 tw.write(chunk); 64 }); 65 tw.end(); 66} 67 68{ 69 // Verify slow writing 70 const tw = new TestWriter({ 71 highWaterMark: 100 72 }); 73 74 tw.on('finish', common.mustCall(function() { 75 // Got chunks in the right order 76 assert.deepStrictEqual(tw.buffer, chunks); 77 })); 78 79 let i = 0; 80 (function W() { 81 tw.write(chunks[i++]); 82 if (i < chunks.length) 83 setTimeout(W, 10); 84 else 85 tw.end(); 86 })(); 87} 88 89{ 90 // Verify write backpressure 91 const tw = new TestWriter({ 92 highWaterMark: 50 93 }); 94 95 let drains = 0; 96 97 tw.on('finish', common.mustCall(function() { 98 // Got chunks in the right order 99 assert.deepStrictEqual(tw.buffer, chunks); 100 assert.strictEqual(drains, 17); 101 })); 102 103 tw.on('drain', function() { 104 drains++; 105 }); 106 107 let i = 0; 108 (function W() { 109 let ret; 110 do { 111 ret = tw.write(chunks[i++]); 112 } while (ret !== false && i < chunks.length); 113 114 if (i < chunks.length) { 115 assert(tw.writableLength >= 50); 116 tw.once('drain', W); 117 } else { 118 tw.end(); 119 } 120 })(); 121} 122 123{ 124 // Verify write buffersize 125 const tw = new TestWriter({ 126 highWaterMark: 100 127 }); 128 129 const encodings = 130 [ 'hex', 131 'utf8', 132 'utf-8', 133 'ascii', 134 'latin1', 135 'binary', 136 'base64', 137 'ucs2', 138 'ucs-2', 139 'utf16le', 140 'utf-16le', 141 undefined ]; 142 143 tw.on('finish', function() { 144 // Got the expected chunks 145 assert.deepStrictEqual(tw.buffer, chunks); 146 }); 147 148 chunks.forEach(function(chunk, i) { 149 const enc = encodings[i % encodings.length]; 150 chunk = Buffer.from(chunk); 151 tw.write(chunk.toString(enc), enc); 152 }); 153} 154 155{ 156 // Verify write with no buffersize 157 const tw = new TestWriter({ 158 highWaterMark: 100, 159 decodeStrings: false 160 }); 161 162 tw._write = function(chunk, encoding, cb) { 163 assert.strictEqual(typeof chunk, 'string'); 164 chunk = Buffer.from(chunk, encoding); 165 return TestWriter.prototype._write.call(this, chunk, encoding, cb); 166 }; 167 168 const encodings = 169 [ 'hex', 170 'utf8', 171 'utf-8', 172 'ascii', 173 'latin1', 174 'binary', 175 'base64', 176 'ucs2', 177 'ucs-2', 178 'utf16le', 179 'utf-16le', 180 undefined ]; 181 182 tw.on('finish', function() { 183 // Got the expected chunks 184 assert.deepStrictEqual(tw.buffer, chunks); 185 }); 186 187 chunks.forEach(function(chunk, i) { 188 const enc = encodings[i % encodings.length]; 189 chunk = Buffer.from(chunk); 190 tw.write(chunk.toString(enc), enc); 191 }); 192} 193 194{ 195 // Verify write callbacks 196 const callbacks = chunks.map(function(chunk, i) { 197 return [i, function() { 198 callbacks._called[i] = chunk; 199 }]; 200 }).reduce(function(set, x) { 201 set[`callback-${x[0]}`] = x[1]; 202 return set; 203 }, {}); 204 callbacks._called = []; 205 206 const tw = new TestWriter({ 207 highWaterMark: 100 208 }); 209 210 tw.on('finish', common.mustCall(function() { 211 process.nextTick(common.mustCall(function() { 212 // Got chunks in the right order 213 assert.deepStrictEqual(tw.buffer, chunks); 214 // Called all callbacks 215 assert.deepStrictEqual(callbacks._called, chunks); 216 })); 217 })); 218 219 chunks.forEach(function(chunk, i) { 220 tw.write(chunk, callbacks[`callback-${i}`]); 221 }); 222 tw.end(); 223} 224 225{ 226 // Verify end() callback 227 const tw = new TestWriter(); 228 tw.end(common.mustCall()); 229} 230 231const helloWorldBuffer = Buffer.from('hello world'); 232 233{ 234 // Verify end() callback with chunk 235 const tw = new TestWriter(); 236 tw.end(helloWorldBuffer, common.mustCall()); 237} 238 239{ 240 // Verify end() callback with chunk and encoding 241 const tw = new TestWriter(); 242 tw.end('hello world', 'ascii', common.mustCall()); 243} 244 245{ 246 // Verify end() callback after write() call 247 const tw = new TestWriter(); 248 tw.write(helloWorldBuffer); 249 tw.end(common.mustCall()); 250} 251 252{ 253 // Verify end() callback after write() callback 254 const tw = new TestWriter(); 255 let writeCalledback = false; 256 tw.write(helloWorldBuffer, function() { 257 writeCalledback = true; 258 }); 259 tw.end(common.mustCall(function() { 260 assert.strictEqual(writeCalledback, true); 261 })); 262} 263 264{ 265 // Verify encoding is ignored for buffers 266 const tw = new W(); 267 const hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; 268 tw._write = common.mustCall(function(chunk) { 269 assert.strictEqual(chunk.toString('hex'), hex); 270 }); 271 const buf = Buffer.from(hex, 'hex'); 272 tw.write(buf, 'latin1'); 273} 274 275{ 276 // Verify writables cannot be piped 277 const w = new W({ autoDestroy: false }); 278 w._write = common.mustNotCall(); 279 let gotError = false; 280 w.on('error', function() { 281 gotError = true; 282 }); 283 w.pipe(process.stdout); 284 assert.strictEqual(gotError, true); 285} 286 287{ 288 // Verify that duplex streams cannot be piped 289 const d = new D(); 290 d._read = common.mustCall(); 291 d._write = common.mustNotCall(); 292 let gotError = false; 293 d.on('error', function() { 294 gotError = true; 295 }); 296 d.pipe(process.stdout); 297 assert.strictEqual(gotError, false); 298} 299 300{ 301 // Verify that end(chunk) twice is an error 302 const w = new W(); 303 w._write = common.mustCall((msg) => { 304 assert.strictEqual(msg.toString(), 'this is the end'); 305 }); 306 let gotError = false; 307 w.on('error', function(er) { 308 gotError = true; 309 assert.strictEqual(er.message, 'write after end'); 310 }); 311 w.end('this is the end'); 312 w.end('and so is this'); 313 process.nextTick(common.mustCall(function() { 314 assert.strictEqual(gotError, true); 315 })); 316} 317 318{ 319 // Verify stream doesn't end while writing 320 const w = new W(); 321 let wrote = false; 322 w._write = function(chunk, e, cb) { 323 assert.strictEqual(this.writing, undefined); 324 wrote = true; 325 this.writing = true; 326 setTimeout(() => { 327 this.writing = false; 328 cb(); 329 }, 1); 330 }; 331 w.on('finish', common.mustCall(function() { 332 assert.strictEqual(wrote, true); 333 assert.strictEqual(this.writing, false); 334 })); 335 w.write(Buffer.alloc(0)); 336 w.end(); 337} 338 339{ 340 // Verify finish does not come before write() callback 341 const w = new W(); 342 let writeCb = false; 343 w._write = function(chunk, e, cb) { 344 setTimeout(function() { 345 writeCb = true; 346 cb(); 347 }, 10); 348 }; 349 w.on('finish', common.mustCall(function() { 350 assert.strictEqual(writeCb, true); 351 })); 352 w.write(Buffer.alloc(0)); 353 w.end(); 354} 355 356{ 357 // Verify finish does not come before synchronous _write() callback 358 const w = new W(); 359 let writeCb = false; 360 w._write = function(chunk, e, cb) { 361 cb(); 362 }; 363 w.on('finish', common.mustCall(function() { 364 assert.strictEqual(writeCb, true); 365 })); 366 w.write(Buffer.alloc(0), function() { 367 writeCb = true; 368 }); 369 w.end(); 370} 371 372{ 373 // Verify finish is emitted if the last chunk is empty 374 const w = new W(); 375 w._write = function(chunk, e, cb) { 376 process.nextTick(cb); 377 }; 378 w.on('finish', common.mustCall()); 379 w.write(Buffer.allocUnsafe(1)); 380 w.end(Buffer.alloc(0)); 381} 382 383{ 384 // Verify that finish is emitted after shutdown 385 const w = new W(); 386 let shutdown = false; 387 388 w._final = common.mustCall(function(cb) { 389 assert.strictEqual(this, w); 390 setTimeout(function() { 391 shutdown = true; 392 cb(); 393 }, 100); 394 }); 395 w._write = function(chunk, e, cb) { 396 process.nextTick(cb); 397 }; 398 w.on('finish', common.mustCall(function() { 399 assert.strictEqual(shutdown, true); 400 })); 401 w.write(Buffer.allocUnsafe(1)); 402 w.end(Buffer.allocUnsafe(0)); 403} 404 405{ 406 // Verify that error is only emitted once when failing in _finish. 407 const w = new W(); 408 409 w._final = common.mustCall(function(cb) { 410 cb(new Error('test')); 411 }); 412 w.on('error', common.mustCall((err) => { 413 assert.strictEqual(w._writableState.errorEmitted, true); 414 assert.strictEqual(err.message, 'test'); 415 w.on('error', common.mustNotCall()); 416 w.destroy(new Error()); 417 })); 418 w.end(); 419} 420 421{ 422 // Verify that error is only emitted once when failing in write. 423 const w = new W(); 424 w.on('error', common.mustNotCall()); 425 assert.throws(() => { 426 w.write(null); 427 }, { 428 code: 'ERR_STREAM_NULL_VALUES' 429 }); 430} 431 432{ 433 // Verify that error is only emitted once when failing in write after end. 434 const w = new W(); 435 w.on('error', common.mustCall((err) => { 436 assert.strictEqual(w._writableState.errorEmitted, true); 437 assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); 438 })); 439 w.end(); 440 w.write('hello'); 441 w.destroy(new Error()); 442} 443 444{ 445 // Verify that finish is not emitted after error 446 const w = new W(); 447 448 w._final = common.mustCall(function(cb) { 449 cb(new Error()); 450 }); 451 w._write = function(chunk, e, cb) { 452 process.nextTick(cb); 453 }; 454 w.on('error', common.mustCall()); 455 w.on('prefinish', common.mustNotCall()); 456 w.on('finish', common.mustNotCall()); 457 w.write(Buffer.allocUnsafe(1)); 458 w.end(Buffer.allocUnsafe(0)); 459} 460