• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21 
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24 
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap-inl.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util-inl.h"
33 
34 #include <cstring>  // memcpy()
35 #include <climits>  // INT_MAX
36 
37 
38 namespace node {
39 
40 using v8::Context;
41 using v8::DontDelete;
42 using v8::EscapableHandleScope;
43 using v8::FunctionCallbackInfo;
44 using v8::FunctionTemplate;
45 using v8::HandleScope;
46 using v8::Local;
47 using v8::MaybeLocal;
48 using v8::Object;
49 using v8::PropertyAttribute;
50 using v8::ReadOnly;
51 using v8::Signature;
52 using v8::Value;
53 
54 
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)55 void LibuvStreamWrap::Initialize(Local<Object> target,
56                                  Local<Value> unused,
57                                  Local<Context> context,
58                                  void* priv) {
59   Environment* env = Environment::GetCurrent(context);
60 
61   auto is_construct_call_callback =
62       [](const FunctionCallbackInfo<Value>& args) {
63     CHECK(args.IsConstructCall());
64     StreamReq::ResetObject(args.This());
65   };
66   Local<FunctionTemplate> sw =
67       FunctionTemplate::New(env->isolate(), is_construct_call_callback);
68   sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
69 
70   // we need to set handle and callback to null,
71   // so that those fields are created and functions
72   // do not become megamorphic
73   // Fields:
74   // - oncomplete
75   // - callback
76   // - handle
77   sw->InstanceTemplate()->Set(
78       env->oncomplete_string(),
79       v8::Null(env->isolate()));
80   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "callback"),
81       v8::Null(env->isolate()));
82   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "handle"),
83       v8::Null(env->isolate()));
84 
85   sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
86 
87   env->SetConstructorFunction(target, "ShutdownWrap", sw);
88   env->set_shutdown_wrap_template(sw->InstanceTemplate());
89 
90   Local<FunctionTemplate> ww =
91       FunctionTemplate::New(env->isolate(), is_construct_call_callback);
92   ww->InstanceTemplate()->SetInternalFieldCount(
93       StreamReq::kInternalFieldCount);
94   ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
95   env->SetConstructorFunction(target, "WriteWrap", ww);
96   env->set_write_wrap_template(ww->InstanceTemplate());
97 
98   NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
99   NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
100   NODE_DEFINE_CONSTANT(target, kBytesWritten);
101   NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
102   target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
103               env->stream_base_state().GetJSArray()).Check();
104 }
105 
106 
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)107 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
108                                  Local<Object> object,
109                                  uv_stream_t* stream,
110                                  AsyncWrap::ProviderType provider)
111     : HandleWrap(env,
112                  object,
113                  reinterpret_cast<uv_handle_t*>(stream),
114                  provider),
115       StreamBase(env),
116       stream_(stream) {
117   StreamBase::AttachToObject(object);
118 }
119 
120 
GetConstructorTemplate(Environment * env)121 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
122     Environment* env) {
123   Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
124   if (tmpl.IsEmpty()) {
125     tmpl = env->NewFunctionTemplate(nullptr);
126     tmpl->SetClassName(
127         FIXED_ONE_BYTE_STRING(env->isolate(), "LibuvStreamWrap"));
128     tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
129     tmpl->InstanceTemplate()->SetInternalFieldCount(
130         StreamBase::kInternalFieldCount);
131     Local<FunctionTemplate> get_write_queue_size =
132         FunctionTemplate::New(env->isolate(),
133                               GetWriteQueueSize,
134                               Local<Value>(),
135                               Signature::New(env->isolate(), tmpl));
136     tmpl->PrototypeTemplate()->SetAccessorProperty(
137         env->write_queue_size_string(),
138         get_write_queue_size,
139         Local<FunctionTemplate>(),
140         static_cast<PropertyAttribute>(ReadOnly | DontDelete));
141     env->SetProtoMethod(tmpl, "setBlocking", SetBlocking);
142     StreamBase::AddMethods(env, tmpl);
143     env->set_libuv_stream_wrap_ctor_template(tmpl);
144   }
145   return tmpl;
146 }
147 
148 
From(Environment * env,Local<Object> object)149 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
150   Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
151   CHECK(!sw.IsEmpty() && sw->HasInstance(object));
152   return Unwrap<LibuvStreamWrap>(object);
153 }
154 
155 
GetFD()156 int LibuvStreamWrap::GetFD() {
157 #ifdef _WIN32
158   return fd_;
159 #else
160   int fd = -1;
161   if (stream() != nullptr)
162     uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
163   return fd;
164 #endif
165 }
166 
167 
IsAlive()168 bool LibuvStreamWrap::IsAlive() {
169   return HandleWrap::IsAlive(this);
170 }
171 
172 
IsClosing()173 bool LibuvStreamWrap::IsClosing() {
174   return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
175 }
176 
177 
GetAsyncWrap()178 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
179   return static_cast<AsyncWrap*>(this);
180 }
181 
182 
IsIPCPipe()183 bool LibuvStreamWrap::IsIPCPipe() {
184   return is_named_pipe_ipc();
185 }
186 
187 
ReadStart()188 int LibuvStreamWrap::ReadStart() {
189   return uv_read_start(stream(), [](uv_handle_t* handle,
190                                     size_t suggested_size,
191                                     uv_buf_t* buf) {
192     static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
193   }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
194     static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
195   });
196 }
197 
198 
ReadStop()199 int LibuvStreamWrap::ReadStop() {
200   return uv_read_stop(stream());
201 }
202 
203 
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)204 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
205   HandleScope scope(env()->isolate());
206   Context::Scope context_scope(env()->context());
207 
208   *buf = EmitAlloc(suggested_size);
209 }
210 
211 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)212 static MaybeLocal<Object> AcceptHandle(Environment* env,
213                                        LibuvStreamWrap* parent) {
214   static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
215                 std::is_base_of<UDPWrap, WrapType>::value,
216                 "Can only accept stream handles");
217 
218   EscapableHandleScope scope(env->isolate());
219   Local<Object> wrap_obj;
220 
221   if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
222     return Local<Object>();
223 
224   HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
225   CHECK_NOT_NULL(wrap);
226   uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
227   CHECK_NOT_NULL(stream);
228 
229   if (uv_accept(parent->stream(), stream))
230     ABORT();
231 
232   return scope.Escape(wrap_obj);
233 }
234 
235 
OnUvRead(ssize_t nread,const uv_buf_t * buf)236 void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
237   HandleScope scope(env()->isolate());
238   Context::Scope context_scope(env()->context());
239   uv_handle_type type = UV_UNKNOWN_HANDLE;
240 
241   if (is_named_pipe_ipc() &&
242       uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
243     type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
244   }
245 
246   // We should not be getting this callback if someone has already called
247   // uv_close() on the handle.
248   CHECK_EQ(persistent().IsEmpty(), false);
249 
250   if (nread > 0) {
251     MaybeLocal<Object> pending_obj;
252 
253     if (type == UV_TCP) {
254       pending_obj = AcceptHandle<TCPWrap>(env(), this);
255     } else if (type == UV_NAMED_PIPE) {
256       pending_obj = AcceptHandle<PipeWrap>(env(), this);
257     } else if (type == UV_UDP) {
258       pending_obj = AcceptHandle<UDPWrap>(env(), this);
259     } else {
260       CHECK_EQ(type, UV_UNKNOWN_HANDLE);
261     }
262 
263     if (!pending_obj.IsEmpty()) {
264       object()
265           ->Set(env()->context(),
266                 env()->pending_handle_string(),
267                 pending_obj.ToLocalChecked())
268           .Check();
269     }
270   }
271 
272   EmitRead(nread, *buf);
273 }
274 
275 
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)276 void LibuvStreamWrap::GetWriteQueueSize(
277     const FunctionCallbackInfo<Value>& info) {
278   LibuvStreamWrap* wrap;
279   ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
280 
281   if (wrap->stream() == nullptr) {
282     info.GetReturnValue().Set(0);
283     return;
284   }
285 
286   uint32_t write_queue_size = wrap->stream()->write_queue_size;
287   info.GetReturnValue().Set(write_queue_size);
288 }
289 
290 
SetBlocking(const FunctionCallbackInfo<Value> & args)291 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
292   LibuvStreamWrap* wrap;
293   ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
294 
295   CHECK_GT(args.Length(), 0);
296   if (!wrap->IsAlive())
297     return args.GetReturnValue().Set(UV_EINVAL);
298 
299   bool enable = args[0]->IsTrue();
300   args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
301 }
302 
303 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
304 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
305 
CreateShutdownWrap(Local<Object> object)306 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
307   return new LibuvShutdownWrap(this, object);
308 }
309 
CreateWriteWrap(Local<Object> object)310 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
311   return new LibuvWriteWrap(this, object);
312 }
313 
314 
DoShutdown(ShutdownWrap * req_wrap_)315 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
316   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
317   return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
318 }
319 
320 
AfterUvShutdown(uv_shutdown_t * req,int status)321 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
322   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
323       LibuvShutdownWrap::from_req(req));
324   CHECK_NOT_NULL(req_wrap);
325   HandleScope scope(req_wrap->env()->isolate());
326   Context::Scope context_scope(req_wrap->env()->context());
327   req_wrap->Done(status);
328 }
329 
330 
331 // NOTE: Call to this function could change both `buf`'s and `count`'s
332 // values, shifting their base and decrementing their length. This is
333 // required in order to skip the data that was successfully written via
334 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)335 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
336   int err;
337   size_t written;
338   uv_buf_t* vbufs = *bufs;
339   size_t vcount = *count;
340 
341   err = uv_try_write(stream(), vbufs, vcount);
342   if (err == UV_ENOSYS || err == UV_EAGAIN)
343     return 0;
344   if (err < 0)
345     return err;
346 
347   // Slice off the buffers: skip all written buffers and slice the one that
348   // was partially written.
349   written = err;
350   for (; vcount > 0; vbufs++, vcount--) {
351     // Slice
352     if (vbufs[0].len > written) {
353       vbufs[0].base += written;
354       vbufs[0].len -= written;
355       written = 0;
356       break;
357 
358     // Discard
359     } else {
360       written -= vbufs[0].len;
361     }
362   }
363 
364   *bufs = vbufs;
365   *count = vcount;
366 
367   return 0;
368 }
369 
370 
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)371 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
372                              uv_buf_t* bufs,
373                              size_t count,
374                              uv_stream_t* send_handle) {
375   LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
376   return w->Dispatch(uv_write2,
377                      stream(),
378                      bufs,
379                      count,
380                      send_handle,
381                      AfterUvWrite);
382 }
383 
384 
385 
AfterUvWrite(uv_write_t * req,int status)386 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
387   LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
388       LibuvWriteWrap::from_req(req));
389   CHECK_NOT_NULL(req_wrap);
390   HandleScope scope(req_wrap->env()->isolate());
391   Context::Scope context_scope(req_wrap->env()->context());
392   req_wrap->Done(status);
393 }
394 
395 }  // namespace node
396 
397 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_wrap,
398                                    node::LibuvStreamWrap::Initialize)
399