• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_H_
2 #define SRC_STREAM_BASE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "async_wrap.h"
8 #include "node.h"
9 #include "util.h"
10 
11 #include "v8.h"
12 
13 namespace node {
14 
15 // Forward declarations
16 class Environment;
17 class ShutdownWrap;
18 class WriteWrap;
19 class StreamBase;
20 class StreamResource;
21 class ExternalReferenceRegistry;
22 
23 struct StreamWriteResult {
24   bool async;
25   int err;
26   WriteWrap* wrap;
27   size_t bytes;
28   BaseObjectPtr<AsyncWrap> wrap_obj;
29 };
30 
31 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
32 
33 class StreamReq {
34  public:
35   // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
36   // here because instances derived from StreamReq will also derive from
37   // BaseObject, and the slots are used for the identical purpose.
38   enum InternalFields {
39     kSlot = BaseObject::kSlot,
40     kStreamReqField = BaseObject::kInternalFieldCount,
41     kInternalFieldCount
42   };
43 
44   inline explicit StreamReq(
45       StreamBase* stream,
46       v8::Local<v8::Object> req_wrap_obj);
47 
48   virtual ~StreamReq() = default;
49   virtual AsyncWrap* GetAsyncWrap() = 0;
50   inline v8::Local<v8::Object> object();
51 
52   // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
53   // if there is a pending exception/termination.
54   void Done(int status, const char* error_str = nullptr);
55   inline void Dispose();
56 
stream()57   StreamBase* stream() const { return stream_; }
58 
59   static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
60 
61   // Sets all internal fields of `req_wrap_obj` to `nullptr`.
62   // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
63   // and what we use in C++ after creating these objects from their
64   // v8::ObjectTemplates, to avoid the overhead of calling the
65   // constructor explicitly.
66   static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
67 
68  protected:
69   virtual void OnDone(int status) = 0;
70 
71   inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
72 
73  private:
74   StreamBase* const stream_;
75 };
76 
77 class ShutdownWrap : public StreamReq {
78  public:
79   inline ShutdownWrap(
80       StreamBase* stream,
81       v8::Local<v8::Object> req_wrap_obj);
82 
83   static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
84   template <typename T, bool kIsWeak>
85   static inline ShutdownWrap* FromObject(
86       const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
87 
88   // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
89   void OnDone(int status) override;
90 };
91 
92 class WriteWrap : public StreamReq {
93  public:
94   inline void SetBackingStore(std::unique_ptr<v8::BackingStore> bs);
95 
96   inline WriteWrap(
97       StreamBase* stream,
98       v8::Local<v8::Object> req_wrap_obj);
99 
100   static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
101   template <typename T, bool kIsWeak>
102   static inline WriteWrap* FromObject(
103       const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
104 
105   // Call stream()->EmitAfterWrite() and dispose of this request wrap.
106   void OnDone(int status) override;
107 
108  private:
109   std::unique_ptr<v8::BackingStore> backing_store_;
110 };
111 
112 
113 // This is the generic interface for objects that control Node.js' C++ streams.
114 // For example, the default `EmitToJSStreamListener` emits a stream's data
115 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
116 class StreamListener {
117  public:
118   virtual ~StreamListener();
119 
120   // This is called when a stream wants to allocate memory before
121   // reading data into the freshly allocated buffer (i.e. it is always followed
122   // by a `OnStreamRead()` call).
123   // This memory may be statically or dynamically allocated; for example,
124   // a protocol parser may want to read data into a static buffer if it knows
125   // that all data is going to be fully handled during the next
126   // `OnStreamRead()` call.
127   // The returned buffer does not need to contain `suggested_size` bytes.
128   // The default implementation of this method returns a buffer that has exactly
129   // the suggested size and is allocated using malloc().
130   // It is not valid to return a zero-length buffer from this method.
131   // It is not guaranteed that the corresponding `OnStreamRead()` call
132   // happens in the same event loop turn as this call.
133   virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
134 
135   // `OnStreamRead()` is called when data is available on the socket and has
136   // been read into the buffer provided by `OnStreamAlloc()`.
137   // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
138   // with base nullptr in case of an error.
139   // `nread` is the number of read bytes (which is at most the buffer length),
140   // or, if negative, a libuv error code.
141   virtual void OnStreamRead(ssize_t nread,
142                             const uv_buf_t& buf) = 0;
143 
144   // This is called once a write has finished. `status` may be 0 or,
145   // if negative, a libuv error code.
146   // By default, this is simply passed on to the previous listener
147   // (and raises an assertion if there is none).
148   virtual void OnStreamAfterWrite(WriteWrap* w, int status);
149 
150   // This is called once a shutdown has finished. `status` may be 0 or,
151   // if negative, a libuv error code.
152   // By default, this is simply passed on to the previous listener
153   // (and raises an assertion if there is none).
154   virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
155 
156   // This is called by the stream if it determines that it wants more data
157   // to be written to it. Not all streams support this.
158   // This callback will not be called as long as there are active writes.
159   // It is not supported by all streams; `stream->HasWantsWrite()` returns
160   // true if it is supported by a stream.
OnStreamWantsWrite(size_t suggested_size)161   virtual void OnStreamWantsWrite(size_t suggested_size) {}
162 
163   // This is called immediately before the stream is destroyed.
OnStreamDestroy()164   virtual void OnStreamDestroy() {}
165 
166   // The stream this is currently associated with, or nullptr if there is none.
stream()167   StreamResource* stream() const { return stream_; }
168 
169  protected:
170   // Pass along a read error to the `StreamListener` instance that was active
171   // before this one. For example, a protocol parser does not care about read
172   // errors and may instead want to let the original handler
173   // (e.g. the JS handler) take care of the situation.
174   inline void PassReadErrorToPreviousListener(ssize_t nread);
175 
176   StreamResource* stream_ = nullptr;
177   StreamListener* previous_listener_ = nullptr;
178 
179   friend class StreamResource;
180 };
181 
182 
183 // An (incomplete) stream listener class that calls the `.oncomplete()`
184 // method of the JS objects associated with the wrap objects.
185 class ReportWritesToJSStreamListener : public StreamListener {
186  public:
187   void OnStreamAfterWrite(WriteWrap* w, int status) override;
188   void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
189 
190  private:
191   void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
192 };
193 
194 
195 // A default emitter that just pushes data chunks as Buffer instances to
196 // JS land via the handle’s .ondata method.
197 class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
198  public:
199   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
200   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
201 };
202 
203 
204 // An alternative listener that uses a custom, user-provided buffer
205 // for reading data.
206 class CustomBufferJSListener : public ReportWritesToJSStreamListener {
207  public:
208   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
209   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
OnStreamDestroy()210   void OnStreamDestroy() override { delete this; }
211 
CustomBufferJSListener(uv_buf_t buffer)212   explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
213 
214  private:
215   uv_buf_t buffer_;
216 };
217 
218 
219 // A generic stream, comparable to JS land’s `Duplex` streams.
220 // A stream is always controlled through one `StreamListener` instance.
221 class StreamResource {
222  public:
223   virtual ~StreamResource();
224 
225   // These need to be implemented on the readable side of this stream:
226 
227   // Start reading from the underlying resource. This is called by the consumer
228   // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
229   // pass data along to the consumer.
230   virtual int ReadStart() = 0;
231   // Stop reading from the underlying resource. This is called by the
232   // consumer when its buffers are full and no more data can be handled.
233   virtual int ReadStop() = 0;
234 
235   // These need to be implemented on the writable side of this stream:
236   // All of these methods may return an error code synchronously.
237   // In that case, the finish callback should *not* be called.
238 
239   // Perform a shutdown operation, and either call req_wrap->Done() when
240   // finished and return 0, return 1 for synchronous success, or
241   // a libuv error code for synchronous failures.
242   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
243   // Try to write as much data as possible synchronously, and modify
244   // `*bufs` and `*count` accordingly. This is a no-op by default.
245   // Return 0 for success and a libuv error code for failures.
246   virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
247   // Initiate a write of data.
248   // Upon an immediate failure, a libuv error code is returned,
249   // w->Done() will never be called and caller should free `bufs`.
250   // Otherwise, 0 is returned and w->Done(status) will be called
251   // with status set to either
252   //  (1) 0 after all data are written, or
253   //  (2) a libuv error code when an error occurs
254   // in either case, w->Done() will never be called before DoWrite() returns.
255   // When 0 is returned:
256   //  (1) memory specified by `bufs` and `count` must remain valid until
257   //      w->Done() gets called.
258   //  (2) `bufs` might or might not be changed, caller should not rely on this.
259   //  (3) `bufs` should be freed after w->Done() gets called.
260   virtual int DoWrite(WriteWrap* w,
261                       uv_buf_t* bufs,
262                       size_t count,
263                       uv_stream_t* send_handle) = 0;
264 
265   // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
HasWantsWrite()266   virtual bool HasWantsWrite() const { return false; }
267 
268   // Optionally, this may provide an error message to be used for
269   // failing writes.
270   virtual const char* Error() const;
271   // Clear the current error (i.e. that would be returned by Error()).
272   virtual void ClearError();
273 
274   // Transfer ownership of this stream to `listener`. The previous listener
275   // will not receive any more callbacks while the new listener was active.
276   inline void PushStreamListener(StreamListener* listener);
277   // Remove a listener, and, if this was the currently active one,
278   // transfer ownership back to the previous listener.
279   void RemoveStreamListener(StreamListener* listener);
280 
281  protected:
282   // Call the current listener's OnStreamAlloc() method.
283   inline uv_buf_t EmitAlloc(size_t suggested_size);
284   // Call the current listener's OnStreamRead() method and update the
285   // stream's read byte counter.
286   inline void EmitRead(
287       ssize_t nread,
288       const uv_buf_t& buf = uv_buf_init(nullptr, 0));
289   // Call the current listener's OnStreamAfterWrite() method.
290   inline void EmitAfterWrite(WriteWrap* w, int status);
291   // Call the current listener's OnStreamAfterShutdown() method.
292   inline void EmitAfterShutdown(ShutdownWrap* w, int status);
293   // Call the current listener's OnStreamWantsWrite() method.
294   inline void EmitWantsWrite(size_t suggested_size);
295 
296   StreamListener* listener_ = nullptr;
297   uint64_t bytes_read_ = 0;
298   uint64_t bytes_written_ = 0;
299 
300   friend class StreamListener;
301 };
302 
303 
304 class StreamBase : public StreamResource {
305  public:
306   // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
307   // because instances deriving from StreamBase generally also derived from
308   // BaseObject (it's possible for it not to, however).
309   enum InternalFields {
310     kSlot = BaseObject::kSlot,
311     kStreamBaseField = BaseObject::kInternalFieldCount,
312     kOnReadFunctionField,
313     kInternalFieldCount
314   };
315 
316   static void AddMethods(Environment* env,
317                          v8::Local<v8::FunctionTemplate> target);
318   static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
319   virtual bool IsAlive() = 0;
320   virtual bool IsClosing() = 0;
321   virtual bool IsIPCPipe();
322   virtual int GetFD();
323 
324   enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
325 
326   v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
327       ssize_t nread,
328       v8::Local<v8::ArrayBuffer> ab,
329       size_t offset = 0,
330       StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
331 
332   // This is named `stream_env` to avoid name clashes, because a lot of
333   // subclasses are also `BaseObject`s.
stream_env()334   Environment* stream_env() const { return env_; }
335 
336   // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
337   // if there is a pending exception/termination.
338   // Shut down the current stream. This request can use an existing
339   // ShutdownWrap object (that was created in JS), or a new one will be created.
340   // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
341   // completion, and a libuv error case in case of synchronous failure.
342   int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
343 
344   // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
345   // if there is a pending exception/termination.
346   // Write data to the current stream. This request can use an existing
347   // WriteWrap object (that was created in JS), or a new one will be created.
348   // This will first try to write synchronously using `DoTryWrite()`, then
349   // asynchronously using `DoWrite()`.
350   // Caller can pass `skip_try_write` as true if it has already called
351   // `DoTryWrite()` and ends up with a partial write, or it knows that the
352   // write is too large to finish synchronously.
353   // If the return value indicates a synchronous completion, no callback will
354   // be invoked.
355   StreamWriteResult Write(
356       uv_buf_t* bufs,
357       size_t count,
358       uv_stream_t* send_handle = nullptr,
359       v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>(),
360       bool skip_try_write = false);
361 
362   // These can be overridden by subclasses to get more specific wrap instances.
363   // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
364   // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
365   // an associated libuv request.
366   virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
367   virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
368 
369   // One of these must be implemented
370   virtual AsyncWrap* GetAsyncWrap() = 0;
371   virtual v8::Local<v8::Object> GetObject();
372 
373   static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
374 
375  protected:
376   inline explicit StreamBase(Environment* env);
377 
378   // JS Methods
379   int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
380   int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
381   int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
382   int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
383   int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
384   template <enum encoding enc>
385   int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
386   int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
387 
388   static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
389   static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
390   static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
391   static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
392   inline void AttachToObject(v8::Local<v8::Object> obj);
393 
394   template <int (StreamBase::*Method)(
395       const v8::FunctionCallbackInfo<v8::Value>& args)>
396   static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
397 
398   // Internal, used only in StreamBase methods + env.cc.
399   enum StreamBaseStateFields {
400     kReadBytesOrError,
401     kArrayBufferOffset,
402     kBytesWritten,
403     kLastWriteWasAsync,
404     kNumStreamBaseStateFields
405   };
406 
407  private:
408   Environment* env_;
409   EmitToJSStreamListener default_listener_;
410 
411   void SetWriteResult(const StreamWriteResult& res);
412   static void AddMethod(Environment* env,
413                         v8::Local<v8::Signature> sig,
414                         enum v8::PropertyAttribute attributes,
415                         v8::Local<v8::FunctionTemplate> t,
416                         JSMethodFunction* stream_method,
417                         v8::Local<v8::String> str);
418 
419   friend class WriteWrap;
420   friend class ShutdownWrap;
421   friend class Environment;  // For kNumStreamBaseStateFields.
422 };
423 
424 
425 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
426 // `OtherBase` must have a constructor that matches the `AsyncWrap`
427 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
428 // and be a subclass of `AsyncWrap`.
429 template <typename OtherBase>
430 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
431  public:
432   SimpleShutdownWrap(StreamBase* stream,
433                      v8::Local<v8::Object> req_wrap_obj);
434 
GetAsyncWrap()435   AsyncWrap* GetAsyncWrap() override { return this; }
436 
437   SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(SimpleShutdownWrap)438   SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
439   SET_SELF_SIZE(SimpleShutdownWrap)
440 
441   bool IsNotIndicativeOfMemoryLeakAtExit() const override {
442     return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
443   }
444 };
445 
446 template <typename OtherBase>
447 class SimpleWriteWrap : public WriteWrap, public OtherBase {
448  public:
449   SimpleWriteWrap(StreamBase* stream,
450                   v8::Local<v8::Object> req_wrap_obj);
451 
GetAsyncWrap()452   AsyncWrap* GetAsyncWrap() override { return this; }
453 
454   SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(SimpleWriteWrap)455   SET_MEMORY_INFO_NAME(SimpleWriteWrap)
456   SET_SELF_SIZE(SimpleWriteWrap)
457 
458   bool IsNotIndicativeOfMemoryLeakAtExit() const override {
459     return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
460   }
461 };
462 
463 }  // namespace node
464 
465 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
466 
467 #endif  // SRC_STREAM_BASE_H_
468