1 #include "stream_base.h" // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4
5 #include "env-inl.h"
6 #include "js_stream.h"
7 #include "node.h"
8 #include "node_buffer.h"
9 #include "node_errors.h"
10 #include "node_external_reference.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14
15 #include <climits> // INT_MAX
16
17 namespace node {
18
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::BackingStore;
22 using v8::ConstructorBehavior;
23 using v8::Context;
24 using v8::DontDelete;
25 using v8::DontEnum;
26 using v8::External;
27 using v8::Function;
28 using v8::FunctionCallbackInfo;
29 using v8::FunctionTemplate;
30 using v8::HandleScope;
31 using v8::Integer;
32 using v8::Isolate;
33 using v8::Local;
34 using v8::MaybeLocal;
35 using v8::Object;
36 using v8::PropertyAttribute;
37 using v8::ReadOnly;
38 using v8::SideEffectType;
39 using v8::Signature;
40 using v8::String;
41 using v8::Value;
42
Shutdown(v8::Local<v8::Object> req_wrap_obj)43 int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
44 Environment* env = stream_env();
45
46 v8::HandleScope handle_scope(env->isolate());
47
48 if (req_wrap_obj.IsEmpty()) {
49 if (!env->shutdown_wrap_template()
50 ->NewInstance(env->context())
51 .ToLocal(&req_wrap_obj)) {
52 return UV_EBUSY;
53 }
54 StreamReq::ResetObject(req_wrap_obj);
55 }
56
57 BaseObjectPtr<AsyncWrap> req_wrap_ptr;
58 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
59 ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
60 if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
61 int err = DoShutdown(req_wrap);
62
63 if (err != 0 && req_wrap != nullptr) {
64 req_wrap->Dispose();
65 }
66
67 const char* msg = Error();
68 if (msg != nullptr) {
69 if (req_wrap_obj
70 ->Set(env->context(),
71 env->error_string(),
72 OneByteString(env->isolate(), msg))
73 .IsNothing()) {
74 return UV_EBUSY;
75 }
76 ClearError();
77 }
78
79 return err;
80 }
81
Write(uv_buf_t * bufs,size_t count,uv_stream_t * send_handle,v8::Local<v8::Object> req_wrap_obj,bool skip_try_write)82 StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
83 size_t count,
84 uv_stream_t* send_handle,
85 v8::Local<v8::Object> req_wrap_obj,
86 bool skip_try_write) {
87 Environment* env = stream_env();
88 int err;
89
90 size_t total_bytes = 0;
91 for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
92 bytes_written_ += total_bytes;
93
94 if (send_handle == nullptr && !skip_try_write) {
95 err = DoTryWrite(&bufs, &count);
96 if (err != 0 || count == 0) {
97 return StreamWriteResult{false, err, nullptr, total_bytes, {}};
98 }
99 }
100
101 v8::HandleScope handle_scope(env->isolate());
102
103 if (req_wrap_obj.IsEmpty()) {
104 if (!env->write_wrap_template()
105 ->NewInstance(env->context())
106 .ToLocal(&req_wrap_obj)) {
107 return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
108 }
109 StreamReq::ResetObject(req_wrap_obj);
110 }
111
112 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
113 WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
114 BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
115
116 err = DoWrite(req_wrap, bufs, count, send_handle);
117 bool async = err == 0;
118
119 if (!async) {
120 req_wrap->Dispose();
121 req_wrap = nullptr;
122 }
123
124 const char* msg = Error();
125 if (msg != nullptr) {
126 if (req_wrap_obj
127 ->Set(env->context(),
128 env->error_string(),
129 OneByteString(env->isolate(), msg))
130 .IsNothing()) {
131 return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
132 }
133 ClearError();
134 }
135
136 return StreamWriteResult{
137 async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
138 }
139
140 template int StreamBase::WriteString<ASCII>(
141 const FunctionCallbackInfo<Value>& args);
142 template int StreamBase::WriteString<UTF8>(
143 const FunctionCallbackInfo<Value>& args);
144 template int StreamBase::WriteString<UCS2>(
145 const FunctionCallbackInfo<Value>& args);
146 template int StreamBase::WriteString<LATIN1>(
147 const FunctionCallbackInfo<Value>& args);
148
149
ReadStartJS(const FunctionCallbackInfo<Value> & args)150 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
151 return ReadStart();
152 }
153
154
ReadStopJS(const FunctionCallbackInfo<Value> & args)155 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
156 return ReadStop();
157 }
158
UseUserBuffer(const FunctionCallbackInfo<Value> & args)159 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
160 CHECK(Buffer::HasInstance(args[0]));
161
162 uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
163 PushStreamListener(new CustomBufferJSListener(buf));
164 return 0;
165 }
166
Shutdown(const FunctionCallbackInfo<Value> & args)167 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
168 CHECK(args[0]->IsObject());
169 Local<Object> req_wrap_obj = args[0].As<Object>();
170
171 return Shutdown(req_wrap_obj);
172 }
173
SetWriteResult(const StreamWriteResult & res)174 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
175 env_->stream_base_state()[kBytesWritten] = res.bytes;
176 env_->stream_base_state()[kLastWriteWasAsync] = res.async;
177 }
178
Writev(const FunctionCallbackInfo<Value> & args)179 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
180 Environment* env = Environment::GetCurrent(args);
181 Isolate* isolate = env->isolate();
182 Local<Context> context = env->context();
183
184 CHECK(args[0]->IsObject());
185 CHECK(args[1]->IsArray());
186
187 Local<Object> req_wrap_obj = args[0].As<Object>();
188 Local<Array> chunks = args[1].As<Array>();
189 bool all_buffers = args[2]->IsTrue();
190
191 size_t count;
192 if (all_buffers)
193 count = chunks->Length();
194 else
195 count = chunks->Length() >> 1;
196
197 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
198
199 size_t storage_size = 0;
200 size_t offset;
201
202 if (!all_buffers) {
203 // Determine storage size first
204 for (size_t i = 0; i < count; i++) {
205 Local<Value> chunk;
206 if (!chunks->Get(context, i * 2).ToLocal(&chunk))
207 return -1;
208
209 if (Buffer::HasInstance(chunk))
210 continue;
211 // Buffer chunk, no additional storage required
212
213 // String chunk
214 Local<String> string;
215 if (!chunk->ToString(context).ToLocal(&string))
216 return -1;
217 Local<Value> next_chunk;
218 if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
219 return -1;
220 enum encoding encoding = ParseEncoding(isolate, next_chunk);
221 size_t chunk_size;
222 if ((encoding == UTF8 &&
223 string->Length() > 65535 &&
224 !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
225 !StringBytes::StorageSize(isolate, string, encoding)
226 .To(&chunk_size)) {
227 return -1;
228 }
229 storage_size += chunk_size;
230 }
231
232 if (storage_size > INT_MAX)
233 return UV_ENOBUFS;
234 } else {
235 for (size_t i = 0; i < count; i++) {
236 Local<Value> chunk;
237 if (!chunks->Get(context, i).ToLocal(&chunk))
238 return -1;
239 bufs[i].base = Buffer::Data(chunk);
240 bufs[i].len = Buffer::Length(chunk);
241 }
242 }
243
244 std::unique_ptr<BackingStore> bs;
245 if (storage_size > 0) {
246 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
247 bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
248 }
249
250 offset = 0;
251 if (!all_buffers) {
252 for (size_t i = 0; i < count; i++) {
253 Local<Value> chunk;
254 if (!chunks->Get(context, i * 2).ToLocal(&chunk))
255 return -1;
256
257 // Write buffer
258 if (Buffer::HasInstance(chunk)) {
259 bufs[i].base = Buffer::Data(chunk);
260 bufs[i].len = Buffer::Length(chunk);
261 continue;
262 }
263
264 // Write string
265 CHECK_LE(offset, storage_size);
266 char* str_storage =
267 static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
268 size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
269
270 Local<String> string;
271 if (!chunk->ToString(context).ToLocal(&string))
272 return -1;
273 Local<Value> next_chunk;
274 if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
275 return -1;
276 enum encoding encoding = ParseEncoding(isolate, next_chunk);
277 str_size = StringBytes::Write(isolate,
278 str_storage,
279 str_size,
280 string,
281 encoding);
282 bufs[i].base = str_storage;
283 bufs[i].len = str_size;
284 offset += str_size;
285 }
286 }
287
288 StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
289 SetWriteResult(res);
290 if (res.wrap != nullptr && storage_size > 0)
291 res.wrap->SetBackingStore(std::move(bs));
292 return res.err;
293 }
294
295
WriteBuffer(const FunctionCallbackInfo<Value> & args)296 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
297 CHECK(args[0]->IsObject());
298
299 Environment* env = Environment::GetCurrent(args);
300
301 if (!args[1]->IsUint8Array()) {
302 node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
303 return 0;
304 }
305
306 Local<Object> req_wrap_obj = args[0].As<Object>();
307 uv_buf_t buf;
308 buf.base = Buffer::Data(args[1]);
309 buf.len = Buffer::Length(args[1]);
310
311 uv_stream_t* send_handle = nullptr;
312
313 if (args[2]->IsObject() && IsIPCPipe()) {
314 Local<Object> send_handle_obj = args[2].As<Object>();
315
316 HandleWrap* wrap;
317 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
318 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
319 // Reference LibuvStreamWrap instance to prevent it from being garbage
320 // collected before `AfterWrite` is called.
321 if (req_wrap_obj->Set(env->context(),
322 env->handle_string(),
323 send_handle_obj).IsNothing()) {
324 return -1;
325 }
326 }
327
328 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
329 SetWriteResult(res);
330
331 return res.err;
332 }
333
334
335 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)336 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337 Environment* env = Environment::GetCurrent(args);
338 Isolate* isolate = env->isolate();
339 CHECK(args[0]->IsObject());
340 CHECK(args[1]->IsString());
341
342 Local<Object> req_wrap_obj = args[0].As<Object>();
343 Local<String> string = args[1].As<String>();
344 Local<Object> send_handle_obj;
345 if (args[2]->IsObject())
346 send_handle_obj = args[2].As<Object>();
347
348 // Compute the size of the storage that the string will be flattened into.
349 // For UTF8 strings that are very long, go ahead and take the hit for
350 // computing their actual size, rather than tripling the storage.
351 size_t storage_size;
352 if ((enc == UTF8 &&
353 string->Length() > 65535 &&
354 !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355 !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356 return -1;
357 }
358
359 if (storage_size > INT_MAX)
360 return UV_ENOBUFS;
361
362 // Try writing immediately if write size isn't too big
363 char stack_storage[16384]; // 16kb
364 size_t data_size;
365 size_t synchronously_written = 0;
366 uv_buf_t buf;
367
368 bool try_write = storage_size <= sizeof(stack_storage) &&
369 (!IsIPCPipe() || send_handle_obj.IsEmpty());
370 if (try_write) {
371 data_size = StringBytes::Write(isolate,
372 stack_storage,
373 storage_size,
374 string,
375 enc);
376 buf = uv_buf_init(stack_storage, data_size);
377
378 uv_buf_t* bufs = &buf;
379 size_t count = 1;
380 const int err = DoTryWrite(&bufs, &count);
381 // Keep track of the bytes written here, because we're taking a shortcut
382 // by using `DoTryWrite()` directly instead of using the utilities
383 // provided by `Write()`.
384 synchronously_written = count == 0 ? data_size : data_size - buf.len;
385 bytes_written_ += synchronously_written;
386
387 // Immediate failure or success
388 if (err != 0 || count == 0) {
389 SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390 return err;
391 }
392
393 // Partial write
394 CHECK_EQ(count, 1);
395 }
396
397 std::unique_ptr<BackingStore> bs;
398
399 if (try_write) {
400 // Copy partial data
401 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402 bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403 memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404 data_size = buf.len;
405 } else {
406 // Write it
407 NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408 bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409 data_size = StringBytes::Write(isolate,
410 static_cast<char*>(bs->Data()),
411 storage_size,
412 string,
413 enc);
414 }
415
416 CHECK_LE(data_size, storage_size);
417
418 buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419
420 uv_stream_t* send_handle = nullptr;
421
422 if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423 HandleWrap* wrap;
424 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426 // Reference LibuvStreamWrap instance to prevent it from being garbage
427 // collected before `AfterWrite` is called.
428 if (req_wrap_obj->Set(env->context(),
429 env->handle_string(),
430 send_handle_obj).IsNothing()) {
431 return -1;
432 }
433 }
434
435 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436 res.bytes += synchronously_written;
437
438 SetWriteResult(res);
439 if (res.wrap != nullptr)
440 res.wrap->SetBackingStore(std::move(bs));
441
442 return res.err;
443 }
444
445
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)446 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
447 Local<ArrayBuffer> ab,
448 size_t offset,
449 StreamBaseJSChecks checks) {
450 Environment* env = env_;
451
452 DCHECK_EQ(static_cast<int32_t>(nread), nread);
453 DCHECK_LE(offset, INT32_MAX);
454
455 if (checks == DONT_SKIP_NREAD_CHECKS) {
456 if (ab.IsEmpty()) {
457 DCHECK_EQ(offset, 0);
458 DCHECK_LE(nread, 0);
459 } else {
460 DCHECK_GE(nread, 0);
461 }
462 }
463
464 env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
465 env->stream_base_state()[kArrayBufferOffset] = offset;
466
467 Local<Value> argv[] = {
468 ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
469 };
470
471 AsyncWrap* wrap = GetAsyncWrap();
472 CHECK_NOT_NULL(wrap);
473 Local<Value> onread = wrap->object()
474 ->GetInternalField(StreamBase::kOnReadFunctionField)
475 .As<Value>();
476 CHECK(onread->IsFunction());
477 return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
478 }
479
480
IsIPCPipe()481 bool StreamBase::IsIPCPipe() {
482 return false;
483 }
484
485
GetFD()486 int StreamBase::GetFD() {
487 return -1;
488 }
489
490
GetObject()491 Local<Object> StreamBase::GetObject() {
492 return GetAsyncWrap()->object();
493 }
494
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)495 void StreamBase::AddMethod(Environment* env,
496 Local<Signature> signature,
497 enum PropertyAttribute attributes,
498 Local<FunctionTemplate> t,
499 JSMethodFunction* stream_method,
500 Local<String> string) {
501 Isolate* isolate = env->isolate();
502 Local<FunctionTemplate> templ =
503 NewFunctionTemplate(isolate,
504 stream_method,
505 signature,
506 ConstructorBehavior::kThrow,
507 SideEffectType::kHasNoSideEffect);
508 t->PrototypeTemplate()->SetAccessorProperty(
509 string, templ, Local<FunctionTemplate>(), attributes);
510 }
511
AddMethods(Environment * env,Local<FunctionTemplate> t)512 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
513 Isolate* isolate = env->isolate();
514 HandleScope scope(isolate);
515
516 enum PropertyAttribute attributes =
517 static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
518 Local<Signature> sig = Signature::New(isolate, t);
519
520 AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
521 AddMethod(
522 env, sig, attributes, t, GetExternal, env->external_stream_string());
523 AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
524 AddMethod(
525 env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
526 SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
527 SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
528 SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
529 SetProtoMethod(
530 isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
531 SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
532 SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
533 SetProtoMethod(isolate,
534 t,
535 "writeAsciiString",
536 JSMethod<&StreamBase::WriteString<ASCII>>);
537 SetProtoMethod(
538 isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
539 SetProtoMethod(
540 isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
541 SetProtoMethod(isolate,
542 t,
543 "writeLatin1String",
544 JSMethod<&StreamBase::WriteString<LATIN1>>);
545 t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
546 True(isolate));
547 t->PrototypeTemplate()->SetAccessor(
548 FIXED_ONE_BYTE_STRING(isolate, "onread"),
549 BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
550 BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
551 &Value::IsFunction>);
552 }
553
RegisterExternalReferences(ExternalReferenceRegistry * registry)554 void StreamBase::RegisterExternalReferences(
555 ExternalReferenceRegistry* registry) {
556 // This function is called by a single thread during start up, so it is safe
557 // to use a local static variable here.
558 static bool is_registered = false;
559 if (is_registered) return;
560 registry->Register(GetFD);
561 registry->Register(GetExternal);
562 registry->Register(GetBytesRead);
563 registry->Register(GetBytesWritten);
564 registry->Register(JSMethod<&StreamBase::ReadStartJS>);
565 registry->Register(JSMethod<&StreamBase::ReadStopJS>);
566 registry->Register(JSMethod<&StreamBase::Shutdown>);
567 registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
568 registry->Register(JSMethod<&StreamBase::Writev>);
569 registry->Register(JSMethod<&StreamBase::WriteBuffer>);
570 registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
571 registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
572 registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
573 registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
574 registry->Register(
575 BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
576 registry->Register(
577 BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
578 &Value::IsFunction>);
579 is_registered = true;
580 }
581
GetFD(const FunctionCallbackInfo<Value> & args)582 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
583 // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
584 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
585 if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
586
587 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
588
589 args.GetReturnValue().Set(wrap->GetFD());
590 }
591
GetBytesRead(const FunctionCallbackInfo<Value> & args)592 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
593 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
594 if (wrap == nullptr) return args.GetReturnValue().Set(0);
595
596 // uint64_t -> double. 53bits is enough for all real cases.
597 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
598 }
599
GetBytesWritten(const FunctionCallbackInfo<Value> & args)600 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
601 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
602 if (wrap == nullptr) return args.GetReturnValue().Set(0);
603
604 // uint64_t -> double. 53bits is enough for all real cases.
605 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
606 }
607
GetExternal(const FunctionCallbackInfo<Value> & args)608 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
609 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
610 if (wrap == nullptr) return;
611
612 Local<External> ext = External::New(args.GetIsolate(), wrap);
613 args.GetReturnValue().Set(ext);
614 }
615
616 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)617 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
618 StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
619 if (wrap == nullptr) return;
620
621 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
622
623 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
624 args.GetReturnValue().Set((wrap->*Method)(args));
625 }
626
DoTryWrite(uv_buf_t ** bufs,size_t * count)627 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
628 // No TryWrite by default
629 return 0;
630 }
631
632
Error() const633 const char* StreamResource::Error() const {
634 return nullptr;
635 }
636
637
ClearError()638 void StreamResource::ClearError() {
639 // No-op
640 }
641
642
OnStreamAlloc(size_t suggested_size)643 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
644 CHECK_NOT_NULL(stream_);
645 Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
646 return env->allocate_managed_buffer(suggested_size);
647 }
648
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)649 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
650 CHECK_NOT_NULL(stream_);
651 StreamBase* stream = static_cast<StreamBase*>(stream_);
652 Environment* env = stream->stream_env();
653 Isolate* isolate = env->isolate();
654 HandleScope handle_scope(isolate);
655 Context::Scope context_scope(env->context());
656 std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
657
658 if (nread <= 0) {
659 if (nread < 0)
660 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
661 return;
662 }
663
664 CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
665 bs = BackingStore::Reallocate(isolate, std::move(bs), nread);
666
667 stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
668 }
669
670
OnStreamAlloc(size_t suggested_size)671 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
672 return buffer_;
673 }
674
675
OnStreamRead(ssize_t nread,const uv_buf_t & buf)676 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
677 CHECK_NOT_NULL(stream_);
678
679 StreamBase* stream = static_cast<StreamBase*>(stream_);
680 Environment* env = stream->stream_env();
681 HandleScope handle_scope(env->isolate());
682 Context::Scope context_scope(env->context());
683
684 // In the case that there's an error and buf is null, return immediately.
685 // This can happen on unices when POLLHUP is received and UV_EOF is returned
686 // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
687 if (buf.base == nullptr && nread < 0) {
688 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
689 return;
690 }
691
692 CHECK_EQ(buf.base, buffer_.base);
693
694 MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
695 Local<ArrayBuffer>(),
696 0,
697 StreamBase::SKIP_NREAD_CHECKS);
698 Local<Value> next_buf_v;
699 if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
700 buffer_.base = Buffer::Data(next_buf_v);
701 buffer_.len = Buffer::Length(next_buf_v);
702 }
703 }
704
705
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)706 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
707 StreamReq* req_wrap, int status) {
708 StreamBase* stream = static_cast<StreamBase*>(stream_);
709 Environment* env = stream->stream_env();
710 if (!env->can_call_into_js()) return;
711 AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
712 HandleScope handle_scope(env->isolate());
713 Context::Scope context_scope(env->context());
714 CHECK(!async_wrap->persistent().IsEmpty());
715 Local<Object> req_wrap_obj = async_wrap->object();
716
717 Local<Value> argv[] = {
718 Integer::New(env->isolate(), status),
719 stream->GetObject(),
720 Undefined(env->isolate())
721 };
722
723 const char* msg = stream->Error();
724 if (msg != nullptr) {
725 argv[2] = OneByteString(env->isolate(), msg);
726 stream->ClearError();
727 }
728
729 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
730 async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
731 }
732
OnStreamAfterWrite(WriteWrap * req_wrap,int status)733 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
734 WriteWrap* req_wrap, int status) {
735 OnStreamAfterReqFinished(req_wrap, status);
736 }
737
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)738 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
739 ShutdownWrap* req_wrap, int status) {
740 OnStreamAfterReqFinished(req_wrap, status);
741 }
742
OnDone(int status)743 void ShutdownWrap::OnDone(int status) {
744 stream()->EmitAfterShutdown(this, status);
745 Dispose();
746 }
747
OnDone(int status)748 void WriteWrap::OnDone(int status) {
749 stream()->EmitAfterWrite(this, status);
750 Dispose();
751 }
752
~StreamListener()753 StreamListener::~StreamListener() {
754 if (stream_ != nullptr)
755 stream_->RemoveStreamListener(this);
756 }
757
OnStreamAfterShutdown(ShutdownWrap * w,int status)758 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
759 CHECK_NOT_NULL(previous_listener_);
760 previous_listener_->OnStreamAfterShutdown(w, status);
761 }
762
OnStreamAfterWrite(WriteWrap * w,int status)763 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
764 CHECK_NOT_NULL(previous_listener_);
765 previous_listener_->OnStreamAfterWrite(w, status);
766 }
767
~StreamResource()768 StreamResource::~StreamResource() {
769 while (listener_ != nullptr) {
770 StreamListener* listener = listener_;
771 listener->OnStreamDestroy();
772 // Remove the listener if it didn’t remove itself. This makes the logic
773 // in `OnStreamDestroy()` implementations easier, because they
774 // may call generic cleanup functions which can just remove the
775 // listener unconditionally.
776 if (listener == listener_)
777 RemoveStreamListener(listener_);
778 }
779 }
780
RemoveStreamListener(StreamListener * listener)781 void StreamResource::RemoveStreamListener(StreamListener* listener) {
782 CHECK_NOT_NULL(listener);
783
784 StreamListener* previous;
785 StreamListener* current;
786
787 // Remove from the linked list.
788 // No loop condition because we want a crash if listener is not found.
789 for (current = listener_, previous = nullptr;;
790 previous = current, current = current->previous_listener_) {
791 CHECK_NOT_NULL(current);
792 if (current == listener) {
793 if (previous != nullptr)
794 previous->previous_listener_ = current->previous_listener_;
795 else
796 listener_ = listener->previous_listener_;
797 break;
798 }
799 }
800
801 listener->stream_ = nullptr;
802 listener->previous_listener_ = nullptr;
803 }
804
CreateShutdownWrap(Local<Object> object)805 ShutdownWrap* StreamBase::CreateShutdownWrap(
806 Local<Object> object) {
807 auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
808 wrap->MakeWeak();
809 return wrap;
810 }
811
CreateWriteWrap(Local<Object> object)812 WriteWrap* StreamBase::CreateWriteWrap(
813 Local<Object> object) {
814 auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
815 wrap->MakeWeak();
816 return wrap;
817 }
818
Done(int status,const char * error_str)819 void StreamReq::Done(int status, const char* error_str) {
820 AsyncWrap* async_wrap = GetAsyncWrap();
821 Environment* env = async_wrap->env();
822 if (error_str != nullptr) {
823 v8::HandleScope handle_scope(env->isolate());
824 if (async_wrap->object()
825 ->Set(env->context(),
826 env->error_string(),
827 OneByteString(env->isolate(), error_str))
828 .IsNothing()) {
829 return;
830 }
831 }
832
833 OnDone(status);
834 }
835
836 } // namespace node
837