1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap-inl.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util-inl.h"
33
34 #include <cstring> // memcpy()
35 #include <climits> // INT_MAX
36
37
38 namespace node {
39
40 using v8::Context;
41 using v8::DontDelete;
42 using v8::EscapableHandleScope;
43 using v8::FunctionCallbackInfo;
44 using v8::FunctionTemplate;
45 using v8::HandleScope;
46 using v8::Local;
47 using v8::MaybeLocal;
48 using v8::Object;
49 using v8::ReadOnly;
50 using v8::Signature;
51 using v8::Value;
52
53
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)54 void LibuvStreamWrap::Initialize(Local<Object> target,
55 Local<Value> unused,
56 Local<Context> context,
57 void* priv) {
58 Environment* env = Environment::GetCurrent(context);
59
60 auto is_construct_call_callback =
61 [](const FunctionCallbackInfo<Value>& args) {
62 CHECK(args.IsConstructCall());
63 StreamReq::ResetObject(args.This());
64 };
65 Local<FunctionTemplate> sw =
66 FunctionTemplate::New(env->isolate(), is_construct_call_callback);
67 sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
68 Local<String> wrapString =
69 FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
70 sw->SetClassName(wrapString);
71
72 // we need to set handle and callback to null,
73 // so that those fields are created and functions
74 // do not become megamorphic
75 // Fields:
76 // - oncomplete
77 // - callback
78 // - handle
79 sw->InstanceTemplate()->Set(
80 env->oncomplete_string(),
81 v8::Null(env->isolate()));
82 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
83 v8::Null(env->isolate()));
84 sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
85 v8::Null(env->isolate()));
86
87 sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
88
89 target->Set(env->context(),
90 wrapString,
91 sw->GetFunction(env->context()).ToLocalChecked()).Check();
92 env->set_shutdown_wrap_template(sw->InstanceTemplate());
93
94 Local<FunctionTemplate> ww =
95 FunctionTemplate::New(env->isolate(), is_construct_call_callback);
96 ww->InstanceTemplate()->SetInternalFieldCount(
97 StreamReq::kInternalFieldCount);
98 Local<String> writeWrapString =
99 FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
100 ww->SetClassName(writeWrapString);
101 ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
102 target->Set(env->context(),
103 writeWrapString,
104 ww->GetFunction(env->context()).ToLocalChecked()).Check();
105 env->set_write_wrap_template(ww->InstanceTemplate());
106
107 NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
108 NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
109 NODE_DEFINE_CONSTANT(target, kBytesWritten);
110 NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
111 target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
112 env->stream_base_state().GetJSArray()).Check();
113 }
114
115
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)116 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
117 Local<Object> object,
118 uv_stream_t* stream,
119 AsyncWrap::ProviderType provider)
120 : HandleWrap(env,
121 object,
122 reinterpret_cast<uv_handle_t*>(stream),
123 provider),
124 StreamBase(env),
125 stream_(stream) {
126 StreamBase::AttachToObject(object);
127 }
128
129
GetConstructorTemplate(Environment * env)130 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
131 Environment* env) {
132 Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
133 if (tmpl.IsEmpty()) {
134 tmpl = env->NewFunctionTemplate(nullptr);
135 tmpl->SetClassName(
136 FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
137 tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
138 tmpl->InstanceTemplate()->SetInternalFieldCount(
139 StreamBase::kInternalFieldCount);
140 Local<FunctionTemplate> get_write_queue_size =
141 FunctionTemplate::New(env->isolate(),
142 GetWriteQueueSize,
143 env->as_callback_data(),
144 Signature::New(env->isolate(), tmpl));
145 tmpl->PrototypeTemplate()->SetAccessorProperty(
146 env->write_queue_size_string(),
147 get_write_queue_size,
148 Local<FunctionTemplate>(),
149 static_cast<PropertyAttribute>(ReadOnly | DontDelete));
150 env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
151 StreamBase::AddMethods(env, tmpl);
152 env->set_libuv_stream_wrap_ctor_template(tmpl);
153 }
154 return tmpl;
155 }
156
157
From(Environment * env,Local<Object> object)158 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
159 Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
160 CHECK(!sw.IsEmpty() && sw->HasInstance(object));
161 return Unwrap<LibuvStreamWrap>(object);
162 }
163
164
GetFD()165 int LibuvStreamWrap::GetFD() {
166 #ifdef _WIN32
167 return fd_;
168 #else
169 int fd = -1;
170 if (stream() != nullptr)
171 uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
172 return fd;
173 #endif
174 }
175
176
IsAlive()177 bool LibuvStreamWrap::IsAlive() {
178 return HandleWrap::IsAlive(this);
179 }
180
181
IsClosing()182 bool LibuvStreamWrap::IsClosing() {
183 return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
184 }
185
186
GetAsyncWrap()187 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
188 return static_cast<AsyncWrap*>(this);
189 }
190
191
IsIPCPipe()192 bool LibuvStreamWrap::IsIPCPipe() {
193 return is_named_pipe_ipc();
194 }
195
196
ReadStart()197 int LibuvStreamWrap::ReadStart() {
198 return uv_read_start(stream(), [](uv_handle_t* handle,
199 size_t suggested_size,
200 uv_buf_t* buf) {
201 static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
202 }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
203 static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
204 });
205 }
206
207
ReadStop()208 int LibuvStreamWrap::ReadStop() {
209 return uv_read_stop(stream());
210 }
211
212
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)213 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
214 HandleScope scope(env()->isolate());
215 Context::Scope context_scope(env()->context());
216
217 *buf = EmitAlloc(suggested_size);
218 }
219
220 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)221 static MaybeLocal<Object> AcceptHandle(Environment* env,
222 LibuvStreamWrap* parent) {
223 static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
224 std::is_base_of<UDPWrap, WrapType>::value,
225 "Can only accept stream handles");
226
227 EscapableHandleScope scope(env->isolate());
228 Local<Object> wrap_obj;
229
230 if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
231 return Local<Object>();
232
233 HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
234 CHECK_NOT_NULL(wrap);
235 uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
236 CHECK_NOT_NULL(stream);
237
238 if (uv_accept(parent->stream(), stream))
239 ABORT();
240
241 return scope.Escape(wrap_obj);
242 }
243
244
OnUvRead(ssize_t nread,const uv_buf_t * buf)245 void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
246 HandleScope scope(env()->isolate());
247 Context::Scope context_scope(env()->context());
248 uv_handle_type type = UV_UNKNOWN_HANDLE;
249
250 if (is_named_pipe_ipc() &&
251 uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
252 type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
253 }
254
255 // We should not be getting this callback if someone has already called
256 // uv_close() on the handle.
257 CHECK_EQ(persistent().IsEmpty(), false);
258
259 if (nread > 0) {
260 MaybeLocal<Object> pending_obj;
261
262 if (type == UV_TCP) {
263 pending_obj = AcceptHandle<TCPWrap>(env(), this);
264 } else if (type == UV_NAMED_PIPE) {
265 pending_obj = AcceptHandle<PipeWrap>(env(), this);
266 } else if (type == UV_UDP) {
267 pending_obj = AcceptHandle<UDPWrap>(env(), this);
268 } else {
269 CHECK_EQ(type, UV_UNKNOWN_HANDLE);
270 }
271
272 if (!pending_obj.IsEmpty()) {
273 object()
274 ->Set(env()->context(),
275 env()->pending_handle_string(),
276 pending_obj.ToLocalChecked())
277 .Check();
278 }
279 }
280
281 EmitRead(nread, *buf);
282 }
283
284
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)285 void LibuvStreamWrap::GetWriteQueueSize(
286 const FunctionCallbackInfo<Value>& info) {
287 LibuvStreamWrap* wrap;
288 ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
289
290 if (wrap->stream() == nullptr) {
291 info.GetReturnValue().Set(0);
292 return;
293 }
294
295 uint32_t write_queue_size = wrap->stream()->write_queue_size;
296 info.GetReturnValue().Set(write_queue_size);
297 }
298
299
SetBlocking(const FunctionCallbackInfo<Value> & args)300 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
301 LibuvStreamWrap* wrap;
302 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
303
304 CHECK_GT(args.Length(), 0);
305 if (!wrap->IsAlive())
306 return args.GetReturnValue().Set(UV_EINVAL);
307
308 bool enable = args[0]->IsTrue();
309 args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
310 }
311
312 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
313 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
314
CreateShutdownWrap(Local<Object> object)315 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
316 return new LibuvShutdownWrap(this, object);
317 }
318
CreateWriteWrap(Local<Object> object)319 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
320 return new LibuvWriteWrap(this, object);
321 }
322
323
DoShutdown(ShutdownWrap * req_wrap_)324 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
325 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
326 return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
327 }
328
329
AfterUvShutdown(uv_shutdown_t * req,int status)330 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
331 LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
332 LibuvShutdownWrap::from_req(req));
333 CHECK_NOT_NULL(req_wrap);
334 HandleScope scope(req_wrap->env()->isolate());
335 Context::Scope context_scope(req_wrap->env()->context());
336 req_wrap->Done(status);
337 }
338
339
340 // NOTE: Call to this function could change both `buf`'s and `count`'s
341 // values, shifting their base and decrementing their length. This is
342 // required in order to skip the data that was successfully written via
343 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)344 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
345 int err;
346 size_t written;
347 uv_buf_t* vbufs = *bufs;
348 size_t vcount = *count;
349
350 err = uv_try_write(stream(), vbufs, vcount);
351 if (err == UV_ENOSYS || err == UV_EAGAIN)
352 return 0;
353 if (err < 0)
354 return err;
355
356 // Slice off the buffers: skip all written buffers and slice the one that
357 // was partially written.
358 written = err;
359 for (; vcount > 0; vbufs++, vcount--) {
360 // Slice
361 if (vbufs[0].len > written) {
362 vbufs[0].base += written;
363 vbufs[0].len -= written;
364 written = 0;
365 break;
366
367 // Discard
368 } else {
369 written -= vbufs[0].len;
370 }
371 }
372
373 *bufs = vbufs;
374 *count = vcount;
375
376 return 0;
377 }
378
379
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)380 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
381 uv_buf_t* bufs,
382 size_t count,
383 uv_stream_t* send_handle) {
384 LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
385 return w->Dispatch(uv_write2,
386 stream(),
387 bufs,
388 count,
389 send_handle,
390 AfterUvWrite);
391 }
392
393
394
AfterUvWrite(uv_write_t * req,int status)395 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
396 LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
397 LibuvWriteWrap::from_req(req));
398 CHECK_NOT_NULL(req_wrap);
399 HandleScope scope(req_wrap->env()->isolate());
400 Context::Scope context_scope(req_wrap->env()->context());
401 req_wrap->Done(status);
402 }
403
404 } // namespace node
405
406 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
407 node::LibuvStreamWrap::Initialize)
408