• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21 
22 #include "stream_wrap.h"
23 #include "stream_base-inl.h"
24 
25 #include "env-inl.h"
26 #include "handle_wrap.h"
27 #include "node_buffer.h"
28 #include "node_errors.h"
29 #include "node_external_reference.h"
30 #include "pipe_wrap.h"
31 #include "req_wrap-inl.h"
32 #include "tcp_wrap.h"
33 #include "udp_wrap.h"
34 #include "util-inl.h"
35 
36 #include <cstring>  // memcpy()
37 #include <climits>  // INT_MAX
38 
39 
40 namespace node {
41 
42 using errors::TryCatchScope;
43 using v8::Context;
44 using v8::DontDelete;
45 using v8::EscapableHandleScope;
46 using v8::FunctionCallbackInfo;
47 using v8::FunctionTemplate;
48 using v8::HandleScope;
49 using v8::Isolate;
50 using v8::JustVoid;
51 using v8::Local;
52 using v8::Maybe;
53 using v8::MaybeLocal;
54 using v8::Nothing;
55 using v8::Object;
56 using v8::PropertyAttribute;
57 using v8::ReadOnly;
58 using v8::Signature;
59 using v8::Value;
60 
IsConstructCallCallback(const FunctionCallbackInfo<Value> & args)61 void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
62   CHECK(args.IsConstructCall());
63   StreamReq::ResetObject(args.This());
64 }
65 
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)66 void LibuvStreamWrap::Initialize(Local<Object> target,
67                                  Local<Value> unused,
68                                  Local<Context> context,
69                                  void* priv) {
70   Environment* env = Environment::GetCurrent(context);
71   Isolate* isolate = env->isolate();
72 
73   Local<FunctionTemplate> sw =
74       NewFunctionTemplate(isolate, IsConstructCallCallback);
75   sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
76 
77   // we need to set handle and callback to null,
78   // so that those fields are created and functions
79   // do not become megamorphic
80   // Fields:
81   // - oncomplete
82   // - callback
83   // - handle
84   sw->InstanceTemplate()->Set(env->oncomplete_string(), v8::Null(isolate));
85   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "callback"),
86                               v8::Null(isolate));
87   sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "handle"),
88                               v8::Null(isolate));
89 
90   sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
91 
92   SetConstructorFunction(context, target, "ShutdownWrap", sw);
93   env->set_shutdown_wrap_template(sw->InstanceTemplate());
94 
95   Local<FunctionTemplate> ww =
96       FunctionTemplate::New(isolate, IsConstructCallCallback);
97   ww->InstanceTemplate()->SetInternalFieldCount(
98       StreamReq::kInternalFieldCount);
99   ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
100   SetConstructorFunction(context, target, "WriteWrap", ww);
101   env->set_write_wrap_template(ww->InstanceTemplate());
102 
103   NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
104   NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
105   NODE_DEFINE_CONSTANT(target, kBytesWritten);
106   NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
107   target
108       ->Set(context,
109             FIXED_ONE_BYTE_STRING(isolate, "streamBaseState"),
110             env->stream_base_state().GetJSArray())
111       .Check();
112 }
113 
RegisterExternalReferences(ExternalReferenceRegistry * registry)114 void LibuvStreamWrap::RegisterExternalReferences(
115     ExternalReferenceRegistry* registry) {
116   registry->Register(IsConstructCallCallback);
117   registry->Register(GetWriteQueueSize);
118   registry->Register(SetBlocking);
119   StreamBase::RegisterExternalReferences(registry);
120 }
121 
LibuvStreamWrap(Environment * env,Local<Object> object,uv_stream_t * stream,AsyncWrap::ProviderType provider)122 LibuvStreamWrap::LibuvStreamWrap(Environment* env,
123                                  Local<Object> object,
124                                  uv_stream_t* stream,
125                                  AsyncWrap::ProviderType provider)
126     : HandleWrap(env,
127                  object,
128                  reinterpret_cast<uv_handle_t*>(stream),
129                  provider),
130       StreamBase(env),
131       stream_(stream) {
132   StreamBase::AttachToObject(object);
133 }
134 
135 
GetConstructorTemplate(Environment * env)136 Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
137     Environment* env) {
138   Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
139   if (tmpl.IsEmpty()) {
140     Isolate* isolate = env->isolate();
141     tmpl = NewFunctionTemplate(isolate, nullptr);
142     tmpl->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "LibuvStreamWrap"));
143     tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
144     tmpl->InstanceTemplate()->SetInternalFieldCount(
145         StreamBase::kInternalFieldCount);
146     Local<FunctionTemplate> get_write_queue_size =
147         FunctionTemplate::New(isolate,
148                               GetWriteQueueSize,
149                               Local<Value>(),
150                               Signature::New(isolate, tmpl));
151     tmpl->PrototypeTemplate()->SetAccessorProperty(
152         env->write_queue_size_string(),
153         get_write_queue_size,
154         Local<FunctionTemplate>(),
155         static_cast<PropertyAttribute>(ReadOnly | DontDelete));
156     SetProtoMethod(isolate, tmpl, "setBlocking", SetBlocking);
157     StreamBase::AddMethods(env, tmpl);
158     env->set_libuv_stream_wrap_ctor_template(tmpl);
159   }
160   return tmpl;
161 }
162 
163 
From(Environment * env,Local<Object> object)164 LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
165   Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
166   CHECK(!sw.IsEmpty() && sw->HasInstance(object));
167   return Unwrap<LibuvStreamWrap>(object);
168 }
169 
170 
GetFD()171 int LibuvStreamWrap::GetFD() {
172 #ifdef _WIN32
173   return fd_;
174 #else
175   int fd = -1;
176   if (stream() != nullptr)
177     uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
178   return fd;
179 #endif
180 }
181 
182 
IsAlive()183 bool LibuvStreamWrap::IsAlive() {
184   return HandleWrap::IsAlive(this);
185 }
186 
187 
IsClosing()188 bool LibuvStreamWrap::IsClosing() {
189   return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
190 }
191 
192 
GetAsyncWrap()193 AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
194   return static_cast<AsyncWrap*>(this);
195 }
196 
197 
IsIPCPipe()198 bool LibuvStreamWrap::IsIPCPipe() {
199   return is_named_pipe_ipc();
200 }
201 
ReadStart()202 int LibuvStreamWrap::ReadStart() {
203   return uv_read_start(
204       stream(),
205       [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
206         static_cast<LibuvStreamWrap*>(handle->data)
207             ->OnUvAlloc(suggested_size, buf);
208       },
209       [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
210         LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(stream->data);
211         TryCatchScope try_catch(wrap->env());
212         try_catch.SetVerbose(true);
213         wrap->OnUvRead(nread, buf);
214       });
215 }
216 
217 
ReadStop()218 int LibuvStreamWrap::ReadStop() {
219   return uv_read_stop(stream());
220 }
221 
222 
OnUvAlloc(size_t suggested_size,uv_buf_t * buf)223 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
224   HandleScope scope(env()->isolate());
225   Context::Scope context_scope(env()->context());
226 
227   *buf = EmitAlloc(suggested_size);
228 }
229 
230 template <class WrapType>
AcceptHandle(Environment * env,LibuvStreamWrap * parent)231 static MaybeLocal<Object> AcceptHandle(Environment* env,
232                                        LibuvStreamWrap* parent) {
233   static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
234                 std::is_base_of<UDPWrap, WrapType>::value,
235                 "Can only accept stream handles");
236 
237   EscapableHandleScope scope(env->isolate());
238   Local<Object> wrap_obj;
239 
240   if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
241     return Local<Object>();
242 
243   HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
244   CHECK_NOT_NULL(wrap);
245   uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
246   CHECK_NOT_NULL(stream);
247 
248   if (uv_accept(parent->stream(), stream))
249     ABORT();
250 
251   return scope.Escape(wrap_obj);
252 }
253 
OnUvRead(ssize_t nread,const uv_buf_t * buf)254 Maybe<void> LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
255   HandleScope scope(env()->isolate());
256   Context::Scope context_scope(env()->context());
257   uv_handle_type type = UV_UNKNOWN_HANDLE;
258 
259   if (is_named_pipe_ipc() &&
260       uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
261     type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
262   }
263 
264   // We should not be getting this callback if someone has already called
265   // uv_close() on the handle.
266   CHECK_EQ(persistent().IsEmpty(), false);
267 
268   if (nread > 0) {
269     MaybeLocal<Object> pending_obj;
270 
271     if (type == UV_TCP) {
272       pending_obj = AcceptHandle<TCPWrap>(env(), this);
273     } else if (type == UV_NAMED_PIPE) {
274       pending_obj = AcceptHandle<PipeWrap>(env(), this);
275     } else if (type == UV_UDP) {
276       pending_obj = AcceptHandle<UDPWrap>(env(), this);
277     } else {
278       CHECK_EQ(type, UV_UNKNOWN_HANDLE);
279     }
280 
281     Local<Object> local_pending_obj;
282     if (type != UV_UNKNOWN_HANDLE &&
283         (!pending_obj.ToLocal(&local_pending_obj) ||
284          object()
285              ->Set(env()->context(),
286                    env()->pending_handle_string(),
287                    local_pending_obj)
288              .IsNothing())) {
289       return Nothing<void>();
290     }
291   }
292 
293   EmitRead(nread, *buf);
294   return JustVoid();
295 }
296 
GetWriteQueueSize(const FunctionCallbackInfo<Value> & info)297 void LibuvStreamWrap::GetWriteQueueSize(
298     const FunctionCallbackInfo<Value>& info) {
299   LibuvStreamWrap* wrap;
300   ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
301 
302   if (wrap->stream() == nullptr) {
303     info.GetReturnValue().Set(0);
304     return;
305   }
306 
307   uint32_t write_queue_size = wrap->stream()->write_queue_size;
308   info.GetReturnValue().Set(write_queue_size);
309 }
310 
311 
SetBlocking(const FunctionCallbackInfo<Value> & args)312 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
313   LibuvStreamWrap* wrap;
314   ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
315 
316   CHECK_GT(args.Length(), 0);
317   if (!wrap->IsAlive())
318     return args.GetReturnValue().Set(UV_EINVAL);
319 
320   bool enable = args[0]->IsTrue();
321   args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
322 }
323 
324 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
325 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
326 
CreateShutdownWrap(Local<Object> object)327 ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
328   return new LibuvShutdownWrap(this, object);
329 }
330 
CreateWriteWrap(Local<Object> object)331 WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
332   return new LibuvWriteWrap(this, object);
333 }
334 
335 
DoShutdown(ShutdownWrap * req_wrap_)336 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
337   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
338   return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
339 }
340 
341 
AfterUvShutdown(uv_shutdown_t * req,int status)342 void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
343   LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
344       LibuvShutdownWrap::from_req(req));
345   CHECK_NOT_NULL(req_wrap);
346   HandleScope scope(req_wrap->env()->isolate());
347   Context::Scope context_scope(req_wrap->env()->context());
348   req_wrap->Done(status);
349 }
350 
351 
352 // NOTE: Call to this function could change both `buf`'s and `count`'s
353 // values, shifting their base and decrementing their length. This is
354 // required in order to skip the data that was successfully written via
355 // uv_try_write().
DoTryWrite(uv_buf_t ** bufs,size_t * count)356 int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
357   int err;
358   size_t written;
359   uv_buf_t* vbufs = *bufs;
360   size_t vcount = *count;
361 
362   err = uv_try_write(stream(), vbufs, vcount);
363   if (err == UV_ENOSYS || err == UV_EAGAIN)
364     return 0;
365   if (err < 0)
366     return err;
367 
368   // Slice off the buffers: skip all written buffers and slice the one that
369   // was partially written.
370   written = err;
371   for (; vcount > 0; vbufs++, vcount--) {
372     // Slice
373     if (vbufs[0].len > written) {
374       vbufs[0].base += written;
375       vbufs[0].len -= written;
376       written = 0;
377       break;
378 
379     // Discard
380     } else {
381       written -= vbufs[0].len;
382     }
383   }
384 
385   *bufs = vbufs;
386   *count = vcount;
387 
388   return 0;
389 }
390 
391 
DoWrite(WriteWrap * req_wrap,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)392 int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
393                              uv_buf_t* bufs,
394                              size_t count,
395                              uv_stream_t* send_handle) {
396   LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
397   return w->Dispatch(uv_write2,
398                      stream(),
399                      bufs,
400                      count,
401                      send_handle,
402                      AfterUvWrite);
403 }
404 
405 
406 
AfterUvWrite(uv_write_t * req,int status)407 void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
408   LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
409       LibuvWriteWrap::from_req(req));
410   CHECK_NOT_NULL(req_wrap);
411   HandleScope scope(req_wrap->env()->isolate());
412   Context::Scope context_scope(req_wrap->env()->context());
413   req_wrap->Done(status);
414 }
415 
416 }  // namespace node
417 
418 NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_wrap,
419                                     node::LibuvStreamWrap::Initialize)
420 NODE_BINDING_EXTERNAL_REFERENCE(
421     stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)
422