• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_base.h"  // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 
5 #include "node.h"
6 #include "node_buffer.h"
7 #include "node_errors.h"
8 #include "env-inl.h"
9 #include "js_stream.h"
10 #include "string_bytes.h"
11 #include "util-inl.h"
12 #include "v8.h"
13 
14 #include <climits>  // INT_MAX
15 
16 namespace node {
17 
18 using v8::Array;
19 using v8::ArrayBuffer;
20 using v8::Context;
21 using v8::DontDelete;
22 using v8::DontEnum;
23 using v8::External;
24 using v8::Function;
25 using v8::FunctionCallbackInfo;
26 using v8::HandleScope;
27 using v8::Integer;
28 using v8::Local;
29 using v8::MaybeLocal;
30 using v8::Object;
31 using v8::ReadOnly;
32 using v8::String;
33 using v8::Value;
34 
35 template int StreamBase::WriteString<ASCII>(
36     const FunctionCallbackInfo<Value>& args);
37 template int StreamBase::WriteString<UTF8>(
38     const FunctionCallbackInfo<Value>& args);
39 template int StreamBase::WriteString<UCS2>(
40     const FunctionCallbackInfo<Value>& args);
41 template int StreamBase::WriteString<LATIN1>(
42     const FunctionCallbackInfo<Value>& args);
43 
44 
ReadStartJS(const FunctionCallbackInfo<Value> & args)45 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
46   return ReadStart();
47 }
48 
49 
ReadStopJS(const FunctionCallbackInfo<Value> & args)50 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
51   return ReadStop();
52 }
53 
UseUserBuffer(const FunctionCallbackInfo<Value> & args)54 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
55   CHECK(Buffer::HasInstance(args[0]));
56 
57   uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
58   PushStreamListener(new CustomBufferJSListener(buf));
59   return 0;
60 }
61 
Shutdown(const FunctionCallbackInfo<Value> & args)62 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
63   CHECK(args[0]->IsObject());
64   Local<Object> req_wrap_obj = args[0].As<Object>();
65 
66   return Shutdown(req_wrap_obj);
67 }
68 
SetWriteResult(const StreamWriteResult & res)69 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
70   env_->stream_base_state()[kBytesWritten] = res.bytes;
71   env_->stream_base_state()[kLastWriteWasAsync] = res.async;
72 }
73 
Writev(const FunctionCallbackInfo<Value> & args)74 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
75   Environment* env = Environment::GetCurrent(args);
76 
77   CHECK(args[0]->IsObject());
78   CHECK(args[1]->IsArray());
79 
80   Local<Object> req_wrap_obj = args[0].As<Object>();
81   Local<Array> chunks = args[1].As<Array>();
82   bool all_buffers = args[2]->IsTrue();
83 
84   size_t count;
85   if (all_buffers)
86     count = chunks->Length();
87   else
88     count = chunks->Length() >> 1;
89 
90   MaybeStackBuffer<uv_buf_t, 16> bufs(count);
91 
92   size_t storage_size = 0;
93   size_t offset;
94 
95   if (!all_buffers) {
96     // Determine storage size first
97     for (size_t i = 0; i < count; i++) {
98       Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
99 
100       if (Buffer::HasInstance(chunk))
101         continue;
102         // Buffer chunk, no additional storage required
103 
104       // String chunk
105       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
106       enum encoding encoding = ParseEncoding(env->isolate(),
107           chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
108       size_t chunk_size;
109       if (encoding == UTF8 && string->Length() > 65535 &&
110           !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
111         return 0;
112       else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
113                     .To(&chunk_size))
114         return 0;
115       storage_size += chunk_size;
116     }
117 
118     if (storage_size > INT_MAX)
119       return UV_ENOBUFS;
120   } else {
121     for (size_t i = 0; i < count; i++) {
122       Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
123       bufs[i].base = Buffer::Data(chunk);
124       bufs[i].len = Buffer::Length(chunk);
125     }
126   }
127 
128   AllocatedBuffer storage;
129   if (storage_size > 0)
130     storage = env->AllocateManaged(storage_size);
131 
132   offset = 0;
133   if (!all_buffers) {
134     for (size_t i = 0; i < count; i++) {
135       Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
136 
137       // Write buffer
138       if (Buffer::HasInstance(chunk)) {
139         bufs[i].base = Buffer::Data(chunk);
140         bufs[i].len = Buffer::Length(chunk);
141         continue;
142       }
143 
144       // Write string
145       CHECK_LE(offset, storage_size);
146       char* str_storage = storage.data() + offset;
147       size_t str_size = storage.size() - offset;
148 
149       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
150       enum encoding encoding = ParseEncoding(env->isolate(),
151           chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
152       str_size = StringBytes::Write(env->isolate(),
153                                     str_storage,
154                                     str_size,
155                                     string,
156                                     encoding);
157       bufs[i].base = str_storage;
158       bufs[i].len = str_size;
159       offset += str_size;
160     }
161   }
162 
163   StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
164   SetWriteResult(res);
165   if (res.wrap != nullptr && storage_size > 0) {
166     res.wrap->SetAllocatedStorage(std::move(storage));
167   }
168   return res.err;
169 }
170 
171 
WriteBuffer(const FunctionCallbackInfo<Value> & args)172 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
173   CHECK(args[0]->IsObject());
174 
175   Environment* env = Environment::GetCurrent(args);
176 
177   if (!args[1]->IsUint8Array()) {
178     node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
179     return 0;
180   }
181 
182   Local<Object> req_wrap_obj = args[0].As<Object>();
183   uv_buf_t buf;
184   buf.base = Buffer::Data(args[1]);
185   buf.len = Buffer::Length(args[1]);
186 
187   uv_stream_t* send_handle = nullptr;
188 
189   if (args[2]->IsObject() && IsIPCPipe()) {
190     Local<Object> send_handle_obj = args[2].As<Object>();
191 
192     HandleWrap* wrap;
193     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
194     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
195     // Reference LibuvStreamWrap instance to prevent it from being garbage
196     // collected before `AfterWrite` is called.
197     req_wrap_obj->Set(env->context(),
198                       env->handle_string(),
199                       send_handle_obj).Check();
200   }
201 
202   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
203   SetWriteResult(res);
204 
205   return res.err;
206 }
207 
208 
209 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)210 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
211   Environment* env = Environment::GetCurrent(args);
212   CHECK(args[0]->IsObject());
213   CHECK(args[1]->IsString());
214 
215   Local<Object> req_wrap_obj = args[0].As<Object>();
216   Local<String> string = args[1].As<String>();
217   Local<Object> send_handle_obj;
218   if (args[2]->IsObject())
219     send_handle_obj = args[2].As<Object>();
220 
221   // Compute the size of the storage that the string will be flattened into.
222   // For UTF8 strings that are very long, go ahead and take the hit for
223   // computing their actual size, rather than tripling the storage.
224   size_t storage_size;
225   if (enc == UTF8 && string->Length() > 65535 &&
226       !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
227     return 0;
228   else if (!StringBytes::StorageSize(env->isolate(), string, enc)
229                 .To(&storage_size))
230     return 0;
231 
232   if (storage_size > INT_MAX)
233     return UV_ENOBUFS;
234 
235   // Try writing immediately if write size isn't too big
236   char stack_storage[16384];  // 16kb
237   size_t data_size;
238   size_t synchronously_written = 0;
239   uv_buf_t buf;
240 
241   bool try_write = storage_size <= sizeof(stack_storage) &&
242                    (!IsIPCPipe() || send_handle_obj.IsEmpty());
243   if (try_write) {
244     data_size = StringBytes::Write(env->isolate(),
245                                    stack_storage,
246                                    storage_size,
247                                    string,
248                                    enc);
249     buf = uv_buf_init(stack_storage, data_size);
250 
251     uv_buf_t* bufs = &buf;
252     size_t count = 1;
253     const int err = DoTryWrite(&bufs, &count);
254     // Keep track of the bytes written here, because we're taking a shortcut
255     // by using `DoTryWrite()` directly instead of using the utilities
256     // provided by `Write()`.
257     synchronously_written = count == 0 ? data_size : data_size - buf.len;
258     bytes_written_ += synchronously_written;
259 
260     // Immediate failure or success
261     if (err != 0 || count == 0) {
262       SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
263       return err;
264     }
265 
266     // Partial write
267     CHECK_EQ(count, 1);
268   }
269 
270   AllocatedBuffer data;
271 
272   if (try_write) {
273     // Copy partial data
274     data = env->AllocateManaged(buf.len);
275     memcpy(data.data(), buf.base, buf.len);
276     data_size = buf.len;
277   } else {
278     // Write it
279     data = env->AllocateManaged(storage_size);
280     data_size = StringBytes::Write(env->isolate(),
281                                    data.data(),
282                                    storage_size,
283                                    string,
284                                    enc);
285   }
286 
287   CHECK_LE(data_size, storage_size);
288 
289   buf = uv_buf_init(data.data(), data_size);
290 
291   uv_stream_t* send_handle = nullptr;
292 
293   if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
294     HandleWrap* wrap;
295     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
296     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
297     // Reference LibuvStreamWrap instance to prevent it from being garbage
298     // collected before `AfterWrite` is called.
299     req_wrap_obj->Set(env->context(),
300                       env->handle_string(),
301                       send_handle_obj).Check();
302   }
303 
304   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
305   res.bytes += synchronously_written;
306 
307   SetWriteResult(res);
308   if (res.wrap != nullptr) {
309     res.wrap->SetAllocatedStorage(std::move(data));
310   }
311 
312   return res.err;
313 }
314 
315 
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)316 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
317                                                  Local<ArrayBuffer> ab,
318                                                  size_t offset,
319                                                  StreamBaseJSChecks checks) {
320   Environment* env = env_;
321 
322   DCHECK_EQ(static_cast<int32_t>(nread), nread);
323   DCHECK_LE(offset, INT32_MAX);
324 
325   if (checks == DONT_SKIP_NREAD_CHECKS) {
326     if (ab.IsEmpty()) {
327       DCHECK_EQ(offset, 0);
328       DCHECK_LE(nread, 0);
329     } else {
330       DCHECK_GE(nread, 0);
331     }
332   }
333 
334   env->stream_base_state()[kReadBytesOrError] = nread;
335   env->stream_base_state()[kArrayBufferOffset] = offset;
336 
337   Local<Value> argv[] = {
338     ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
339   };
340 
341   AsyncWrap* wrap = GetAsyncWrap();
342   CHECK_NOT_NULL(wrap);
343   Local<Value> onread = wrap->object()->GetInternalField(
344       StreamBase::kOnReadFunctionField);
345   CHECK(onread->IsFunction());
346   return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
347 }
348 
349 
IsIPCPipe()350 bool StreamBase::IsIPCPipe() {
351   return false;
352 }
353 
354 
GetFD()355 int StreamBase::GetFD() {
356   return -1;
357 }
358 
359 
GetObject()360 Local<Object> StreamBase::GetObject() {
361   return GetAsyncWrap()->object();
362 }
363 
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)364 void StreamBase::AddMethod(Environment* env,
365                            Local<Signature> signature,
366                            enum PropertyAttribute attributes,
367                            Local<FunctionTemplate> t,
368                            JSMethodFunction* stream_method,
369                            Local<String> string) {
370   Local<FunctionTemplate> templ =
371       env->NewFunctionTemplate(stream_method,
372                                signature,
373                                v8::ConstructorBehavior::kThrow,
374                                v8::SideEffectType::kHasNoSideEffect);
375   t->PrototypeTemplate()->SetAccessorProperty(
376       string, templ, Local<FunctionTemplate>(), attributes);
377 }
378 
AddMethods(Environment * env,Local<FunctionTemplate> t)379 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
380   HandleScope scope(env->isolate());
381 
382   enum PropertyAttribute attributes =
383       static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
384   Local<Signature> sig = Signature::New(env->isolate(), t);
385 
386   AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
387   AddMethod(
388       env, sig, attributes, t, GetExternal, env->external_stream_string());
389   AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
390   AddMethod(
391       env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
392   env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
393   env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
394   env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
395   env->SetProtoMethod(t,
396                       "useUserBuffer",
397                       JSMethod<&StreamBase::UseUserBuffer>);
398   env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
399   env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
400   env->SetProtoMethod(
401       t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
402   env->SetProtoMethod(
403       t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
404   env->SetProtoMethod(
405       t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
406   env->SetProtoMethod(
407       t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
408   t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
409                                                     "isStreamBase"),
410                               True(env->isolate()));
411   t->PrototypeTemplate()->SetAccessor(
412       FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
413       BaseObject::InternalFieldGet<
414           StreamBase::kOnReadFunctionField>,
415       BaseObject::InternalFieldSet<
416           StreamBase::kOnReadFunctionField,
417           &Value::IsFunction>);
418 }
419 
GetFD(const FunctionCallbackInfo<Value> & args)420 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
421   // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
422   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
423   if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
424 
425   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
426 
427   args.GetReturnValue().Set(wrap->GetFD());
428 }
429 
GetBytesRead(const FunctionCallbackInfo<Value> & args)430 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
431   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
432   if (wrap == nullptr) return args.GetReturnValue().Set(0);
433 
434   // uint64_t -> double. 53bits is enough for all real cases.
435   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
436 }
437 
GetBytesWritten(const FunctionCallbackInfo<Value> & args)438 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
439   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
440   if (wrap == nullptr) return args.GetReturnValue().Set(0);
441 
442   // uint64_t -> double. 53bits is enough for all real cases.
443   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
444 }
445 
GetExternal(const FunctionCallbackInfo<Value> & args)446 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
447   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
448   if (wrap == nullptr) return;
449 
450   Local<External> ext = External::New(args.GetIsolate(), wrap);
451   args.GetReturnValue().Set(ext);
452 }
453 
454 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)455 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
456   StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
457   if (wrap == nullptr) return;
458 
459   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
460 
461   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
462   args.GetReturnValue().Set((wrap->*Method)(args));
463 }
464 
DoTryWrite(uv_buf_t ** bufs,size_t * count)465 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
466   // No TryWrite by default
467   return 0;
468 }
469 
470 
Error() const471 const char* StreamResource::Error() const {
472   return nullptr;
473 }
474 
475 
ClearError()476 void StreamResource::ClearError() {
477   // No-op
478 }
479 
480 
OnStreamAlloc(size_t suggested_size)481 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
482   CHECK_NOT_NULL(stream_);
483   Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
484   return env->AllocateManaged(suggested_size).release();
485 }
486 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)487 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
488   CHECK_NOT_NULL(stream_);
489   StreamBase* stream = static_cast<StreamBase*>(stream_);
490   Environment* env = stream->stream_env();
491   HandleScope handle_scope(env->isolate());
492   Context::Scope context_scope(env->context());
493   AllocatedBuffer buf(env, buf_);
494 
495   if (nread <= 0)  {
496     if (nread < 0)
497       stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
498     return;
499   }
500 
501   CHECK_LE(static_cast<size_t>(nread), buf.size());
502   buf.Resize(nread);
503 
504   stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
505 }
506 
507 
OnStreamAlloc(size_t suggested_size)508 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
509   return buffer_;
510 }
511 
512 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)513 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
514   CHECK_NOT_NULL(stream_);
515 
516   StreamBase* stream = static_cast<StreamBase*>(stream_);
517   Environment* env = stream->stream_env();
518   HandleScope handle_scope(env->isolate());
519   Context::Scope context_scope(env->context());
520 
521   // To deal with the case where POLLHUP is received and UV_EOF is returned, as
522   // libuv returns an empty buffer (on unices only).
523   if (nread == UV_EOF && buf.base == nullptr) {
524     stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
525     return;
526   }
527 
528   CHECK_EQ(buf.base, buffer_.base);
529 
530   MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
531                              Local<ArrayBuffer>(),
532                              0,
533                              StreamBase::SKIP_NREAD_CHECKS);
534   Local<Value> next_buf_v;
535   if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
536     buffer_.base = Buffer::Data(next_buf_v);
537     buffer_.len = Buffer::Length(next_buf_v);
538   }
539 }
540 
541 
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)542 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
543     StreamReq* req_wrap, int status) {
544   StreamBase* stream = static_cast<StreamBase*>(stream_);
545   Environment* env = stream->stream_env();
546   AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
547   HandleScope handle_scope(env->isolate());
548   Context::Scope context_scope(env->context());
549   CHECK(!async_wrap->persistent().IsEmpty());
550   Local<Object> req_wrap_obj = async_wrap->object();
551 
552   Local<Value> argv[] = {
553     Integer::New(env->isolate(), status),
554     stream->GetObject(),
555     Undefined(env->isolate())
556   };
557 
558   const char* msg = stream->Error();
559   if (msg != nullptr) {
560     argv[2] = OneByteString(env->isolate(), msg);
561     stream->ClearError();
562   }
563 
564   if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
565     async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
566 }
567 
OnStreamAfterWrite(WriteWrap * req_wrap,int status)568 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
569     WriteWrap* req_wrap, int status) {
570   OnStreamAfterReqFinished(req_wrap, status);
571 }
572 
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)573 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
574     ShutdownWrap* req_wrap, int status) {
575   OnStreamAfterReqFinished(req_wrap, status);
576 }
577 
578 
579 }  // namespace node
580