• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_base.h"  // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 
5 #include "env-inl.h"
6 #include "js_stream.h"
7 #include "node.h"
8 #include "node_buffer.h"
9 #include "node_errors.h"
10 #include "node_external_reference.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14 
15 #include <climits>  // INT_MAX
16 
17 namespace node {
18 
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::BackingStore;
22 using v8::ConstructorBehavior;
23 using v8::Context;
24 using v8::DontDelete;
25 using v8::DontEnum;
26 using v8::External;
27 using v8::Function;
28 using v8::FunctionCallbackInfo;
29 using v8::FunctionTemplate;
30 using v8::HandleScope;
31 using v8::Integer;
32 using v8::Isolate;
33 using v8::Local;
34 using v8::MaybeLocal;
35 using v8::Object;
36 using v8::PropertyAttribute;
37 using v8::ReadOnly;
38 using v8::SideEffectType;
39 using v8::Signature;
40 using v8::String;
41 using v8::Value;
42 
Shutdown(v8::Local<v8::Object> req_wrap_obj)43 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
44   Environment* env = stream_env();
45 
46   v8::HandleScope handle_scope(env->isolate());
47 
48   if (req_wrap_obj.IsEmpty()) {
49     if (!env->shutdown_wrap_template()
50              ->NewInstance(env->context())
51              .ToLocal(&req_wrap_obj)) {
52       return UV_EBUSY;
53     }
54     StreamReq::ResetObject(req_wrap_obj);
55   }
56 
57   BaseObjectPtr<AsyncWrap> req_wrap_ptr;
58   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
59   ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
60   if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
61   int err = DoShutdown(req_wrap);
62 
63   if (err != 0 && req_wrap != nullptr) {
64     req_wrap->Dispose();
65   }
66 
67   const char* msg = Error();
68   if (msg != nullptr) {
69     if (req_wrap_obj
70             ->Set(env->context(),
71                   env->error_string(),
72                   OneByteString(env->isolate(), msg))
73             .IsNothing()) {
74       return UV_EBUSY;
75     }
76     ClearError();
77   }
78 
79   return err;
80 }
81 
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj,bool skip_try_write)82 StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
83                                     size_t count,
84                                     uv_stream_t* send_handle,
85                                     v8::Local<v8::Object> req_wrap_obj,
86                                     bool skip_try_write) {
87   Environment* env = stream_env();
88   int err;
89 
90   size_t total_bytes = 0;
91   for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
92   bytes_written_ += total_bytes;
93 
94   if (send_handle == nullptr && !skip_try_write) {
95     err = DoTryWrite(&bufs, &count);
96     if (err != 0 || count == 0) {
97       return StreamWriteResult{false, err, nullptr, total_bytes, {}};
98     }
99   }
100 
101   v8::HandleScope handle_scope(env->isolate());
102 
103   if (req_wrap_obj.IsEmpty()) {
104     if (!env->write_wrap_template()
105              ->NewInstance(env->context())
106              .ToLocal(&req_wrap_obj)) {
107       return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
108     }
109     StreamReq::ResetObject(req_wrap_obj);
110   }
111 
112   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
113   WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
114   BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
115 
116   err = DoWrite(req_wrap, bufs, count, send_handle);
117   bool async = err == 0;
118 
119   if (!async) {
120     req_wrap->Dispose();
121     req_wrap = nullptr;
122   }
123 
124   const char* msg = Error();
125   if (msg != nullptr) {
126     if (req_wrap_obj
127             ->Set(env->context(),
128                   env->error_string(),
129                   OneByteString(env->isolate(), msg))
130             .IsNothing()) {
131       return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
132     }
133     ClearError();
134   }
135 
136   return StreamWriteResult{
137       async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
138 }
139 
140 template int StreamBase::WriteString<ASCII>(
141     const FunctionCallbackInfo<Value>& args);
142 template int StreamBase::WriteString<UTF8>(
143     const FunctionCallbackInfo<Value>& args);
144 template int StreamBase::WriteString<UCS2>(
145     const FunctionCallbackInfo<Value>& args);
146 template int StreamBase::WriteString<LATIN1>(
147     const FunctionCallbackInfo<Value>& args);
148 
149 
ReadStartJS(const FunctionCallbackInfo<Value> & args)150 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
151   return ReadStart();
152 }
153 
154 
ReadStopJS(const FunctionCallbackInfo<Value> & args)155 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
156   return ReadStop();
157 }
158 
UseUserBuffer(const FunctionCallbackInfo<Value> & args)159 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
160   CHECK(Buffer::HasInstance(args[0]));
161 
162   uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
163   PushStreamListener(new CustomBufferJSListener(buf));
164   return 0;
165 }
166 
Shutdown(const FunctionCallbackInfo<Value> & args)167 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
168   CHECK(args[0]->IsObject());
169   Local<Object> req_wrap_obj = args[0].As<Object>();
170 
171   return Shutdown(req_wrap_obj);
172 }
173 
SetWriteResult(const StreamWriteResult & res)174 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
175   env_->stream_base_state()[kBytesWritten] = res.bytes;
176   env_->stream_base_state()[kLastWriteWasAsync] = res.async;
177 }
178 
Writev(const FunctionCallbackInfo<Value> & args)179 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
180   Environment* env = Environment::GetCurrent(args);
181   Isolate* isolate = env->isolate();
182   Local<Context> context = env->context();
183 
184   CHECK(args[0]->IsObject());
185   CHECK(args[1]->IsArray());
186 
187   Local<Object> req_wrap_obj = args[0].As<Object>();
188   Local<Array> chunks = args[1].As<Array>();
189   bool all_buffers = args[2]->IsTrue();
190 
191   size_t count;
192   if (all_buffers)
193     count = chunks->Length();
194   else
195     count = chunks->Length() >> 1;
196 
197   MaybeStackBuffer<uv_buf_t, 16> bufs(count);
198 
199   size_t storage_size = 0;
200   size_t offset;
201 
202   if (!all_buffers) {
203     // Determine storage size first
204     for (size_t i = 0; i < count; i++) {
205       Local<Value> chunk;
206       if (!chunks->Get(context, i * 2).ToLocal(&chunk))
207         return -1;
208 
209       if (Buffer::HasInstance(chunk))
210         continue;
211         // Buffer chunk, no additional storage required
212 
213       // String chunk
214       Local<String> string;
215       if (!chunk->ToString(context).ToLocal(&string))
216         return -1;
217       Local<Value> next_chunk;
218       if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
219         return -1;
220       enum encoding encoding = ParseEncoding(isolate, next_chunk);
221       size_t chunk_size;
222       if ((encoding == UTF8 &&
223              string->Length() > 65535 &&
224              !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
225               !StringBytes::StorageSize(isolate, string, encoding)
226                   .To(&chunk_size)) {
227         return -1;
228       }
229       storage_size += chunk_size;
230     }
231 
232     if (storage_size > INT_MAX)
233       return UV_ENOBUFS;
234   } else {
235     for (size_t i = 0; i < count; i++) {
236       Local<Value> chunk;
237       if (!chunks->Get(context, i).ToLocal(&chunk))
238         return -1;
239       bufs[i].base = Buffer::Data(chunk);
240       bufs[i].len = Buffer::Length(chunk);
241     }
242   }
243 
244   std::unique_ptr<BackingStore> bs;
245   if (storage_size > 0) {
246     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
247     bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
248   }
249 
250   offset = 0;
251   if (!all_buffers) {
252     for (size_t i = 0; i < count; i++) {
253       Local<Value> chunk;
254       if (!chunks->Get(context, i * 2).ToLocal(&chunk))
255         return -1;
256 
257       // Write buffer
258       if (Buffer::HasInstance(chunk)) {
259         bufs[i].base = Buffer::Data(chunk);
260         bufs[i].len = Buffer::Length(chunk);
261         continue;
262       }
263 
264       // Write string
265       CHECK_LE(offset, storage_size);
266       char* str_storage =
267           static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
268       size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
269 
270       Local<String> string;
271       if (!chunk->ToString(context).ToLocal(&string))
272         return -1;
273       Local<Value> next_chunk;
274       if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
275         return -1;
276       enum encoding encoding = ParseEncoding(isolate, next_chunk);
277       str_size = StringBytes::Write(isolate,
278                                     str_storage,
279                                     str_size,
280                                     string,
281                                     encoding);
282       bufs[i].base = str_storage;
283       bufs[i].len = str_size;
284       offset += str_size;
285     }
286   }
287 
288   StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
289   SetWriteResult(res);
290   if (res.wrap != nullptr && storage_size > 0)
291     res.wrap->SetBackingStore(std::move(bs));
292   return res.err;
293 }
294 
295 
WriteBuffer(const FunctionCallbackInfo<Value> & args)296 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
297   CHECK(args[0]->IsObject());
298 
299   Environment* env = Environment::GetCurrent(args);
300 
301   if (!args[1]->IsUint8Array()) {
302     node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
303     return 0;
304   }
305 
306   Local<Object> req_wrap_obj = args[0].As<Object>();
307   uv_buf_t buf;
308   buf.base = Buffer::Data(args[1]);
309   buf.len = Buffer::Length(args[1]);
310 
311   uv_stream_t* send_handle = nullptr;
312 
313   if (args[2]->IsObject() && IsIPCPipe()) {
314     Local<Object> send_handle_obj = args[2].As<Object>();
315 
316     HandleWrap* wrap;
317     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
318     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
319     // Reference LibuvStreamWrap instance to prevent it from being garbage
320     // collected before `AfterWrite` is called.
321     if (req_wrap_obj->Set(env->context(),
322                           env->handle_string(),
323                           send_handle_obj).IsNothing()) {
324       return -1;
325     }
326   }
327 
328   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
329   SetWriteResult(res);
330 
331   return res.err;
332 }
333 
334 
335 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)336 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337   Environment* env = Environment::GetCurrent(args);
338   Isolate* isolate = env->isolate();
339   CHECK(args[0]->IsObject());
340   CHECK(args[1]->IsString());
341 
342   Local<Object> req_wrap_obj = args[0].As<Object>();
343   Local<String> string = args[1].As<String>();
344   Local<Object> send_handle_obj;
345   if (args[2]->IsObject())
346     send_handle_obj = args[2].As<Object>();
347 
348   // Compute the size of the storage that the string will be flattened into.
349   // For UTF8 strings that are very long, go ahead and take the hit for
350   // computing their actual size, rather than tripling the storage.
351   size_t storage_size;
352   if ((enc == UTF8 &&
353          string->Length() > 65535 &&
354          !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355           !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356     return -1;
357   }
358 
359   if (storage_size > INT_MAX)
360     return UV_ENOBUFS;
361 
362   // Try writing immediately if write size isn't too big
363   char stack_storage[16384];  // 16kb
364   size_t data_size;
365   size_t synchronously_written = 0;
366   uv_buf_t buf;
367 
368   bool try_write = storage_size <= sizeof(stack_storage) &&
369                    (!IsIPCPipe() || send_handle_obj.IsEmpty());
370   if (try_write) {
371     data_size = StringBytes::Write(isolate,
372                                    stack_storage,
373                                    storage_size,
374                                    string,
375                                    enc);
376     buf = uv_buf_init(stack_storage, data_size);
377 
378     uv_buf_t* bufs = &buf;
379     size_t count = 1;
380     const int err = DoTryWrite(&bufs, &count);
381     // Keep track of the bytes written here, because we're taking a shortcut
382     // by using `DoTryWrite()` directly instead of using the utilities
383     // provided by `Write()`.
384     synchronously_written = count == 0 ? data_size : data_size - buf.len;
385     bytes_written_ += synchronously_written;
386 
387     // Immediate failure or success
388     if (err != 0 || count == 0) {
389       SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390       return err;
391     }
392 
393     // Partial write
394     CHECK_EQ(count, 1);
395   }
396 
397   std::unique_ptr<BackingStore> bs;
398 
399   if (try_write) {
400     // Copy partial data
401     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402     bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403     memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404     data_size = buf.len;
405   } else {
406     // Write it
407     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408     bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409     data_size = StringBytes::Write(isolate,
410                                    static_cast<char*>(bs->Data()),
411                                    storage_size,
412                                    string,
413                                    enc);
414   }
415 
416   CHECK_LE(data_size, storage_size);
417 
418   buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419 
420   uv_stream_t* send_handle = nullptr;
421 
422   if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423     HandleWrap* wrap;
424     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426     // Reference LibuvStreamWrap instance to prevent it from being garbage
427     // collected before `AfterWrite` is called.
428     if (req_wrap_obj->Set(env->context(),
429                           env->handle_string(),
430                           send_handle_obj).IsNothing()) {
431       return -1;
432     }
433   }
434 
435   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436   res.bytes += synchronously_written;
437 
438   SetWriteResult(res);
439   if (res.wrap != nullptr)
440     res.wrap->SetBackingStore(std::move(bs));
441 
442   return res.err;
443 }
444 
445 
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)446 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
447                                                  Local<ArrayBuffer> ab,
448                                                  size_t offset,
449                                                  StreamBaseJSChecks checks) {
450   Environment* env = env_;
451 
452   DCHECK_EQ(static_cast<int32_t>(nread), nread);
453   DCHECK_LE(offset, INT32_MAX);
454 
455   if (checks == DONT_SKIP_NREAD_CHECKS) {
456     if (ab.IsEmpty()) {
457       DCHECK_EQ(offset, 0);
458       DCHECK_LE(nread, 0);
459     } else {
460       DCHECK_GE(nread, 0);
461     }
462   }
463 
464   env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
465   env->stream_base_state()[kArrayBufferOffset] = offset;
466 
467   Local<Value> argv[] = {
468     ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
469   };
470 
471   AsyncWrap* wrap = GetAsyncWrap();
472   CHECK_NOT_NULL(wrap);
473   Local<Value> onread = wrap->object()->GetInternalField(
474       StreamBase::kOnReadFunctionField);
475   CHECK(onread->IsFunction());
476   return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
477 }
478 
479 
IsIPCPipe()480 bool StreamBase::IsIPCPipe() {
481   return false;
482 }
483 
484 
GetFD()485 int StreamBase::GetFD() {
486   return -1;
487 }
488 
489 
GetObject()490 Local<Object> StreamBase::GetObject() {
491   return GetAsyncWrap()->object();
492 }
493 
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)494 void StreamBase::AddMethod(Environment* env,
495                            Local<Signature> signature,
496                            enum PropertyAttribute attributes,
497                            Local<FunctionTemplate> t,
498                            JSMethodFunction* stream_method,
499                            Local<String> string) {
500   Isolate* isolate = env->isolate();
501   Local<FunctionTemplate> templ =
502       NewFunctionTemplate(isolate,
503                           stream_method,
504                           signature,
505                           ConstructorBehavior::kThrow,
506                           SideEffectType::kHasNoSideEffect);
507   t->PrototypeTemplate()->SetAccessorProperty(
508       string, templ, Local<FunctionTemplate>(), attributes);
509 }
510 
AddMethods(Environment * env,Local<FunctionTemplate> t)511 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
512   Isolate* isolate = env->isolate();
513   HandleScope scope(isolate);
514 
515   enum PropertyAttribute attributes =
516       static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
517   Local<Signature> sig = Signature::New(isolate, t);
518 
519   AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
520   AddMethod(
521       env, sig, attributes, t, GetExternal, env->external_stream_string());
522   AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
523   AddMethod(
524       env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
525   SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
526   SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
527   SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
528   SetProtoMethod(
529       isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
530   SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
531   SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
532   SetProtoMethod(isolate,
533                  t,
534                  "writeAsciiString",
535                  JSMethod<&StreamBase::WriteString<ASCII>>);
536   SetProtoMethod(
537       isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
538   SetProtoMethod(
539       isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
540   SetProtoMethod(isolate,
541                  t,
542                  "writeLatin1String",
543                  JSMethod<&StreamBase::WriteString<LATIN1>>);
544   t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
545                               True(isolate));
546   t->PrototypeTemplate()->SetAccessor(
547       FIXED_ONE_BYTE_STRING(isolate, "onread"),
548       BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
549       BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
550                                    &Value::IsFunction>);
551 }
552 
RegisterExternalReferences(ExternalReferenceRegistry * registry)553 void StreamBase::RegisterExternalReferences(
554     ExternalReferenceRegistry* registry) {
555   // This function is called by a single thread during start up, so it is safe
556   // to use a local static variable here.
557   static bool is_registered = false;
558   if (is_registered) return;
559   registry->Register(GetFD);
560   registry->Register(GetExternal);
561   registry->Register(GetBytesRead);
562   registry->Register(GetBytesWritten);
563   registry->Register(JSMethod<&StreamBase::ReadStartJS>);
564   registry->Register(JSMethod<&StreamBase::ReadStopJS>);
565   registry->Register(JSMethod<&StreamBase::Shutdown>);
566   registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
567   registry->Register(JSMethod<&StreamBase::Writev>);
568   registry->Register(JSMethod<&StreamBase::WriteBuffer>);
569   registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
570   registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
571   registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
572   registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
573   registry->Register(
574       BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
575   registry->Register(
576       BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
577                                    &Value::IsFunction>);
578   is_registered = true;
579 }
580 
GetFD(const FunctionCallbackInfo<Value> & args)581 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
582   // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
583   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
584   if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
585 
586   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
587 
588   args.GetReturnValue().Set(wrap->GetFD());
589 }
590 
GetBytesRead(const FunctionCallbackInfo<Value> & args)591 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
592   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
593   if (wrap == nullptr) return args.GetReturnValue().Set(0);
594 
595   // uint64_t -> double. 53bits is enough for all real cases.
596   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
597 }
598 
GetBytesWritten(const FunctionCallbackInfo<Value> & args)599 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
600   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
601   if (wrap == nullptr) return args.GetReturnValue().Set(0);
602 
603   // uint64_t -> double. 53bits is enough for all real cases.
604   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
605 }
606 
GetExternal(const FunctionCallbackInfo<Value> & args)607 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
608   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
609   if (wrap == nullptr) return;
610 
611   Local<External> ext = External::New(args.GetIsolate(), wrap);
612   args.GetReturnValue().Set(ext);
613 }
614 
615 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)616 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
617   StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
618   if (wrap == nullptr) return;
619 
620   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
621 
622   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
623   args.GetReturnValue().Set((wrap->*Method)(args));
624 }
625 
DoTryWrite(uv_buf_t ** bufs,size_t * count)626 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
627   // No TryWrite by default
628   return 0;
629 }
630 
631 
Error() const632 const char* StreamResource::Error() const {
633   return nullptr;
634 }
635 
636 
ClearError()637 void StreamResource::ClearError() {
638   // No-op
639 }
640 
641 
OnStreamAlloc(size_t suggested_size)642 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
643   CHECK_NOT_NULL(stream_);
644   Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
645   return env->allocate_managed_buffer(suggested_size);
646 }
647 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)648 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
649   CHECK_NOT_NULL(stream_);
650   StreamBase* stream = static_cast<StreamBase*>(stream_);
651   Environment* env = stream->stream_env();
652   Isolate* isolate = env->isolate();
653   HandleScope handle_scope(isolate);
654   Context::Scope context_scope(env->context());
655   std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
656 
657   if (nread <= 0)  {
658     if (nread < 0)
659       stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
660     return;
661   }
662 
663   CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
664   bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
665 
666   stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
667 }
668 
669 
OnStreamAlloc(size_t suggested_size)670 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
671   return buffer_;
672 }
673 
674 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)675 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
676   CHECK_NOT_NULL(stream_);
677 
678   StreamBase* stream = static_cast<StreamBase*>(stream_);
679   Environment* env = stream->stream_env();
680   HandleScope handle_scope(env->isolate());
681   Context::Scope context_scope(env->context());
682 
683   // In the case that there's an error and buf is null, return immediately.
684   // This can happen on unices when POLLHUP is received and UV_EOF is returned
685   // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
686   if (buf.base == nullptr && nread < 0) {
687     stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
688     return;
689   }
690 
691   CHECK_EQ(buf.base, buffer_.base);
692 
693   MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
694                              Local<ArrayBuffer>(),
695                              0,
696                              StreamBase::SKIP_NREAD_CHECKS);
697   Local<Value> next_buf_v;
698   if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
699     buffer_.base = Buffer::Data(next_buf_v);
700     buffer_.len = Buffer::Length(next_buf_v);
701   }
702 }
703 
704 
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)705 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
706     StreamReq* req_wrap, int status) {
707   StreamBase* stream = static_cast<StreamBase*>(stream_);
708   Environment* env = stream->stream_env();
709   if (!env->can_call_into_js()) return;
710   AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
711   HandleScope handle_scope(env->isolate());
712   Context::Scope context_scope(env->context());
713   CHECK(!async_wrap->persistent().IsEmpty());
714   Local<Object> req_wrap_obj = async_wrap->object();
715 
716   Local<Value> argv[] = {
717     Integer::New(env->isolate(), status),
718     stream->GetObject(),
719     Undefined(env->isolate())
720   };
721 
722   const char* msg = stream->Error();
723   if (msg != nullptr) {
724     argv[2] = OneByteString(env->isolate(), msg);
725     stream->ClearError();
726   }
727 
728   if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
729     async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
730 }
731 
OnStreamAfterWrite(WriteWrap * req_wrap,int status)732 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
733     WriteWrap* req_wrap, int status) {
734   OnStreamAfterReqFinished(req_wrap, status);
735 }
736 
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)737 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
738     ShutdownWrap* req_wrap, int status) {
739   OnStreamAfterReqFinished(req_wrap, status);
740 }
741 
OnDone(int status)742 void ShutdownWrap::OnDone(int status) {
743   stream()->EmitAfterShutdown(this, status);
744   Dispose();
745 }
746 
OnDone(int status)747 void WriteWrap::OnDone(int status) {
748   stream()->EmitAfterWrite(this, status);
749   Dispose();
750 }
751 
~StreamListener()752 StreamListener::~StreamListener() {
753   if (stream_ != nullptr)
754     stream_->RemoveStreamListener(this);
755 }
756 
OnStreamAfterShutdown(ShutdownWrap * w,int status)757 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
758   CHECK_NOT_NULL(previous_listener_);
759   previous_listener_->OnStreamAfterShutdown(w, status);
760 }
761 
OnStreamAfterWrite(WriteWrap * w,int status)762 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
763   CHECK_NOT_NULL(previous_listener_);
764   previous_listener_->OnStreamAfterWrite(w, status);
765 }
766 
~StreamResource()767 StreamResource::~StreamResource() {
768   while (listener_ != nullptr) {
769     StreamListener* listener = listener_;
770     listener->OnStreamDestroy();
771     // Remove the listener if it didn’t remove itself. This makes the logic
772     // in `OnStreamDestroy()` implementations easier, because they
773     // may call generic cleanup functions which can just remove the
774     // listener unconditionally.
775     if (listener == listener_)
776       RemoveStreamListener(listener_);
777   }
778 }
779 
RemoveStreamListener(StreamListener * listener)780 void StreamResource::RemoveStreamListener(StreamListener* listener) {
781   CHECK_NOT_NULL(listener);
782 
783   StreamListener* previous;
784   StreamListener* current;
785 
786   // Remove from the linked list.
787   // No loop condition because we want a crash if listener is not found.
788   for (current = listener_, previous = nullptr;;
789        previous = current, current = current->previous_listener_) {
790     CHECK_NOT_NULL(current);
791     if (current == listener) {
792       if (previous != nullptr)
793         previous->previous_listener_ = current->previous_listener_;
794       else
795         listener_ = listener->previous_listener_;
796       break;
797     }
798   }
799 
800   listener->stream_ = nullptr;
801   listener->previous_listener_ = nullptr;
802 }
803 
CreateShutdownWrap(Local<Object> object)804 ShutdownWrap* StreamBase::CreateShutdownWrap(
805     Local<Object> object) {
806   auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
807   wrap->MakeWeak();
808   return wrap;
809 }
810 
CreateWriteWrap(Local<Object> object)811 WriteWrap* StreamBase::CreateWriteWrap(
812     Local<Object> object) {
813   auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
814   wrap->MakeWeak();
815   return wrap;
816 }
817 
Done(int status,const char * error_str)818 void StreamReq::Done(int status, const char* error_str) {
819   AsyncWrap* async_wrap = GetAsyncWrap();
820   Environment* env = async_wrap->env();
821   if (error_str != nullptr) {
822     v8::HandleScope handle_scope(env->isolate());
823     if (async_wrap->object()
824             ->Set(env->context(),
825                   env->error_string(),
826                   OneByteString(env->isolate(), error_str))
827             .IsNothing()) {
828       return;
829     }
830   }
831 
832   OnDone(status);
833 }
834 
835 }  // namespace node
836