• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "async_wrap-inl.h"
7 #include "base_object-inl.h"
8 #include "node.h"
9 #include "stream_base.h"
10 #include "v8.h"
11 
12 namespace node {
13 
14 using v8::Signature;
15 using v8::FunctionCallbackInfo;
16 using v8::FunctionTemplate;
17 using v8::HandleScope;
18 using v8::Local;
19 using v8::Object;
20 using v8::PropertyAttribute;
21 using v8::PropertyCallbackInfo;
22 using v8::String;
23 using v8::Value;
24 
AttachToObject(v8::Local<v8::Object> req_wrap_obj)25 inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
26   CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
27                StreamReq::kStreamReqField),
28            nullptr);
29   req_wrap_obj->SetAlignedPointerInInternalField(
30       StreamReq::kStreamReqField, this);
31 }
32 
FromObject(v8::Local<v8::Object> req_wrap_obj)33 inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
34   return static_cast<StreamReq*>(
35       req_wrap_obj->GetAlignedPointerFromInternalField(
36           StreamReq::kStreamReqField));
37 }
38 
Dispose()39 inline void StreamReq::Dispose() {
40   BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
41   object()->SetAlignedPointerInInternalField(
42       StreamReq::kStreamReqField, nullptr);
43   destroy_me->Detach();
44 }
45 
object()46 inline v8::Local<v8::Object> StreamReq::object() {
47   return GetAsyncWrap()->object();
48 }
49 
~StreamListener()50 inline StreamListener::~StreamListener() {
51   if (stream_ != nullptr)
52     stream_->RemoveStreamListener(this);
53 }
54 
PassReadErrorToPreviousListener(ssize_t nread)55 inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
56   CHECK_NOT_NULL(previous_listener_);
57   previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
58 }
59 
OnStreamAfterShutdown(ShutdownWrap * w,int status)60 inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
61   CHECK_NOT_NULL(previous_listener_);
62   previous_listener_->OnStreamAfterShutdown(w, status);
63 }
64 
OnStreamAfterWrite(WriteWrap * w,int status)65 inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
66   CHECK_NOT_NULL(previous_listener_);
67   previous_listener_->OnStreamAfterWrite(w, status);
68 }
69 
~StreamResource()70 inline StreamResource::~StreamResource() {
71   while (listener_ != nullptr) {
72     StreamListener* listener = listener_;
73     listener->OnStreamDestroy();
74     // Remove the listener if it didn’t remove itself. This makes the logic
75     // in `OnStreamDestroy()` implementations easier, because they
76     // may call generic cleanup functions which can just remove the
77     // listener unconditionally.
78     if (listener == listener_)
79       RemoveStreamListener(listener_);
80   }
81 }
82 
PushStreamListener(StreamListener * listener)83 inline void StreamResource::PushStreamListener(StreamListener* listener) {
84   CHECK_NOT_NULL(listener);
85   CHECK_NULL(listener->stream_);
86 
87   listener->previous_listener_ = listener_;
88   listener->stream_ = this;
89 
90   listener_ = listener;
91 }
92 
RemoveStreamListener(StreamListener * listener)93 inline void StreamResource::RemoveStreamListener(StreamListener* listener) {
94   CHECK_NOT_NULL(listener);
95 
96   StreamListener* previous;
97   StreamListener* current;
98 
99   // Remove from the linked list.
100   for (current = listener_, previous = nullptr;
101        /* No loop condition because we want a crash if listener is not found */
102        ; previous = current, current = current->previous_listener_) {
103     CHECK_NOT_NULL(current);
104     if (current == listener) {
105       if (previous != nullptr)
106         previous->previous_listener_ = current->previous_listener_;
107       else
108         listener_ = listener->previous_listener_;
109       break;
110     }
111   }
112 
113   listener->stream_ = nullptr;
114   listener->previous_listener_ = nullptr;
115 }
116 
EmitAlloc(size_t suggested_size)117 inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
118   DebugSealHandleScope seal_handle_scope;
119   return listener_->OnStreamAlloc(suggested_size);
120 }
121 
EmitRead(ssize_t nread,const uv_buf_t & buf)122 inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
123   DebugSealHandleScope seal_handle_scope;
124   if (nread > 0)
125     bytes_read_ += static_cast<uint64_t>(nread);
126   listener_->OnStreamRead(nread, buf);
127 }
128 
EmitAfterWrite(WriteWrap * w,int status)129 inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
130   DebugSealHandleScope seal_handle_scope;
131   listener_->OnStreamAfterWrite(w, status);
132 }
133 
EmitAfterShutdown(ShutdownWrap * w,int status)134 inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
135   DebugSealHandleScope seal_handle_scope;
136   listener_->OnStreamAfterShutdown(w, status);
137 }
138 
EmitWantsWrite(size_t suggested_size)139 inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
140   DebugSealHandleScope seal_handle_scope;
141   listener_->OnStreamWantsWrite(suggested_size);
142 }
143 
StreamBase(Environment * env)144 inline StreamBase::StreamBase(Environment* env) : env_(env) {
145   PushStreamListener(&default_listener_);
146 }
147 
stream_env()148 inline Environment* StreamBase::stream_env() const {
149   return env_;
150 }
151 
Shutdown(v8::Local<v8::Object> req_wrap_obj)152 inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
153   Environment* env = stream_env();
154 
155   HandleScope handle_scope(env->isolate());
156 
157   if (req_wrap_obj.IsEmpty()) {
158     if (!env->shutdown_wrap_template()
159              ->NewInstance(env->context())
160              .ToLocal(&req_wrap_obj)) {
161       return UV_EBUSY;
162     }
163     StreamReq::ResetObject(req_wrap_obj);
164   }
165 
166   BaseObjectPtr<AsyncWrap> req_wrap_ptr;
167   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
168   ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
169   if (req_wrap != nullptr)
170     req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
171   int err = DoShutdown(req_wrap);
172 
173   if (err != 0 && req_wrap != nullptr) {
174     req_wrap->Dispose();
175   }
176 
177   const char* msg = Error();
178   if (msg != nullptr) {
179     req_wrap_obj->Set(
180         env->context(),
181         env->error_string(), OneByteString(env->isolate(), msg)).Check();
182     ClearError();
183   }
184 
185   return err;
186 }
187 
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj)188 inline StreamWriteResult StreamBase::Write(
189     uv_buf_t* bufs,
190     size_t count,
191     uv_stream_t* send_handle,
192     v8::Local<v8::Object> req_wrap_obj) {
193   Environment* env = stream_env();
194   int err;
195 
196   size_t total_bytes = 0;
197   for (size_t i = 0; i < count; ++i)
198     total_bytes += bufs[i].len;
199   bytes_written_ += total_bytes;
200 
201   if (send_handle == nullptr) {
202     err = DoTryWrite(&bufs, &count);
203     if (err != 0 || count == 0) {
204       return StreamWriteResult { false, err, nullptr, total_bytes, {} };
205     }
206   }
207 
208   HandleScope handle_scope(env->isolate());
209 
210   if (req_wrap_obj.IsEmpty()) {
211     if (!env->write_wrap_template()
212              ->NewInstance(env->context())
213              .ToLocal(&req_wrap_obj)) {
214       return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
215     }
216     StreamReq::ResetObject(req_wrap_obj);
217   }
218 
219   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
220   WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
221   BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
222 
223   err = DoWrite(req_wrap, bufs, count, send_handle);
224   bool async = err == 0;
225 
226   if (!async) {
227     req_wrap->Dispose();
228     req_wrap = nullptr;
229   }
230 
231   const char* msg = Error();
232   if (msg != nullptr) {
233     req_wrap_obj->Set(env->context(),
234                       env->error_string(),
235                       OneByteString(env->isolate(), msg)).Check();
236     ClearError();
237   }
238 
239   return StreamWriteResult {
240       async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
241 }
242 
243 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)244 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
245     StreamBase* stream,
246     v8::Local<v8::Object> req_wrap_obj)
247   : ShutdownWrap(stream, req_wrap_obj),
248     OtherBase(stream->stream_env(),
249               req_wrap_obj,
250               AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
251 }
252 
CreateShutdownWrap(v8::Local<v8::Object> object)253 inline ShutdownWrap* StreamBase::CreateShutdownWrap(
254     v8::Local<v8::Object> object) {
255   return new SimpleShutdownWrap<AsyncWrap>(this, object);
256 }
257 
258 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)259 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
260     StreamBase* stream,
261     v8::Local<v8::Object> req_wrap_obj)
262   : WriteWrap(stream, req_wrap_obj),
263     OtherBase(stream->stream_env(),
264               req_wrap_obj,
265               AsyncWrap::PROVIDER_WRITEWRAP) {
266 }
267 
CreateWriteWrap(v8::Local<v8::Object> object)268 inline WriteWrap* StreamBase::CreateWriteWrap(
269     v8::Local<v8::Object> object) {
270   return new SimpleWriteWrap<AsyncWrap>(this, object);
271 }
272 
AttachToObject(v8::Local<v8::Object> obj)273 inline void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
274   obj->SetAlignedPointerInInternalField(
275       StreamBase::kStreamBaseField, this);
276 }
277 
FromObject(v8::Local<v8::Object> obj)278 inline StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
279   if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
280     return nullptr;
281 
282   return static_cast<StreamBase*>(
283       obj->GetAlignedPointerFromInternalField(
284           StreamBase::kStreamBaseField));
285 }
286 
287 
OnDone(int status)288 inline void ShutdownWrap::OnDone(int status) {
289   stream()->EmitAfterShutdown(this, status);
290   Dispose();
291 }
292 
SetAllocatedStorage(AllocatedBuffer && storage)293 inline void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
294   CHECK_NULL(storage_.data());
295   storage_ = std::move(storage);
296 }
297 
OnDone(int status)298 inline void WriteWrap::OnDone(int status) {
299   stream()->EmitAfterWrite(this, status);
300   Dispose();
301 }
302 
Done(int status,const char * error_str)303 inline void StreamReq::Done(int status, const char* error_str) {
304   AsyncWrap* async_wrap = GetAsyncWrap();
305   Environment* env = async_wrap->env();
306   if (error_str != nullptr) {
307     async_wrap->object()->Set(env->context(),
308                               env->error_string(),
309                               OneByteString(env->isolate(), error_str))
310                               .Check();
311   }
312 
313   OnDone(status);
314 }
315 
ResetObject(v8::Local<v8::Object> obj)316 inline void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
317   DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
318 
319   obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
320   obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
321 }
322 
323 
324 }  // namespace node
325 
326 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
327 
328 #endif  // SRC_STREAM_BASE_INL_H_
329