1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "node_errors.h"
29 #include "node_external_reference.h"
30 #include "pipe_wrap.h"
31 #include "req_wrap-inl.h"
32 #include "tcp_wrap.h"
33 #include "udp_wrap.h"
34 #include "util-inl.h"
35
36 #include <cstring> // memcpy()
37 #include <climits> // INT_MAX
38
39
40 namespace node {
41
42 using errors::TryCatchScope;
43 using v8::Context;
44 using v8::DontDelete;
45 using v8::EscapableHandleScope;
46 using v8::FunctionCallbackInfo;
47 using v8::FunctionTemplate;
48 using v8::HandleScope;
49 using v8::Isolate;
50 using v8::JustVoid;
51 using v8::Local;
52 using v8::Maybe;
53 using v8::MaybeLocal;
54 using v8::Nothing;
55 using v8::Object;
56 using v8::PropertyAttribute;
57 using v8::ReadOnly;
58 using v8::Signature;
59 using v8::Value;
60
IsConstructCallCallback(const FunctionCallbackInfo<Value> & args)61 void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
62 CHECK(args.IsConstructCall());
63 StreamReq::ResetObject(args.This());
64 }
65
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)66 void LibuvStreamWrap::Initialize(Local<Object> target,
67 Local<Value> unused,
68 Local<Context> context,
69 void* priv) {
70 Environment* env = Environment::GetCurrent(context);
71 Isolate* isolate = env->isolate();
72
73 Local<FunctionTemplate> sw =
74 NewFunctionTemplate(isolate, IsConstructCallCallback);
75 sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
76
77 // we need to set handle and callback to null,
78 // so that those fields are created and functions
79 // do not become megamorphic
80 // Fields:
81 // - oncomplete
82 // - callback
83 // - handle
84 sw->InstanceTemplate()->Set(env->oncomplete_string(), v8::Null(isolate));
85 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "callback"),
86 v8::Null(isolate));
87 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "handle"),
88 v8::Null(isolate));
89
90 sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
91
92 SetConstructorFunction(context, target, "ShutdownWrap", sw);
93 env->set_shutdown_wrap_template(sw->InstanceTemplate());
94
95 Local<FunctionTemplate> ww =
96 FunctionTemplate::New(isolate, IsConstructCallCallback);
97 ww->InstanceTemplate()->SetInternalFieldCount(
98 StreamReq::kInternalFieldCount);
99 ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
100 SetConstructorFunction(context, target, "WriteWrap", ww);
101 env->set_write_wrap_template(ww->InstanceTemplate());
102
103 NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
104 NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
105 NODE_DEFINE_CONSTANT(target, kBytesWritten);
106 NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
107 target
108 ->Set(context,
109 FIXED_ONE_BYTE_STRING(isolate, "streamBaseState"),
110 env->stream_base_state().GetJSArray())
111 .Check();
112 }
113
RegisterExternalReferences(ExternalReferenceRegistry * registry)114 void LibuvStreamWrap::RegisterExternalReferences(
115 ExternalReferenceRegistry* registry) {
116 registry->Register(IsConstructCallCallback);
117 registry->Register(GetWriteQueueSize);
118 registry->Register(SetBlocking);
119 StreamBase::RegisterExternalReferences(registry);
120 }
121
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)122 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
123 Local<Object> object,
124 uv_stream_t* stream,
125 AsyncWrap::ProviderType provider)
126 : HandleWrap(env,
127 object,
128 reinterpret_cast<uv_handle_t*>(stream),
129 provider),
130 StreamBase(env),
131 stream_(stream) {
132 StreamBase::AttachToObject(object);
133 }
134
135
GetConstructorTemplate(Environment * env)136 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
137 Environment* env) {
138 Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
139 if (tmpl.IsEmpty()) {
140 Isolate* isolate = env->isolate();
141 tmpl = NewFunctionTemplate(isolate, nullptr);
142 tmpl->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "LibuvStreamWrap"));
143 tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
144 tmpl->InstanceTemplate()->SetInternalFieldCount(
145 StreamBase::kInternalFieldCount);
146 Local<FunctionTemplate> get_write_queue_size =
147 FunctionTemplate::New(isolate,
148 GetWriteQueueSize,
149 Local<Value>(),
150 Signature::New(isolate, tmpl));
151 tmpl->PrototypeTemplate()->SetAccessorProperty(
152 env->write_queue_size_string(),
153 get_write_queue_size,
154 Local<FunctionTemplate>(),
155 static_cast<PropertyAttribute>(ReadOnly | DontDelete));
156 SetProtoMethod(isolate, tmpl, "setBlocking", SetBlocking);
157 StreamBase::AddMethods(env, tmpl);
158 env->set_libuv_stream_wrap_ctor_template(tmpl);
159 }
160 return tmpl;
161 }
162
163
From(Environment * env,Local<Object> object)164 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
165 Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
166 CHECK(!sw.IsEmpty() && sw->HasInstance(object));
167 return Unwrap<LibuvStreamWrap>(object);
168 }
169
170
GetFD()171 int LibuvStreamWrap::GetFD() {
172 #ifdef _WIN32
173 return fd_;
174 #else
175 int fd = -1;
176 if (stream() != nullptr)
177 uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
178 return fd;
179 #endif
180 }
181
182
IsAlive()183 bool LibuvStreamWrap::IsAlive() {
184 return HandleWrap::IsAlive(this);
185 }
186
187
IsClosing()188 bool LibuvStreamWrap::IsClosing() {
189 return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
190 }
191
192
GetAsyncWrap()193 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
194 return static_cast<AsyncWrap*>(this);
195 }
196
197
IsIPCPipe()198 bool LibuvStreamWrap::IsIPCPipe() {
199 return is_named_pipe_ipc();
200 }
201
ReadStart()202 int LibuvStreamWrap::ReadStart() {
203 return uv_read_start(
204 stream(),
205 [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
206 static_cast<LibuvStreamWrap*>(handle->data)
207 ->OnUvAlloc(suggested_size, buf);
208 },
209 [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
210 LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(stream->data);
211 TryCatchScope try_catch(wrap->env());
212 try_catch.SetVerbose(true);
213 wrap->OnUvRead(nread, buf);
214 });
215 }
216
217
ReadStop()218 int LibuvStreamWrap::ReadStop() {
219 return uv_read_stop(stream());
220 }
221
222
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)223 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
224 HandleScope scope(env()->isolate());
225 Context::Scope context_scope(env()->context());
226
227 *buf = EmitAlloc(suggested_size);
228 }
229
230 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)231 static MaybeLocal<Object> AcceptHandle(Environment* env,
232 LibuvStreamWrap* parent) {
233 static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
234 std::is_base_of<UDPWrap, WrapType>::value,
235 "Can only accept stream handles");
236
237 EscapableHandleScope scope(env->isolate());
238 Local<Object> wrap_obj;
239
240 if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
241 return Local<Object>();
242
243 HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
244 CHECK_NOT_NULL(wrap);
245 uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
246 CHECK_NOT_NULL(stream);
247
248 if (uv_accept(parent->stream(), stream))
249 ABORT();
250
251 return scope.Escape(wrap_obj);
252 }
253
OnUvRead(ssize_t nread,const uv_buf_t * buf)254 Maybe<void> LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
255 HandleScope scope(env()->isolate());
256 Context::Scope context_scope(env()->context());
257 uv_handle_type type = UV_UNKNOWN_HANDLE;
258
259 if (is_named_pipe_ipc() &&
260 uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
261 type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
262 }
263
264 // We should not be getting this callback if someone has already called
265 // uv_close() on the handle.
266 CHECK_EQ(persistent().IsEmpty(), false);
267
268 if (nread > 0) {
269 MaybeLocal<Object> pending_obj;
270
271 if (type == UV_TCP) {
272 pending_obj = AcceptHandle<TCPWrap>(env(), this);
273 } else if (type == UV_NAMED_PIPE) {
274 pending_obj = AcceptHandle<PipeWrap>(env(), this);
275 } else if (type == UV_UDP) {
276 pending_obj = AcceptHandle<UDPWrap>(env(), this);
277 } else {
278 CHECK_EQ(type, UV_UNKNOWN_HANDLE);
279 }
280
281 Local<Object> local_pending_obj;
282 if (type != UV_UNKNOWN_HANDLE &&
283 (!pending_obj.ToLocal(&local_pending_obj) ||
284 object()
285 ->Set(env()->context(),
286 env()->pending_handle_string(),
287 local_pending_obj)
288 .IsNothing())) {
289 return Nothing<void>();
290 }
291 }
292
293 EmitRead(nread, *buf);
294 return JustVoid();
295 }
296
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)297 void LibuvStreamWrap::GetWriteQueueSize(
298 const FunctionCallbackInfo<Value>& info) {
299 LibuvStreamWrap* wrap;
300 ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
301
302 if (wrap->stream() == nullptr) {
303 info.GetReturnValue().Set(0);
304 return;
305 }
306
307 uint32_t write_queue_size = wrap->stream()->write_queue_size;
308 info.GetReturnValue().Set(write_queue_size);
309 }
310
311
SetBlocking(const FunctionCallbackInfo<Value> & args)312 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
313 LibuvStreamWrap* wrap;
314 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
315
316 CHECK_GT(args.Length(), 0);
317 if (!wrap->IsAlive())
318 return args.GetReturnValue().Set(UV_EINVAL);
319
320 bool enable = args[0]->IsTrue();
321 args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
322 }
323
324 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
325 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
326
CreateShutdownWrap(Local<Object> object)327 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
328 return new LibuvShutdownWrap(this, object);
329 }
330
CreateWriteWrap(Local<Object> object)331 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
332 return new LibuvWriteWrap(this, object);
333 }
334
335
DoShutdown(ShutdownWrap * req_wrap_)336 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
337 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
338 return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
339 }
340
341
AfterUvShutdown(uv_shutdown_t * req,int status)342 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
343 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
344 LibuvShutdownWrap::from_req(req));
345 CHECK_NOT_NULL(req_wrap);
346 HandleScope scope(req_wrap->env()->isolate());
347 Context::Scope context_scope(req_wrap->env()->context());
348 req_wrap->Done(status);
349 }
350
351
352 // NOTE: Call to this function could change both `buf`'s and `count`'s
353 // values, shifting their base and decrementing their length. This is
354 // required in order to skip the data that was successfully written via
355 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)356 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
357 int err;
358 size_t written;
359 uv_buf_t* vbufs = *bufs;
360 size_t vcount = *count;
361
362 err = uv_try_write(stream(), vbufs, vcount);
363 if (err == UV_ENOSYS || err == UV_EAGAIN)
364 return 0;
365 if (err < 0)
366 return err;
367
368 // Slice off the buffers: skip all written buffers and slice the one that
369 // was partially written.
370 written = err;
371 for (; vcount > 0; vbufs++, vcount--) {
372 // Slice
373 if (vbufs[0].len > written) {
374 vbufs[0].base += written;
375 vbufs[0].len -= written;
376 written = 0;
377 break;
378
379 // Discard
380 } else {
381 written -= vbufs[0].len;
382 }
383 }
384
385 *bufs = vbufs;
386 *count = vcount;
387
388 return 0;
389 }
390
391
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)392 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
393 uv_buf_t* bufs,
394 size_t count,
395 uv_stream_t* send_handle) {
396 LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
397 return w->Dispatch(uv_write2,
398 stream(),
399 bufs,
400 count,
401 send_handle,
402 AfterUvWrite);
403 }
404
405
406
AfterUvWrite(uv_write_t * req,int status)407 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
408 LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
409 LibuvWriteWrap::from_req(req));
410 CHECK_NOT_NULL(req_wrap);
411 HandleScope scope(req_wrap->env()->isolate());
412 Context::Scope context_scope(req_wrap->env()->context());
413 req_wrap->Done(status);
414 }
415
416 } // namespace node
417
418 NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_wrap,
419 node::LibuvStreamWrap::Initialize)
420 NODE_BINDING_EXTERNAL_REFERENCE(
421 stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)
422