• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Array,
5  Symbol,
6} = primordials;
7
8const { Buffer } = require('buffer');
9const { FastBuffer } = require('internal/buffer');
10const {
11  WriteWrap,
12  kReadBytesOrError,
13  kArrayBufferOffset,
14  kBytesWritten,
15  kLastWriteWasAsync,
16  streamBaseState
17} = internalBinding('stream_wrap');
18const { UV_EOF } = internalBinding('uv');
19const {
20  codes: {
21    ERR_INVALID_CALLBACK
22  },
23  errnoException
24} = require('internal/errors');
25const { owner_symbol } = require('internal/async_hooks').symbols;
26const {
27  kTimeout,
28  setUnrefTimeout,
29  getTimerDuration
30} = require('internal/timers');
31const { isUint8Array } = require('internal/util/types');
32const { clearTimeout } = require('timers');
33
34const kMaybeDestroy = Symbol('kMaybeDestroy');
35const kUpdateTimer = Symbol('kUpdateTimer');
36const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
37const kHandle = Symbol('kHandle');
38const kSession = Symbol('kSession');
39
40let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
41  debug = fn;
42});
43const kBuffer = Symbol('kBuffer');
44const kBufferGen = Symbol('kBufferGen');
45const kBufferCb = Symbol('kBufferCb');
46
47function handleWriteReq(req, data, encoding) {
48  const { handle } = req;
49
50  switch (encoding) {
51    case 'buffer':
52    {
53      const ret = handle.writeBuffer(req, data);
54      if (streamBaseState[kLastWriteWasAsync])
55        req.buffer = data;
56      return ret;
57    }
58    case 'latin1':
59    case 'binary':
60      return handle.writeLatin1String(req, data);
61    case 'utf8':
62    case 'utf-8':
63      return handle.writeUtf8String(req, data);
64    case 'ascii':
65      return handle.writeAsciiString(req, data);
66    case 'ucs2':
67    case 'ucs-2':
68    case 'utf16le':
69    case 'utf-16le':
70      return handle.writeUcs2String(req, data);
71    default:
72    {
73      const buffer = Buffer.from(data, encoding);
74      const ret = handle.writeBuffer(req, buffer);
75      if (streamBaseState[kLastWriteWasAsync])
76        req.buffer = buffer;
77      return ret;
78    }
79  }
80}
81
82function onWriteComplete(status) {
83  debug('onWriteComplete', status, this.error);
84
85  const stream = this.handle[owner_symbol];
86
87  if (stream.destroyed) {
88    if (typeof this.callback === 'function')
89      this.callback(null);
90    return;
91  }
92
93  if (status < 0) {
94    const ex = errnoException(status, 'write', this.error);
95    stream.destroy(ex, this.callback);
96    return;
97  }
98
99  stream[kUpdateTimer]();
100  stream[kAfterAsyncWrite](this);
101
102  if (typeof this.callback === 'function')
103    this.callback(null);
104}
105
106function createWriteWrap(handle) {
107  const req = new WriteWrap();
108
109  req.handle = handle;
110  req.oncomplete = onWriteComplete;
111  req.async = false;
112  req.bytes = 0;
113  req.buffer = null;
114
115  return req;
116}
117
118function writevGeneric(self, data, cb) {
119  const req = createWriteWrap(self[kHandle]);
120  const allBuffers = data.allBuffers;
121  let chunks;
122  if (allBuffers) {
123    chunks = data;
124    for (let i = 0; i < data.length; i++)
125      data[i] = data[i].chunk;
126  } else {
127    chunks = new Array(data.length << 1);
128    for (let i = 0; i < data.length; i++) {
129      const entry = data[i];
130      chunks[i * 2] = entry.chunk;
131      chunks[i * 2 + 1] = entry.encoding;
132    }
133  }
134  const err = req.handle.writev(req, chunks, allBuffers);
135
136  // Retain chunks
137  if (err === 0) req._chunks = chunks;
138
139  afterWriteDispatched(self, req, err, cb);
140  return req;
141}
142
143function writeGeneric(self, data, encoding, cb) {
144  const req = createWriteWrap(self[kHandle]);
145  const err = handleWriteReq(req, data, encoding);
146
147  afterWriteDispatched(self, req, err, cb);
148  return req;
149}
150
151function afterWriteDispatched(self, req, err, cb) {
152  req.bytes = streamBaseState[kBytesWritten];
153  req.async = !!streamBaseState[kLastWriteWasAsync];
154
155  if (err !== 0)
156    return self.destroy(errnoException(err, 'write', req.error), cb);
157
158  if (!req.async) {
159    cb();
160  } else {
161    req.callback = cb;
162  }
163}
164
165function onStreamRead(arrayBuffer) {
166  const nread = streamBaseState[kReadBytesOrError];
167
168  const handle = this;
169  const stream = this[owner_symbol];
170
171  stream[kUpdateTimer]();
172
173  if (nread > 0 && !stream.destroyed) {
174    let ret;
175    let result;
176    const userBuf = stream[kBuffer];
177    if (userBuf) {
178      result = (stream[kBufferCb](nread, userBuf) !== false);
179      const bufGen = stream[kBufferGen];
180      if (bufGen !== null) {
181        const nextBuf = bufGen();
182        if (isUint8Array(nextBuf))
183          stream[kBuffer] = ret = nextBuf;
184      }
185    } else {
186      const offset = streamBaseState[kArrayBufferOffset];
187      const buf = new FastBuffer(arrayBuffer, offset, nread);
188      result = stream.push(buf);
189    }
190    if (!result) {
191      handle.reading = false;
192      if (!stream.destroyed) {
193        const err = handle.readStop();
194        if (err)
195          stream.destroy(errnoException(err, 'read'));
196      }
197    }
198
199    return ret;
200  }
201
202  if (nread === 0) {
203    return;
204  }
205
206  if (nread !== UV_EOF) {
207    // CallJSOnreadMethod expects the return value to be a buffer.
208    // Ref: https://github.com/nodejs/node/pull/34375
209    stream.destroy(errnoException(nread, 'read'));
210    return;
211  }
212
213  // Defer this until we actually emit end
214  if (stream._readableState.endEmitted) {
215    if (stream[kMaybeDestroy])
216      stream[kMaybeDestroy]();
217  } else {
218    if (stream[kMaybeDestroy])
219      stream.on('end', stream[kMaybeDestroy]);
220
221    // TODO(ronag): Without this `readStop`, `onStreamRead`
222    // will be called once more (i.e. after Readable.ended)
223    // on Windows causing a ECONNRESET, failing the
224    // test-https-truncate test.
225    if (handle.readStop) {
226      const err = handle.readStop();
227      if (err) {
228        // CallJSOnreadMethod expects the return value to be a buffer.
229        // Ref: https://github.com/nodejs/node/pull/34375
230        stream.destroy(errnoException(err, 'read'));
231        return;
232      }
233    }
234
235    // Push a null to signal the end of data.
236    // Do it before `maybeDestroy` for correct order of events:
237    // `end` -> `close`
238    stream.push(null);
239    stream.read(0);
240  }
241}
242
243function setStreamTimeout(msecs, callback) {
244  if (this.destroyed)
245    return this;
246
247  this.timeout = msecs;
248
249  // Type checking identical to timers.enroll()
250  msecs = getTimerDuration(msecs, 'msecs');
251
252  // Attempt to clear an existing timer in both cases -
253  //  even if it will be rescheduled we don't want to leak an existing timer.
254  clearTimeout(this[kTimeout]);
255
256  if (msecs === 0) {
257    if (callback !== undefined) {
258      if (typeof callback !== 'function')
259        throw new ERR_INVALID_CALLBACK(callback);
260      this.removeListener('timeout', callback);
261    }
262  } else {
263    this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
264    if (this[kSession]) this[kSession][kUpdateTimer]();
265
266    if (callback !== undefined) {
267      if (typeof callback !== 'function')
268        throw new ERR_INVALID_CALLBACK(callback);
269      this.once('timeout', callback);
270    }
271  }
272  return this;
273}
274
275module.exports = {
276  createWriteWrap,
277  writevGeneric,
278  writeGeneric,
279  onStreamRead,
280  kAfterAsyncWrite,
281  kMaybeDestroy,
282  kUpdateTimer,
283  kHandle,
284  kSession,
285  setStreamTimeout,
286  kBuffer,
287  kBufferCb,
288  kBufferGen
289};
290