1 #include "stream_base.h" // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4
5 #include "env-inl.h"
6 #include "js_stream.h"
7 #include "node.h"
8 #include "node_buffer.h"
9 #include "node_errors.h"
10 #include "node_external_reference.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14
15 #include <climits> // INT_MAX
16
17 namespace node {
18
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::BackingStore;
22 using v8::ConstructorBehavior;
23 using v8::Context;
24 using v8::DontDelete;
25 using v8::DontEnum;
26 using v8::External;
27 using v8::Function;
28 using v8::FunctionCallbackInfo;
29 using v8::FunctionTemplate;
30 using v8::HandleScope;
31 using v8::Integer;
32 using v8::Isolate;
33 using v8::Local;
34 using v8::MaybeLocal;
35 using v8::Object;
36 using v8::PropertyAttribute;
37 using v8::ReadOnly;
38 using v8::SideEffectType;
39 using v8::Signature;
40 using v8::String;
41 using v8::Value;
42
Shutdown(v8::Local<v8::Object> req_wrap_obj)43 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
44 Environment* env = stream_env();
45
46 v8::HandleScope handle_scope(env->isolate());
47
48 if (req_wrap_obj.IsEmpty()) {
49 if (!env->shutdown_wrap_template()
50 ->NewInstance(env->context())
51 .ToLocal(&req_wrap_obj)) {
52 return UV_EBUSY;
53 }
54 StreamReq::ResetObject(req_wrap_obj);
55 }
56
57 BaseObjectPtr<AsyncWrap> req_wrap_ptr;
58 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
59 ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
60 if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
61 int err = DoShutdown(req_wrap);
62
63 if (err != 0 && req_wrap != nullptr) {
64 req_wrap->Dispose();
65 }
66
67 const char* msg = Error();
68 if (msg != nullptr) {
69 if (req_wrap_obj
70 ->Set(env->context(),
71 env->error_string(),
72 OneByteString(env->isolate(), msg))
73 .IsNothing()) {
74 return UV_EBUSY;
75 }
76 ClearError();
77 }
78
79 return err;
80 }
81
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj,bool skip_try_write)82 StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
83 size_t count,
84 uv_stream_t* send_handle,
85 v8::Local<v8::Object> req_wrap_obj,
86 bool skip_try_write) {
87 Environment* env = stream_env();
88 int err;
89
90 size_t total_bytes = 0;
91 for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
92 bytes_written_ += total_bytes;
93
94 if (send_handle == nullptr && !skip_try_write) {
95 err = DoTryWrite(&bufs, &count);
96 if (err != 0 || count == 0) {
97 return StreamWriteResult{false, err, nullptr, total_bytes, {}};
98 }
99 }
100
101 v8::HandleScope handle_scope(env->isolate());
102
103 if (req_wrap_obj.IsEmpty()) {
104 if (!env->write_wrap_template()
105 ->NewInstance(env->context())
106 .ToLocal(&req_wrap_obj)) {
107 return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
108 }
109 StreamReq::ResetObject(req_wrap_obj);
110 }
111
112 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
113 WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
114 BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
115
116 err = DoWrite(req_wrap, bufs, count, send_handle);
117 bool async = err == 0;
118
119 if (!async) {
120 req_wrap->Dispose();
121 req_wrap = nullptr;
122 }
123
124 const char* msg = Error();
125 if (msg != nullptr) {
126 if (req_wrap_obj
127 ->Set(env->context(),
128 env->error_string(),
129 OneByteString(env->isolate(), msg))
130 .IsNothing()) {
131 return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
132 }
133 ClearError();
134 }
135
136 return StreamWriteResult{
137 async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
138 }
139
140 template int StreamBase::WriteString<ASCII>(
141 const FunctionCallbackInfo<Value>& args);
142 template int StreamBase::WriteString<UTF8>(
143 const FunctionCallbackInfo<Value>& args);
144 template int StreamBase::WriteString<UCS2>(
145 const FunctionCallbackInfo<Value>& args);
146 template int StreamBase::WriteString<LATIN1>(
147 const FunctionCallbackInfo<Value>& args);
148
149
ReadStartJS(const FunctionCallbackInfo<Value> & args)150 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
151 return ReadStart();
152 }
153
154
ReadStopJS(const FunctionCallbackInfo<Value> & args)155 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
156 return ReadStop();
157 }
158
UseUserBuffer(const FunctionCallbackInfo<Value> & args)159 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
160 CHECK(Buffer::HasInstance(args[0]));
161
162 uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
163 PushStreamListener(new CustomBufferJSListener(buf));
164 return 0;
165 }
166
Shutdown(const FunctionCallbackInfo<Value> & args)167 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
168 CHECK(args[0]->IsObject());
169 Local<Object> req_wrap_obj = args[0].As<Object>();
170
171 return Shutdown(req_wrap_obj);
172 }
173
SetWriteResult(const StreamWriteResult & res)174 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
175 env_->stream_base_state()[kBytesWritten] = res.bytes;
176 env_->stream_base_state()[kLastWriteWasAsync] = res.async;
177 }
178
Writev(const FunctionCallbackInfo<Value> & args)179 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
180 Environment* env = Environment::GetCurrent(args);
181 Isolate* isolate = env->isolate();
182 Local<Context> context = env->context();
183
184 CHECK(args[0]->IsObject());
185 CHECK(args[1]->IsArray());
186
187 Local<Object> req_wrap_obj = args[0].As<Object>();
188 Local<Array> chunks = args[1].As<Array>();
189 bool all_buffers = args[2]->IsTrue();
190
191 size_t count;
192 if (all_buffers)
193 count = chunks->Length();
194 else
195 count = chunks->Length() >> 1;
196
197 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
198
199 size_t storage_size = 0;
200 size_t offset;
201
202 if (!all_buffers) {
203 // Determine storage size first
204 for (size_t i = 0; i < count; i++) {
205 Local<Value> chunk;
206 if (!chunks->Get(context, i * 2).ToLocal(&chunk))
207 return -1;
208
209 if (Buffer::HasInstance(chunk))
210 continue;
211 // Buffer chunk, no additional storage required
212
213 // String chunk
214 Local<String> string;
215 if (!chunk->ToString(context).ToLocal(&string))
216 return -1;
217 Local<Value> next_chunk;
218 if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
219 return -1;
220 enum encoding encoding = ParseEncoding(isolate, next_chunk);
221 size_t chunk_size;
222 if ((encoding == UTF8 &&
223 string->Length() > 65535 &&
224 !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
225 !StringBytes::StorageSize(isolate, string, encoding)
226 .To(&chunk_size)) {
227 return -1;
228 }
229 storage_size += chunk_size;
230 }
231
232 if (storage_size > INT_MAX)
233 return UV_ENOBUFS;
234 } else {
235 for (size_t i = 0; i < count; i++) {
236 Local<Value> chunk;
237 if (!chunks->Get(context, i).ToLocal(&chunk))
238 return -1;
239 bufs[i].base = Buffer::Data(chunk);
240 bufs[i].len = Buffer::Length(chunk);
241 }
242 }
243
244 std::unique_ptr<BackingStore> bs;
245 if (storage_size > 0) {
246 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
247 bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
248 }
249
250 offset = 0;
251 if (!all_buffers) {
252 for (size_t i = 0; i < count; i++) {
253 Local<Value> chunk;
254 if (!chunks->Get(context, i * 2).ToLocal(&chunk))
255 return -1;
256
257 // Write buffer
258 if (Buffer::HasInstance(chunk)) {
259 bufs[i].base = Buffer::Data(chunk);
260 bufs[i].len = Buffer::Length(chunk);
261 continue;
262 }
263
264 // Write string
265 CHECK_LE(offset, storage_size);
266 char* str_storage =
267 static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
268 size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
269
270 Local<String> string;
271 if (!chunk->ToString(context).ToLocal(&string))
272 return -1;
273 Local<Value> next_chunk;
274 if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
275 return -1;
276 enum encoding encoding = ParseEncoding(isolate, next_chunk);
277 str_size = StringBytes::Write(isolate,
278 str_storage,
279 str_size,
280 string,
281 encoding);
282 bufs[i].base = str_storage;
283 bufs[i].len = str_size;
284 offset += str_size;
285 }
286 }
287
288 StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
289 SetWriteResult(res);
290 if (res.wrap != nullptr && storage_size > 0)
291 res.wrap->SetBackingStore(std::move(bs));
292 return res.err;
293 }
294
295
WriteBuffer(const FunctionCallbackInfo<Value> & args)296 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
297 CHECK(args[0]->IsObject());
298
299 Environment* env = Environment::GetCurrent(args);
300
301 if (!args[1]->IsUint8Array()) {
302 node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
303 return 0;
304 }
305
306 Local<Object> req_wrap_obj = args[0].As<Object>();
307 uv_buf_t buf;
308 buf.base = Buffer::Data(args[1]);
309 buf.len = Buffer::Length(args[1]);
310
311 uv_stream_t* send_handle = nullptr;
312
313 if (args[2]->IsObject() && IsIPCPipe()) {
314 Local<Object> send_handle_obj = args[2].As<Object>();
315
316 HandleWrap* wrap;
317 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
318 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
319 // Reference LibuvStreamWrap instance to prevent it from being garbage
320 // collected before `AfterWrite` is called.
321 if (req_wrap_obj->Set(env->context(),
322 env->handle_string(),
323 send_handle_obj).IsNothing()) {
324 return -1;
325 }
326 }
327
328 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
329 SetWriteResult(res);
330
331 return res.err;
332 }
333
334
335 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)336 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337 Environment* env = Environment::GetCurrent(args);
338 Isolate* isolate = env->isolate();
339 CHECK(args[0]->IsObject());
340 CHECK(args[1]->IsString());
341
342 Local<Object> req_wrap_obj = args[0].As<Object>();
343 Local<String> string = args[1].As<String>();
344 Local<Object> send_handle_obj;
345 if (args[2]->IsObject())
346 send_handle_obj = args[2].As<Object>();
347
348 // Compute the size of the storage that the string will be flattened into.
349 // For UTF8 strings that are very long, go ahead and take the hit for
350 // computing their actual size, rather than tripling the storage.
351 size_t storage_size;
352 if ((enc == UTF8 &&
353 string->Length() > 65535 &&
354 !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355 !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356 return -1;
357 }
358
359 if (storage_size > INT_MAX)
360 return UV_ENOBUFS;
361
362 // Try writing immediately if write size isn't too big
363 char stack_storage[16384]; // 16kb
364 size_t data_size;
365 size_t synchronously_written = 0;
366 uv_buf_t buf;
367
368 bool try_write = storage_size <= sizeof(stack_storage) &&
369 (!IsIPCPipe() || send_handle_obj.IsEmpty());
370 if (try_write) {
371 data_size = StringBytes::Write(isolate,
372 stack_storage,
373 storage_size,
374 string,
375 enc);
376 buf = uv_buf_init(stack_storage, data_size);
377
378 uv_buf_t* bufs = &buf;
379 size_t count = 1;
380 const int err = DoTryWrite(&bufs, &count);
381 // Keep track of the bytes written here, because we're taking a shortcut
382 // by using `DoTryWrite()` directly instead of using the utilities
383 // provided by `Write()`.
384 synchronously_written = count == 0 ? data_size : data_size - buf.len;
385 bytes_written_ += synchronously_written;
386
387 // Immediate failure or success
388 if (err != 0 || count == 0) {
389 SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390 return err;
391 }
392
393 // Partial write
394 CHECK_EQ(count, 1);
395 }
396
397 std::unique_ptr<BackingStore> bs;
398
399 if (try_write) {
400 // Copy partial data
401 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402 bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403 memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404 data_size = buf.len;
405 } else {
406 // Write it
407 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408 bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409 data_size = StringBytes::Write(isolate,
410 static_cast<char*>(bs->Data()),
411 storage_size,
412 string,
413 enc);
414 }
415
416 CHECK_LE(data_size, storage_size);
417
418 buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419
420 uv_stream_t* send_handle = nullptr;
421
422 if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423 HandleWrap* wrap;
424 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426 // Reference LibuvStreamWrap instance to prevent it from being garbage
427 // collected before `AfterWrite` is called.
428 if (req_wrap_obj->Set(env->context(),
429 env->handle_string(),
430 send_handle_obj).IsNothing()) {
431 return -1;
432 }
433 }
434
435 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436 res.bytes += synchronously_written;
437
438 SetWriteResult(res);
439 if (res.wrap != nullptr)
440 res.wrap->SetBackingStore(std::move(bs));
441
442 return res.err;
443 }
444
445
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)446 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
447 Local<ArrayBuffer> ab,
448 size_t offset,
449 StreamBaseJSChecks checks) {
450 Environment* env = env_;
451
452 DCHECK_EQ(static_cast<int32_t>(nread), nread);
453 DCHECK_LE(offset, INT32_MAX);
454
455 if (checks == DONT_SKIP_NREAD_CHECKS) {
456 if (ab.IsEmpty()) {
457 DCHECK_EQ(offset, 0);
458 DCHECK_LE(nread, 0);
459 } else {
460 DCHECK_GE(nread, 0);
461 }
462 }
463
464 env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
465 env->stream_base_state()[kArrayBufferOffset] = offset;
466
467 Local<Value> argv[] = {
468 ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
469 };
470
471 AsyncWrap* wrap = GetAsyncWrap();
472 CHECK_NOT_NULL(wrap);
473 Local<Value> onread = wrap->object()->GetInternalField(
474 StreamBase::kOnReadFunctionField);
475 CHECK(onread->IsFunction());
476 return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
477 }
478
479
IsIPCPipe()480 bool StreamBase::IsIPCPipe() {
481 return false;
482 }
483
484
GetFD()485 int StreamBase::GetFD() {
486 return -1;
487 }
488
489
GetObject()490 Local<Object> StreamBase::GetObject() {
491 return GetAsyncWrap()->object();
492 }
493
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)494 void StreamBase::AddMethod(Environment* env,
495 Local<Signature> signature,
496 enum PropertyAttribute attributes,
497 Local<FunctionTemplate> t,
498 JSMethodFunction* stream_method,
499 Local<String> string) {
500 Isolate* isolate = env->isolate();
501 Local<FunctionTemplate> templ =
502 NewFunctionTemplate(isolate,
503 stream_method,
504 signature,
505 ConstructorBehavior::kThrow,
506 SideEffectType::kHasNoSideEffect);
507 t->PrototypeTemplate()->SetAccessorProperty(
508 string, templ, Local<FunctionTemplate>(), attributes);
509 }
510
AddMethods(Environment * env,Local<FunctionTemplate> t)511 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
512 Isolate* isolate = env->isolate();
513 HandleScope scope(isolate);
514
515 enum PropertyAttribute attributes =
516 static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
517 Local<Signature> sig = Signature::New(isolate, t);
518
519 AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
520 AddMethod(
521 env, sig, attributes, t, GetExternal, env->external_stream_string());
522 AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
523 AddMethod(
524 env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
525 SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
526 SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
527 SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
528 SetProtoMethod(
529 isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
530 SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
531 SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
532 SetProtoMethod(isolate,
533 t,
534 "writeAsciiString",
535 JSMethod<&StreamBase::WriteString<ASCII>>);
536 SetProtoMethod(
537 isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
538 SetProtoMethod(
539 isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
540 SetProtoMethod(isolate,
541 t,
542 "writeLatin1String",
543 JSMethod<&StreamBase::WriteString<LATIN1>>);
544 t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
545 True(isolate));
546 t->PrototypeTemplate()->SetAccessor(
547 FIXED_ONE_BYTE_STRING(isolate, "onread"),
548 BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
549 BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
550 &Value::IsFunction>);
551 }
552
RegisterExternalReferences(ExternalReferenceRegistry * registry)553 void StreamBase::RegisterExternalReferences(
554 ExternalReferenceRegistry* registry) {
555 // This function is called by a single thread during start up, so it is safe
556 // to use a local static variable here.
557 static bool is_registered = false;
558 if (is_registered) return;
559 registry->Register(GetFD);
560 registry->Register(GetExternal);
561 registry->Register(GetBytesRead);
562 registry->Register(GetBytesWritten);
563 registry->Register(JSMethod<&StreamBase::ReadStartJS>);
564 registry->Register(JSMethod<&StreamBase::ReadStopJS>);
565 registry->Register(JSMethod<&StreamBase::Shutdown>);
566 registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
567 registry->Register(JSMethod<&StreamBase::Writev>);
568 registry->Register(JSMethod<&StreamBase::WriteBuffer>);
569 registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
570 registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
571 registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
572 registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
573 registry->Register(
574 BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
575 registry->Register(
576 BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
577 &Value::IsFunction>);
578 is_registered = true;
579 }
580
GetFD(const FunctionCallbackInfo<Value> & args)581 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
582 // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
583 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
584 if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
585
586 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
587
588 args.GetReturnValue().Set(wrap->GetFD());
589 }
590
GetBytesRead(const FunctionCallbackInfo<Value> & args)591 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
592 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
593 if (wrap == nullptr) return args.GetReturnValue().Set(0);
594
595 // uint64_t -> double. 53bits is enough for all real cases.
596 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
597 }
598
GetBytesWritten(const FunctionCallbackInfo<Value> & args)599 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
600 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
601 if (wrap == nullptr) return args.GetReturnValue().Set(0);
602
603 // uint64_t -> double. 53bits is enough for all real cases.
604 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
605 }
606
GetExternal(const FunctionCallbackInfo<Value> & args)607 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
608 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
609 if (wrap == nullptr) return;
610
611 Local<External> ext = External::New(args.GetIsolate(), wrap);
612 args.GetReturnValue().Set(ext);
613 }
614
615 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)616 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
617 StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
618 if (wrap == nullptr) return;
619
620 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
621
622 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
623 args.GetReturnValue().Set((wrap->*Method)(args));
624 }
625
DoTryWrite(uv_buf_t ** bufs,size_t * count)626 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
627 // No TryWrite by default
628 return 0;
629 }
630
631
Error() const632 const char* StreamResource::Error() const {
633 return nullptr;
634 }
635
636
ClearError()637 void StreamResource::ClearError() {
638 // No-op
639 }
640
641
OnStreamAlloc(size_t suggested_size)642 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
643 CHECK_NOT_NULL(stream_);
644 Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
645 return env->allocate_managed_buffer(suggested_size);
646 }
647
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)648 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
649 CHECK_NOT_NULL(stream_);
650 StreamBase* stream = static_cast<StreamBase*>(stream_);
651 Environment* env = stream->stream_env();
652 Isolate* isolate = env->isolate();
653 HandleScope handle_scope(isolate);
654 Context::Scope context_scope(env->context());
655 std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
656
657 if (nread <= 0) {
658 if (nread < 0)
659 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
660 return;
661 }
662
663 CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
664 bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
665
666 stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
667 }
668
669
OnStreamAlloc(size_t suggested_size)670 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
671 return buffer_;
672 }
673
674
OnStreamRead(ssize_t nread,const uv_buf_t & buf)675 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
676 CHECK_NOT_NULL(stream_);
677
678 StreamBase* stream = static_cast<StreamBase*>(stream_);
679 Environment* env = stream->stream_env();
680 HandleScope handle_scope(env->isolate());
681 Context::Scope context_scope(env->context());
682
683 // In the case that there's an error and buf is null, return immediately.
684 // This can happen on unices when POLLHUP is received and UV_EOF is returned
685 // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
686 if (buf.base == nullptr && nread < 0) {
687 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
688 return;
689 }
690
691 CHECK_EQ(buf.base, buffer_.base);
692
693 MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
694 Local<ArrayBuffer>(),
695 0,
696 StreamBase::SKIP_NREAD_CHECKS);
697 Local<Value> next_buf_v;
698 if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
699 buffer_.base = Buffer::Data(next_buf_v);
700 buffer_.len = Buffer::Length(next_buf_v);
701 }
702 }
703
704
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)705 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
706 StreamReq* req_wrap, int status) {
707 StreamBase* stream = static_cast<StreamBase*>(stream_);
708 Environment* env = stream->stream_env();
709 if (!env->can_call_into_js()) return;
710 AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
711 HandleScope handle_scope(env->isolate());
712 Context::Scope context_scope(env->context());
713 CHECK(!async_wrap->persistent().IsEmpty());
714 Local<Object> req_wrap_obj = async_wrap->object();
715
716 Local<Value> argv[] = {
717 Integer::New(env->isolate(), status),
718 stream->GetObject(),
719 Undefined(env->isolate())
720 };
721
722 const char* msg = stream->Error();
723 if (msg != nullptr) {
724 argv[2] = OneByteString(env->isolate(), msg);
725 stream->ClearError();
726 }
727
728 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
729 async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
730 }
731
OnStreamAfterWrite(WriteWrap * req_wrap,int status)732 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
733 WriteWrap* req_wrap, int status) {
734 OnStreamAfterReqFinished(req_wrap, status);
735 }
736
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)737 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
738 ShutdownWrap* req_wrap, int status) {
739 OnStreamAfterReqFinished(req_wrap, status);
740 }
741
OnDone(int status)742 void ShutdownWrap::OnDone(int status) {
743 stream()->EmitAfterShutdown(this, status);
744 Dispose();
745 }
746
OnDone(int status)747 void WriteWrap::OnDone(int status) {
748 stream()->EmitAfterWrite(this, status);
749 Dispose();
750 }
751
~StreamListener()752 StreamListener::~StreamListener() {
753 if (stream_ != nullptr)
754 stream_->RemoveStreamListener(this);
755 }
756
OnStreamAfterShutdown(ShutdownWrap * w,int status)757 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
758 CHECK_NOT_NULL(previous_listener_);
759 previous_listener_->OnStreamAfterShutdown(w, status);
760 }
761
OnStreamAfterWrite(WriteWrap * w,int status)762 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
763 CHECK_NOT_NULL(previous_listener_);
764 previous_listener_->OnStreamAfterWrite(w, status);
765 }
766
~StreamResource()767 StreamResource::~StreamResource() {
768 while (listener_ != nullptr) {
769 StreamListener* listener = listener_;
770 listener->OnStreamDestroy();
771 // Remove the listener if it didn’t remove itself. This makes the logic
772 // in `OnStreamDestroy()` implementations easier, because they
773 // may call generic cleanup functions which can just remove the
774 // listener unconditionally.
775 if (listener == listener_)
776 RemoveStreamListener(listener_);
777 }
778 }
779
RemoveStreamListener(StreamListener * listener)780 void StreamResource::RemoveStreamListener(StreamListener* listener) {
781 CHECK_NOT_NULL(listener);
782
783 StreamListener* previous;
784 StreamListener* current;
785
786 // Remove from the linked list.
787 // No loop condition because we want a crash if listener is not found.
788 for (current = listener_, previous = nullptr;;
789 previous = current, current = current->previous_listener_) {
790 CHECK_NOT_NULL(current);
791 if (current == listener) {
792 if (previous != nullptr)
793 previous->previous_listener_ = current->previous_listener_;
794 else
795 listener_ = listener->previous_listener_;
796 break;
797 }
798 }
799
800 listener->stream_ = nullptr;
801 listener->previous_listener_ = nullptr;
802 }
803
CreateShutdownWrap(Local<Object> object)804 ShutdownWrap* StreamBase::CreateShutdownWrap(
805 Local<Object> object) {
806 auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
807 wrap->MakeWeak();
808 return wrap;
809 }
810
CreateWriteWrap(Local<Object> object)811 WriteWrap* StreamBase::CreateWriteWrap(
812 Local<Object> object) {
813 auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
814 wrap->MakeWeak();
815 return wrap;
816 }
817
Done(int status,const char * error_str)818 void StreamReq::Done(int status, const char* error_str) {
819 AsyncWrap* async_wrap = GetAsyncWrap();
820 Environment* env = async_wrap->env();
821 if (error_str != nullptr) {
822 v8::HandleScope handle_scope(env->isolate());
823 if (async_wrap->object()
824 ->Set(env->context(),
825 env->error_string(),
826 OneByteString(env->isolate(), error_str))
827 .IsNothing()) {
828 return;
829 }
830 }
831
832 OnDone(status);
833 }
834
835 } // namespace node
836