1 #ifndef SRC_STREAM_PIPE_H_ 2 #define SRC_STREAM_PIPE_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "stream_base.h" 7 #include "allocated_buffer.h" 8 9 namespace node { 10 11 class StreamPipe : public AsyncWrap { 12 public: 13 StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj); 14 ~StreamPipe() override; 15 16 void Unpipe(bool is_in_deletion = false); 17 18 static void New(const v8::FunctionCallbackInfo<v8::Value>& args); 19 static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); 20 static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args); 21 static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args); 22 static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args); 23 24 SET_NO_MEMORY_INFO() 25 SET_MEMORY_INFO_NAME(StreamPipe) 26 SET_SELF_SIZE(StreamPipe) 27 28 private: 29 inline StreamBase* source(); 30 inline StreamBase* sink(); 31 32 int pending_writes_ = 0; 33 bool is_reading_ = false; 34 bool is_eof_ = false; 35 bool is_closed_ = true; 36 bool sink_destroyed_ = false; 37 bool source_destroyed_ = false; 38 bool uses_wants_write_ = false; 39 40 // Set a default value so that when we’re coming from Start(), we know 41 // that we don’t want to read just yet. 42 // This will likely need to be changed when supporting streams without 43 // `OnStreamWantsWrite()` support. 44 size_t wanted_data_ = 0; 45 46 void ProcessData(size_t nread, AllocatedBuffer&& buf); 47 48 class ReadableListener : public StreamListener { 49 public: 50 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 51 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 52 void OnStreamDestroy() override; 53 }; 54 55 class WritableListener : public StreamListener { 56 public: 57 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 58 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 59 void OnStreamAfterWrite(WriteWrap* w, int status) override; 60 void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 61 void OnStreamWantsWrite(size_t suggested_size) override; 62 void OnStreamDestroy() override; 63 }; 64 65 ReadableListener readable_listener_; 66 WritableListener writable_listener_; 67 }; 68 69 } // namespace node 70 71 #endif 72 73 #endif // SRC_STREAM_PIPE_H_ 74