• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_PIPE_H_
2 #define SRC_STREAM_PIPE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "stream_base.h"
7 #include "allocated_buffer.h"
8 
9 namespace node {
10 
11 class StreamPipe : public AsyncWrap {
12  public:
13   StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
14   ~StreamPipe() override;
15 
16   void Unpipe(bool is_in_deletion = false);
17 
18   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
19   static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
20   static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
21   static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args);
22   static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args);
23 
24   SET_NO_MEMORY_INFO()
25   SET_MEMORY_INFO_NAME(StreamPipe)
26   SET_SELF_SIZE(StreamPipe)
27 
28  private:
29   inline StreamBase* source();
30   inline StreamBase* sink();
31 
32   int pending_writes_ = 0;
33   bool is_reading_ = false;
34   bool is_eof_ = false;
35   bool is_closed_ = true;
36   bool sink_destroyed_ = false;
37   bool source_destroyed_ = false;
38   bool uses_wants_write_ = false;
39 
40   // Set a default value so that when we’re coming from Start(), we know
41   // that we don’t want to read just yet.
42   // This will likely need to be changed when supporting streams without
43   // `OnStreamWantsWrite()` support.
44   size_t wanted_data_ = 0;
45 
46   void ProcessData(size_t nread, AllocatedBuffer&& buf);
47 
48   class ReadableListener : public StreamListener {
49    public:
50     uv_buf_t OnStreamAlloc(size_t suggested_size) override;
51     void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
52     void OnStreamDestroy() override;
53   };
54 
55   class WritableListener : public StreamListener {
56    public:
57     uv_buf_t OnStreamAlloc(size_t suggested_size) override;
58     void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
59     void OnStreamAfterWrite(WriteWrap* w, int status) override;
60     void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
61     void OnStreamWantsWrite(size_t suggested_size) override;
62     void OnStreamDestroy() override;
63   };
64 
65   ReadableListener readable_listener_;
66   WritableListener writable_listener_;
67 };
68 
69 }  // namespace node
70 
71 #endif
72 
73 #endif  // SRC_STREAM_PIPE_H_
74