#include "stream_base.h" // NOLINT(build/include_inline) #include "stream_base-inl.h" #include "stream_wrap.h" #include "env-inl.h" #include "js_stream.h" #include "node.h" #include "node_buffer.h" #include "node_errors.h" #include "node_external_reference.h" #include "string_bytes.h" #include "util-inl.h" #include "v8.h" #include // INT_MAX namespace node { using v8::Array; using v8::ArrayBuffer; using v8::BackingStore; using v8::ConstructorBehavior; using v8::Context; using v8::DontDelete; using v8::DontEnum; using v8::External; using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; using v8::Integer; using v8::Isolate; using v8::Local; using v8::MaybeLocal; using v8::Object; using v8::PropertyAttribute; using v8::ReadOnly; using v8::SideEffectType; using v8::Signature; using v8::String; using v8::Value; int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); v8::HandleScope handle_scope(env->isolate()); if (req_wrap_obj.IsEmpty()) { if (!env->shutdown_wrap_template() ->NewInstance(env->context()) .ToLocal(&req_wrap_obj)) { return UV_EBUSY; } StreamReq::ResetObject(req_wrap_obj); } BaseObjectPtr req_wrap_ptr; AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap()); int err = DoShutdown(req_wrap); if (err != 0 && req_wrap != nullptr) { req_wrap->Dispose(); } const char* msg = Error(); if (msg != nullptr) { if (req_wrap_obj ->Set(env->context(), env->error_string(), OneByteString(env->isolate(), msg)) .IsNothing()) { return UV_EBUSY; } ClearError(); } return err; } StreamWriteResult StreamBase::Write(uv_buf_t* bufs, size_t count, uv_stream_t* send_handle, v8::Local req_wrap_obj, bool skip_try_write) { Environment* env = stream_env(); int err; size_t total_bytes = 0; for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len; bytes_written_ += total_bytes; if (send_handle == nullptr && !skip_try_write) { err = DoTryWrite(&bufs, &count); if (err != 0 || count == 0) { return StreamWriteResult{false, err, nullptr, total_bytes, {}}; } } v8::HandleScope handle_scope(env->isolate()); if (req_wrap_obj.IsEmpty()) { if (!env->write_wrap_template() ->NewInstance(env->context()) .ToLocal(&req_wrap_obj)) { return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; } StreamReq::ResetObject(req_wrap_obj); } AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); BaseObjectPtr req_wrap_ptr(req_wrap->GetAsyncWrap()); err = DoWrite(req_wrap, bufs, count, send_handle); bool async = err == 0; if (!async) { req_wrap->Dispose(); req_wrap = nullptr; } const char* msg = Error(); if (msg != nullptr) { if (req_wrap_obj ->Set(env->context(), env->error_string(), OneByteString(env->isolate(), msg)) .IsNothing()) { return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; } ClearError(); } return StreamWriteResult{ async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)}; } template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( const FunctionCallbackInfo& args); int StreamBase::ReadStartJS(const FunctionCallbackInfo& args) { return ReadStart(); } int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { return ReadStop(); } int StreamBase::UseUserBuffer(const FunctionCallbackInfo& args) { CHECK(Buffer::HasInstance(args[0])); uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); PushStreamListener(new CustomBufferJSListener(buf)); return 0; } int StreamBase::Shutdown(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); Local req_wrap_obj = args[0].As(); return Shutdown(req_wrap_obj); } void StreamBase::SetWriteResult(const StreamWriteResult& res) { env_->stream_base_state()[kBytesWritten] = res.bytes; env_->stream_base_state()[kLastWriteWasAsync] = res.async; } int StreamBase::Writev(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); Isolate* isolate = env->isolate(); Local context = env->context(); CHECK(args[0]->IsObject()); CHECK(args[1]->IsArray()); Local req_wrap_obj = args[0].As(); Local chunks = args[1].As(); bool all_buffers = args[2]->IsTrue(); size_t count; if (all_buffers) count = chunks->Length(); else count = chunks->Length() >> 1; MaybeStackBuffer bufs(count); size_t storage_size = 0; size_t offset; if (!all_buffers) { // Determine storage size first for (size_t i = 0; i < count; i++) { Local chunk; if (!chunks->Get(context, i * 2).ToLocal(&chunk)) return -1; if (Buffer::HasInstance(chunk)) continue; // Buffer chunk, no additional storage required // String chunk Local string; if (!chunk->ToString(context).ToLocal(&string)) return -1; Local next_chunk; if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) return -1; enum encoding encoding = ParseEncoding(isolate, next_chunk); size_t chunk_size; if ((encoding == UTF8 && string->Length() > 65535 && !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) || !StringBytes::StorageSize(isolate, string, encoding) .To(&chunk_size)) { return -1; } storage_size += chunk_size; } if (storage_size > INT_MAX) return UV_ENOBUFS; } else { for (size_t i = 0; i < count; i++) { Local chunk; if (!chunks->Get(context, i).ToLocal(&chunk)) return -1; bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); } } std::unique_ptr bs; if (storage_size > 0) { NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); bs = ArrayBuffer::NewBackingStore(isolate, storage_size); } offset = 0; if (!all_buffers) { for (size_t i = 0; i < count; i++) { Local chunk; if (!chunks->Get(context, i * 2).ToLocal(&chunk)) return -1; // Write buffer if (Buffer::HasInstance(chunk)) { bufs[i].base = Buffer::Data(chunk); bufs[i].len = Buffer::Length(chunk); continue; } // Write string CHECK_LE(offset, storage_size); char* str_storage = static_cast(bs ? bs->Data() : nullptr) + offset; size_t str_size = (bs ? bs->ByteLength() : 0) - offset; Local string; if (!chunk->ToString(context).ToLocal(&string)) return -1; Local next_chunk; if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) return -1; enum encoding encoding = ParseEncoding(isolate, next_chunk); str_size = StringBytes::Write(isolate, str_storage, str_size, string, encoding); bufs[i].base = str_storage; bufs[i].len = str_size; offset += str_size; } } StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); SetWriteResult(res); if (res.wrap != nullptr && storage_size > 0) res.wrap->SetBackingStore(std::move(bs)); return res.err; } int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); Environment* env = Environment::GetCurrent(args); if (!args[1]->IsUint8Array()) { node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer"); return 0; } Local req_wrap_obj = args[0].As(); uv_buf_t buf; buf.base = Buffer::Data(args[1]); buf.len = Buffer::Length(args[1]); uv_stream_t* send_handle = nullptr; if (args[2]->IsObject() && IsIPCPipe()) { Local send_handle_obj = args[2].As(); HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); send_handle = reinterpret_cast(wrap->GetHandle()); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. if (req_wrap_obj->Set(env->context(), env->handle_string(), send_handle_obj).IsNothing()) { return -1; } } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); SetWriteResult(res); return res.err; } template int StreamBase::WriteString(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); Isolate* isolate = env->isolate(); CHECK(args[0]->IsObject()); CHECK(args[1]->IsString()); Local req_wrap_obj = args[0].As(); Local string = args[1].As(); Local send_handle_obj; if (args[2]->IsObject()) send_handle_obj = args[2].As(); // Compute the size of the storage that the string will be flattened into. // For UTF8 strings that are very long, go ahead and take the hit for // computing their actual size, rather than tripling the storage. size_t storage_size; if ((enc == UTF8 && string->Length() > 65535 && !StringBytes::Size(isolate, string, enc).To(&storage_size)) || !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) { return -1; } if (storage_size > INT_MAX) return UV_ENOBUFS; // Try writing immediately if write size isn't too big char stack_storage[16384]; // 16kb size_t data_size; size_t synchronously_written = 0; uv_buf_t buf; bool try_write = storage_size <= sizeof(stack_storage) && (!IsIPCPipe() || send_handle_obj.IsEmpty()); if (try_write) { data_size = StringBytes::Write(isolate, stack_storage, storage_size, string, enc); buf = uv_buf_init(stack_storage, data_size); uv_buf_t* bufs = &buf; size_t count = 1; const int err = DoTryWrite(&bufs, &count); // Keep track of the bytes written here, because we're taking a shortcut // by using `DoTryWrite()` directly instead of using the utilities // provided by `Write()`. synchronously_written = count == 0 ? data_size : data_size - buf.len; bytes_written_ += synchronously_written; // Immediate failure or success if (err != 0 || count == 0) { SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} }); return err; } // Partial write CHECK_EQ(count, 1); } std::unique_ptr bs; if (try_write) { // Copy partial data NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); bs = ArrayBuffer::NewBackingStore(isolate, buf.len); memcpy(static_cast(bs->Data()), buf.base, buf.len); data_size = buf.len; } else { // Write it NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); bs = ArrayBuffer::NewBackingStore(isolate, storage_size); data_size = StringBytes::Write(isolate, static_cast(bs->Data()), storage_size, string, enc); } CHECK_LE(data_size, storage_size); buf = uv_buf_init(static_cast(bs->Data()), data_size); uv_stream_t* send_handle = nullptr; if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); send_handle = reinterpret_cast(wrap->GetHandle()); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. if (req_wrap_obj->Set(env->context(), env->handle_string(), send_handle_obj).IsNothing()) { return -1; } } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write); res.bytes += synchronously_written; SetWriteResult(res); if (res.wrap != nullptr) res.wrap->SetBackingStore(std::move(bs)); return res.err; } MaybeLocal StreamBase::CallJSOnreadMethod(ssize_t nread, Local ab, size_t offset, StreamBaseJSChecks checks) { Environment* env = env_; DCHECK_EQ(static_cast(nread), nread); DCHECK_LE(offset, INT32_MAX); if (checks == DONT_SKIP_NREAD_CHECKS) { if (ab.IsEmpty()) { DCHECK_EQ(offset, 0); DCHECK_LE(nread, 0); } else { DCHECK_GE(nread, 0); } } env->stream_base_state()[kReadBytesOrError] = static_cast(nread); env->stream_base_state()[kArrayBufferOffset] = offset; Local argv[] = { ab.IsEmpty() ? Undefined(env->isolate()).As() : ab.As() }; AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); Local onread = wrap->object()->GetInternalField( StreamBase::kOnReadFunctionField); CHECK(onread->IsFunction()); return wrap->MakeCallback(onread.As(), arraysize(argv), argv); } bool StreamBase::IsIPCPipe() { return false; } int StreamBase::GetFD() { return -1; } Local StreamBase::GetObject() { return GetAsyncWrap()->object(); } void StreamBase::AddMethod(Environment* env, Local signature, enum PropertyAttribute attributes, Local t, JSMethodFunction* stream_method, Local string) { Isolate* isolate = env->isolate(); Local templ = NewFunctionTemplate(isolate, stream_method, signature, ConstructorBehavior::kThrow, SideEffectType::kHasNoSideEffect); t->PrototypeTemplate()->SetAccessorProperty( string, templ, Local(), attributes); } void StreamBase::AddMethods(Environment* env, Local t) { Isolate* isolate = env->isolate(); HandleScope scope(isolate); enum PropertyAttribute attributes = static_cast(ReadOnly | DontDelete | DontEnum); Local sig = Signature::New(isolate, t); AddMethod(env, sig, attributes, t, GetFD, env->fd_string()); AddMethod( env, sig, attributes, t, GetExternal, env->external_stream_string()); AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string()); AddMethod( env, sig, attributes, t, GetBytesWritten, env->bytes_written_string()); SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>); SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>); SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>); SetProtoMethod( isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>); SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>); SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); SetProtoMethod(isolate, t, "writeAsciiString", JSMethod<&StreamBase::WriteString>); SetProtoMethod( isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString>); SetProtoMethod( isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString>); SetProtoMethod(isolate, t, "writeLatin1String", JSMethod<&StreamBase::WriteString>); t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"), True(isolate)); t->PrototypeTemplate()->SetAccessor( FIXED_ONE_BYTE_STRING(isolate, "onread"), BaseObject::InternalFieldGet, BaseObject::InternalFieldSet); } void StreamBase::RegisterExternalReferences( ExternalReferenceRegistry* registry) { // This function is called by a single thread during start up, so it is safe // to use a local static variable here. static bool is_registered = false; if (is_registered) return; registry->Register(GetFD); registry->Register(GetExternal); registry->Register(GetBytesRead); registry->Register(GetBytesWritten); registry->Register(JSMethod<&StreamBase::ReadStartJS>); registry->Register(JSMethod<&StreamBase::ReadStopJS>); registry->Register(JSMethod<&StreamBase::Shutdown>); registry->Register(JSMethod<&StreamBase::UseUserBuffer>); registry->Register(JSMethod<&StreamBase::Writev>); registry->Register(JSMethod<&StreamBase::WriteBuffer>); registry->Register(JSMethod<&StreamBase::WriteString>); registry->Register(JSMethod<&StreamBase::WriteString>); registry->Register(JSMethod<&StreamBase::WriteString>); registry->Register(JSMethod<&StreamBase::WriteString>); registry->Register( BaseObject::InternalFieldGet); registry->Register( BaseObject::InternalFieldSet); is_registered = true; } void StreamBase::GetFD(const FunctionCallbackInfo& args) { // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD(). StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL); if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); args.GetReturnValue().Set(wrap->GetFD()); } void StreamBase::GetBytesRead(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(0); // uint64_t -> double. 53bits is enough for all real cases. args.GetReturnValue().Set(static_cast(wrap->bytes_read_)); } void StreamBase::GetBytesWritten(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return args.GetReturnValue().Set(0); // uint64_t -> double. 53bits is enough for all real cases. args.GetReturnValue().Set(static_cast(wrap->bytes_written_)); } void StreamBase::GetExternal(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As()); if (wrap == nullptr) return; Local ext = External::New(args.GetIsolate(), wrap); args.GetReturnValue().Set(ext); } template & args)> void StreamBase::JSMethod(const FunctionCallbackInfo& args) { StreamBase* wrap = StreamBase::FromObject(args.Holder().As()); if (wrap == nullptr) return; if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap()); args.GetReturnValue().Set((wrap->*Method)(args)); } int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { // No TryWrite by default return 0; } const char* StreamResource::Error() const { return nullptr; } void StreamResource::ClearError() { // No-op } uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { CHECK_NOT_NULL(stream_); Environment* env = static_cast(stream_)->stream_env(); return env->allocate_managed_buffer(suggested_size); } void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { CHECK_NOT_NULL(stream_); StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); Isolate* isolate = env->isolate(); HandleScope handle_scope(isolate); Context::Scope context_scope(env->context()); std::unique_ptr bs = env->release_managed_buffer(buf_); if (nread <= 0) { if (nread < 0) stream->CallJSOnreadMethod(nread, Local()); return; } CHECK_LE(static_cast(nread), bs->ByteLength()); bs = BackingStore::Reallocate(isolate, std::move(bs), nread); stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs))); } uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { return buffer_; } void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_NOT_NULL(stream_); StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); // In the case that there's an error and buf is null, return immediately. // This can happen on unices when POLLHUP is received and UV_EOF is returned // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows. if (buf.base == nullptr && nread < 0) { stream->CallJSOnreadMethod(nread, Local()); return; } CHECK_EQ(buf.base, buffer_.base); MaybeLocal ret = stream->CallJSOnreadMethod(nread, Local(), 0, StreamBase::SKIP_NREAD_CHECKS); Local next_buf_v; if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) { buffer_.base = Buffer::Data(next_buf_v); buffer_.len = Buffer::Length(next_buf_v); } } void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); if (!env->can_call_into_js()) return; AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); CHECK(!async_wrap->persistent().IsEmpty()); Local req_wrap_obj = async_wrap->object(); Local argv[] = { Integer::New(env->isolate(), status), stream->GetObject(), Undefined(env->isolate()) }; const char* msg = stream->Error(); if (msg != nullptr) { argv[2] = OneByteString(env->isolate(), msg); stream->ClearError(); } if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); } void ReportWritesToJSStreamListener::OnStreamAfterWrite( WriteWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); } void ReportWritesToJSStreamListener::OnStreamAfterShutdown( ShutdownWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); } void ShutdownWrap::OnDone(int status) { stream()->EmitAfterShutdown(this, status); Dispose(); } void WriteWrap::OnDone(int status) { stream()->EmitAfterWrite(this, status); Dispose(); } StreamListener::~StreamListener() { if (stream_ != nullptr) stream_->RemoveStreamListener(this); } void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { CHECK_NOT_NULL(previous_listener_); previous_listener_->OnStreamAfterShutdown(w, status); } void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { CHECK_NOT_NULL(previous_listener_); previous_listener_->OnStreamAfterWrite(w, status); } StreamResource::~StreamResource() { while (listener_ != nullptr) { StreamListener* listener = listener_; listener->OnStreamDestroy(); // Remove the listener if it didn’t remove itself. This makes the logic // in `OnStreamDestroy()` implementations easier, because they // may call generic cleanup functions which can just remove the // listener unconditionally. if (listener == listener_) RemoveStreamListener(listener_); } } void StreamResource::RemoveStreamListener(StreamListener* listener) { CHECK_NOT_NULL(listener); StreamListener* previous; StreamListener* current; // Remove from the linked list. // No loop condition because we want a crash if listener is not found. for (current = listener_, previous = nullptr;; previous = current, current = current->previous_listener_) { CHECK_NOT_NULL(current); if (current == listener) { if (previous != nullptr) previous->previous_listener_ = current->previous_listener_; else listener_ = listener->previous_listener_; break; } } listener->stream_ = nullptr; listener->previous_listener_ = nullptr; } ShutdownWrap* StreamBase::CreateShutdownWrap( Local object) { auto* wrap = new SimpleShutdownWrap(this, object); wrap->MakeWeak(); return wrap; } WriteWrap* StreamBase::CreateWriteWrap( Local object) { auto* wrap = new SimpleWriteWrap(this, object); wrap->MakeWeak(); return wrap; } void StreamReq::Done(int status, const char* error_str) { AsyncWrap* async_wrap = GetAsyncWrap(); Environment* env = async_wrap->env(); if (error_str != nullptr) { v8::HandleScope handle_scope(env->isolate()); if (async_wrap->object() ->Set(env->context(), env->error_string(), OneByteString(env->isolate(), error_str)) .IsNothing()) { return; } } OnDone(status); } } // namespace node