1 #include "stream_base.h" // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4
5 #include "node.h"
6 #include "node_buffer.h"
7 #include "node_errors.h"
8 #include "env-inl.h"
9 #include "js_stream.h"
10 #include "string_bytes.h"
11 #include "util-inl.h"
12 #include "v8.h"
13
14 #include <climits> // INT_MAX
15
16 namespace node {
17
18 using v8::Array;
19 using v8::ArrayBuffer;
20 using v8::Context;
21 using v8::DontDelete;
22 using v8::DontEnum;
23 using v8::External;
24 using v8::Function;
25 using v8::FunctionCallbackInfo;
26 using v8::HandleScope;
27 using v8::Integer;
28 using v8::Local;
29 using v8::MaybeLocal;
30 using v8::Object;
31 using v8::ReadOnly;
32 using v8::String;
33 using v8::Value;
34
35 template int StreamBase::WriteString<ASCII>(
36 const FunctionCallbackInfo<Value>& args);
37 template int StreamBase::WriteString<UTF8>(
38 const FunctionCallbackInfo<Value>& args);
39 template int StreamBase::WriteString<UCS2>(
40 const FunctionCallbackInfo<Value>& args);
41 template int StreamBase::WriteString<LATIN1>(
42 const FunctionCallbackInfo<Value>& args);
43
44
ReadStartJS(const FunctionCallbackInfo<Value> & args)45 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
46 return ReadStart();
47 }
48
49
ReadStopJS(const FunctionCallbackInfo<Value> & args)50 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
51 return ReadStop();
52 }
53
UseUserBuffer(const FunctionCallbackInfo<Value> & args)54 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
55 CHECK(Buffer::HasInstance(args[0]));
56
57 uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
58 PushStreamListener(new CustomBufferJSListener(buf));
59 return 0;
60 }
61
Shutdown(const FunctionCallbackInfo<Value> & args)62 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
63 CHECK(args[0]->IsObject());
64 Local<Object> req_wrap_obj = args[0].As<Object>();
65
66 return Shutdown(req_wrap_obj);
67 }
68
SetWriteResult(const StreamWriteResult & res)69 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
70 env_->stream_base_state()[kBytesWritten] = res.bytes;
71 env_->stream_base_state()[kLastWriteWasAsync] = res.async;
72 }
73
Writev(const FunctionCallbackInfo<Value> & args)74 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
75 Environment* env = Environment::GetCurrent(args);
76
77 CHECK(args[0]->IsObject());
78 CHECK(args[1]->IsArray());
79
80 Local<Object> req_wrap_obj = args[0].As<Object>();
81 Local<Array> chunks = args[1].As<Array>();
82 bool all_buffers = args[2]->IsTrue();
83
84 size_t count;
85 if (all_buffers)
86 count = chunks->Length();
87 else
88 count = chunks->Length() >> 1;
89
90 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
91
92 size_t storage_size = 0;
93 size_t offset;
94
95 if (!all_buffers) {
96 // Determine storage size first
97 for (size_t i = 0; i < count; i++) {
98 Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
99
100 if (Buffer::HasInstance(chunk))
101 continue;
102 // Buffer chunk, no additional storage required
103
104 // String chunk
105 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
106 enum encoding encoding = ParseEncoding(env->isolate(),
107 chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
108 size_t chunk_size;
109 if (encoding == UTF8 && string->Length() > 65535 &&
110 !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
111 return 0;
112 else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
113 .To(&chunk_size))
114 return 0;
115 storage_size += chunk_size;
116 }
117
118 if (storage_size > INT_MAX)
119 return UV_ENOBUFS;
120 } else {
121 for (size_t i = 0; i < count; i++) {
122 Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
123 bufs[i].base = Buffer::Data(chunk);
124 bufs[i].len = Buffer::Length(chunk);
125 }
126 }
127
128 AllocatedBuffer storage;
129 if (storage_size > 0)
130 storage = env->AllocateManaged(storage_size);
131
132 offset = 0;
133 if (!all_buffers) {
134 for (size_t i = 0; i < count; i++) {
135 Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
136
137 // Write buffer
138 if (Buffer::HasInstance(chunk)) {
139 bufs[i].base = Buffer::Data(chunk);
140 bufs[i].len = Buffer::Length(chunk);
141 continue;
142 }
143
144 // Write string
145 CHECK_LE(offset, storage_size);
146 char* str_storage = storage.data() + offset;
147 size_t str_size = storage.size() - offset;
148
149 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
150 enum encoding encoding = ParseEncoding(env->isolate(),
151 chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
152 str_size = StringBytes::Write(env->isolate(),
153 str_storage,
154 str_size,
155 string,
156 encoding);
157 bufs[i].base = str_storage;
158 bufs[i].len = str_size;
159 offset += str_size;
160 }
161 }
162
163 StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
164 SetWriteResult(res);
165 if (res.wrap != nullptr && storage_size > 0) {
166 res.wrap->SetAllocatedStorage(std::move(storage));
167 }
168 return res.err;
169 }
170
171
WriteBuffer(const FunctionCallbackInfo<Value> & args)172 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
173 CHECK(args[0]->IsObject());
174
175 Environment* env = Environment::GetCurrent(args);
176
177 if (!args[1]->IsUint8Array()) {
178 node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
179 return 0;
180 }
181
182 Local<Object> req_wrap_obj = args[0].As<Object>();
183 uv_buf_t buf;
184 buf.base = Buffer::Data(args[1]);
185 buf.len = Buffer::Length(args[1]);
186
187 uv_stream_t* send_handle = nullptr;
188
189 if (args[2]->IsObject() && IsIPCPipe()) {
190 Local<Object> send_handle_obj = args[2].As<Object>();
191
192 HandleWrap* wrap;
193 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
194 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
195 // Reference LibuvStreamWrap instance to prevent it from being garbage
196 // collected before `AfterWrite` is called.
197 req_wrap_obj->Set(env->context(),
198 env->handle_string(),
199 send_handle_obj).Check();
200 }
201
202 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
203 SetWriteResult(res);
204
205 return res.err;
206 }
207
208
209 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)210 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
211 Environment* env = Environment::GetCurrent(args);
212 CHECK(args[0]->IsObject());
213 CHECK(args[1]->IsString());
214
215 Local<Object> req_wrap_obj = args[0].As<Object>();
216 Local<String> string = args[1].As<String>();
217 Local<Object> send_handle_obj;
218 if (args[2]->IsObject())
219 send_handle_obj = args[2].As<Object>();
220
221 // Compute the size of the storage that the string will be flattened into.
222 // For UTF8 strings that are very long, go ahead and take the hit for
223 // computing their actual size, rather than tripling the storage.
224 size_t storage_size;
225 if (enc == UTF8 && string->Length() > 65535 &&
226 !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
227 return 0;
228 else if (!StringBytes::StorageSize(env->isolate(), string, enc)
229 .To(&storage_size))
230 return 0;
231
232 if (storage_size > INT_MAX)
233 return UV_ENOBUFS;
234
235 // Try writing immediately if write size isn't too big
236 char stack_storage[16384]; // 16kb
237 size_t data_size;
238 size_t synchronously_written = 0;
239 uv_buf_t buf;
240
241 bool try_write = storage_size <= sizeof(stack_storage) &&
242 (!IsIPCPipe() || send_handle_obj.IsEmpty());
243 if (try_write) {
244 data_size = StringBytes::Write(env->isolate(),
245 stack_storage,
246 storage_size,
247 string,
248 enc);
249 buf = uv_buf_init(stack_storage, data_size);
250
251 uv_buf_t* bufs = &buf;
252 size_t count = 1;
253 const int err = DoTryWrite(&bufs, &count);
254 // Keep track of the bytes written here, because we're taking a shortcut
255 // by using `DoTryWrite()` directly instead of using the utilities
256 // provided by `Write()`.
257 synchronously_written = count == 0 ? data_size : data_size - buf.len;
258 bytes_written_ += synchronously_written;
259
260 // Immediate failure or success
261 if (err != 0 || count == 0) {
262 SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
263 return err;
264 }
265
266 // Partial write
267 CHECK_EQ(count, 1);
268 }
269
270 AllocatedBuffer data;
271
272 if (try_write) {
273 // Copy partial data
274 data = env->AllocateManaged(buf.len);
275 memcpy(data.data(), buf.base, buf.len);
276 data_size = buf.len;
277 } else {
278 // Write it
279 data = env->AllocateManaged(storage_size);
280 data_size = StringBytes::Write(env->isolate(),
281 data.data(),
282 storage_size,
283 string,
284 enc);
285 }
286
287 CHECK_LE(data_size, storage_size);
288
289 buf = uv_buf_init(data.data(), data_size);
290
291 uv_stream_t* send_handle = nullptr;
292
293 if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
294 HandleWrap* wrap;
295 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
296 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
297 // Reference LibuvStreamWrap instance to prevent it from being garbage
298 // collected before `AfterWrite` is called.
299 req_wrap_obj->Set(env->context(),
300 env->handle_string(),
301 send_handle_obj).Check();
302 }
303
304 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
305 res.bytes += synchronously_written;
306
307 SetWriteResult(res);
308 if (res.wrap != nullptr) {
309 res.wrap->SetAllocatedStorage(std::move(data));
310 }
311
312 return res.err;
313 }
314
315
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)316 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
317 Local<ArrayBuffer> ab,
318 size_t offset,
319 StreamBaseJSChecks checks) {
320 Environment* env = env_;
321
322 DCHECK_EQ(static_cast<int32_t>(nread), nread);
323 DCHECK_LE(offset, INT32_MAX);
324
325 if (checks == DONT_SKIP_NREAD_CHECKS) {
326 if (ab.IsEmpty()) {
327 DCHECK_EQ(offset, 0);
328 DCHECK_LE(nread, 0);
329 } else {
330 DCHECK_GE(nread, 0);
331 }
332 }
333
334 env->stream_base_state()[kReadBytesOrError] = nread;
335 env->stream_base_state()[kArrayBufferOffset] = offset;
336
337 Local<Value> argv[] = {
338 ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
339 };
340
341 AsyncWrap* wrap = GetAsyncWrap();
342 CHECK_NOT_NULL(wrap);
343 Local<Value> onread = wrap->object()->GetInternalField(
344 StreamBase::kOnReadFunctionField);
345 CHECK(onread->IsFunction());
346 return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
347 }
348
349
IsIPCPipe()350 bool StreamBase::IsIPCPipe() {
351 return false;
352 }
353
354
GetFD()355 int StreamBase::GetFD() {
356 return -1;
357 }
358
359
GetObject()360 Local<Object> StreamBase::GetObject() {
361 return GetAsyncWrap()->object();
362 }
363
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)364 void StreamBase::AddMethod(Environment* env,
365 Local<Signature> signature,
366 enum PropertyAttribute attributes,
367 Local<FunctionTemplate> t,
368 JSMethodFunction* stream_method,
369 Local<String> string) {
370 Local<FunctionTemplate> templ =
371 env->NewFunctionTemplate(stream_method,
372 signature,
373 v8::ConstructorBehavior::kThrow,
374 v8::SideEffectType::kHasNoSideEffect);
375 t->PrototypeTemplate()->SetAccessorProperty(
376 string, templ, Local<FunctionTemplate>(), attributes);
377 }
378
AddMethods(Environment * env,Local<FunctionTemplate> t)379 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
380 HandleScope scope(env->isolate());
381
382 enum PropertyAttribute attributes =
383 static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
384 Local<Signature> sig = Signature::New(env->isolate(), t);
385
386 AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
387 AddMethod(
388 env, sig, attributes, t, GetExternal, env->external_stream_string());
389 AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
390 AddMethod(
391 env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
392 env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
393 env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
394 env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
395 env->SetProtoMethod(t,
396 "useUserBuffer",
397 JSMethod<&StreamBase::UseUserBuffer>);
398 env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
399 env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
400 env->SetProtoMethod(
401 t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
402 env->SetProtoMethod(
403 t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
404 env->SetProtoMethod(
405 t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
406 env->SetProtoMethod(
407 t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
408 t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
409 "isStreamBase"),
410 True(env->isolate()));
411 t->PrototypeTemplate()->SetAccessor(
412 FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
413 BaseObject::InternalFieldGet<
414 StreamBase::kOnReadFunctionField>,
415 BaseObject::InternalFieldSet<
416 StreamBase::kOnReadFunctionField,
417 &Value::IsFunction>);
418 }
419
GetFD(const FunctionCallbackInfo<Value> & args)420 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
421 // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
422 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
423 if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
424
425 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
426
427 args.GetReturnValue().Set(wrap->GetFD());
428 }
429
GetBytesRead(const FunctionCallbackInfo<Value> & args)430 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
431 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
432 if (wrap == nullptr) return args.GetReturnValue().Set(0);
433
434 // uint64_t -> double. 53bits is enough for all real cases.
435 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
436 }
437
GetBytesWritten(const FunctionCallbackInfo<Value> & args)438 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
439 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
440 if (wrap == nullptr) return args.GetReturnValue().Set(0);
441
442 // uint64_t -> double. 53bits is enough for all real cases.
443 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
444 }
445
GetExternal(const FunctionCallbackInfo<Value> & args)446 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
447 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
448 if (wrap == nullptr) return;
449
450 Local<External> ext = External::New(args.GetIsolate(), wrap);
451 args.GetReturnValue().Set(ext);
452 }
453
454 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)455 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
456 StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
457 if (wrap == nullptr) return;
458
459 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
460
461 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
462 args.GetReturnValue().Set((wrap->*Method)(args));
463 }
464
DoTryWrite(uv_buf_t ** bufs,size_t * count)465 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
466 // No TryWrite by default
467 return 0;
468 }
469
470
Error() const471 const char* StreamResource::Error() const {
472 return nullptr;
473 }
474
475
ClearError()476 void StreamResource::ClearError() {
477 // No-op
478 }
479
480
OnStreamAlloc(size_t suggested_size)481 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
482 CHECK_NOT_NULL(stream_);
483 Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
484 return env->AllocateManaged(suggested_size).release();
485 }
486
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)487 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
488 CHECK_NOT_NULL(stream_);
489 StreamBase* stream = static_cast<StreamBase*>(stream_);
490 Environment* env = stream->stream_env();
491 HandleScope handle_scope(env->isolate());
492 Context::Scope context_scope(env->context());
493 AllocatedBuffer buf(env, buf_);
494
495 if (nread <= 0) {
496 if (nread < 0)
497 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
498 return;
499 }
500
501 CHECK_LE(static_cast<size_t>(nread), buf.size());
502 buf.Resize(nread);
503
504 stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
505 }
506
507
OnStreamAlloc(size_t suggested_size)508 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
509 return buffer_;
510 }
511
512
OnStreamRead(ssize_t nread,const uv_buf_t & buf)513 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
514 CHECK_NOT_NULL(stream_);
515
516 StreamBase* stream = static_cast<StreamBase*>(stream_);
517 Environment* env = stream->stream_env();
518 HandleScope handle_scope(env->isolate());
519 Context::Scope context_scope(env->context());
520
521 // To deal with the case where POLLHUP is received and UV_EOF is returned, as
522 // libuv returns an empty buffer (on unices only).
523 if (nread == UV_EOF && buf.base == nullptr) {
524 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
525 return;
526 }
527
528 CHECK_EQ(buf.base, buffer_.base);
529
530 MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
531 Local<ArrayBuffer>(),
532 0,
533 StreamBase::SKIP_NREAD_CHECKS);
534 Local<Value> next_buf_v;
535 if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
536 buffer_.base = Buffer::Data(next_buf_v);
537 buffer_.len = Buffer::Length(next_buf_v);
538 }
539 }
540
541
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)542 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
543 StreamReq* req_wrap, int status) {
544 StreamBase* stream = static_cast<StreamBase*>(stream_);
545 Environment* env = stream->stream_env();
546 AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
547 HandleScope handle_scope(env->isolate());
548 Context::Scope context_scope(env->context());
549 CHECK(!async_wrap->persistent().IsEmpty());
550 Local<Object> req_wrap_obj = async_wrap->object();
551
552 Local<Value> argv[] = {
553 Integer::New(env->isolate(), status),
554 stream->GetObject(),
555 Undefined(env->isolate())
556 };
557
558 const char* msg = stream->Error();
559 if (msg != nullptr) {
560 argv[2] = OneByteString(env->isolate(), msg);
561 stream->ClearError();
562 }
563
564 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
565 async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
566 }
567
OnStreamAfterWrite(WriteWrap * req_wrap,int status)568 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
569 WriteWrap* req_wrap, int status) {
570 OnStreamAfterReqFinished(req_wrap, status);
571 }
572
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)573 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
574 ShutdownWrap* req_wrap, int status) {
575 OnStreamAfterReqFinished(req_wrap, status);
576 }
577
578
579 } // namespace node
580