• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "async_wrap-inl.h"
7 #include "base_object-inl.h"
8 #include "node.h"
9 #include "stream_base.h"
10 #include "v8.h"
11 
12 namespace node {
13 
StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)14 StreamReq::StreamReq(
15     StreamBase* stream,
16     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
17   AttachToObject(req_wrap_obj);
18 }
19 
AttachToObject(v8::Local<v8::Object> req_wrap_obj)20 void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
21   CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
22                StreamReq::kStreamReqField),
23            nullptr);
24   req_wrap_obj->SetAlignedPointerInInternalField(
25       StreamReq::kStreamReqField, this);
26 }
27 
FromObject(v8::Local<v8::Object> req_wrap_obj)28 StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
29   return static_cast<StreamReq*>(
30       req_wrap_obj->GetAlignedPointerFromInternalField(
31           StreamReq::kStreamReqField));
32 }
33 
Dispose()34 void StreamReq::Dispose() {
35   BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
36   object()->SetAlignedPointerInInternalField(
37       StreamReq::kStreamReqField, nullptr);
38   destroy_me->Detach();
39 }
40 
object()41 v8::Local<v8::Object> StreamReq::object() {
42   return GetAsyncWrap()->object();
43 }
44 
ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)45 ShutdownWrap::ShutdownWrap(
46     StreamBase* stream,
47     v8::Local<v8::Object> req_wrap_obj)
48     : StreamReq(stream, req_wrap_obj) { }
49 
WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)50 WriteWrap::WriteWrap(
51     StreamBase* stream,
52     v8::Local<v8::Object> req_wrap_obj)
53     : StreamReq(stream, req_wrap_obj) { }
54 
PassReadErrorToPreviousListener(ssize_t nread)55 void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
56   CHECK_NOT_NULL(previous_listener_);
57   previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
58 }
59 
PushStreamListener(StreamListener * listener)60 void StreamResource::PushStreamListener(StreamListener* listener) {
61   CHECK_NOT_NULL(listener);
62   CHECK_NULL(listener->stream_);
63 
64   listener->previous_listener_ = listener_;
65   listener->stream_ = this;
66 
67   listener_ = listener;
68 }
69 
EmitAlloc(size_t suggested_size)70 uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
71   DebugSealHandleScope seal_handle_scope;
72   return listener_->OnStreamAlloc(suggested_size);
73 }
74 
EmitRead(ssize_t nread,const uv_buf_t & buf)75 void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
76   DebugSealHandleScope seal_handle_scope;
77   if (nread > 0)
78     bytes_read_ += static_cast<uint64_t>(nread);
79   listener_->OnStreamRead(nread, buf);
80 }
81 
EmitAfterWrite(WriteWrap * w,int status)82 void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
83   DebugSealHandleScope seal_handle_scope;
84   listener_->OnStreamAfterWrite(w, status);
85 }
86 
EmitAfterShutdown(ShutdownWrap * w,int status)87 void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
88   DebugSealHandleScope seal_handle_scope;
89   listener_->OnStreamAfterShutdown(w, status);
90 }
91 
EmitWantsWrite(size_t suggested_size)92 void StreamResource::EmitWantsWrite(size_t suggested_size) {
93   DebugSealHandleScope seal_handle_scope;
94   listener_->OnStreamWantsWrite(suggested_size);
95 }
96 
StreamBase(Environment * env)97 StreamBase::StreamBase(Environment* env) : env_(env) {
98   PushStreamListener(&default_listener_);
99 }
100 
101 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)102 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
103     StreamBase* stream,
104     v8::Local<v8::Object> req_wrap_obj)
105   : ShutdownWrap(stream, req_wrap_obj),
106     OtherBase(stream->stream_env(),
107               req_wrap_obj,
108               AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
109 }
110 
111 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)112 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
113     StreamBase* stream,
114     v8::Local<v8::Object> req_wrap_obj)
115   : WriteWrap(stream, req_wrap_obj),
116     OtherBase(stream->stream_env(),
117               req_wrap_obj,
118               AsyncWrap::PROVIDER_WRITEWRAP) {
119 }
120 
AttachToObject(v8::Local<v8::Object> obj)121 void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
122   obj->SetAlignedPointerInInternalField(
123       StreamBase::kStreamBaseField, this);
124 }
125 
FromObject(v8::Local<v8::Object> obj)126 StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
127   if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
128     return nullptr;
129 
130   return static_cast<StreamBase*>(
131       obj->GetAlignedPointerFromInternalField(
132           StreamBase::kStreamBaseField));
133 }
134 
FromObject(v8::Local<v8::Object> req_wrap_obj)135 WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
136   return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
137 }
138 
139 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)140 WriteWrap* WriteWrap::FromObject(
141     const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
142   if (!base_obj) return nullptr;
143   return FromObject(base_obj->object());
144 }
145 
FromObject(v8::Local<v8::Object> req_wrap_obj)146 ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
147   return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
148 }
149 
150 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)151 ShutdownWrap* ShutdownWrap::FromObject(
152     const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
153   if (!base_obj) return nullptr;
154   return FromObject(base_obj->object());
155 }
156 
SetBackingStore(std::unique_ptr<v8::BackingStore> bs)157 void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
158   CHECK(!backing_store_);
159   backing_store_ = std::move(bs);
160 }
161 
ResetObject(v8::Local<v8::Object> obj)162 void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
163   DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
164 
165   obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
166   obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
167 }
168 
169 }  // namespace node
170 
171 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
172 
173 #endif  // SRC_STREAM_BASE_INL_H_
174