• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21 
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24 
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap-inl.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util-inl.h"
33 
34 #include <cstring>  // memcpy()
35 #include <climits>  // INT_MAX
36 
37 
38 namespace node {
39 
40 using v8::Context;
41 using v8::DontDelete;
42 using v8::EscapableHandleScope;
43 using v8::FunctionCallbackInfo;
44 using v8::FunctionTemplate;
45 using v8::HandleScope;
46 using v8::Local;
47 using v8::MaybeLocal;
48 using v8::Object;
49 using v8::ReadOnly;
50 using v8::Signature;
51 using v8::Value;
52 
53 
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)54 void LibuvStreamWrap::Initialize(Local<Object> target,
55                                  Local<Value> unused,
56                                  Local<Context> context,
57                                  void* priv) {
58   Environment* env = Environment::GetCurrent(context);
59 
60   auto is_construct_call_callback =
61       [](const FunctionCallbackInfo<Value>& args) {
62     CHECK(args.IsConstructCall());
63     StreamReq::ResetObject(args.This());
64   };
65   Local<FunctionTemplate> sw =
66       FunctionTemplate::New(env->isolate(), is_construct_call_callback);
67   sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
68   Local<String> wrapString =
69       FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
70   sw->SetClassName(wrapString);
71 
72   // we need to set handle and callback to null,
73   // so that those fields are created and functions
74   // do not become megamorphic
75   // Fields:
76   // - oncomplete
77   // - callback
78   // - handle
79   sw->InstanceTemplate()->Set(
80       env->oncomplete_string(),
81       v8::Null(env->isolate()));
82   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
83       v8::Null(env->isolate()));
84   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
85       v8::Null(env->isolate()));
86 
87   sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
88 
89   target->Set(env->context(),
90               wrapString,
91               sw->GetFunction(env->context()).ToLocalChecked()).Check();
92   env->set_shutdown_wrap_template(sw->InstanceTemplate());
93 
94   Local<FunctionTemplate> ww =
95       FunctionTemplate::New(env->isolate(), is_construct_call_callback);
96   ww->InstanceTemplate()->SetInternalFieldCount(
97       StreamReq::kInternalFieldCount);
98   Local<String> writeWrapString =
99       FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
100   ww->SetClassName(writeWrapString);
101   ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
102   target->Set(env->context(),
103               writeWrapString,
104               ww->GetFunction(env->context()).ToLocalChecked()).Check();
105   env->set_write_wrap_template(ww->InstanceTemplate());
106 
107   NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
108   NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
109   NODE_DEFINE_CONSTANT(target, kBytesWritten);
110   NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
111   target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
112               env->stream_base_state().GetJSArray()).Check();
113 }
114 
115 
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)116 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
117                                  Local<Object> object,
118                                  uv_stream_t* stream,
119                                  AsyncWrap::ProviderType provider)
120     : HandleWrap(env,
121                  object,
122                  reinterpret_cast<uv_handle_t*>(stream),
123                  provider),
124       StreamBase(env),
125       stream_(stream) {
126   StreamBase::AttachToObject(object);
127 }
128 
129 
GetConstructorTemplate(Environment * env)130 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
131     Environment* env) {
132   Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
133   if (tmpl.IsEmpty()) {
134     tmpl = env->NewFunctionTemplate(nullptr);
135     tmpl->SetClassName(
136         FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
137     tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
138     tmpl->InstanceTemplate()->SetInternalFieldCount(
139         StreamBase::kInternalFieldCount);
140     Local<FunctionTemplate> get_write_queue_size =
141         FunctionTemplate::New(env->isolate(),
142                               GetWriteQueueSize,
143                               env->as_callback_data(),
144                               Signature::New(env->isolate(), tmpl));
145     tmpl->PrototypeTemplate()->SetAccessorProperty(
146         env->write_queue_size_string(),
147         get_write_queue_size,
148         Local<FunctionTemplate>(),
149         static_cast<PropertyAttribute>(ReadOnly | DontDelete));
150     env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
151     StreamBase::AddMethods(env, tmpl);
152     env->set_libuv_stream_wrap_ctor_template(tmpl);
153   }
154   return tmpl;
155 }
156 
157 
From(Environment * env,Local<Object> object)158 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
159   Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
160   CHECK(!sw.IsEmpty() && sw->HasInstance(object));
161   return Unwrap<LibuvStreamWrap>(object);
162 }
163 
164 
GetFD()165 int LibuvStreamWrap::GetFD() {
166 #ifdef _WIN32
167   return fd_;
168 #else
169   int fd = -1;
170   if (stream() != nullptr)
171     uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
172   return fd;
173 #endif
174 }
175 
176 
IsAlive()177 bool LibuvStreamWrap::IsAlive() {
178   return HandleWrap::IsAlive(this);
179 }
180 
181 
IsClosing()182 bool LibuvStreamWrap::IsClosing() {
183   return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
184 }
185 
186 
GetAsyncWrap()187 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
188   return static_cast<AsyncWrap*>(this);
189 }
190 
191 
IsIPCPipe()192 bool LibuvStreamWrap::IsIPCPipe() {
193   return is_named_pipe_ipc();
194 }
195 
196 
ReadStart()197 int LibuvStreamWrap::ReadStart() {
198   return uv_read_start(stream(), [](uv_handle_t* handle,
199                                     size_t suggested_size,
200                                     uv_buf_t* buf) {
201     static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
202   }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
203     static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
204   });
205 }
206 
207 
ReadStop()208 int LibuvStreamWrap::ReadStop() {
209   return uv_read_stop(stream());
210 }
211 
212 
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)213 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
214   HandleScope scope(env()->isolate());
215   Context::Scope context_scope(env()->context());
216 
217   *buf = EmitAlloc(suggested_size);
218 }
219 
220 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)221 static MaybeLocal<Object> AcceptHandle(Environment* env,
222                                        LibuvStreamWrap* parent) {
223   static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
224                 std::is_base_of<UDPWrap, WrapType>::value,
225                 "Can only accept stream handles");
226 
227   EscapableHandleScope scope(env->isolate());
228   Local<Object> wrap_obj;
229 
230   if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
231     return Local<Object>();
232 
233   HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
234   CHECK_NOT_NULL(wrap);
235   uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
236   CHECK_NOT_NULL(stream);
237 
238   if (uv_accept(parent->stream(), stream))
239     ABORT();
240 
241   return scope.Escape(wrap_obj);
242 }
243 
244 
OnUvRead(ssize_t nread,const uv_buf_t * buf)245 void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
246   HandleScope scope(env()->isolate());
247   Context::Scope context_scope(env()->context());
248   uv_handle_type type = UV_UNKNOWN_HANDLE;
249 
250   if (is_named_pipe_ipc() &&
251       uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
252     type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
253   }
254 
255   // We should not be getting this callback if someone has already called
256   // uv_close() on the handle.
257   CHECK_EQ(persistent().IsEmpty(), false);
258 
259   if (nread > 0) {
260     MaybeLocal<Object> pending_obj;
261 
262     if (type == UV_TCP) {
263       pending_obj = AcceptHandle<TCPWrap>(env(), this);
264     } else if (type == UV_NAMED_PIPE) {
265       pending_obj = AcceptHandle<PipeWrap>(env(), this);
266     } else if (type == UV_UDP) {
267       pending_obj = AcceptHandle<UDPWrap>(env(), this);
268     } else {
269       CHECK_EQ(type, UV_UNKNOWN_HANDLE);
270     }
271 
272     if (!pending_obj.IsEmpty()) {
273       object()
274           ->Set(env()->context(),
275                 env()->pending_handle_string(),
276                 pending_obj.ToLocalChecked())
277           .Check();
278     }
279   }
280 
281   EmitRead(nread, *buf);
282 }
283 
284 
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)285 void LibuvStreamWrap::GetWriteQueueSize(
286     const FunctionCallbackInfo<Value>& info) {
287   LibuvStreamWrap* wrap;
288   ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
289 
290   if (wrap->stream() == nullptr) {
291     info.GetReturnValue().Set(0);
292     return;
293   }
294 
295   uint32_t write_queue_size = wrap->stream()->write_queue_size;
296   info.GetReturnValue().Set(write_queue_size);
297 }
298 
299 
SetBlocking(const FunctionCallbackInfo<Value> & args)300 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
301   LibuvStreamWrap* wrap;
302   ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
303 
304   CHECK_GT(args.Length(), 0);
305   if (!wrap->IsAlive())
306     return args.GetReturnValue().Set(UV_EINVAL);
307 
308   bool enable = args[0]->IsTrue();
309   args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
310 }
311 
312 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
313 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
314 
CreateShutdownWrap(Local<Object> object)315 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
316   return new LibuvShutdownWrap(this, object);
317 }
318 
CreateWriteWrap(Local<Object> object)319 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
320   return new LibuvWriteWrap(this, object);
321 }
322 
323 
DoShutdown(ShutdownWrap * req_wrap_)324 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
325   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
326   return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
327 }
328 
329 
AfterUvShutdown(uv_shutdown_t * req,int status)330 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
331   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
332       LibuvShutdownWrap::from_req(req));
333   CHECK_NOT_NULL(req_wrap);
334   HandleScope scope(req_wrap->env()->isolate());
335   Context::Scope context_scope(req_wrap->env()->context());
336   req_wrap->Done(status);
337 }
338 
339 
340 // NOTE: Call to this function could change both `buf`'s and `count`'s
341 // values, shifting their base and decrementing their length. This is
342 // required in order to skip the data that was successfully written via
343 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)344 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
345   int err;
346   size_t written;
347   uv_buf_t* vbufs = *bufs;
348   size_t vcount = *count;
349 
350   err = uv_try_write(stream(), vbufs, vcount);
351   if (err == UV_ENOSYS || err == UV_EAGAIN)
352     return 0;
353   if (err < 0)
354     return err;
355 
356   // Slice off the buffers: skip all written buffers and slice the one that
357   // was partially written.
358   written = err;
359   for (; vcount > 0; vbufs++, vcount--) {
360     // Slice
361     if (vbufs[0].len > written) {
362       vbufs[0].base += written;
363       vbufs[0].len -= written;
364       written = 0;
365       break;
366 
367     // Discard
368     } else {
369       written -= vbufs[0].len;
370     }
371   }
372 
373   *bufs = vbufs;
374   *count = vcount;
375 
376   return 0;
377 }
378 
379 
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)380 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
381                              uv_buf_t* bufs,
382                              size_t count,
383                              uv_stream_t* send_handle) {
384   LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
385   return w->Dispatch(uv_write2,
386                      stream(),
387                      bufs,
388                      count,
389                      send_handle,
390                      AfterUvWrite);
391 }
392 
393 
394 
AfterUvWrite(uv_write_t * req,int status)395 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
396   LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
397       LibuvWriteWrap::from_req(req));
398   CHECK_NOT_NULL(req_wrap);
399   HandleScope scope(req_wrap->env()->isolate());
400   Context::Scope context_scope(req_wrap->env()->context());
401   req_wrap->Done(status);
402 }
403 
404 }  // namespace node
405 
406 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
407                                    node::LibuvStreamWrap::Initialize)
408