• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_H_
2 #define SRC_STREAM_BASE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "async_wrap.h"
8 #include "node.h"
9 #include "util.h"
10 
11 #include "v8.h"
12 
13 namespace node {
14 
15 // Forward declarations
16 class Environment;
17 class ShutdownWrap;
18 class WriteWrap;
19 class StreamBase;
20 class StreamResource;
21 
22 struct StreamWriteResult {
23   bool async;
24   int err;
25   WriteWrap* wrap;
26   size_t bytes;
27   BaseObjectPtr<AsyncWrap> wrap_obj;
28 };
29 
30 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
31 
32 class StreamReq {
33  public:
34   // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
35   // here because instances derived from StreamReq will also derive from
36   // BaseObject, and the slots are used for the identical purpose.
37   enum InternalFields {
38     kSlot = BaseObject::kSlot,
39     kStreamReqField = BaseObject::kInternalFieldCount,
40     kInternalFieldCount
41   };
42 
StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)43   explicit StreamReq(StreamBase* stream,
44                      v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
45     AttachToObject(req_wrap_obj);
46   }
47 
48   virtual ~StreamReq() = default;
49   virtual AsyncWrap* GetAsyncWrap() = 0;
50   v8::Local<v8::Object> object();
51 
52   void Done(int status, const char* error_str = nullptr);
53   void Dispose();
54 
stream()55   inline StreamBase* stream() const { return stream_; }
56 
57   static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
58 
59   // Sets all internal fields of `req_wrap_obj` to `nullptr`.
60   // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
61   // and what we use in C++ after creating these objects from their
62   // v8::ObjectTemplates, to avoid the overhead of calling the
63   // constructor explicitly.
64   static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
65 
66  protected:
67   virtual void OnDone(int status) = 0;
68 
69   void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
70 
71  private:
72   StreamBase* const stream_;
73 };
74 
75 class ShutdownWrap : public StreamReq {
76  public:
ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)77   ShutdownWrap(StreamBase* stream,
78                v8::Local<v8::Object> req_wrap_obj)
79     : StreamReq(stream, req_wrap_obj) { }
80 
81   // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
82   void OnDone(int status) override;
83 };
84 
85 class WriteWrap : public StreamReq {
86  public:
87   void SetAllocatedStorage(AllocatedBuffer&& storage);
88 
WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)89   WriteWrap(StreamBase* stream,
90             v8::Local<v8::Object> req_wrap_obj)
91     : StreamReq(stream, req_wrap_obj) { }
92 
93   // Call stream()->EmitAfterWrite() and dispose of this request wrap.
94   void OnDone(int status) override;
95 
96  private:
97   AllocatedBuffer storage_;
98 };
99 
100 
101 // This is the generic interface for objects that control Node.js' C++ streams.
102 // For example, the default `EmitToJSStreamListener` emits a stream's data
103 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
104 class StreamListener {
105  public:
106   virtual ~StreamListener();
107 
108   // This is called when a stream wants to allocate memory before
109   // reading data into the freshly allocated buffer (i.e. it is always followed
110   // by a `OnStreamRead()` call).
111   // This memory may be statically or dynamically allocated; for example,
112   // a protocol parser may want to read data into a static buffer if it knows
113   // that all data is going to be fully handled during the next
114   // `OnStreamRead()` call.
115   // The returned buffer does not need to contain `suggested_size` bytes.
116   // The default implementation of this method returns a buffer that has exactly
117   // the suggested size and is allocated using malloc().
118   // It is not valid to return a zero-length buffer from this method.
119   // It is not guaranteed that the corresponding `OnStreamRead()` call
120   // happens in the same event loop turn as this call.
121   virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
122 
123   // `OnStreamRead()` is called when data is available on the socket and has
124   // been read into the buffer provided by `OnStreamAlloc()`.
125   // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
126   // with base nullptr in case of an error.
127   // `nread` is the number of read bytes (which is at most the buffer length),
128   // or, if negative, a libuv error code.
129   virtual void OnStreamRead(ssize_t nread,
130                             const uv_buf_t& buf) = 0;
131 
132   // This is called once a write has finished. `status` may be 0 or,
133   // if negative, a libuv error code.
134   // By default, this is simply passed on to the previous listener
135   // (and raises an assertion if there is none).
136   virtual void OnStreamAfterWrite(WriteWrap* w, int status);
137 
138   // This is called once a shutdown has finished. `status` may be 0 or,
139   // if negative, a libuv error code.
140   // By default, this is simply passed on to the previous listener
141   // (and raises an assertion if there is none).
142   virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
143 
144   // This is called by the stream if it determines that it wants more data
145   // to be written to it. Not all streams support this.
146   // This callback will not be called as long as there are active writes.
147   // It is not supported by all streams; `stream->HasWantsWrite()` returns
148   // true if it is supported by a stream.
OnStreamWantsWrite(size_t suggested_size)149   virtual void OnStreamWantsWrite(size_t suggested_size) {}
150 
151   // This is called immediately before the stream is destroyed.
OnStreamDestroy()152   virtual void OnStreamDestroy() {}
153 
154   // The stream this is currently associated with, or nullptr if there is none.
stream()155   inline StreamResource* stream() { return stream_; }
156 
157  protected:
158   // Pass along a read error to the `StreamListener` instance that was active
159   // before this one. For example, a protocol parser does not care about read
160   // errors and may instead want to let the original handler
161   // (e.g. the JS handler) take care of the situation.
162   void PassReadErrorToPreviousListener(ssize_t nread);
163 
164   StreamResource* stream_ = nullptr;
165   StreamListener* previous_listener_ = nullptr;
166 
167   friend class StreamResource;
168 };
169 
170 
171 // An (incomplete) stream listener class that calls the `.oncomplete()`
172 // method of the JS objects associated with the wrap objects.
173 class ReportWritesToJSStreamListener : public StreamListener {
174  public:
175   void OnStreamAfterWrite(WriteWrap* w, int status) override;
176   void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
177 
178  private:
179   void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
180 };
181 
182 
183 // A default emitter that just pushes data chunks as Buffer instances to
184 // JS land via the handle’s .ondata method.
185 class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
186  public:
187   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
188   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
189 };
190 
191 
192 // An alternative listener that uses a custom, user-provided buffer
193 // for reading data.
194 class CustomBufferJSListener : public ReportWritesToJSStreamListener {
195  public:
196   uv_buf_t OnStreamAlloc(size_t suggested_size) override;
197   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
OnStreamDestroy()198   void OnStreamDestroy() override { delete this; }
199 
CustomBufferJSListener(uv_buf_t buffer)200   explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
201 
202  private:
203   uv_buf_t buffer_;
204 };
205 
206 
207 // A generic stream, comparable to JS land’s `Duplex` streams.
208 // A stream is always controlled through one `StreamListener` instance.
209 class StreamResource {
210  public:
211   virtual ~StreamResource();
212 
213   // These need to be implemented on the readable side of this stream:
214 
215   // Start reading from the underlying resource. This is called by the consumer
216   // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
217   // pass data along to the consumer.
218   virtual int ReadStart() = 0;
219   // Stop reading from the underlying resource. This is called by the
220   // consumer when its buffers are full and no more data can be handled.
221   virtual int ReadStop() = 0;
222 
223   // These need to be implemented on the writable side of this stream:
224   // All of these methods may return an error code synchronously.
225   // In that case, the finish callback should *not* be called.
226 
227   // Perform a shutdown operation, and either call req_wrap->Done() when
228   // finished and return 0, return 1 for synchronous success, or
229   // a libuv error code for synchronous failures.
230   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
231   // Try to write as much data as possible synchronously, and modify
232   // `*bufs` and `*count` accordingly. This is a no-op by default.
233   // Return 0 for success and a libuv error code for failures.
234   virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
235   // Initiate a write of data. If the write completes synchronously, return 0 on
236   // success (with bufs modified to indicate how much data was consumed) or a
237   // libuv error code on failure. If the write will complete asynchronously,
238   // return 0. When the write completes asynchronously, call req_wrap->Done()
239   // with 0 on success (with bufs modified to indicate how much data was
240   // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
241   // the write completes synchronously, that is, it should never be called
242   // before DoWrite() has returned.
243   virtual int DoWrite(WriteWrap* w,
244                       uv_buf_t* bufs,
245                       size_t count,
246                       uv_stream_t* send_handle) = 0;
247 
248   // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
HasWantsWrite()249   virtual bool HasWantsWrite() const { return false; }
250 
251   // Optionally, this may provide an error message to be used for
252   // failing writes.
253   virtual const char* Error() const;
254   // Clear the current error (i.e. that would be returned by Error()).
255   virtual void ClearError();
256 
257   // Transfer ownership of this stream to `listener`. The previous listener
258   // will not receive any more callbacks while the new listener was active.
259   void PushStreamListener(StreamListener* listener);
260   // Remove a listener, and, if this was the currently active one,
261   // transfer ownership back to the previous listener.
262   void RemoveStreamListener(StreamListener* listener);
263 
264  protected:
265   // Call the current listener's OnStreamAlloc() method.
266   uv_buf_t EmitAlloc(size_t suggested_size);
267   // Call the current listener's OnStreamRead() method and update the
268   // stream's read byte counter.
269   void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
270   // Call the current listener's OnStreamAfterWrite() method.
271   void EmitAfterWrite(WriteWrap* w, int status);
272   // Call the current listener's OnStreamAfterShutdown() method.
273   void EmitAfterShutdown(ShutdownWrap* w, int status);
274   // Call the current listener's OnStreamWantsWrite() method.
275   void EmitWantsWrite(size_t suggested_size);
276 
277   StreamListener* listener_ = nullptr;
278   uint64_t bytes_read_ = 0;
279   uint64_t bytes_written_ = 0;
280 
281   friend class StreamListener;
282 };
283 
284 
285 class StreamBase : public StreamResource {
286  public:
287   // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
288   // because instances deriving from StreamBase generally also derived from
289   // BaseObject (it's possible for it not to, however).
290   enum InternalFields {
291     kSlot = BaseObject::kSlot,
292     kStreamBaseField = BaseObject::kInternalFieldCount,
293     kOnReadFunctionField,
294     kInternalFieldCount
295   };
296 
297   static void AddMethods(Environment* env,
298                          v8::Local<v8::FunctionTemplate> target);
299 
300   virtual bool IsAlive() = 0;
301   virtual bool IsClosing() = 0;
302   virtual bool IsIPCPipe();
303   virtual int GetFD();
304 
305   enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
306 
307   v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
308       ssize_t nread,
309       v8::Local<v8::ArrayBuffer> ab,
310       size_t offset = 0,
311       StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
312 
313   // This is named `stream_env` to avoid name clashes, because a lot of
314   // subclasses are also `BaseObject`s.
315   Environment* stream_env() const;
316 
317   // Shut down the current stream. This request can use an existing
318   // ShutdownWrap object (that was created in JS), or a new one will be created.
319   // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
320   // completion, and a libuv error case in case of synchronous failure.
321   int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
322 
323   // Write data to the current stream. This request can use an existing
324   // WriteWrap object (that was created in JS), or a new one will be created.
325   // This will first try to write synchronously using `DoTryWrite()`, then
326   // asynchronously using `DoWrite()`.
327   // If the return value indicates a synchronous completion, no callback will
328   // be invoked.
329   StreamWriteResult Write(
330       uv_buf_t* bufs,
331       size_t count,
332       uv_stream_t* send_handle = nullptr,
333       v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
334 
335   // These can be overridden by subclasses to get more specific wrap instances.
336   // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
337   // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
338   // an associated libuv request.
339   virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
340   virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
341 
342   // One of these must be implemented
343   virtual AsyncWrap* GetAsyncWrap() = 0;
344   virtual v8::Local<v8::Object> GetObject();
345 
346   static StreamBase* FromObject(v8::Local<v8::Object> obj);
347 
348  protected:
349   explicit StreamBase(Environment* env);
350 
351   // JS Methods
352   int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
353   int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
354   int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
355   int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
356   int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
357   template <enum encoding enc>
358   int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
359   int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
360 
361   static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
362   static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
363   static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
364   static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
365   void AttachToObject(v8::Local<v8::Object> obj);
366 
367   template <int (StreamBase::*Method)(
368       const v8::FunctionCallbackInfo<v8::Value>& args)>
369   static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
370 
371   // Internal, used only in StreamBase methods + env.cc.
372   enum StreamBaseStateFields {
373     kReadBytesOrError,
374     kArrayBufferOffset,
375     kBytesWritten,
376     kLastWriteWasAsync,
377     kNumStreamBaseStateFields
378   };
379 
380  private:
381   Environment* env_;
382   EmitToJSStreamListener default_listener_;
383 
384   void SetWriteResult(const StreamWriteResult& res);
385   static void AddMethod(Environment* env,
386                         v8::Local<v8::Signature> sig,
387                         enum v8::PropertyAttribute attributes,
388                         v8::Local<v8::FunctionTemplate> t,
389                         JSMethodFunction* stream_method,
390                         v8::Local<v8::String> str);
391 
392   friend class WriteWrap;
393   friend class ShutdownWrap;
394   friend class Environment;  // For kNumStreamBaseStateFields.
395 };
396 
397 
398 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
399 // `OtherBase` must have a constructor that matches the `AsyncWrap`
400 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
401 // and be a subclass of `AsyncWrap`.
402 template <typename OtherBase>
403 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
404  public:
405   SimpleShutdownWrap(StreamBase* stream,
406                      v8::Local<v8::Object> req_wrap_obj);
407 
GetAsyncWrap()408   AsyncWrap* GetAsyncWrap() override { return this; }
409 
410   SET_NO_MEMORY_INFO()
411   SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
412   SET_SELF_SIZE(SimpleShutdownWrap)
413 };
414 
415 template <typename OtherBase>
416 class SimpleWriteWrap : public WriteWrap, public OtherBase {
417  public:
418   SimpleWriteWrap(StreamBase* stream,
419                   v8::Local<v8::Object> req_wrap_obj);
420 
GetAsyncWrap()421   AsyncWrap* GetAsyncWrap() override { return this; }
422 
423   SET_NO_MEMORY_INFO()
424   SET_MEMORY_INFO_NAME(SimpleWriteWrap)
425   SET_SELF_SIZE(SimpleWriteWrap)
426 };
427 
428 }  // namespace node
429 
430 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
431 
432 #endif  // SRC_STREAM_BASE_H_
433