'use strict'; const { Array, Symbol, } = primordials; const { Buffer } = require('buffer'); const { FastBuffer } = require('internal/buffer'); const { WriteWrap, kReadBytesOrError, kArrayBufferOffset, kBytesWritten, kLastWriteWasAsync, streamBaseState } = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); const { codes: { ERR_INVALID_CALLBACK }, errnoException } = require('internal/errors'); const { owner_symbol } = require('internal/async_hooks').symbols; const { kTimeout, setUnrefTimeout, getTimerDuration } = require('internal/timers'); const { isUint8Array } = require('internal/util/types'); const { clearTimeout } = require('timers'); const kMaybeDestroy = Symbol('kMaybeDestroy'); const kUpdateTimer = Symbol('kUpdateTimer'); const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); const kHandle = Symbol('kHandle'); const kSession = Symbol('kSession'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); const kBuffer = Symbol('kBuffer'); const kBufferGen = Symbol('kBufferGen'); const kBufferCb = Symbol('kBufferCb'); function handleWriteReq(req, data, encoding) { const { handle } = req; switch (encoding) { case 'buffer': { const ret = handle.writeBuffer(req, data); if (streamBaseState[kLastWriteWasAsync]) req.buffer = data; return ret; } case 'latin1': case 'binary': return handle.writeLatin1String(req, data); case 'utf8': case 'utf-8': return handle.writeUtf8String(req, data); case 'ascii': return handle.writeAsciiString(req, data); case 'ucs2': case 'ucs-2': case 'utf16le': case 'utf-16le': return handle.writeUcs2String(req, data); default: { const buffer = Buffer.from(data, encoding); const ret = handle.writeBuffer(req, buffer); if (streamBaseState[kLastWriteWasAsync]) req.buffer = buffer; return ret; } } } function onWriteComplete(status) { debug('onWriteComplete', status, this.error); const stream = this.handle[owner_symbol]; if (stream.destroyed) { if (typeof this.callback === 'function') this.callback(null); return; } if (status < 0) { const ex = errnoException(status, 'write', this.error); stream.destroy(ex, this.callback); return; } stream[kUpdateTimer](); stream[kAfterAsyncWrite](this); if (typeof this.callback === 'function') this.callback(null); } function createWriteWrap(handle) { const req = new WriteWrap(); req.handle = handle; req.oncomplete = onWriteComplete; req.async = false; req.bytes = 0; req.buffer = null; return req; } function writevGeneric(self, data, cb) { const req = createWriteWrap(self[kHandle]); const allBuffers = data.allBuffers; let chunks; if (allBuffers) { chunks = data; for (let i = 0; i < data.length; i++) data[i] = data[i].chunk; } else { chunks = new Array(data.length << 1); for (let i = 0; i < data.length; i++) { const entry = data[i]; chunks[i * 2] = entry.chunk; chunks[i * 2 + 1] = entry.encoding; } } const err = req.handle.writev(req, chunks, allBuffers); // Retain chunks if (err === 0) req._chunks = chunks; afterWriteDispatched(self, req, err, cb); return req; } function writeGeneric(self, data, encoding, cb) { const req = createWriteWrap(self[kHandle]); const err = handleWriteReq(req, data, encoding); afterWriteDispatched(self, req, err, cb); return req; } function afterWriteDispatched(self, req, err, cb) { req.bytes = streamBaseState[kBytesWritten]; req.async = !!streamBaseState[kLastWriteWasAsync]; if (err !== 0) return self.destroy(errnoException(err, 'write', req.error), cb); if (!req.async) { cb(); } else { req.callback = cb; } } function onStreamRead(arrayBuffer) { const nread = streamBaseState[kReadBytesOrError]; const handle = this; const stream = this[owner_symbol]; stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { let ret; let result; const userBuf = stream[kBuffer]; if (userBuf) { result = (stream[kBufferCb](nread, userBuf) !== false); const bufGen = stream[kBufferGen]; if (bufGen !== null) { const nextBuf = bufGen(); if (isUint8Array(nextBuf)) stream[kBuffer] = ret = nextBuf; } } else { const offset = streamBaseState[kArrayBufferOffset]; const buf = new FastBuffer(arrayBuffer, offset, nread); result = stream.push(buf); } if (!result) { handle.reading = false; if (!stream.destroyed) { const err = handle.readStop(); if (err) stream.destroy(errnoException(err, 'read')); } } return ret; } if (nread === 0) { return; } if (nread !== UV_EOF) { // CallJSOnreadMethod expects the return value to be a buffer. // Ref: https://github.com/nodejs/node/pull/34375 stream.destroy(errnoException(nread, 'read')); return; } // Defer this until we actually emit end if (stream._readableState.endEmitted) { if (stream[kMaybeDestroy]) stream[kMaybeDestroy](); } else { if (stream[kMaybeDestroy]) stream.on('end', stream[kMaybeDestroy]); // TODO(ronag): Without this `readStop`, `onStreamRead` // will be called once more (i.e. after Readable.ended) // on Windows causing a ECONNRESET, failing the // test-https-truncate test. if (handle.readStop) { const err = handle.readStop(); if (err) { // CallJSOnreadMethod expects the return value to be a buffer. // Ref: https://github.com/nodejs/node/pull/34375 stream.destroy(errnoException(err, 'read')); return; } } // Push a null to signal the end of data. // Do it before `maybeDestroy` for correct order of events: // `end` -> `close` stream.push(null); stream.read(0); } } function setStreamTimeout(msecs, callback) { if (this.destroyed) return this; this.timeout = msecs; // Type checking identical to timers.enroll() msecs = getTimerDuration(msecs, 'msecs'); // Attempt to clear an existing timer in both cases - // even if it will be rescheduled we don't want to leak an existing timer. clearTimeout(this[kTimeout]); if (msecs === 0) { if (callback !== undefined) { if (typeof callback !== 'function') throw new ERR_INVALID_CALLBACK(callback); this.removeListener('timeout', callback); } } else { this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); if (this[kSession]) this[kSession][kUpdateTimer](); if (callback !== undefined) { if (typeof callback !== 'function') throw new ERR_INVALID_CALLBACK(callback); this.once('timeout', callback); } } return this; } module.exports = { createWriteWrap, writevGeneric, writeGeneric, onStreamRead, kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer, kHandle, kSession, setStreamTimeout, kBuffer, kBufferCb, kBufferGen };