• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  SymbolAsyncIterator,
5  SymbolIterator,
6  SymbolFor,
7} = primordials;
8
9// We need to use SymbolFor to make these globally available
10// for interopt with readable-stream, i.e. readable-stream
11// and node core needs to be able to read/write private state
12// from each other for proper interoperability.
13const kIsDestroyed = SymbolFor('nodejs.stream.destroyed');
14const kIsErrored = SymbolFor('nodejs.stream.errored');
15const kIsReadable = SymbolFor('nodejs.stream.readable');
16const kIsWritable = SymbolFor('nodejs.stream.writable');
17const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');
18
19const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
20const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
21
22function isReadableNodeStream(obj, strict = false) {
23  return !!(
24    obj &&
25    typeof obj.pipe === 'function' &&
26    typeof obj.on === 'function' &&
27    (
28      !strict ||
29      (typeof obj.pause === 'function' && typeof obj.resume === 'function')
30    ) &&
31    (!obj._writableState || obj._readableState?.readable !== false) && // Duplex
32    (!obj._writableState || obj._readableState) // Writable has .pipe.
33  );
34}
35
36function isWritableNodeStream(obj) {
37  return !!(
38    obj &&
39    typeof obj.write === 'function' &&
40    typeof obj.on === 'function' &&
41    (!obj._readableState || obj._writableState?.writable !== false) // Duplex
42  );
43}
44
45function isDuplexNodeStream(obj) {
46  return !!(
47    obj &&
48    (typeof obj.pipe === 'function' && obj._readableState) &&
49    typeof obj.on === 'function' &&
50    typeof obj.write === 'function'
51  );
52}
53
54function isNodeStream(obj) {
55  return (
56    obj &&
57    (
58      obj._readableState ||
59      obj._writableState ||
60      (typeof obj.write === 'function' && typeof obj.on === 'function') ||
61      (typeof obj.pipe === 'function' && typeof obj.on === 'function')
62    )
63  );
64}
65
66function isReadableStream(obj) {
67  return !!(
68    obj &&
69    !isNodeStream(obj) &&
70    typeof obj.pipeThrough === 'function' &&
71    typeof obj.getReader === 'function' &&
72    typeof obj.cancel === 'function'
73  );
74}
75
76function isWritableStream(obj) {
77  return !!(
78    obj &&
79    !isNodeStream(obj) &&
80    typeof obj.getWriter === 'function' &&
81    typeof obj.abort === 'function'
82  );
83}
84
85function isTransformStream(obj) {
86  return !!(
87    obj &&
88    !isNodeStream(obj) &&
89    typeof obj.readable === 'object' &&
90    typeof obj.writable === 'object'
91  );
92}
93
94function isWebStream(obj) {
95  return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
96}
97
98function isIterable(obj, isAsync) {
99  if (obj == null) return false;
100  if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
101  if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
102  return typeof obj[SymbolAsyncIterator] === 'function' ||
103    typeof obj[SymbolIterator] === 'function';
104}
105
106function isDestroyed(stream) {
107  if (!isNodeStream(stream)) return null;
108  const wState = stream._writableState;
109  const rState = stream._readableState;
110  const state = wState || rState;
111  return !!(stream.destroyed || stream[kIsDestroyed] || state?.destroyed);
112}
113
114// Have been end():d.
115function isWritableEnded(stream) {
116  if (!isWritableNodeStream(stream)) return null;
117  if (stream.writableEnded === true) return true;
118  const wState = stream._writableState;
119  if (wState?.errored) return false;
120  if (typeof wState?.ended !== 'boolean') return null;
121  return wState.ended;
122}
123
124// Have emitted 'finish'.
125function isWritableFinished(stream, strict) {
126  if (!isWritableNodeStream(stream)) return null;
127  if (stream.writableFinished === true) return true;
128  const wState = stream._writableState;
129  if (wState?.errored) return false;
130  if (typeof wState?.finished !== 'boolean') return null;
131  return !!(
132    wState.finished ||
133    (strict === false && wState.ended === true && wState.length === 0)
134  );
135}
136
137// Have been push(null):d.
138function isReadableEnded(stream) {
139  if (!isReadableNodeStream(stream)) return null;
140  if (stream.readableEnded === true) return true;
141  const rState = stream._readableState;
142  if (!rState || rState.errored) return false;
143  if (typeof rState?.ended !== 'boolean') return null;
144  return rState.ended;
145}
146
147// Have emitted 'end'.
148function isReadableFinished(stream, strict) {
149  if (!isReadableNodeStream(stream)) return null;
150  const rState = stream._readableState;
151  if (rState?.errored) return false;
152  if (typeof rState?.endEmitted !== 'boolean') return null;
153  return !!(
154    rState.endEmitted ||
155    (strict === false && rState.ended === true && rState.length === 0)
156  );
157}
158
159function isReadable(stream) {
160  if (stream && stream[kIsReadable] != null) return stream[kIsReadable];
161  if (typeof stream?.readable !== 'boolean') return null;
162  if (isDestroyed(stream)) return false;
163  return isReadableNodeStream(stream) &&
164    stream.readable &&
165    !isReadableFinished(stream);
166}
167
168function isWritable(stream) {
169  if (stream && stream[kIsWritable] != null) return stream[kIsWritable];
170  if (typeof stream?.writable !== 'boolean') return null;
171  if (isDestroyed(stream)) return false;
172  return isWritableNodeStream(stream) &&
173    stream.writable &&
174    !isWritableEnded(stream);
175}
176
177function isFinished(stream, opts) {
178  if (!isNodeStream(stream)) {
179    return null;
180  }
181
182  if (isDestroyed(stream)) {
183    return true;
184  }
185
186  if (opts?.readable !== false && isReadable(stream)) {
187    return false;
188  }
189
190  if (opts?.writable !== false && isWritable(stream)) {
191    return false;
192  }
193
194  return true;
195}
196
197function isWritableErrored(stream) {
198  if (!isNodeStream(stream)) {
199    return null;
200  }
201
202  if (stream.writableErrored) {
203    return stream.writableErrored;
204  }
205
206  return stream._writableState?.errored ?? null;
207}
208
209function isReadableErrored(stream) {
210  if (!isNodeStream(stream)) {
211    return null;
212  }
213
214  if (stream.readableErrored) {
215    return stream.readableErrored;
216  }
217
218  return stream._readableState?.errored ?? null;
219}
220
221function isClosed(stream) {
222  if (!isNodeStream(stream)) {
223    return null;
224  }
225
226  if (typeof stream.closed === 'boolean') {
227    return stream.closed;
228  }
229
230  const wState = stream._writableState;
231  const rState = stream._readableState;
232
233  if (
234    typeof wState?.closed === 'boolean' ||
235    typeof rState?.closed === 'boolean'
236  ) {
237    return wState?.closed || rState?.closed;
238  }
239
240  if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) {
241    return stream._closed;
242  }
243
244  return null;
245}
246
247function isOutgoingMessage(stream) {
248  return (
249    typeof stream._closed === 'boolean' &&
250    typeof stream._defaultKeepAlive === 'boolean' &&
251    typeof stream._removedConnection === 'boolean' &&
252    typeof stream._removedContLen === 'boolean'
253  );
254}
255
256function isServerResponse(stream) {
257  return (
258    typeof stream._sent100 === 'boolean' &&
259    isOutgoingMessage(stream)
260  );
261}
262
263function isServerRequest(stream) {
264  return (
265    typeof stream._consuming === 'boolean' &&
266    typeof stream._dumped === 'boolean' &&
267    stream.req?.upgradeOrConnect === undefined
268  );
269}
270
271function willEmitClose(stream) {
272  if (!isNodeStream(stream)) return null;
273
274  const wState = stream._writableState;
275  const rState = stream._readableState;
276  const state = wState || rState;
277
278  return (!state && isServerResponse(stream)) || !!(
279    state &&
280    state.autoDestroy &&
281    state.emitClose &&
282    state.closed === false
283  );
284}
285
286function isDisturbed(stream) {
287  return !!(stream && (
288    stream[kIsDisturbed] ??
289    (stream.readableDidRead || stream.readableAborted)
290  ));
291}
292
293function isErrored(stream) {
294  return !!(stream && (
295    stream[kIsErrored] ??
296    stream.readableErrored ??
297    stream.writableErrored ??
298    stream._readableState?.errorEmitted ??
299    stream._writableState?.errorEmitted ??
300    stream._readableState?.errored ??
301    stream._writableState?.errored
302  ));
303}
304
305module.exports = {
306  isDestroyed,
307  kIsDestroyed,
308  isDisturbed,
309  kIsDisturbed,
310  isErrored,
311  kIsErrored,
312  isReadable,
313  kIsReadable,
314  kIsClosedPromise,
315  kControllerErrorFunction,
316  kIsWritable,
317  isClosed,
318  isDuplexNodeStream,
319  isFinished,
320  isIterable,
321  isReadableNodeStream,
322  isReadableStream,
323  isReadableEnded,
324  isReadableFinished,
325  isReadableErrored,
326  isNodeStream,
327  isWebStream,
328  isWritable,
329  isWritableNodeStream,
330  isWritableStream,
331  isWritableEnded,
332  isWritableFinished,
333  isWritableErrored,
334  isServerRequest,
335  isServerResponse,
336  willEmitClose,
337  isTransformStream,
338};
339