• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_base.h"  // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 
5 #include "env-inl.h"
6 #include "js_stream.h"
7 #include "node.h"
8 #include "node_buffer.h"
9 #include "node_errors.h"
10 #include "node_external_reference.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14 
15 #include <climits>  // INT_MAX
16 
17 namespace node {
18 
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::BackingStore;
22 using v8::ConstructorBehavior;
23 using v8::Context;
24 using v8::DontDelete;
25 using v8::DontEnum;
26 using v8::External;
27 using v8::Function;
28 using v8::FunctionCallbackInfo;
29 using v8::FunctionTemplate;
30 using v8::HandleScope;
31 using v8::Integer;
32 using v8::Isolate;
33 using v8::Local;
34 using v8::MaybeLocal;
35 using v8::Object;
36 using v8::PropertyAttribute;
37 using v8::ReadOnly;
38 using v8::SideEffectType;
39 using v8::Signature;
40 using v8::String;
41 using v8::Value;
42 
Shutdown(v8::Local<v8::Object> req_wrap_obj)43 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
44   Environment* env = stream_env();
45 
46   v8::HandleScope handle_scope(env->isolate());
47 
48   if (req_wrap_obj.IsEmpty()) {
49     if (!env->shutdown_wrap_template()
50              ->NewInstance(env->context())
51              .ToLocal(&req_wrap_obj)) {
52       return UV_EBUSY;
53     }
54     StreamReq::ResetObject(req_wrap_obj);
55   }
56 
57   BaseObjectPtr<AsyncWrap> req_wrap_ptr;
58   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
59   ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
60   if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
61   int err = DoShutdown(req_wrap);
62 
63   if (err != 0 && req_wrap != nullptr) {
64     req_wrap->Dispose();
65   }
66 
67   const char* msg = Error();
68   if (msg != nullptr) {
69     if (req_wrap_obj
70             ->Set(env->context(),
71                   env->error_string(),
72                   OneByteString(env->isolate(), msg))
73             .IsNothing()) {
74       return UV_EBUSY;
75     }
76     ClearError();
77   }
78 
79   return err;
80 }
81 
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj,bool skip_try_write)82 StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
83                                     size_t count,
84                                     uv_stream_t* send_handle,
85                                     v8::Local<v8::Object> req_wrap_obj,
86                                     bool skip_try_write) {
87   Environment* env = stream_env();
88   int err;
89 
90   size_t total_bytes = 0;
91   for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
92   bytes_written_ += total_bytes;
93 
94   if (send_handle == nullptr && !skip_try_write) {
95     err = DoTryWrite(&bufs, &count);
96     if (err != 0 || count == 0) {
97       return StreamWriteResult{false, err, nullptr, total_bytes, {}};
98     }
99   }
100 
101   v8::HandleScope handle_scope(env->isolate());
102 
103   if (req_wrap_obj.IsEmpty()) {
104     if (!env->write_wrap_template()
105              ->NewInstance(env->context())
106              .ToLocal(&req_wrap_obj)) {
107       return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
108     }
109     StreamReq::ResetObject(req_wrap_obj);
110   }
111 
112   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
113   WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
114   BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
115 
116   err = DoWrite(req_wrap, bufs, count, send_handle);
117   bool async = err == 0;
118 
119   if (!async) {
120     req_wrap->Dispose();
121     req_wrap = nullptr;
122   }
123 
124   const char* msg = Error();
125   if (msg != nullptr) {
126     if (req_wrap_obj
127             ->Set(env->context(),
128                   env->error_string(),
129                   OneByteString(env->isolate(), msg))
130             .IsNothing()) {
131       return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
132     }
133     ClearError();
134   }
135 
136   return StreamWriteResult{
137       async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
138 }
139 
140 template int StreamBase::WriteString<ASCII>(
141     const FunctionCallbackInfo<Value>& args);
142 template int StreamBase::WriteString<UTF8>(
143     const FunctionCallbackInfo<Value>& args);
144 template int StreamBase::WriteString<UCS2>(
145     const FunctionCallbackInfo<Value>& args);
146 template int StreamBase::WriteString<LATIN1>(
147     const FunctionCallbackInfo<Value>& args);
148 
149 
ReadStartJS(const FunctionCallbackInfo<Value> & args)150 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
151   return ReadStart();
152 }
153 
154 
ReadStopJS(const FunctionCallbackInfo<Value> & args)155 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
156   return ReadStop();
157 }
158 
UseUserBuffer(const FunctionCallbackInfo<Value> & args)159 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
160   CHECK(Buffer::HasInstance(args[0]));
161 
162   uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
163   PushStreamListener(new CustomBufferJSListener(buf));
164   return 0;
165 }
166 
Shutdown(const FunctionCallbackInfo<Value> & args)167 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
168   CHECK(args[0]->IsObject());
169   Local<Object> req_wrap_obj = args[0].As<Object>();
170 
171   return Shutdown(req_wrap_obj);
172 }
173 
SetWriteResult(const StreamWriteResult & res)174 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
175   env_->stream_base_state()[kBytesWritten] = res.bytes;
176   env_->stream_base_state()[kLastWriteWasAsync] = res.async;
177 }
178 
Writev(const FunctionCallbackInfo<Value> & args)179 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
180   Environment* env = Environment::GetCurrent(args);
181   Isolate* isolate = env->isolate();
182   Local<Context> context = env->context();
183 
184   CHECK(args[0]->IsObject());
185   CHECK(args[1]->IsArray());
186 
187   Local<Object> req_wrap_obj = args[0].As<Object>();
188   Local<Array> chunks = args[1].As<Array>();
189   bool all_buffers = args[2]->IsTrue();
190 
191   size_t count;
192   if (all_buffers)
193     count = chunks->Length();
194   else
195     count = chunks->Length() >> 1;
196 
197   MaybeStackBuffer<uv_buf_t, 16> bufs(count);
198 
199   size_t storage_size = 0;
200   size_t offset;
201 
202   if (!all_buffers) {
203     // Determine storage size first
204     for (size_t i = 0; i < count; i++) {
205       Local<Value> chunk;
206       if (!chunks->Get(context, i * 2).ToLocal(&chunk))
207         return -1;
208 
209       if (Buffer::HasInstance(chunk))
210         continue;
211         // Buffer chunk, no additional storage required
212 
213       // String chunk
214       Local<String> string;
215       if (!chunk->ToString(context).ToLocal(&string))
216         return -1;
217       Local<Value> next_chunk;
218       if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
219         return -1;
220       enum encoding encoding = ParseEncoding(isolate, next_chunk);
221       size_t chunk_size;
222       if ((encoding == UTF8 &&
223              string->Length() > 65535 &&
224              !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
225               !StringBytes::StorageSize(isolate, string, encoding)
226                   .To(&chunk_size)) {
227         return -1;
228       }
229       storage_size += chunk_size;
230     }
231 
232     if (storage_size > INT_MAX)
233       return UV_ENOBUFS;
234   } else {
235     for (size_t i = 0; i < count; i++) {
236       Local<Value> chunk;
237       if (!chunks->Get(context, i).ToLocal(&chunk))
238         return -1;
239       bufs[i].base = Buffer::Data(chunk);
240       bufs[i].len = Buffer::Length(chunk);
241     }
242   }
243 
244   std::unique_ptr<BackingStore> bs;
245   if (storage_size > 0) {
246     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
247     bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
248   }
249 
250   offset = 0;
251   if (!all_buffers) {
252     for (size_t i = 0; i < count; i++) {
253       Local<Value> chunk;
254       if (!chunks->Get(context, i * 2).ToLocal(&chunk))
255         return -1;
256 
257       // Write buffer
258       if (Buffer::HasInstance(chunk)) {
259         bufs[i].base = Buffer::Data(chunk);
260         bufs[i].len = Buffer::Length(chunk);
261         continue;
262       }
263 
264       // Write string
265       CHECK_LE(offset, storage_size);
266       char* str_storage =
267           static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
268       size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
269 
270       Local<String> string;
271       if (!chunk->ToString(context).ToLocal(&string))
272         return -1;
273       Local<Value> next_chunk;
274       if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
275         return -1;
276       enum encoding encoding = ParseEncoding(isolate, next_chunk);
277       str_size = StringBytes::Write(isolate,
278                                     str_storage,
279                                     str_size,
280                                     string,
281                                     encoding);
282       bufs[i].base = str_storage;
283       bufs[i].len = str_size;
284       offset += str_size;
285     }
286   }
287 
288   StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
289   SetWriteResult(res);
290   if (res.wrap != nullptr && storage_size > 0)
291     res.wrap->SetBackingStore(std::move(bs));
292   return res.err;
293 }
294 
295 
WriteBuffer(const FunctionCallbackInfo<Value> & args)296 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
297   CHECK(args[0]->IsObject());
298 
299   Environment* env = Environment::GetCurrent(args);
300 
301   if (!args[1]->IsUint8Array()) {
302     node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
303     return 0;
304   }
305 
306   Local<Object> req_wrap_obj = args[0].As<Object>();
307   uv_buf_t buf;
308   buf.base = Buffer::Data(args[1]);
309   buf.len = Buffer::Length(args[1]);
310 
311   uv_stream_t* send_handle = nullptr;
312 
313   if (args[2]->IsObject() && IsIPCPipe()) {
314     Local<Object> send_handle_obj = args[2].As<Object>();
315 
316     HandleWrap* wrap;
317     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
318     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
319     // Reference LibuvStreamWrap instance to prevent it from being garbage
320     // collected before `AfterWrite` is called.
321     if (req_wrap_obj->Set(env->context(),
322                           env->handle_string(),
323                           send_handle_obj).IsNothing()) {
324       return -1;
325     }
326   }
327 
328   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
329   SetWriteResult(res);
330 
331   return res.err;
332 }
333 
334 
335 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)336 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337   Environment* env = Environment::GetCurrent(args);
338   Isolate* isolate = env->isolate();
339   CHECK(args[0]->IsObject());
340   CHECK(args[1]->IsString());
341 
342   Local<Object> req_wrap_obj = args[0].As<Object>();
343   Local<String> string = args[1].As<String>();
344   Local<Object> send_handle_obj;
345   if (args[2]->IsObject())
346     send_handle_obj = args[2].As<Object>();
347 
348   // Compute the size of the storage that the string will be flattened into.
349   // For UTF8 strings that are very long, go ahead and take the hit for
350   // computing their actual size, rather than tripling the storage.
351   size_t storage_size;
352   if ((enc == UTF8 &&
353          string->Length() > 65535 &&
354          !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355           !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356     return -1;
357   }
358 
359   if (storage_size > INT_MAX)
360     return UV_ENOBUFS;
361 
362   // Try writing immediately if write size isn't too big
363   char stack_storage[16384];  // 16kb
364   size_t data_size;
365   size_t synchronously_written = 0;
366   uv_buf_t buf;
367 
368   bool try_write = storage_size <= sizeof(stack_storage) &&
369                    (!IsIPCPipe() || send_handle_obj.IsEmpty());
370   if (try_write) {
371     data_size = StringBytes::Write(isolate,
372                                    stack_storage,
373                                    storage_size,
374                                    string,
375                                    enc);
376     buf = uv_buf_init(stack_storage, data_size);
377 
378     uv_buf_t* bufs = &buf;
379     size_t count = 1;
380     const int err = DoTryWrite(&bufs, &count);
381     // Keep track of the bytes written here, because we're taking a shortcut
382     // by using `DoTryWrite()` directly instead of using the utilities
383     // provided by `Write()`.
384     synchronously_written = count == 0 ? data_size : data_size - buf.len;
385     bytes_written_ += synchronously_written;
386 
387     // Immediate failure or success
388     if (err != 0 || count == 0) {
389       SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390       return err;
391     }
392 
393     // Partial write
394     CHECK_EQ(count, 1);
395   }
396 
397   std::unique_ptr<BackingStore> bs;
398 
399   if (try_write) {
400     // Copy partial data
401     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402     bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403     memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404     data_size = buf.len;
405   } else {
406     // Write it
407     NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408     bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409     data_size = StringBytes::Write(isolate,
410                                    static_cast<char*>(bs->Data()),
411                                    storage_size,
412                                    string,
413                                    enc);
414   }
415 
416   CHECK_LE(data_size, storage_size);
417 
418   buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419 
420   uv_stream_t* send_handle = nullptr;
421 
422   if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423     HandleWrap* wrap;
424     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426     // Reference LibuvStreamWrap instance to prevent it from being garbage
427     // collected before `AfterWrite` is called.
428     if (req_wrap_obj->Set(env->context(),
429                           env->handle_string(),
430                           send_handle_obj).IsNothing()) {
431       return -1;
432     }
433   }
434 
435   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436   res.bytes += synchronously_written;
437 
438   SetWriteResult(res);
439   if (res.wrap != nullptr)
440     res.wrap->SetBackingStore(std::move(bs));
441 
442   return res.err;
443 }
444 
445 
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)446 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
447                                                  Local<ArrayBuffer> ab,
448                                                  size_t offset,
449                                                  StreamBaseJSChecks checks) {
450   Environment* env = env_;
451 
452   DCHECK_EQ(static_cast<int32_t>(nread), nread);
453   DCHECK_LE(offset, INT32_MAX);
454 
455   if (checks == DONT_SKIP_NREAD_CHECKS) {
456     if (ab.IsEmpty()) {
457       DCHECK_EQ(offset, 0);
458       DCHECK_LE(nread, 0);
459     } else {
460       DCHECK_GE(nread, 0);
461     }
462   }
463 
464   env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
465   env->stream_base_state()[kArrayBufferOffset] = offset;
466 
467   Local<Value> argv[] = {
468     ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
469   };
470 
471   AsyncWrap* wrap = GetAsyncWrap();
472   CHECK_NOT_NULL(wrap);
473   Local<Value> onread = wrap->object()
474                             ->GetInternalField(StreamBase::kOnReadFunctionField)
475                             .As<Value>();
476   CHECK(onread->IsFunction());
477   return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
478 }
479 
480 
IsIPCPipe()481 bool StreamBase::IsIPCPipe() {
482   return false;
483 }
484 
485 
GetFD()486 int StreamBase::GetFD() {
487   return -1;
488 }
489 
490 
GetObject()491 Local<Object> StreamBase::GetObject() {
492   return GetAsyncWrap()->object();
493 }
494 
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)495 void StreamBase::AddMethod(Environment* env,
496                            Local<Signature> signature,
497                            enum PropertyAttribute attributes,
498                            Local<FunctionTemplate> t,
499                            JSMethodFunction* stream_method,
500                            Local<String> string) {
501   Isolate* isolate = env->isolate();
502   Local<FunctionTemplate> templ =
503       NewFunctionTemplate(isolate,
504                           stream_method,
505                           signature,
506                           ConstructorBehavior::kThrow,
507                           SideEffectType::kHasNoSideEffect);
508   t->PrototypeTemplate()->SetAccessorProperty(
509       string, templ, Local<FunctionTemplate>(), attributes);
510 }
511 
AddMethods(Environment * env,Local<FunctionTemplate> t)512 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
513   Isolate* isolate = env->isolate();
514   HandleScope scope(isolate);
515 
516   enum PropertyAttribute attributes =
517       static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
518   Local<Signature> sig = Signature::New(isolate, t);
519 
520   AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
521   AddMethod(
522       env, sig, attributes, t, GetExternal, env->external_stream_string());
523   AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
524   AddMethod(
525       env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
526   SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
527   SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
528   SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
529   SetProtoMethod(
530       isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
531   SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
532   SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
533   SetProtoMethod(isolate,
534                  t,
535                  "writeAsciiString",
536                  JSMethod<&StreamBase::WriteString<ASCII>>);
537   SetProtoMethod(
538       isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
539   SetProtoMethod(
540       isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
541   SetProtoMethod(isolate,
542                  t,
543                  "writeLatin1String",
544                  JSMethod<&StreamBase::WriteString<LATIN1>>);
545   t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
546                               True(isolate));
547   t->PrototypeTemplate()->SetAccessor(
548       FIXED_ONE_BYTE_STRING(isolate, "onread"),
549       BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
550       BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
551                                    &Value::IsFunction>);
552 }
553 
RegisterExternalReferences(ExternalReferenceRegistry * registry)554 void StreamBase::RegisterExternalReferences(
555     ExternalReferenceRegistry* registry) {
556   // This function is called by a single thread during start up, so it is safe
557   // to use a local static variable here.
558   static bool is_registered = false;
559   if (is_registered) return;
560   registry->Register(GetFD);
561   registry->Register(GetExternal);
562   registry->Register(GetBytesRead);
563   registry->Register(GetBytesWritten);
564   registry->Register(JSMethod<&StreamBase::ReadStartJS>);
565   registry->Register(JSMethod<&StreamBase::ReadStopJS>);
566   registry->Register(JSMethod<&StreamBase::Shutdown>);
567   registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
568   registry->Register(JSMethod<&StreamBase::Writev>);
569   registry->Register(JSMethod<&StreamBase::WriteBuffer>);
570   registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
571   registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
572   registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
573   registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
574   registry->Register(
575       BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
576   registry->Register(
577       BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
578                                    &Value::IsFunction>);
579   is_registered = true;
580 }
581 
GetFD(const FunctionCallbackInfo<Value> & args)582 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
583   // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
584   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
585   if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
586 
587   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
588 
589   args.GetReturnValue().Set(wrap->GetFD());
590 }
591 
GetBytesRead(const FunctionCallbackInfo<Value> & args)592 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
593   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
594   if (wrap == nullptr) return args.GetReturnValue().Set(0);
595 
596   // uint64_t -> double. 53bits is enough for all real cases.
597   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
598 }
599 
GetBytesWritten(const FunctionCallbackInfo<Value> & args)600 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
601   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
602   if (wrap == nullptr) return args.GetReturnValue().Set(0);
603 
604   // uint64_t -> double. 53bits is enough for all real cases.
605   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
606 }
607 
GetExternal(const FunctionCallbackInfo<Value> & args)608 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
609   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
610   if (wrap == nullptr) return;
611 
612   Local<External> ext = External::New(args.GetIsolate(), wrap);
613   args.GetReturnValue().Set(ext);
614 }
615 
616 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)617 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
618   StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
619   if (wrap == nullptr) return;
620 
621   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
622 
623   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
624   args.GetReturnValue().Set((wrap->*Method)(args));
625 }
626 
DoTryWrite(uv_buf_t ** bufs,size_t * count)627 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
628   // No TryWrite by default
629   return 0;
630 }
631 
632 
Error() const633 const char* StreamResource::Error() const {
634   return nullptr;
635 }
636 
637 
ClearError()638 void StreamResource::ClearError() {
639   // No-op
640 }
641 
642 
OnStreamAlloc(size_t suggested_size)643 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
644   CHECK_NOT_NULL(stream_);
645   Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
646   return env->allocate_managed_buffer(suggested_size);
647 }
648 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)649 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
650   CHECK_NOT_NULL(stream_);
651   StreamBase* stream = static_cast<StreamBase*>(stream_);
652   Environment* env = stream->stream_env();
653   Isolate* isolate = env->isolate();
654   HandleScope handle_scope(isolate);
655   Context::Scope context_scope(env->context());
656   std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
657 
658   if (nread <= 0)  {
659     if (nread < 0)
660       stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
661     return;
662   }
663 
664   CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
665   bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
666 
667   stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
668 }
669 
670 
OnStreamAlloc(size_t suggested_size)671 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
672   return buffer_;
673 }
674 
675 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)676 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
677   CHECK_NOT_NULL(stream_);
678 
679   StreamBase* stream = static_cast<StreamBase*>(stream_);
680   Environment* env = stream->stream_env();
681   HandleScope handle_scope(env->isolate());
682   Context::Scope context_scope(env->context());
683 
684   // In the case that there's an error and buf is null, return immediately.
685   // This can happen on unices when POLLHUP is received and UV_EOF is returned
686   // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
687   if (buf.base == nullptr && nread < 0) {
688     stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
689     return;
690   }
691 
692   CHECK_EQ(buf.base, buffer_.base);
693 
694   MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
695                              Local<ArrayBuffer>(),
696                              0,
697                              StreamBase::SKIP_NREAD_CHECKS);
698   Local<Value> next_buf_v;
699   if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
700     buffer_.base = Buffer::Data(next_buf_v);
701     buffer_.len = Buffer::Length(next_buf_v);
702   }
703 }
704 
705 
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)706 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
707     StreamReq* req_wrap, int status) {
708   StreamBase* stream = static_cast<StreamBase*>(stream_);
709   Environment* env = stream->stream_env();
710   if (!env->can_call_into_js()) return;
711   AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
712   HandleScope handle_scope(env->isolate());
713   Context::Scope context_scope(env->context());
714   CHECK(!async_wrap->persistent().IsEmpty());
715   Local<Object> req_wrap_obj = async_wrap->object();
716 
717   Local<Value> argv[] = {
718     Integer::New(env->isolate(), status),
719     stream->GetObject(),
720     Undefined(env->isolate())
721   };
722 
723   const char* msg = stream->Error();
724   if (msg != nullptr) {
725     argv[2] = OneByteString(env->isolate(), msg);
726     stream->ClearError();
727   }
728 
729   if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
730     async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
731 }
732 
OnStreamAfterWrite(WriteWrap * req_wrap,int status)733 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
734     WriteWrap* req_wrap, int status) {
735   OnStreamAfterReqFinished(req_wrap, status);
736 }
737 
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)738 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
739     ShutdownWrap* req_wrap, int status) {
740   OnStreamAfterReqFinished(req_wrap, status);
741 }
742 
OnDone(int status)743 void ShutdownWrap::OnDone(int status) {
744   stream()->EmitAfterShutdown(this, status);
745   Dispose();
746 }
747 
OnDone(int status)748 void WriteWrap::OnDone(int status) {
749   stream()->EmitAfterWrite(this, status);
750   Dispose();
751 }
752 
~StreamListener()753 StreamListener::~StreamListener() {
754   if (stream_ != nullptr)
755     stream_->RemoveStreamListener(this);
756 }
757 
OnStreamAfterShutdown(ShutdownWrap * w,int status)758 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
759   CHECK_NOT_NULL(previous_listener_);
760   previous_listener_->OnStreamAfterShutdown(w, status);
761 }
762 
OnStreamAfterWrite(WriteWrap * w,int status)763 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
764   CHECK_NOT_NULL(previous_listener_);
765   previous_listener_->OnStreamAfterWrite(w, status);
766 }
767 
~StreamResource()768 StreamResource::~StreamResource() {
769   while (listener_ != nullptr) {
770     StreamListener* listener = listener_;
771     listener->OnStreamDestroy();
772     // Remove the listener if it didn’t remove itself. This makes the logic
773     // in `OnStreamDestroy()` implementations easier, because they
774     // may call generic cleanup functions which can just remove the
775     // listener unconditionally.
776     if (listener == listener_)
777       RemoveStreamListener(listener_);
778   }
779 }
780 
RemoveStreamListener(StreamListener * listener)781 void StreamResource::RemoveStreamListener(StreamListener* listener) {
782   CHECK_NOT_NULL(listener);
783 
784   StreamListener* previous;
785   StreamListener* current;
786 
787   // Remove from the linked list.
788   // No loop condition because we want a crash if listener is not found.
789   for (current = listener_, previous = nullptr;;
790        previous = current, current = current->previous_listener_) {
791     CHECK_NOT_NULL(current);
792     if (current == listener) {
793       if (previous != nullptr)
794         previous->previous_listener_ = current->previous_listener_;
795       else
796         listener_ = listener->previous_listener_;
797       break;
798     }
799   }
800 
801   listener->stream_ = nullptr;
802   listener->previous_listener_ = nullptr;
803 }
804 
CreateShutdownWrap(Local<Object> object)805 ShutdownWrap* StreamBase::CreateShutdownWrap(
806     Local<Object> object) {
807   auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
808   wrap->MakeWeak();
809   return wrap;
810 }
811 
CreateWriteWrap(Local<Object> object)812 WriteWrap* StreamBase::CreateWriteWrap(
813     Local<Object> object) {
814   auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
815   wrap->MakeWeak();
816   return wrap;
817 }
818 
Done(int status,const char * error_str)819 void StreamReq::Done(int status, const char* error_str) {
820   AsyncWrap* async_wrap = GetAsyncWrap();
821   Environment* env = async_wrap->env();
822   if (error_str != nullptr) {
823     v8::HandleScope handle_scope(env->isolate());
824     if (async_wrap->object()
825             ->Set(env->context(),
826                   env->error_string(),
827                   OneByteString(env->isolate(), error_str))
828             .IsNothing()) {
829       return;
830     }
831   }
832 
833   OnDone(status);
834 }
835 
836 }  // namespace node
837