• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Array,
5  Symbol,
6} = primordials;
7
8const { Buffer } = require('buffer');
9const { FastBuffer } = require('internal/buffer');
10const {
11  WriteWrap,
12  kReadBytesOrError,
13  kArrayBufferOffset,
14  kBytesWritten,
15  kLastWriteWasAsync,
16  streamBaseState,
17} = internalBinding('stream_wrap');
18const { UV_EOF } = internalBinding('uv');
19const {
20  errnoException,
21} = require('internal/errors');
22const { owner_symbol } = require('internal/async_hooks').symbols;
23const {
24  kTimeout,
25  setUnrefTimeout,
26  getTimerDuration,
27} = require('internal/timers');
28const { isUint8Array } = require('internal/util/types');
29const { clearTimeout } = require('timers');
30const { validateFunction } = require('internal/validators');
31
32const kMaybeDestroy = Symbol('kMaybeDestroy');
33const kUpdateTimer = Symbol('kUpdateTimer');
34const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
35const kHandle = Symbol('kHandle');
36const kSession = Symbol('kSession');
37
38let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
39  debug = fn;
40});
41const kBuffer = Symbol('kBuffer');
42const kBufferGen = Symbol('kBufferGen');
43const kBufferCb = Symbol('kBufferCb');
44
45function handleWriteReq(req, data, encoding) {
46  const { handle } = req;
47
48  switch (encoding) {
49    case 'buffer':
50    {
51      const ret = handle.writeBuffer(req, data);
52      if (streamBaseState[kLastWriteWasAsync])
53        req.buffer = data;
54      return ret;
55    }
56    case 'latin1':
57    case 'binary':
58      return handle.writeLatin1String(req, data);
59    case 'utf8':
60    case 'utf-8':
61      return handle.writeUtf8String(req, data);
62    case 'ascii':
63      return handle.writeAsciiString(req, data);
64    case 'ucs2':
65    case 'ucs-2':
66    case 'utf16le':
67    case 'utf-16le':
68      return handle.writeUcs2String(req, data);
69    default:
70    {
71      const buffer = Buffer.from(data, encoding);
72      const ret = handle.writeBuffer(req, buffer);
73      if (streamBaseState[kLastWriteWasAsync])
74        req.buffer = buffer;
75      return ret;
76    }
77  }
78}
79
80function onWriteComplete(status) {
81  debug('onWriteComplete', status, this.error);
82
83  const stream = this.handle[owner_symbol];
84
85  if (stream.destroyed) {
86    if (typeof this.callback === 'function')
87      this.callback(null);
88    return;
89  }
90
91  // TODO (ronag): This should be moved before if(stream.destroyed)
92  // in order to avoid swallowing error.
93  if (status < 0) {
94    const ex = errnoException(status, 'write', this.error);
95    if (typeof this.callback === 'function')
96      this.callback(ex);
97    else
98      stream.destroy(ex);
99    return;
100  }
101
102  stream[kUpdateTimer]();
103  stream[kAfterAsyncWrite](this);
104
105  if (typeof this.callback === 'function')
106    this.callback(null);
107}
108
109function createWriteWrap(handle, callback) {
110  const req = new WriteWrap();
111
112  req.handle = handle;
113  req.oncomplete = onWriteComplete;
114  req.async = false;
115  req.bytes = 0;
116  req.buffer = null;
117  req.callback = callback;
118
119  return req;
120}
121
122function writevGeneric(self, data, cb) {
123  const req = createWriteWrap(self[kHandle], cb);
124  const allBuffers = data.allBuffers;
125  let chunks;
126  if (allBuffers) {
127    chunks = data;
128    for (let i = 0; i < data.length; i++)
129      data[i] = data[i].chunk;
130  } else {
131    chunks = new Array(data.length << 1);
132    for (let i = 0; i < data.length; i++) {
133      const entry = data[i];
134      chunks[i * 2] = entry.chunk;
135      chunks[i * 2 + 1] = entry.encoding;
136    }
137  }
138  const err = req.handle.writev(req, chunks, allBuffers);
139
140  // Retain chunks
141  if (err === 0) req._chunks = chunks;
142
143  afterWriteDispatched(req, err, cb);
144  return req;
145}
146
147function writeGeneric(self, data, encoding, cb) {
148  const req = createWriteWrap(self[kHandle], cb);
149  const err = handleWriteReq(req, data, encoding);
150
151  afterWriteDispatched(req, err, cb);
152  return req;
153}
154
155function afterWriteDispatched(req, err, cb) {
156  req.bytes = streamBaseState[kBytesWritten];
157  req.async = !!streamBaseState[kLastWriteWasAsync];
158
159  if (err !== 0)
160    return cb(errnoException(err, 'write', req.error));
161
162  if (!req.async && typeof req.callback === 'function') {
163    req.callback();
164  }
165}
166
167function onStreamRead(arrayBuffer) {
168  const nread = streamBaseState[kReadBytesOrError];
169
170  const handle = this;
171  const stream = this[owner_symbol];
172
173  stream[kUpdateTimer]();
174
175  if (nread > 0 && !stream.destroyed) {
176    let ret;
177    let result;
178    const userBuf = stream[kBuffer];
179    if (userBuf) {
180      result = (stream[kBufferCb](nread, userBuf) !== false);
181      const bufGen = stream[kBufferGen];
182      if (bufGen !== null) {
183        const nextBuf = bufGen();
184        if (isUint8Array(nextBuf))
185          stream[kBuffer] = ret = nextBuf;
186      }
187    } else {
188      const offset = streamBaseState[kArrayBufferOffset];
189      const buf = new FastBuffer(arrayBuffer, offset, nread);
190      result = stream.push(buf);
191    }
192    if (!result) {
193      handle.reading = false;
194      if (!stream.destroyed) {
195        const err = handle.readStop();
196        if (err)
197          stream.destroy(errnoException(err, 'read'));
198      }
199    }
200
201    return ret;
202  }
203
204  if (nread === 0) {
205    return;
206  }
207
208  // After seeing EOF, most streams will be closed permanently,
209  // and will not deliver any more read events after this point.
210  // (equivalently, it should have called readStop on itself already).
211  // Some streams may be reset and explicitly started again with a call
212  // to readStart, such as TTY.
213
214  if (nread !== UV_EOF) {
215    // CallJSOnreadMethod expects the return value to be a buffer.
216    // Ref: https://github.com/nodejs/node/pull/34375
217    stream.destroy(errnoException(nread, 'read'));
218    return;
219  }
220
221  // Defer this until we actually emit end
222  if (stream._readableState.endEmitted) {
223    if (stream[kMaybeDestroy])
224      stream[kMaybeDestroy]();
225  } else {
226    if (stream[kMaybeDestroy])
227      stream.on('end', stream[kMaybeDestroy]);
228
229    // Push a null to signal the end of data.
230    // Do it before `maybeDestroy` for correct order of events:
231    // `end` -> `close`
232    stream.push(null);
233    stream.read(0);
234  }
235}
236
237function setStreamTimeout(msecs, callback) {
238  if (this.destroyed)
239    return this;
240
241  this.timeout = msecs;
242
243  // Type checking identical to timers.enroll()
244  msecs = getTimerDuration(msecs, 'msecs');
245
246  // Attempt to clear an existing timer in both cases -
247  //  even if it will be rescheduled we don't want to leak an existing timer.
248  clearTimeout(this[kTimeout]);
249
250  if (msecs === 0) {
251    if (callback !== undefined) {
252      validateFunction(callback, 'callback');
253      this.removeListener('timeout', callback);
254    }
255  } else {
256    this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
257    if (this[kSession]) this[kSession][kUpdateTimer]();
258
259    if (callback !== undefined) {
260      validateFunction(callback, 'callback');
261      this.once('timeout', callback);
262    }
263  }
264  return this;
265}
266
267module.exports = {
268  writevGeneric,
269  writeGeneric,
270  onStreamRead,
271  kAfterAsyncWrite,
272  kMaybeDestroy,
273  kUpdateTimer,
274  kHandle,
275  kSession,
276  setStreamTimeout,
277  kBuffer,
278  kBufferCb,
279  kBufferGen,
280};
281