• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "allocated_buffer-inl.h"
7 #include "async_wrap-inl.h"
8 #include "base_object-inl.h"
9 #include "node.h"
10 #include "stream_base.h"
11 #include "v8.h"
12 
13 namespace node {
14 
StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)15 StreamReq::StreamReq(
16     StreamBase* stream,
17     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
18   AttachToObject(req_wrap_obj);
19 }
20 
AttachToObject(v8::Local<v8::Object> req_wrap_obj)21 void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
22   CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
23                StreamReq::kStreamReqField),
24            nullptr);
25   req_wrap_obj->SetAlignedPointerInInternalField(
26       StreamReq::kStreamReqField, this);
27 }
28 
FromObject(v8::Local<v8::Object> req_wrap_obj)29 StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
30   return static_cast<StreamReq*>(
31       req_wrap_obj->GetAlignedPointerFromInternalField(
32           StreamReq::kStreamReqField));
33 }
34 
Dispose()35 void StreamReq::Dispose() {
36   BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
37   object()->SetAlignedPointerInInternalField(
38       StreamReq::kStreamReqField, nullptr);
39   destroy_me->Detach();
40 }
41 
object()42 v8::Local<v8::Object> StreamReq::object() {
43   return GetAsyncWrap()->object();
44 }
45 
ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)46 ShutdownWrap::ShutdownWrap(
47     StreamBase* stream,
48     v8::Local<v8::Object> req_wrap_obj)
49     : StreamReq(stream, req_wrap_obj) { }
50 
WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)51 WriteWrap::WriteWrap(
52     StreamBase* stream,
53     v8::Local<v8::Object> req_wrap_obj)
54     : StreamReq(stream, req_wrap_obj) { }
55 
PassReadErrorToPreviousListener(ssize_t nread)56 void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
57   CHECK_NOT_NULL(previous_listener_);
58   previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
59 }
60 
PushStreamListener(StreamListener * listener)61 void StreamResource::PushStreamListener(StreamListener* listener) {
62   CHECK_NOT_NULL(listener);
63   CHECK_NULL(listener->stream_);
64 
65   listener->previous_listener_ = listener_;
66   listener->stream_ = this;
67 
68   listener_ = listener;
69 }
70 
RemoveStreamListener(StreamListener * listener)71 void StreamResource::RemoveStreamListener(StreamListener* listener) {
72   CHECK_NOT_NULL(listener);
73 
74   StreamListener* previous;
75   StreamListener* current;
76 
77   // Remove from the linked list.
78   for (current = listener_, previous = nullptr;
79        /* No loop condition because we want a crash if listener is not found */
80        ; previous = current, current = current->previous_listener_) {
81     CHECK_NOT_NULL(current);
82     if (current == listener) {
83       if (previous != nullptr)
84         previous->previous_listener_ = current->previous_listener_;
85       else
86         listener_ = listener->previous_listener_;
87       break;
88     }
89   }
90 
91   listener->stream_ = nullptr;
92   listener->previous_listener_ = nullptr;
93 }
94 
EmitAlloc(size_t suggested_size)95 uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
96   DebugSealHandleScope seal_handle_scope;
97   return listener_->OnStreamAlloc(suggested_size);
98 }
99 
EmitRead(ssize_t nread,const uv_buf_t & buf)100 void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
101   DebugSealHandleScope seal_handle_scope;
102   if (nread > 0)
103     bytes_read_ += static_cast<uint64_t>(nread);
104   listener_->OnStreamRead(nread, buf);
105 }
106 
EmitAfterWrite(WriteWrap * w,int status)107 void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
108   DebugSealHandleScope seal_handle_scope;
109   listener_->OnStreamAfterWrite(w, status);
110 }
111 
EmitAfterShutdown(ShutdownWrap * w,int status)112 void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
113   DebugSealHandleScope seal_handle_scope;
114   listener_->OnStreamAfterShutdown(w, status);
115 }
116 
EmitWantsWrite(size_t suggested_size)117 void StreamResource::EmitWantsWrite(size_t suggested_size) {
118   DebugSealHandleScope seal_handle_scope;
119   listener_->OnStreamWantsWrite(suggested_size);
120 }
121 
StreamBase(Environment * env)122 StreamBase::StreamBase(Environment* env) : env_(env) {
123   PushStreamListener(&default_listener_);
124 }
125 
Shutdown(v8::Local<v8::Object> req_wrap_obj)126 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
127   Environment* env = stream_env();
128 
129   v8::HandleScope handle_scope(env->isolate());
130 
131   if (req_wrap_obj.IsEmpty()) {
132     if (!env->shutdown_wrap_template()
133              ->NewInstance(env->context())
134              .ToLocal(&req_wrap_obj)) {
135       return UV_EBUSY;
136     }
137     StreamReq::ResetObject(req_wrap_obj);
138   }
139 
140   BaseObjectPtr<AsyncWrap> req_wrap_ptr;
141   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
142   ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
143   if (req_wrap != nullptr)
144     req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
145   int err = DoShutdown(req_wrap);
146 
147   if (err != 0 && req_wrap != nullptr) {
148     req_wrap->Dispose();
149   }
150 
151   const char* msg = Error();
152   if (msg != nullptr) {
153     req_wrap_obj->Set(
154         env->context(),
155         env->error_string(), OneByteString(env->isolate(), msg)).Check();
156     ClearError();
157   }
158 
159   return err;
160 }
161 
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj)162 StreamWriteResult StreamBase::Write(
163     uv_buf_t* bufs,
164     size_t count,
165     uv_stream_t* send_handle,
166     v8::Local<v8::Object> req_wrap_obj) {
167   Environment* env = stream_env();
168   int err;
169 
170   size_t total_bytes = 0;
171   for (size_t i = 0; i < count; ++i)
172     total_bytes += bufs[i].len;
173   bytes_written_ += total_bytes;
174 
175   if (send_handle == nullptr) {
176     err = DoTryWrite(&bufs, &count);
177     if (err != 0 || count == 0) {
178       return StreamWriteResult { false, err, nullptr, total_bytes, {} };
179     }
180   }
181 
182   v8::HandleScope handle_scope(env->isolate());
183 
184   if (req_wrap_obj.IsEmpty()) {
185     if (!env->write_wrap_template()
186              ->NewInstance(env->context())
187              .ToLocal(&req_wrap_obj)) {
188       return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
189     }
190     StreamReq::ResetObject(req_wrap_obj);
191   }
192 
193   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
194   WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
195   BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
196 
197   err = DoWrite(req_wrap, bufs, count, send_handle);
198   bool async = err == 0;
199 
200   if (!async) {
201     req_wrap->Dispose();
202     req_wrap = nullptr;
203   }
204 
205   const char* msg = Error();
206   if (msg != nullptr) {
207     req_wrap_obj->Set(env->context(),
208                       env->error_string(),
209                       OneByteString(env->isolate(), msg)).Check();
210     ClearError();
211   }
212 
213   return StreamWriteResult {
214       async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
215 }
216 
217 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)218 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
219     StreamBase* stream,
220     v8::Local<v8::Object> req_wrap_obj)
221   : ShutdownWrap(stream, req_wrap_obj),
222     OtherBase(stream->stream_env(),
223               req_wrap_obj,
224               AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
225 }
226 
227 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)228 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
229     StreamBase* stream,
230     v8::Local<v8::Object> req_wrap_obj)
231   : WriteWrap(stream, req_wrap_obj),
232     OtherBase(stream->stream_env(),
233               req_wrap_obj,
234               AsyncWrap::PROVIDER_WRITEWRAP) {
235 }
236 
AttachToObject(v8::Local<v8::Object> obj)237 void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
238   obj->SetAlignedPointerInInternalField(
239       StreamBase::kStreamBaseField, this);
240 }
241 
FromObject(v8::Local<v8::Object> obj)242 StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
243   if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
244     return nullptr;
245 
246   return static_cast<StreamBase*>(
247       obj->GetAlignedPointerFromInternalField(
248           StreamBase::kStreamBaseField));
249 }
250 
FromObject(v8::Local<v8::Object> req_wrap_obj)251 WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
252   return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
253 }
254 
255 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)256 WriteWrap* WriteWrap::FromObject(
257     const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
258   if (!base_obj) return nullptr;
259   return FromObject(base_obj->object());
260 }
261 
FromObject(v8::Local<v8::Object> req_wrap_obj)262 ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
263   return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
264 }
265 
266 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)267 ShutdownWrap* ShutdownWrap::FromObject(
268     const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
269   if (!base_obj) return nullptr;
270   return FromObject(base_obj->object());
271 }
272 
SetAllocatedStorage(AllocatedBuffer && storage)273 void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
274   CHECK_NULL(storage_.data());
275   storage_ = std::move(storage);
276 }
277 
Done(int status,const char * error_str)278 void StreamReq::Done(int status, const char* error_str) {
279   AsyncWrap* async_wrap = GetAsyncWrap();
280   Environment* env = async_wrap->env();
281   if (error_str != nullptr) {
282     v8::HandleScope handle_scope(env->isolate());
283     async_wrap->object()->Set(env->context(),
284                               env->error_string(),
285                               OneByteString(env->isolate(), error_str))
286                               .Check();
287   }
288 
289   OnDone(status);
290 }
291 
ResetObject(v8::Local<v8::Object> obj)292 void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
293   DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
294 
295   obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
296   obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
297 }
298 
299 }  // namespace node
300 
301 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
302 
303 #endif  // SRC_STREAM_BASE_INL_H_
304