1'use strict'; 2 3const { 4 Array, 5 Symbol, 6} = primordials; 7 8const { Buffer } = require('buffer'); 9const { FastBuffer } = require('internal/buffer'); 10const { 11 WriteWrap, 12 kReadBytesOrError, 13 kArrayBufferOffset, 14 kBytesWritten, 15 kLastWriteWasAsync, 16 streamBaseState, 17} = internalBinding('stream_wrap'); 18const { UV_EOF } = internalBinding('uv'); 19const { 20 errnoException, 21} = require('internal/errors'); 22const { owner_symbol } = require('internal/async_hooks').symbols; 23const { 24 kTimeout, 25 setUnrefTimeout, 26 getTimerDuration, 27} = require('internal/timers'); 28const { isUint8Array } = require('internal/util/types'); 29const { clearTimeout } = require('timers'); 30const { validateFunction } = require('internal/validators'); 31 32const kMaybeDestroy = Symbol('kMaybeDestroy'); 33const kUpdateTimer = Symbol('kUpdateTimer'); 34const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); 35const kHandle = Symbol('kHandle'); 36const kSession = Symbol('kSession'); 37 38let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 39 debug = fn; 40}); 41const kBuffer = Symbol('kBuffer'); 42const kBufferGen = Symbol('kBufferGen'); 43const kBufferCb = Symbol('kBufferCb'); 44 45function handleWriteReq(req, data, encoding) { 46 const { handle } = req; 47 48 switch (encoding) { 49 case 'buffer': 50 { 51 const ret = handle.writeBuffer(req, data); 52 if (streamBaseState[kLastWriteWasAsync]) 53 req.buffer = data; 54 return ret; 55 } 56 case 'latin1': 57 case 'binary': 58 return handle.writeLatin1String(req, data); 59 case 'utf8': 60 case 'utf-8': 61 return handle.writeUtf8String(req, data); 62 case 'ascii': 63 return handle.writeAsciiString(req, data); 64 case 'ucs2': 65 case 'ucs-2': 66 case 'utf16le': 67 case 'utf-16le': 68 return handle.writeUcs2String(req, data); 69 default: 70 { 71 const buffer = Buffer.from(data, encoding); 72 const ret = handle.writeBuffer(req, buffer); 73 if (streamBaseState[kLastWriteWasAsync]) 74 req.buffer = buffer; 75 return ret; 76 } 77 } 78} 79 80function onWriteComplete(status) { 81 debug('onWriteComplete', status, this.error); 82 83 const stream = this.handle[owner_symbol]; 84 85 if (stream.destroyed) { 86 if (typeof this.callback === 'function') 87 this.callback(null); 88 return; 89 } 90 91 // TODO (ronag): This should be moved before if(stream.destroyed) 92 // in order to avoid swallowing error. 93 if (status < 0) { 94 const ex = errnoException(status, 'write', this.error); 95 if (typeof this.callback === 'function') 96 this.callback(ex); 97 else 98 stream.destroy(ex); 99 return; 100 } 101 102 stream[kUpdateTimer](); 103 stream[kAfterAsyncWrite](this); 104 105 if (typeof this.callback === 'function') 106 this.callback(null); 107} 108 109function createWriteWrap(handle, callback) { 110 const req = new WriteWrap(); 111 112 req.handle = handle; 113 req.oncomplete = onWriteComplete; 114 req.async = false; 115 req.bytes = 0; 116 req.buffer = null; 117 req.callback = callback; 118 119 return req; 120} 121 122function writevGeneric(self, data, cb) { 123 const req = createWriteWrap(self[kHandle], cb); 124 const allBuffers = data.allBuffers; 125 let chunks; 126 if (allBuffers) { 127 chunks = data; 128 for (let i = 0; i < data.length; i++) 129 data[i] = data[i].chunk; 130 } else { 131 chunks = new Array(data.length << 1); 132 for (let i = 0; i < data.length; i++) { 133 const entry = data[i]; 134 chunks[i * 2] = entry.chunk; 135 chunks[i * 2 + 1] = entry.encoding; 136 } 137 } 138 const err = req.handle.writev(req, chunks, allBuffers); 139 140 // Retain chunks 141 if (err === 0) req._chunks = chunks; 142 143 afterWriteDispatched(req, err, cb); 144 return req; 145} 146 147function writeGeneric(self, data, encoding, cb) { 148 const req = createWriteWrap(self[kHandle], cb); 149 const err = handleWriteReq(req, data, encoding); 150 151 afterWriteDispatched(req, err, cb); 152 return req; 153} 154 155function afterWriteDispatched(req, err, cb) { 156 req.bytes = streamBaseState[kBytesWritten]; 157 req.async = !!streamBaseState[kLastWriteWasAsync]; 158 159 if (err !== 0) 160 return cb(errnoException(err, 'write', req.error)); 161 162 if (!req.async && typeof req.callback === 'function') { 163 req.callback(); 164 } 165} 166 167function onStreamRead(arrayBuffer) { 168 const nread = streamBaseState[kReadBytesOrError]; 169 170 const handle = this; 171 const stream = this[owner_symbol]; 172 173 stream[kUpdateTimer](); 174 175 if (nread > 0 && !stream.destroyed) { 176 let ret; 177 let result; 178 const userBuf = stream[kBuffer]; 179 if (userBuf) { 180 result = (stream[kBufferCb](nread, userBuf) !== false); 181 const bufGen = stream[kBufferGen]; 182 if (bufGen !== null) { 183 const nextBuf = bufGen(); 184 if (isUint8Array(nextBuf)) 185 stream[kBuffer] = ret = nextBuf; 186 } 187 } else { 188 const offset = streamBaseState[kArrayBufferOffset]; 189 const buf = new FastBuffer(arrayBuffer, offset, nread); 190 result = stream.push(buf); 191 } 192 if (!result) { 193 handle.reading = false; 194 if (!stream.destroyed) { 195 const err = handle.readStop(); 196 if (err) 197 stream.destroy(errnoException(err, 'read')); 198 } 199 } 200 201 return ret; 202 } 203 204 if (nread === 0) { 205 return; 206 } 207 208 // After seeing EOF, most streams will be closed permanently, 209 // and will not deliver any more read events after this point. 210 // (equivalently, it should have called readStop on itself already). 211 // Some streams may be reset and explicitly started again with a call 212 // to readStart, such as TTY. 213 214 if (nread !== UV_EOF) { 215 // CallJSOnreadMethod expects the return value to be a buffer. 216 // Ref: https://github.com/nodejs/node/pull/34375 217 stream.destroy(errnoException(nread, 'read')); 218 return; 219 } 220 221 // Defer this until we actually emit end 222 if (stream._readableState.endEmitted) { 223 if (stream[kMaybeDestroy]) 224 stream[kMaybeDestroy](); 225 } else { 226 if (stream[kMaybeDestroy]) 227 stream.on('end', stream[kMaybeDestroy]); 228 229 // Push a null to signal the end of data. 230 // Do it before `maybeDestroy` for correct order of events: 231 // `end` -> `close` 232 stream.push(null); 233 stream.read(0); 234 } 235} 236 237function setStreamTimeout(msecs, callback) { 238 if (this.destroyed) 239 return this; 240 241 this.timeout = msecs; 242 243 // Type checking identical to timers.enroll() 244 msecs = getTimerDuration(msecs, 'msecs'); 245 246 // Attempt to clear an existing timer in both cases - 247 // even if it will be rescheduled we don't want to leak an existing timer. 248 clearTimeout(this[kTimeout]); 249 250 if (msecs === 0) { 251 if (callback !== undefined) { 252 validateFunction(callback, 'callback'); 253 this.removeListener('timeout', callback); 254 } 255 } else { 256 this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); 257 if (this[kSession]) this[kSession][kUpdateTimer](); 258 259 if (callback !== undefined) { 260 validateFunction(callback, 'callback'); 261 this.once('timeout', callback); 262 } 263 } 264 return this; 265} 266 267module.exports = { 268 writevGeneric, 269 writeGeneric, 270 onStreamRead, 271 kAfterAsyncWrite, 272 kMaybeDestroy, 273 kUpdateTimer, 274 kHandle, 275 kSession, 276 setStreamTimeout, 277 kBuffer, 278 kBufferCb, 279 kBufferGen, 280}; 281