1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6 #include "async_wrap-inl.h"
7 #include "base_object-inl.h"
8 #include "node.h"
9 #include "stream_base.h"
10 #include "v8.h"
11
12 namespace node {
13
14 using v8::Signature;
15 using v8::FunctionCallbackInfo;
16 using v8::FunctionTemplate;
17 using v8::HandleScope;
18 using v8::Local;
19 using v8::Object;
20 using v8::PropertyAttribute;
21 using v8::PropertyCallbackInfo;
22 using v8::String;
23 using v8::Value;
24
AttachToObject(v8::Local<v8::Object> req_wrap_obj)25 inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
26 CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
27 StreamReq::kStreamReqField),
28 nullptr);
29 req_wrap_obj->SetAlignedPointerInInternalField(
30 StreamReq::kStreamReqField, this);
31 }
32
FromObject(v8::Local<v8::Object> req_wrap_obj)33 inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
34 return static_cast<StreamReq*>(
35 req_wrap_obj->GetAlignedPointerFromInternalField(
36 StreamReq::kStreamReqField));
37 }
38
Dispose()39 inline void StreamReq::Dispose() {
40 BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
41 object()->SetAlignedPointerInInternalField(
42 StreamReq::kStreamReqField, nullptr);
43 destroy_me->Detach();
44 }
45
object()46 inline v8::Local<v8::Object> StreamReq::object() {
47 return GetAsyncWrap()->object();
48 }
49
~StreamListener()50 inline StreamListener::~StreamListener() {
51 if (stream_ != nullptr)
52 stream_->RemoveStreamListener(this);
53 }
54
PassReadErrorToPreviousListener(ssize_t nread)55 inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
56 CHECK_NOT_NULL(previous_listener_);
57 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
58 }
59
OnStreamAfterShutdown(ShutdownWrap * w,int status)60 inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
61 CHECK_NOT_NULL(previous_listener_);
62 previous_listener_->OnStreamAfterShutdown(w, status);
63 }
64
OnStreamAfterWrite(WriteWrap * w,int status)65 inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
66 CHECK_NOT_NULL(previous_listener_);
67 previous_listener_->OnStreamAfterWrite(w, status);
68 }
69
~StreamResource()70 inline StreamResource::~StreamResource() {
71 while (listener_ != nullptr) {
72 StreamListener* listener = listener_;
73 listener->OnStreamDestroy();
74 // Remove the listener if it didn’t remove itself. This makes the logic
75 // in `OnStreamDestroy()` implementations easier, because they
76 // may call generic cleanup functions which can just remove the
77 // listener unconditionally.
78 if (listener == listener_)
79 RemoveStreamListener(listener_);
80 }
81 }
82
PushStreamListener(StreamListener * listener)83 inline void StreamResource::PushStreamListener(StreamListener* listener) {
84 CHECK_NOT_NULL(listener);
85 CHECK_NULL(listener->stream_);
86
87 listener->previous_listener_ = listener_;
88 listener->stream_ = this;
89
90 listener_ = listener;
91 }
92
RemoveStreamListener(StreamListener * listener)93 inline void StreamResource::RemoveStreamListener(StreamListener* listener) {
94 CHECK_NOT_NULL(listener);
95
96 StreamListener* previous;
97 StreamListener* current;
98
99 // Remove from the linked list.
100 for (current = listener_, previous = nullptr;
101 /* No loop condition because we want a crash if listener is not found */
102 ; previous = current, current = current->previous_listener_) {
103 CHECK_NOT_NULL(current);
104 if (current == listener) {
105 if (previous != nullptr)
106 previous->previous_listener_ = current->previous_listener_;
107 else
108 listener_ = listener->previous_listener_;
109 break;
110 }
111 }
112
113 listener->stream_ = nullptr;
114 listener->previous_listener_ = nullptr;
115 }
116
EmitAlloc(size_t suggested_size)117 inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
118 DebugSealHandleScope seal_handle_scope;
119 return listener_->OnStreamAlloc(suggested_size);
120 }
121
EmitRead(ssize_t nread,const uv_buf_t & buf)122 inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
123 DebugSealHandleScope seal_handle_scope;
124 if (nread > 0)
125 bytes_read_ += static_cast<uint64_t>(nread);
126 listener_->OnStreamRead(nread, buf);
127 }
128
EmitAfterWrite(WriteWrap * w,int status)129 inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
130 DebugSealHandleScope seal_handle_scope;
131 listener_->OnStreamAfterWrite(w, status);
132 }
133
EmitAfterShutdown(ShutdownWrap * w,int status)134 inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
135 DebugSealHandleScope seal_handle_scope;
136 listener_->OnStreamAfterShutdown(w, status);
137 }
138
EmitWantsWrite(size_t suggested_size)139 inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
140 DebugSealHandleScope seal_handle_scope;
141 listener_->OnStreamWantsWrite(suggested_size);
142 }
143
StreamBase(Environment * env)144 inline StreamBase::StreamBase(Environment* env) : env_(env) {
145 PushStreamListener(&default_listener_);
146 }
147
stream_env()148 inline Environment* StreamBase::stream_env() const {
149 return env_;
150 }
151
Shutdown(v8::Local<v8::Object> req_wrap_obj)152 inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
153 Environment* env = stream_env();
154
155 HandleScope handle_scope(env->isolate());
156
157 if (req_wrap_obj.IsEmpty()) {
158 if (!env->shutdown_wrap_template()
159 ->NewInstance(env->context())
160 .ToLocal(&req_wrap_obj)) {
161 return UV_EBUSY;
162 }
163 StreamReq::ResetObject(req_wrap_obj);
164 }
165
166 BaseObjectPtr<AsyncWrap> req_wrap_ptr;
167 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
168 ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
169 if (req_wrap != nullptr)
170 req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
171 int err = DoShutdown(req_wrap);
172
173 if (err != 0 && req_wrap != nullptr) {
174 req_wrap->Dispose();
175 }
176
177 const char* msg = Error();
178 if (msg != nullptr) {
179 req_wrap_obj->Set(
180 env->context(),
181 env->error_string(), OneByteString(env->isolate(), msg)).Check();
182 ClearError();
183 }
184
185 return err;
186 }
187
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj)188 inline StreamWriteResult StreamBase::Write(
189 uv_buf_t* bufs,
190 size_t count,
191 uv_stream_t* send_handle,
192 v8::Local<v8::Object> req_wrap_obj) {
193 Environment* env = stream_env();
194 int err;
195
196 size_t total_bytes = 0;
197 for (size_t i = 0; i < count; ++i)
198 total_bytes += bufs[i].len;
199 bytes_written_ += total_bytes;
200
201 if (send_handle == nullptr) {
202 err = DoTryWrite(&bufs, &count);
203 if (err != 0 || count == 0) {
204 return StreamWriteResult { false, err, nullptr, total_bytes, {} };
205 }
206 }
207
208 HandleScope handle_scope(env->isolate());
209
210 if (req_wrap_obj.IsEmpty()) {
211 if (!env->write_wrap_template()
212 ->NewInstance(env->context())
213 .ToLocal(&req_wrap_obj)) {
214 return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
215 }
216 StreamReq::ResetObject(req_wrap_obj);
217 }
218
219 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
220 WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
221 BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
222
223 err = DoWrite(req_wrap, bufs, count, send_handle);
224 bool async = err == 0;
225
226 if (!async) {
227 req_wrap->Dispose();
228 req_wrap = nullptr;
229 }
230
231 const char* msg = Error();
232 if (msg != nullptr) {
233 req_wrap_obj->Set(env->context(),
234 env->error_string(),
235 OneByteString(env->isolate(), msg)).Check();
236 ClearError();
237 }
238
239 return StreamWriteResult {
240 async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
241 }
242
243 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)244 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
245 StreamBase* stream,
246 v8::Local<v8::Object> req_wrap_obj)
247 : ShutdownWrap(stream, req_wrap_obj),
248 OtherBase(stream->stream_env(),
249 req_wrap_obj,
250 AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
251 }
252
CreateShutdownWrap(v8::Local<v8::Object> object)253 inline ShutdownWrap* StreamBase::CreateShutdownWrap(
254 v8::Local<v8::Object> object) {
255 return new SimpleShutdownWrap<AsyncWrap>(this, object);
256 }
257
258 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)259 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
260 StreamBase* stream,
261 v8::Local<v8::Object> req_wrap_obj)
262 : WriteWrap(stream, req_wrap_obj),
263 OtherBase(stream->stream_env(),
264 req_wrap_obj,
265 AsyncWrap::PROVIDER_WRITEWRAP) {
266 }
267
CreateWriteWrap(v8::Local<v8::Object> object)268 inline WriteWrap* StreamBase::CreateWriteWrap(
269 v8::Local<v8::Object> object) {
270 return new SimpleWriteWrap<AsyncWrap>(this, object);
271 }
272
AttachToObject(v8::Local<v8::Object> obj)273 inline void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
274 obj->SetAlignedPointerInInternalField(
275 StreamBase::kStreamBaseField, this);
276 }
277
FromObject(v8::Local<v8::Object> obj)278 inline StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
279 if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
280 return nullptr;
281
282 return static_cast<StreamBase*>(
283 obj->GetAlignedPointerFromInternalField(
284 StreamBase::kStreamBaseField));
285 }
286
287
OnDone(int status)288 inline void ShutdownWrap::OnDone(int status) {
289 stream()->EmitAfterShutdown(this, status);
290 Dispose();
291 }
292
SetAllocatedStorage(AllocatedBuffer && storage)293 inline void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
294 CHECK_NULL(storage_.data());
295 storage_ = std::move(storage);
296 }
297
OnDone(int status)298 inline void WriteWrap::OnDone(int status) {
299 stream()->EmitAfterWrite(this, status);
300 Dispose();
301 }
302
Done(int status,const char * error_str)303 inline void StreamReq::Done(int status, const char* error_str) {
304 AsyncWrap* async_wrap = GetAsyncWrap();
305 Environment* env = async_wrap->env();
306 if (error_str != nullptr) {
307 async_wrap->object()->Set(env->context(),
308 env->error_string(),
309 OneByteString(env->isolate(), error_str))
310 .Check();
311 }
312
313 OnDone(status);
314 }
315
ResetObject(v8::Local<v8::Object> obj)316 inline void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
317 DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
318
319 obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
320 obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
321 }
322
323
324 } // namespace node
325
326 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
327
328 #endif // SRC_STREAM_BASE_INL_H_
329