• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_PIPE_H_
2 #define SRC_STREAM_PIPE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "stream_base.h"
7 
8 namespace node {
9 
10 class StreamPipe : public AsyncWrap {
11  public:
12   StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
13   ~StreamPipe() override;
14 
15   void Unpipe(bool is_in_deletion = false);
16 
17   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
18   static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
19   static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
20   static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args);
21   static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args);
22 
23   SET_NO_MEMORY_INFO()
24   SET_MEMORY_INFO_NAME(StreamPipe)
25   SET_SELF_SIZE(StreamPipe)
26 
27  private:
28   inline StreamBase* source();
29   inline StreamBase* sink();
30 
31   int pending_writes_ = 0;
32   bool is_reading_ = false;
33   bool is_eof_ = false;
34   bool is_closed_ = true;
35   bool sink_destroyed_ = false;
36   bool source_destroyed_ = false;
37   bool uses_wants_write_ = false;
38 
39   // Set a default value so that when we’re coming from Start(), we know
40   // that we don’t want to read just yet.
41   // This will likely need to be changed when supporting streams without
42   // `OnStreamWantsWrite()` support.
43   size_t wanted_data_ = 0;
44 
45   void ProcessData(size_t nread, AllocatedBuffer&& buf);
46 
47   class ReadableListener : public StreamListener {
48    public:
49     uv_buf_t OnStreamAlloc(size_t suggested_size) override;
50     void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
51     void OnStreamDestroy() override;
52   };
53 
54   class WritableListener : public StreamListener {
55    public:
56     uv_buf_t OnStreamAlloc(size_t suggested_size) override;
57     void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
58     void OnStreamAfterWrite(WriteWrap* w, int status) override;
59     void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
60     void OnStreamWantsWrite(size_t suggested_size) override;
61     void OnStreamDestroy() override;
62   };
63 
64   ReadableListener readable_listener_;
65   WritableListener writable_listener_;
66 };
67 
68 }  // namespace node
69 
70 #endif
71 
72 #endif  // SRC_STREAM_PIPE_H_
73