1'use strict'; 2 3const { 4 Array, 5 Symbol, 6} = primordials; 7 8const { Buffer } = require('buffer'); 9const { FastBuffer } = require('internal/buffer'); 10const { 11 WriteWrap, 12 kReadBytesOrError, 13 kArrayBufferOffset, 14 kBytesWritten, 15 kLastWriteWasAsync, 16 streamBaseState 17} = internalBinding('stream_wrap'); 18const { UV_EOF } = internalBinding('uv'); 19const { 20 codes: { 21 ERR_INVALID_CALLBACK 22 }, 23 errnoException 24} = require('internal/errors'); 25const { owner_symbol } = require('internal/async_hooks').symbols; 26const { 27 kTimeout, 28 setUnrefTimeout, 29 getTimerDuration 30} = require('internal/timers'); 31const { isUint8Array } = require('internal/util/types'); 32const { clearTimeout } = require('timers'); 33 34const kMaybeDestroy = Symbol('kMaybeDestroy'); 35const kUpdateTimer = Symbol('kUpdateTimer'); 36const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); 37const kHandle = Symbol('kHandle'); 38const kSession = Symbol('kSession'); 39 40let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 41 debug = fn; 42}); 43const kBuffer = Symbol('kBuffer'); 44const kBufferGen = Symbol('kBufferGen'); 45const kBufferCb = Symbol('kBufferCb'); 46 47function handleWriteReq(req, data, encoding) { 48 const { handle } = req; 49 50 switch (encoding) { 51 case 'buffer': 52 { 53 const ret = handle.writeBuffer(req, data); 54 if (streamBaseState[kLastWriteWasAsync]) 55 req.buffer = data; 56 return ret; 57 } 58 case 'latin1': 59 case 'binary': 60 return handle.writeLatin1String(req, data); 61 case 'utf8': 62 case 'utf-8': 63 return handle.writeUtf8String(req, data); 64 case 'ascii': 65 return handle.writeAsciiString(req, data); 66 case 'ucs2': 67 case 'ucs-2': 68 case 'utf16le': 69 case 'utf-16le': 70 return handle.writeUcs2String(req, data); 71 default: 72 { 73 const buffer = Buffer.from(data, encoding); 74 const ret = handle.writeBuffer(req, buffer); 75 if (streamBaseState[kLastWriteWasAsync]) 76 req.buffer = buffer; 77 return ret; 78 } 79 } 80} 81 82function onWriteComplete(status) { 83 debug('onWriteComplete', status, this.error); 84 85 const stream = this.handle[owner_symbol]; 86 87 if (stream.destroyed) { 88 if (typeof this.callback === 'function') 89 this.callback(null); 90 return; 91 } 92 93 if (status < 0) { 94 const ex = errnoException(status, 'write', this.error); 95 stream.destroy(ex, this.callback); 96 return; 97 } 98 99 stream[kUpdateTimer](); 100 stream[kAfterAsyncWrite](this); 101 102 if (typeof this.callback === 'function') 103 this.callback(null); 104} 105 106function createWriteWrap(handle) { 107 const req = new WriteWrap(); 108 109 req.handle = handle; 110 req.oncomplete = onWriteComplete; 111 req.async = false; 112 req.bytes = 0; 113 req.buffer = null; 114 115 return req; 116} 117 118function writevGeneric(self, data, cb) { 119 const req = createWriteWrap(self[kHandle]); 120 const allBuffers = data.allBuffers; 121 let chunks; 122 if (allBuffers) { 123 chunks = data; 124 for (let i = 0; i < data.length; i++) 125 data[i] = data[i].chunk; 126 } else { 127 chunks = new Array(data.length << 1); 128 for (let i = 0; i < data.length; i++) { 129 const entry = data[i]; 130 chunks[i * 2] = entry.chunk; 131 chunks[i * 2 + 1] = entry.encoding; 132 } 133 } 134 const err = req.handle.writev(req, chunks, allBuffers); 135 136 // Retain chunks 137 if (err === 0) req._chunks = chunks; 138 139 afterWriteDispatched(self, req, err, cb); 140 return req; 141} 142 143function writeGeneric(self, data, encoding, cb) { 144 const req = createWriteWrap(self[kHandle]); 145 const err = handleWriteReq(req, data, encoding); 146 147 afterWriteDispatched(self, req, err, cb); 148 return req; 149} 150 151function afterWriteDispatched(self, req, err, cb) { 152 req.bytes = streamBaseState[kBytesWritten]; 153 req.async = !!streamBaseState[kLastWriteWasAsync]; 154 155 if (err !== 0) 156 return self.destroy(errnoException(err, 'write', req.error), cb); 157 158 if (!req.async) { 159 cb(); 160 } else { 161 req.callback = cb; 162 } 163} 164 165function onStreamRead(arrayBuffer) { 166 const nread = streamBaseState[kReadBytesOrError]; 167 168 const handle = this; 169 const stream = this[owner_symbol]; 170 171 stream[kUpdateTimer](); 172 173 if (nread > 0 && !stream.destroyed) { 174 let ret; 175 let result; 176 const userBuf = stream[kBuffer]; 177 if (userBuf) { 178 result = (stream[kBufferCb](nread, userBuf) !== false); 179 const bufGen = stream[kBufferGen]; 180 if (bufGen !== null) { 181 const nextBuf = bufGen(); 182 if (isUint8Array(nextBuf)) 183 stream[kBuffer] = ret = nextBuf; 184 } 185 } else { 186 const offset = streamBaseState[kArrayBufferOffset]; 187 const buf = new FastBuffer(arrayBuffer, offset, nread); 188 result = stream.push(buf); 189 } 190 if (!result) { 191 handle.reading = false; 192 if (!stream.destroyed) { 193 const err = handle.readStop(); 194 if (err) 195 stream.destroy(errnoException(err, 'read')); 196 } 197 } 198 199 return ret; 200 } 201 202 if (nread === 0) { 203 return; 204 } 205 206 if (nread !== UV_EOF) { 207 // CallJSOnreadMethod expects the return value to be a buffer. 208 // Ref: https://github.com/nodejs/node/pull/34375 209 stream.destroy(errnoException(nread, 'read')); 210 return; 211 } 212 213 // Defer this until we actually emit end 214 if (stream._readableState.endEmitted) { 215 if (stream[kMaybeDestroy]) 216 stream[kMaybeDestroy](); 217 } else { 218 if (stream[kMaybeDestroy]) 219 stream.on('end', stream[kMaybeDestroy]); 220 221 // TODO(ronag): Without this `readStop`, `onStreamRead` 222 // will be called once more (i.e. after Readable.ended) 223 // on Windows causing a ECONNRESET, failing the 224 // test-https-truncate test. 225 if (handle.readStop) { 226 const err = handle.readStop(); 227 if (err) { 228 // CallJSOnreadMethod expects the return value to be a buffer. 229 // Ref: https://github.com/nodejs/node/pull/34375 230 stream.destroy(errnoException(err, 'read')); 231 return; 232 } 233 } 234 235 // Push a null to signal the end of data. 236 // Do it before `maybeDestroy` for correct order of events: 237 // `end` -> `close` 238 stream.push(null); 239 stream.read(0); 240 } 241} 242 243function setStreamTimeout(msecs, callback) { 244 if (this.destroyed) 245 return this; 246 247 this.timeout = msecs; 248 249 // Type checking identical to timers.enroll() 250 msecs = getTimerDuration(msecs, 'msecs'); 251 252 // Attempt to clear an existing timer in both cases - 253 // even if it will be rescheduled we don't want to leak an existing timer. 254 clearTimeout(this[kTimeout]); 255 256 if (msecs === 0) { 257 if (callback !== undefined) { 258 if (typeof callback !== 'function') 259 throw new ERR_INVALID_CALLBACK(callback); 260 this.removeListener('timeout', callback); 261 } 262 } else { 263 this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); 264 if (this[kSession]) this[kSession][kUpdateTimer](); 265 266 if (callback !== undefined) { 267 if (typeof callback !== 'function') 268 throw new ERR_INVALID_CALLBACK(callback); 269 this.once('timeout', callback); 270 } 271 } 272 return this; 273} 274 275module.exports = { 276 createWriteWrap, 277 writevGeneric, 278 writeGeneric, 279 onStreamRead, 280 kAfterAsyncWrite, 281 kMaybeDestroy, 282 kUpdateTimer, 283 kHandle, 284 kSession, 285 setStreamTimeout, 286 kBuffer, 287 kBufferCb, 288 kBufferGen 289}; 290