1 #ifndef SRC_STREAM_BASE_INL_H_
2 #define SRC_STREAM_BASE_INL_H_
3
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6 #include "allocated_buffer-inl.h"
7 #include "async_wrap-inl.h"
8 #include "base_object-inl.h"
9 #include "node.h"
10 #include "stream_base.h"
11 #include "v8.h"
12
13 namespace node {
14
StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)15 StreamReq::StreamReq(
16 StreamBase* stream,
17 v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
18 AttachToObject(req_wrap_obj);
19 }
20
AttachToObject(v8::Local<v8::Object> req_wrap_obj)21 void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
22 CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(
23 StreamReq::kStreamReqField),
24 nullptr);
25 req_wrap_obj->SetAlignedPointerInInternalField(
26 StreamReq::kStreamReqField, this);
27 }
28
FromObject(v8::Local<v8::Object> req_wrap_obj)29 StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
30 return static_cast<StreamReq*>(
31 req_wrap_obj->GetAlignedPointerFromInternalField(
32 StreamReq::kStreamReqField));
33 }
34
Dispose()35 void StreamReq::Dispose() {
36 BaseObjectPtr<AsyncWrap> destroy_me{GetAsyncWrap()};
37 object()->SetAlignedPointerInInternalField(
38 StreamReq::kStreamReqField, nullptr);
39 destroy_me->Detach();
40 }
41
object()42 v8::Local<v8::Object> StreamReq::object() {
43 return GetAsyncWrap()->object();
44 }
45
ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)46 ShutdownWrap::ShutdownWrap(
47 StreamBase* stream,
48 v8::Local<v8::Object> req_wrap_obj)
49 : StreamReq(stream, req_wrap_obj) { }
50
WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)51 WriteWrap::WriteWrap(
52 StreamBase* stream,
53 v8::Local<v8::Object> req_wrap_obj)
54 : StreamReq(stream, req_wrap_obj) { }
55
PassReadErrorToPreviousListener(ssize_t nread)56 void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
57 CHECK_NOT_NULL(previous_listener_);
58 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
59 }
60
PushStreamListener(StreamListener * listener)61 void StreamResource::PushStreamListener(StreamListener* listener) {
62 CHECK_NOT_NULL(listener);
63 CHECK_NULL(listener->stream_);
64
65 listener->previous_listener_ = listener_;
66 listener->stream_ = this;
67
68 listener_ = listener;
69 }
70
RemoveStreamListener(StreamListener * listener)71 void StreamResource::RemoveStreamListener(StreamListener* listener) {
72 CHECK_NOT_NULL(listener);
73
74 StreamListener* previous;
75 StreamListener* current;
76
77 // Remove from the linked list.
78 for (current = listener_, previous = nullptr;
79 /* No loop condition because we want a crash if listener is not found */
80 ; previous = current, current = current->previous_listener_) {
81 CHECK_NOT_NULL(current);
82 if (current == listener) {
83 if (previous != nullptr)
84 previous->previous_listener_ = current->previous_listener_;
85 else
86 listener_ = listener->previous_listener_;
87 break;
88 }
89 }
90
91 listener->stream_ = nullptr;
92 listener->previous_listener_ = nullptr;
93 }
94
EmitAlloc(size_t suggested_size)95 uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
96 DebugSealHandleScope seal_handle_scope;
97 return listener_->OnStreamAlloc(suggested_size);
98 }
99
EmitRead(ssize_t nread,const uv_buf_t & buf)100 void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
101 DebugSealHandleScope seal_handle_scope;
102 if (nread > 0)
103 bytes_read_ += static_cast<uint64_t>(nread);
104 listener_->OnStreamRead(nread, buf);
105 }
106
EmitAfterWrite(WriteWrap * w,int status)107 void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
108 DebugSealHandleScope seal_handle_scope;
109 listener_->OnStreamAfterWrite(w, status);
110 }
111
EmitAfterShutdown(ShutdownWrap * w,int status)112 void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
113 DebugSealHandleScope seal_handle_scope;
114 listener_->OnStreamAfterShutdown(w, status);
115 }
116
EmitWantsWrite(size_t suggested_size)117 void StreamResource::EmitWantsWrite(size_t suggested_size) {
118 DebugSealHandleScope seal_handle_scope;
119 listener_->OnStreamWantsWrite(suggested_size);
120 }
121
StreamBase(Environment * env)122 StreamBase::StreamBase(Environment* env) : env_(env) {
123 PushStreamListener(&default_listener_);
124 }
125
Shutdown(v8::Local<v8::Object> req_wrap_obj)126 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
127 Environment* env = stream_env();
128
129 v8::HandleScope handle_scope(env->isolate());
130
131 if (req_wrap_obj.IsEmpty()) {
132 if (!env->shutdown_wrap_template()
133 ->NewInstance(env->context())
134 .ToLocal(&req_wrap_obj)) {
135 return UV_EBUSY;
136 }
137 StreamReq::ResetObject(req_wrap_obj);
138 }
139
140 BaseObjectPtr<AsyncWrap> req_wrap_ptr;
141 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
142 ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
143 if (req_wrap != nullptr)
144 req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
145 int err = DoShutdown(req_wrap);
146
147 if (err != 0 && req_wrap != nullptr) {
148 req_wrap->Dispose();
149 }
150
151 const char* msg = Error();
152 if (msg != nullptr) {
153 req_wrap_obj->Set(
154 env->context(),
155 env->error_string(), OneByteString(env->isolate(), msg)).Check();
156 ClearError();
157 }
158
159 return err;
160 }
161
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj)162 StreamWriteResult StreamBase::Write(
163 uv_buf_t* bufs,
164 size_t count,
165 uv_stream_t* send_handle,
166 v8::Local<v8::Object> req_wrap_obj) {
167 Environment* env = stream_env();
168 int err;
169
170 size_t total_bytes = 0;
171 for (size_t i = 0; i < count; ++i)
172 total_bytes += bufs[i].len;
173 bytes_written_ += total_bytes;
174
175 if (send_handle == nullptr) {
176 err = DoTryWrite(&bufs, &count);
177 if (err != 0 || count == 0) {
178 return StreamWriteResult { false, err, nullptr, total_bytes, {} };
179 }
180 }
181
182 v8::HandleScope handle_scope(env->isolate());
183
184 if (req_wrap_obj.IsEmpty()) {
185 if (!env->write_wrap_template()
186 ->NewInstance(env->context())
187 .ToLocal(&req_wrap_obj)) {
188 return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
189 }
190 StreamReq::ResetObject(req_wrap_obj);
191 }
192
193 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
194 WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
195 BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
196
197 err = DoWrite(req_wrap, bufs, count, send_handle);
198 bool async = err == 0;
199
200 if (!async) {
201 req_wrap->Dispose();
202 req_wrap = nullptr;
203 }
204
205 const char* msg = Error();
206 if (msg != nullptr) {
207 req_wrap_obj->Set(env->context(),
208 env->error_string(),
209 OneByteString(env->isolate(), msg)).Check();
210 ClearError();
211 }
212
213 return StreamWriteResult {
214 async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) };
215 }
216
217 template <typename OtherBase>
SimpleShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)218 SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap(
219 StreamBase* stream,
220 v8::Local<v8::Object> req_wrap_obj)
221 : ShutdownWrap(stream, req_wrap_obj),
222 OtherBase(stream->stream_env(),
223 req_wrap_obj,
224 AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
225 }
226
227 template <typename OtherBase>
SimpleWriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)228 SimpleWriteWrap<OtherBase>::SimpleWriteWrap(
229 StreamBase* stream,
230 v8::Local<v8::Object> req_wrap_obj)
231 : WriteWrap(stream, req_wrap_obj),
232 OtherBase(stream->stream_env(),
233 req_wrap_obj,
234 AsyncWrap::PROVIDER_WRITEWRAP) {
235 }
236
AttachToObject(v8::Local<v8::Object> obj)237 void StreamBase::AttachToObject(v8::Local<v8::Object> obj) {
238 obj->SetAlignedPointerInInternalField(
239 StreamBase::kStreamBaseField, this);
240 }
241
FromObject(v8::Local<v8::Object> obj)242 StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
243 if (obj->GetAlignedPointerFromInternalField(StreamBase::kSlot) == nullptr)
244 return nullptr;
245
246 return static_cast<StreamBase*>(
247 obj->GetAlignedPointerFromInternalField(
248 StreamBase::kStreamBaseField));
249 }
250
FromObject(v8::Local<v8::Object> req_wrap_obj)251 WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
252 return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
253 }
254
255 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)256 WriteWrap* WriteWrap::FromObject(
257 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
258 if (!base_obj) return nullptr;
259 return FromObject(base_obj->object());
260 }
261
FromObject(v8::Local<v8::Object> req_wrap_obj)262 ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
263 return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
264 }
265
266 template <typename T, bool kIsWeak>
FromObject(const BaseObjectPtrImpl<T,kIsWeak> & base_obj)267 ShutdownWrap* ShutdownWrap::FromObject(
268 const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
269 if (!base_obj) return nullptr;
270 return FromObject(base_obj->object());
271 }
272
SetAllocatedStorage(AllocatedBuffer && storage)273 void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
274 CHECK_NULL(storage_.data());
275 storage_ = std::move(storage);
276 }
277
Done(int status,const char * error_str)278 void StreamReq::Done(int status, const char* error_str) {
279 AsyncWrap* async_wrap = GetAsyncWrap();
280 Environment* env = async_wrap->env();
281 if (error_str != nullptr) {
282 v8::HandleScope handle_scope(env->isolate());
283 async_wrap->object()->Set(env->context(),
284 env->error_string(),
285 OneByteString(env->isolate(), error_str))
286 .Check();
287 }
288
289 OnDone(status);
290 }
291
ResetObject(v8::Local<v8::Object> obj)292 void StreamReq::ResetObject(v8::Local<v8::Object> obj) {
293 DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField);
294
295 obj->SetAlignedPointerInInternalField(StreamReq::kSlot, nullptr);
296 obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr);
297 }
298
299 } // namespace node
300
301 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
302
303 #endif // SRC_STREAM_BASE_INL_H_
304