• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_H_
2 #define SRC_STREAM_BASE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "allocated_buffer.h"
8 #include "async_wrap.h"
9 #include "node.h"
10 #include "util.h"
11 
12 #include "v8.h"
13 
14 namespace node {
15 
16 // Forward declarations
17 class Environment;
18 class ShutdownWrap;
19 class WriteWrap;
20 class StreamBase;
21 class StreamResource;
22 
23 struct StreamWriteResult {
24   bool async;
25   int err;
26   WriteWrap* wrap;
27   size_t bytes;
28   BaseObjectPtr<AsyncWrap> wrap_obj;
29 };
30 
31 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
32 
33 class StreamReq {
34  public:
35   // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
36   // here because instances derived from StreamReq will also derive from
37   // BaseObject, and the slots are used for the identical purpose.
38   enum InternalFields {
39     kSlot = BaseObject::kSlot,
40     kStreamReqField = BaseObject::kInternalFieldCount,
41     kInternalFieldCount
42   };
43 
44   inline explicit StreamReq(
45       StreamBase* stream,
46       v8::Local<v8::Object> req_wrap_obj);
47 
48   virtual ~StreamReq() = default;
49   virtual AsyncWrap* GetAsyncWrap() = 0;
50   inline v8::Local<v8::Object> object();
51 
52   inline void Done(int status, const char* error_str = nullptr);
53   inline void Dispose();
54 
stream()55   StreamBase* stream() const { return stream_; }
56 
57   static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
58 
59   // Sets all internal fields of `req_wrap_obj` to `nullptr`.
60   // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
61   // and what we use in C++ after creating these objects from their
62   // v8::ObjectTemplates, to avoid the overhead of calling the
63   // constructor explicitly.
64   static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
65 
66  protected:
67   virtual void OnDone(int status) = 0;
68 
69   inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
70 
71  private:
72   StreamBase* const stream_;
73 };
74 
75 class ShutdownWrap : public StreamReq {
76  public:
77   inline ShutdownWrap(
78       StreamBase* stream,
79       v8::Local<v8::Object> req_wrap_obj);
80 
81   static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
82   template <typename T, bool kIsWeak>
83   static inline ShutdownWrap* FromObject(
84       const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
85 
86   // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
87   void OnDone(int status) override;
88 };
89 
90 class WriteWrap : public StreamReq {
91  public:
92   inline void SetAllocatedStorage(AllocatedBuffer&& storage);
93 
94   inline WriteWrap(
95       StreamBase* stream,
96       v8::Local<v8::Object> req_wrap_obj);
97 
98   static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
99   template <typename T, bool kIsWeak>
100   static inline WriteWrap* FromObject(
101       const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
102 
103   // Call stream()->EmitAfterWrite() and dispose of this request wrap.
104   void OnDone(int status) override;
105 
106  private:
107   AllocatedBuffer storage_;
108 };
109 
110 
111 // This is the generic interface for objects that control Node.js' C++ streams.
112 // For example, the default `EmitToJSStreamListener` emits a stream's data
113 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
114 class StreamListener {
115  public:
116   virtual ~StreamListener();
117 
118   // This is called when a stream wants to allocate memory before
119   // reading data into the freshly allocated buffer (i.e. it is always followed
120   // by a `OnStreamRead()` call).
121   // This memory may be statically or dynamically allocated; for example,
122   // a protocol parser may want to read data into a static buffer if it knows
123   // that all data is going to be fully handled during the next
124   // `OnStreamRead()` call.
125   // The returned buffer does not need to contain `suggested_size` bytes.
126   // The default implementation of this method returns a buffer that has exactly
127   // the suggested size and is allocated using malloc().
128   // It is not valid to return a zero-length buffer from this method.
129   // It is not guaranteed that the corresponding `OnStreamRead()` call
130   // happens in the same event loop turn as this call.
131   virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
132 
133   // `OnStreamRead()` is called when data is available on the socket and has
134   // been read into the buffer provided by `OnStreamAlloc()`.
135   // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
136   // with base nullptr in case of an error.
137   // `nread` is the number of read bytes (which is at most the buffer length),
138   // or, if negative, a libuv error code.
139   virtual void OnStreamRead(ssize_t nread,
140                             const uv_buf_t& buf) = 0;
141 
142   // This is called once a write has finished. `status` may be 0 or,
143   // if negative, a libuv error code.
144   // By default, this is simply passed on to the previous listener
145   // (and raises an assertion if there is none).
146   virtual void OnStreamAfterWrite(WriteWrap* w, int status);
147 
148   // This is called once a shutdown has finished. `status` may be 0 or,
149   // if negative, a libuv error code.
150   // By default, this is simply passed on to the previous listener
151   // (and raises an assertion if there is none).
152   virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
153 
154   // This is called by the stream if it determines that it wants more data
155   // to be written to it. Not all streams support this.
156   // This callback will not be called as long as there are active writes.
157   // It is not supported by all streams; `stream->HasWantsWrite()` returns
158   // true if it is supported by a stream.
OnStreamWantsWrite(size_t suggested_size)159   virtual void OnStreamWantsWrite(size_t suggested_size) {}
160 
161   // This is called immediately before the stream is destroyed.
OnStreamDestroy()162   virtual void OnStreamDestroy() {}
163 
164   // The stream this is currently associated with, or nullptr if there is none.
stream()165   StreamResource* stream() const { return stream_; }
166 
167  protected:
168   // Pass along a read error to the `StreamListener` instance that was active
169   // before this one. For example, a protocol parser does not care about read
170   // errors and may instead want to let the original handler
171   // (e.g. the JS handler) take care of the situation.
172   inline void PassReadErrorToPreviousListener(ssize_t nread);
173 
174   StreamResource* stream_ = nullptr;
175   StreamListener* previous_listener_ = nullptr;
176 
177   friend class StreamResource;
178 };
179 
180 
181 // An (incomplete) stream listener class that calls the `.oncomplete()`
182 // method of the JS objects associated with the wrap objects.
183 class ReportWritesToJSStreamListener : public StreamListener {
184  public:
185   void OnStreamAfterWrite(WriteWrap* w, int status) override;
186   void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
187 
188  private:
189   void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
190 };
191 
192 
193 // A default emitter that just pushes data chunks as Buffer instances to
194 // JS land via the handle’s .ondata method.
195 class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
196  public:
197   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
198   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
199 };
200 
201 
202 // An alternative listener that uses a custom, user-provided buffer
203 // for reading data.
204 class CustomBufferJSListener : public ReportWritesToJSStreamListener {
205  public:
206   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
207   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
OnStreamDestroy()208   void OnStreamDestroy() override { delete this; }
209 
CustomBufferJSListener(uv_buf_t buffer)210   explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
211 
212  private:
213   uv_buf_t buffer_;
214 };
215 
216 
217 // A generic stream, comparable to JS land’s `Duplex` streams.
218 // A stream is always controlled through one `StreamListener` instance.
219 class StreamResource {
220  public:
221   virtual ~StreamResource();
222 
223   // These need to be implemented on the readable side of this stream:
224 
225   // Start reading from the underlying resource. This is called by the consumer
226   // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
227   // pass data along to the consumer.
228   virtual int ReadStart() = 0;
229   // Stop reading from the underlying resource. This is called by the
230   // consumer when its buffers are full and no more data can be handled.
231   virtual int ReadStop() = 0;
232 
233   // These need to be implemented on the writable side of this stream:
234   // All of these methods may return an error code synchronously.
235   // In that case, the finish callback should *not* be called.
236 
237   // Perform a shutdown operation, and either call req_wrap->Done() when
238   // finished and return 0, return 1 for synchronous success, or
239   // a libuv error code for synchronous failures.
240   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
241   // Try to write as much data as possible synchronously, and modify
242   // `*bufs` and `*count` accordingly. This is a no-op by default.
243   // Return 0 for success and a libuv error code for failures.
244   virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
245   // Initiate a write of data. If the write completes synchronously, return 0 on
246   // success (with bufs modified to indicate how much data was consumed) or a
247   // libuv error code on failure. If the write will complete asynchronously,
248   // return 0. When the write completes asynchronously, call req_wrap->Done()
249   // with 0 on success (with bufs modified to indicate how much data was
250   // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
251   // the write completes synchronously, that is, it should never be called
252   // before DoWrite() has returned.
253   virtual int DoWrite(WriteWrap* w,
254                       uv_buf_t* bufs,
255                       size_t count,
256                       uv_stream_t* send_handle) = 0;
257 
258   // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
HasWantsWrite()259   virtual bool HasWantsWrite() const { return false; }
260 
261   // Optionally, this may provide an error message to be used for
262   // failing writes.
263   virtual const char* Error() const;
264   // Clear the current error (i.e. that would be returned by Error()).
265   virtual void ClearError();
266 
267   // Transfer ownership of this stream to `listener`. The previous listener
268   // will not receive any more callbacks while the new listener was active.
269   inline void PushStreamListener(StreamListener* listener);
270   // Remove a listener, and, if this was the currently active one,
271   // transfer ownership back to the previous listener.
272   inline void RemoveStreamListener(StreamListener* listener);
273 
274  protected:
275   // Call the current listener's OnStreamAlloc() method.
276   inline uv_buf_t EmitAlloc(size_t suggested_size);
277   // Call the current listener's OnStreamRead() method and update the
278   // stream's read byte counter.
279   inline void EmitRead(
280       ssize_t nread,
281       const uv_buf_t& buf = uv_buf_init(nullptr, 0));
282   // Call the current listener's OnStreamAfterWrite() method.
283   inline void EmitAfterWrite(WriteWrap* w, int status);
284   // Call the current listener's OnStreamAfterShutdown() method.
285   inline void EmitAfterShutdown(ShutdownWrap* w, int status);
286   // Call the current listener's OnStreamWantsWrite() method.
287   inline void EmitWantsWrite(size_t suggested_size);
288 
289   StreamListener* listener_ = nullptr;
290   uint64_t bytes_read_ = 0;
291   uint64_t bytes_written_ = 0;
292 
293   friend class StreamListener;
294 };
295 
296 
297 class StreamBase : public StreamResource {
298  public:
299   // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
300   // because instances deriving from StreamBase generally also derived from
301   // BaseObject (it's possible for it not to, however).
302   enum InternalFields {
303     kSlot = BaseObject::kSlot,
304     kStreamBaseField = BaseObject::kInternalFieldCount,
305     kOnReadFunctionField,
306     kInternalFieldCount
307   };
308 
309   static void AddMethods(Environment* env,
310                          v8::Local<v8::FunctionTemplate> target);
311 
312   virtual bool IsAlive() = 0;
313   virtual bool IsClosing() = 0;
314   virtual bool IsIPCPipe();
315   virtual int GetFD();
316 
317   enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
318 
319   v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
320       ssize_t nread,
321       v8::Local<v8::ArrayBuffer> ab,
322       size_t offset = 0,
323       StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
324 
325   // This is named `stream_env` to avoid name clashes, because a lot of
326   // subclasses are also `BaseObject`s.
stream_env()327   Environment* stream_env() const { return env_; }
328 
329   // Shut down the current stream. This request can use an existing
330   // ShutdownWrap object (that was created in JS), or a new one will be created.
331   // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
332   // completion, and a libuv error case in case of synchronous failure.
333   inline int Shutdown(
334       v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
335 
336   // Write data to the current stream. This request can use an existing
337   // WriteWrap object (that was created in JS), or a new one will be created.
338   // This will first try to write synchronously using `DoTryWrite()`, then
339   // asynchronously using `DoWrite()`.
340   // If the return value indicates a synchronous completion, no callback will
341   // be invoked.
342   inline StreamWriteResult Write(
343       uv_buf_t* bufs,
344       size_t count,
345       uv_stream_t* send_handle = nullptr,
346       v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
347 
348   // These can be overridden by subclasses to get more specific wrap instances.
349   // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
350   // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
351   // an associated libuv request.
352   virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
353   virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
354 
355   // One of these must be implemented
356   virtual AsyncWrap* GetAsyncWrap() = 0;
357   virtual v8::Local<v8::Object> GetObject();
358 
359   static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
360 
361  protected:
362   inline explicit StreamBase(Environment* env);
363 
364   // JS Methods
365   int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
366   int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
367   int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
368   int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
369   int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
370   template <enum encoding enc>
371   int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
372   int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
373 
374   static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
375   static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
376   static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
377   static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
378   inline void AttachToObject(v8::Local<v8::Object> obj);
379 
380   template <int (StreamBase::*Method)(
381       const v8::FunctionCallbackInfo<v8::Value>& args)>
382   static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
383 
384   // Internal, used only in StreamBase methods + env.cc.
385   enum StreamBaseStateFields {
386     kReadBytesOrError,
387     kArrayBufferOffset,
388     kBytesWritten,
389     kLastWriteWasAsync,
390     kNumStreamBaseStateFields
391   };
392 
393  private:
394   Environment* env_;
395   EmitToJSStreamListener default_listener_;
396 
397   void SetWriteResult(const StreamWriteResult& res);
398   static void AddMethod(Environment* env,
399                         v8::Local<v8::Signature> sig,
400                         enum v8::PropertyAttribute attributes,
401                         v8::Local<v8::FunctionTemplate> t,
402                         JSMethodFunction* stream_method,
403                         v8::Local<v8::String> str);
404 
405   friend class WriteWrap;
406   friend class ShutdownWrap;
407   friend class Environment;  // For kNumStreamBaseStateFields.
408 };
409 
410 
411 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
412 // `OtherBase` must have a constructor that matches the `AsyncWrap`
413 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
414 // and be a subclass of `AsyncWrap`.
415 template <typename OtherBase>
416 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
417  public:
418   SimpleShutdownWrap(StreamBase* stream,
419                      v8::Local<v8::Object> req_wrap_obj);
420 
GetAsyncWrap()421   AsyncWrap* GetAsyncWrap() override { return this; }
422 
423   SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(SimpleShutdownWrap)424   SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
425   SET_SELF_SIZE(SimpleShutdownWrap)
426 
427   bool IsNotIndicativeOfMemoryLeakAtExit() const override {
428     return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
429   }
430 };
431 
432 template <typename OtherBase>
433 class SimpleWriteWrap : public WriteWrap, public OtherBase {
434  public:
435   SimpleWriteWrap(StreamBase* stream,
436                   v8::Local<v8::Object> req_wrap_obj);
437 
GetAsyncWrap()438   AsyncWrap* GetAsyncWrap() override { return this; }
439 
440   SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(SimpleWriteWrap)441   SET_MEMORY_INFO_NAME(SimpleWriteWrap)
442   SET_SELF_SIZE(SimpleWriteWrap)
443 
444   bool IsNotIndicativeOfMemoryLeakAtExit() const override {
445     return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
446   }
447 };
448 
449 }  // namespace node
450 
451 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
452 
453 #endif  // SRC_STREAM_BASE_H_
454