1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6 #include "async_wrap-inl.h"
7 #include "base_object-inl.h"
8 #include "node.h"
9 #include "stream_base.h"
10 #include "v8.h"
11
12 namespace node {
13
StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)14 StreamReq::StreamReq(
15 StreamBase* stream,
16 v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
17 AttachToObject(req_wrap_obj);
18 }
19
AttachToObject(v8::Local<v8::Object> req_wrap_obj)20 void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
21 CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
22 StreamReq::kStreamReqField),
23 nullptr);
24 req_wrap_obj->SetAlignedPointerInInternalField(
25 StreamReq::kStreamReqField, this);
26 }
27
FromObject(v8::Local<v8::Object> req_wrap_obj)28 StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
29 return static_cast<StreamReq*>(
30 req_wrap_obj->GetAlignedPointerFromInternalField(
31 StreamReq::kStreamReqField));
32 }
33
Dispose()34 void StreamReq::Dispose() {
35 BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
36 object()->SetAlignedPointerInInternalField(
37 StreamReq::kStreamReqField, nullptr);
38 destroy_me->Detach();
39 }
40
object()41 v8::Local<v8::Object> StreamReq::object() {
42 return GetAsyncWrap()->object();
43 }
44
ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)45 ShutdownWrap::ShutdownWrap(
46 StreamBase* stream,
47 v8::Local<v8::Object> req_wrap_obj)
48 : StreamReq(stream, req_wrap_obj) { }
49
WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)50 WriteWrap::WriteWrap(
51 StreamBase* stream,
52 v8::Local<v8::Object> req_wrap_obj)
53 : StreamReq(stream, req_wrap_obj) { }
54
PassReadErrorToPreviousListener(ssize_t nread)55 void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
56 CHECK_NOT_NULL(previous_listener_);
57 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
58 }
59
PushStreamListener(StreamListener * listener)60 void StreamResource::PushStreamListener(StreamListener* listener) {
61 CHECK_NOT_NULL(listener);
62 CHECK_NULL(listener->stream_);
63
64 listener->previous_listener_ = listener_;
65 listener->stream_ = this;
66
67 listener_ = listener;
68 }
69
EmitAlloc(size_t suggested_size)70 uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
71 DebugSealHandleScope seal_handle_scope;
72 return listener_->OnStreamAlloc(suggested_size);
73 }
74
EmitRead(ssize_t nread,const uv_buf_t & buf)75 void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
76 DebugSealHandleScope seal_handle_scope;
77 if (nread > 0)
78 bytes_read_ += static_cast<uint64_t>(nread);
79 listener_->OnStreamRead(nread, buf);
80 }
81
EmitAfterWrite(WriteWrap * w,int status)82 void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
83 DebugSealHandleScope seal_handle_scope;
84 listener_->OnStreamAfterWrite(w, status);
85 }
86
EmitAfterShutdown(ShutdownWrap * w,int status)87 void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
88 DebugSealHandleScope seal_handle_scope;
89 listener_->OnStreamAfterShutdown(w, status);
90 }
91
EmitWantsWrite(size_t suggested_size)92 void StreamResource::EmitWantsWrite(size_t suggested_size) {
93 DebugSealHandleScope seal_handle_scope;
94 listener_->OnStreamWantsWrite(suggested_size);
95 }
96
StreamBase(Environment * env)97 StreamBase::StreamBase(Environment* env) : env_(env) {
98 PushStreamListener(&default_listener_);
99 }
100
101 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)102 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
103 StreamBase* stream,
104 v8::Local<v8::Object> req_wrap_obj)
105 : ShutdownWrap(stream, req_wrap_obj),
106 OtherBase(stream->stream_env(),
107 req_wrap_obj,
108 AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
109 }
110
111 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)112 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
113 StreamBase* stream,
114 v8::Local<v8::Object> req_wrap_obj)
115 : WriteWrap(stream, req_wrap_obj),
116 OtherBase(stream->stream_env(),
117 req_wrap_obj,
118 AsyncWrap::PROVIDER_WRITEWRAP) {
119 }
120
AttachToObject(v8::Local<v8::Object> obj)121 void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
122 obj->SetAlignedPointerInInternalField(
123 StreamBase::kStreamBaseField, this);
124 }
125
FromObject(v8::Local<v8::Object> obj)126 StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
127 if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
128 return nullptr;
129
130 return static_cast<StreamBase*>(
131 obj->GetAlignedPointerFromInternalField(
132 StreamBase::kStreamBaseField));
133 }
134
FromObject(v8::Local<v8::Object> req_wrap_obj)135 WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
136 return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
137 }
138
139 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)140 WriteWrap* WriteWrap::FromObject(
141 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
142 if (!base_obj) return nullptr;
143 return FromObject(base_obj->object());
144 }
145
FromObject(v8::Local<v8::Object> req_wrap_obj)146 ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
147 return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
148 }
149
150 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)151 ShutdownWrap* ShutdownWrap::FromObject(
152 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
153 if (!base_obj) return nullptr;
154 return FromObject(base_obj->object());
155 }
156
SetBackingStore(std::unique_ptr<v8::BackingStore> bs)157 void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
158 CHECK(!backing_store_);
159 backing_store_ = std::move(bs);
160 }
161
ResetObject(v8::Local<v8::Object> obj)162 void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
163 DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
164
165 obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
166 obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
167 }
168
169 } // namespace node
170
171 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
172
173 #endif // SRC_STREAM_BASE_INL_H_
174