1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap-inl.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util-inl.h"
33
34 #include <cstring> // memcpy()
35 #include <climits> // INT_MAX
36
37
38 namespace node {
39
40 using v8::Context;
41 using v8::DontDelete;
42 using v8::EscapableHandleScope;
43 using v8::FunctionCallbackInfo;
44 using v8::FunctionTemplate;
45 using v8::HandleScope;
46 using v8::Local;
47 using v8::MaybeLocal;
48 using v8::Object;
49 using v8::PropertyAttribute;
50 using v8::ReadOnly;
51 using v8::Signature;
52 using v8::Value;
53
54
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)55 void LibuvStreamWrap::Initialize(Local<Object> target,
56 Local<Value> unused,
57 Local<Context> context,
58 void* priv) {
59 Environment* env = Environment::GetCurrent(context);
60
61 auto is_construct_call_callback =
62 [](const FunctionCallbackInfo<Value>& args) {
63 CHECK(args.IsConstructCall());
64 StreamReq::ResetObject(args.This());
65 };
66 Local<FunctionTemplate> sw =
67 FunctionTemplate::New(env->isolate(), is_construct_call_callback);
68 sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
69
70 // we need to set handle and callback to null,
71 // so that those fields are created and functions
72 // do not become megamorphic
73 // Fields:
74 // - oncomplete
75 // - callback
76 // - handle
77 sw->InstanceTemplate()->Set(
78 env->oncomplete_string(),
79 v8::Null(env->isolate()));
80 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
81 v8::Null(env->isolate()));
82 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
83 v8::Null(env->isolate()));
84
85 sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
86
87 env->SetConstructorFunction(target, "ShutdownWrap", sw);
88 env->set_shutdown_wrap_template(sw->InstanceTemplate());
89
90 Local<FunctionTemplate> ww =
91 FunctionTemplate::New(env->isolate(), is_construct_call_callback);
92 ww->InstanceTemplate()->SetInternalFieldCount(
93 StreamReq::kInternalFieldCount);
94 ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
95 env->SetConstructorFunction(target, "WriteWrap", ww);
96 env->set_write_wrap_template(ww->InstanceTemplate());
97
98 NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
99 NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
100 NODE_DEFINE_CONSTANT(target, kBytesWritten);
101 NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
102 target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
103 env->stream_base_state().GetJSArray()).Check();
104 }
105
106
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)107 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
108 Local<Object> object,
109 uv_stream_t* stream,
110 AsyncWrap::ProviderType provider)
111 : HandleWrap(env,
112 object,
113 reinterpret_cast<uv_handle_t*>(stream),
114 provider),
115 StreamBase(env),
116 stream_(stream) {
117 StreamBase::AttachToObject(object);
118 }
119
120
GetConstructorTemplate(Environment * env)121 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
122 Environment* env) {
123 Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
124 if (tmpl.IsEmpty()) {
125 tmpl = env->NewFunctionTemplate(nullptr);
126 tmpl->SetClassName(
127 FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
128 tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
129 tmpl->InstanceTemplate()->SetInternalFieldCount(
130 StreamBase::kInternalFieldCount);
131 Local<FunctionTemplate> get_write_queue_size =
132 FunctionTemplate::New(env->isolate(),
133 GetWriteQueueSize,
134 Local<Value>(),
135 Signature::New(env->isolate(), tmpl));
136 tmpl->PrototypeTemplate()->SetAccessorProperty(
137 env->write_queue_size_string(),
138 get_write_queue_size,
139 Local<FunctionTemplate>(),
140 static_cast<PropertyAttribute>(ReadOnly | DontDelete));
141 env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
142 StreamBase::AddMethods(env, tmpl);
143 env->set_libuv_stream_wrap_ctor_template(tmpl);
144 }
145 return tmpl;
146 }
147
148
From(Environment * env,Local<Object> object)149 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
150 Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
151 CHECK(!sw.IsEmpty() && sw->HasInstance(object));
152 return Unwrap<LibuvStreamWrap>(object);
153 }
154
155
GetFD()156 int LibuvStreamWrap::GetFD() {
157 #ifdef _WIN32
158 return fd_;
159 #else
160 int fd = -1;
161 if (stream() != nullptr)
162 uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
163 return fd;
164 #endif
165 }
166
167
IsAlive()168 bool LibuvStreamWrap::IsAlive() {
169 return HandleWrap::IsAlive(this);
170 }
171
172
IsClosing()173 bool LibuvStreamWrap::IsClosing() {
174 return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
175 }
176
177
GetAsyncWrap()178 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
179 return static_cast<AsyncWrap*>(this);
180 }
181
182
IsIPCPipe()183 bool LibuvStreamWrap::IsIPCPipe() {
184 return is_named_pipe_ipc();
185 }
186
187
ReadStart()188 int LibuvStreamWrap::ReadStart() {
189 return uv_read_start(stream(), [](uv_handle_t* handle,
190 size_t suggested_size,
191 uv_buf_t* buf) {
192 static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
193 }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
194 static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
195 });
196 }
197
198
ReadStop()199 int LibuvStreamWrap::ReadStop() {
200 return uv_read_stop(stream());
201 }
202
203
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)204 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
205 HandleScope scope(env()->isolate());
206 Context::Scope context_scope(env()->context());
207
208 *buf = EmitAlloc(suggested_size);
209 }
210
211 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)212 static MaybeLocal<Object> AcceptHandle(Environment* env,
213 LibuvStreamWrap* parent) {
214 static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
215 std::is_base_of<UDPWrap, WrapType>::value,
216 "Can only accept stream handles");
217
218 EscapableHandleScope scope(env->isolate());
219 Local<Object> wrap_obj;
220
221 if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
222 return Local<Object>();
223
224 HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
225 CHECK_NOT_NULL(wrap);
226 uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
227 CHECK_NOT_NULL(stream);
228
229 if (uv_accept(parent->stream(), stream))
230 ABORT();
231
232 return scope.Escape(wrap_obj);
233 }
234
235
OnUvRead(ssize_t nread,const uv_buf_t * buf)236 void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
237 HandleScope scope(env()->isolate());
238 Context::Scope context_scope(env()->context());
239 uv_handle_type type = UV_UNKNOWN_HANDLE;
240
241 if (is_named_pipe_ipc() &&
242 uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
243 type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
244 }
245
246 // We should not be getting this callback if someone has already called
247 // uv_close() on the handle.
248 CHECK_EQ(persistent().IsEmpty(), false);
249
250 if (nread > 0) {
251 MaybeLocal<Object> pending_obj;
252
253 if (type == UV_TCP) {
254 pending_obj = AcceptHandle<TCPWrap>(env(), this);
255 } else if (type == UV_NAMED_PIPE) {
256 pending_obj = AcceptHandle<PipeWrap>(env(), this);
257 } else if (type == UV_UDP) {
258 pending_obj = AcceptHandle<UDPWrap>(env(), this);
259 } else {
260 CHECK_EQ(type, UV_UNKNOWN_HANDLE);
261 }
262
263 if (!pending_obj.IsEmpty()) {
264 object()
265 ->Set(env()->context(),
266 env()->pending_handle_string(),
267 pending_obj.ToLocalChecked())
268 .Check();
269 }
270 }
271
272 EmitRead(nread, *buf);
273 }
274
275
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)276 void LibuvStreamWrap::GetWriteQueueSize(
277 const FunctionCallbackInfo<Value>& info) {
278 LibuvStreamWrap* wrap;
279 ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
280
281 if (wrap->stream() == nullptr) {
282 info.GetReturnValue().Set(0);
283 return;
284 }
285
286 uint32_t write_queue_size = wrap->stream()->write_queue_size;
287 info.GetReturnValue().Set(write_queue_size);
288 }
289
290
SetBlocking(const FunctionCallbackInfo<Value> & args)291 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
292 LibuvStreamWrap* wrap;
293 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
294
295 CHECK_GT(args.Length(), 0);
296 if (!wrap->IsAlive())
297 return args.GetReturnValue().Set(UV_EINVAL);
298
299 bool enable = args[0]->IsTrue();
300 args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
301 }
302
303 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
304 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
305
CreateShutdownWrap(Local<Object> object)306 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
307 return new LibuvShutdownWrap(this, object);
308 }
309
CreateWriteWrap(Local<Object> object)310 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
311 return new LibuvWriteWrap(this, object);
312 }
313
314
DoShutdown(ShutdownWrap * req_wrap_)315 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
316 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
317 return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
318 }
319
320
AfterUvShutdown(uv_shutdown_t * req,int status)321 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
322 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
323 LibuvShutdownWrap::from_req(req));
324 CHECK_NOT_NULL(req_wrap);
325 HandleScope scope(req_wrap->env()->isolate());
326 Context::Scope context_scope(req_wrap->env()->context());
327 req_wrap->Done(status);
328 }
329
330
331 // NOTE: Call to this function could change both `buf`'s and `count`'s
332 // values, shifting their base and decrementing their length. This is
333 // required in order to skip the data that was successfully written via
334 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)335 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
336 int err;
337 size_t written;
338 uv_buf_t* vbufs = *bufs;
339 size_t vcount = *count;
340
341 err = uv_try_write(stream(), vbufs, vcount);
342 if (err == UV_ENOSYS || err == UV_EAGAIN)
343 return 0;
344 if (err < 0)
345 return err;
346
347 // Slice off the buffers: skip all written buffers and slice the one that
348 // was partially written.
349 written = err;
350 for (; vcount > 0; vbufs++, vcount--) {
351 // Slice
352 if (vbufs[0].len > written) {
353 vbufs[0].base += written;
354 vbufs[0].len -= written;
355 written = 0;
356 break;
357
358 // Discard
359 } else {
360 written -= vbufs[0].len;
361 }
362 }
363
364 *bufs = vbufs;
365 *count = vcount;
366
367 return 0;
368 }
369
370
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)371 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
372 uv_buf_t* bufs,
373 size_t count,
374 uv_stream_t* send_handle) {
375 LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
376 return w->Dispatch(uv_write2,
377 stream(),
378 bufs,
379 count,
380 send_handle,
381 AfterUvWrite);
382 }
383
384
385
AfterUvWrite(uv_write_t * req,int status)386 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
387 LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
388 LibuvWriteWrap::from_req(req));
389 CHECK_NOT_NULL(req_wrap);
390 HandleScope scope(req_wrap->env()->isolate());
391 Context::Scope context_scope(req_wrap->env()->context());
392 req_wrap->Done(status);
393 }
394
395 } // namespace node
396
397 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
398 node::LibuvStreamWrap::Initialize)
399