1 #include "stream_base.h" // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 #include "allocated_buffer-inl.h"
5
6 #include "node.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "env-inl.h"
10 #include "js_stream.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14
15 #include <climits> // INT_MAX
16
17 namespace node {
18
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::ConstructorBehavior;
22 using v8::Context;
23 using v8::DontDelete;
24 using v8::DontEnum;
25 using v8::External;
26 using v8::Function;
27 using v8::FunctionCallbackInfo;
28 using v8::FunctionTemplate;
29 using v8::HandleScope;
30 using v8::Integer;
31 using v8::Local;
32 using v8::MaybeLocal;
33 using v8::Object;
34 using v8::PropertyAttribute;
35 using v8::ReadOnly;
36 using v8::SideEffectType;
37 using v8::Signature;
38 using v8::String;
39 using v8::Value;
40
41 template int StreamBase::WriteString<ASCII>(
42 const FunctionCallbackInfo<Value>& args);
43 template int StreamBase::WriteString<UTF8>(
44 const FunctionCallbackInfo<Value>& args);
45 template int StreamBase::WriteString<UCS2>(
46 const FunctionCallbackInfo<Value>& args);
47 template int StreamBase::WriteString<LATIN1>(
48 const FunctionCallbackInfo<Value>& args);
49
50
ReadStartJS(const FunctionCallbackInfo<Value> & args)51 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
52 return ReadStart();
53 }
54
55
ReadStopJS(const FunctionCallbackInfo<Value> & args)56 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
57 return ReadStop();
58 }
59
UseUserBuffer(const FunctionCallbackInfo<Value> & args)60 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
61 CHECK(Buffer::HasInstance(args[0]));
62
63 uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
64 PushStreamListener(new CustomBufferJSListener(buf));
65 return 0;
66 }
67
Shutdown(const FunctionCallbackInfo<Value> & args)68 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
69 CHECK(args[0]->IsObject());
70 Local<Object> req_wrap_obj = args[0].As<Object>();
71
72 return Shutdown(req_wrap_obj);
73 }
74
SetWriteResult(const StreamWriteResult & res)75 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
76 env_->stream_base_state()[kBytesWritten] = res.bytes;
77 env_->stream_base_state()[kLastWriteWasAsync] = res.async;
78 }
79
Writev(const FunctionCallbackInfo<Value> & args)80 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
81 Environment* env = Environment::GetCurrent(args);
82
83 CHECK(args[0]->IsObject());
84 CHECK(args[1]->IsArray());
85
86 Local<Object> req_wrap_obj = args[0].As<Object>();
87 Local<Array> chunks = args[1].As<Array>();
88 bool all_buffers = args[2]->IsTrue();
89
90 size_t count;
91 if (all_buffers)
92 count = chunks->Length();
93 else
94 count = chunks->Length() >> 1;
95
96 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
97
98 size_t storage_size = 0;
99 size_t offset;
100
101 if (!all_buffers) {
102 // Determine storage size first
103 for (size_t i = 0; i < count; i++) {
104 Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
105
106 if (Buffer::HasInstance(chunk))
107 continue;
108 // Buffer chunk, no additional storage required
109
110 // String chunk
111 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
112 enum encoding encoding = ParseEncoding(env->isolate(),
113 chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
114 size_t chunk_size;
115 if (encoding == UTF8 && string->Length() > 65535 &&
116 !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
117 return 0;
118 else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
119 .To(&chunk_size))
120 return 0;
121 storage_size += chunk_size;
122 }
123
124 if (storage_size > INT_MAX)
125 return UV_ENOBUFS;
126 } else {
127 for (size_t i = 0; i < count; i++) {
128 Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
129 bufs[i].base = Buffer::Data(chunk);
130 bufs[i].len = Buffer::Length(chunk);
131 }
132 }
133
134 AllocatedBuffer storage;
135 if (storage_size > 0)
136 storage = AllocatedBuffer::AllocateManaged(env, storage_size);
137
138 offset = 0;
139 if (!all_buffers) {
140 for (size_t i = 0; i < count; i++) {
141 Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
142
143 // Write buffer
144 if (Buffer::HasInstance(chunk)) {
145 bufs[i].base = Buffer::Data(chunk);
146 bufs[i].len = Buffer::Length(chunk);
147 continue;
148 }
149
150 // Write string
151 CHECK_LE(offset, storage_size);
152 char* str_storage = storage.data() + offset;
153 size_t str_size = storage.size() - offset;
154
155 Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
156 enum encoding encoding = ParseEncoding(env->isolate(),
157 chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
158 str_size = StringBytes::Write(env->isolate(),
159 str_storage,
160 str_size,
161 string,
162 encoding);
163 bufs[i].base = str_storage;
164 bufs[i].len = str_size;
165 offset += str_size;
166 }
167 }
168
169 StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
170 SetWriteResult(res);
171 if (res.wrap != nullptr && storage_size > 0) {
172 res.wrap->SetAllocatedStorage(std::move(storage));
173 }
174 return res.err;
175 }
176
177
WriteBuffer(const FunctionCallbackInfo<Value> & args)178 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
179 CHECK(args[0]->IsObject());
180
181 Environment* env = Environment::GetCurrent(args);
182
183 if (!args[1]->IsUint8Array()) {
184 node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
185 return 0;
186 }
187
188 Local<Object> req_wrap_obj = args[0].As<Object>();
189 uv_buf_t buf;
190 buf.base = Buffer::Data(args[1]);
191 buf.len = Buffer::Length(args[1]);
192
193 uv_stream_t* send_handle = nullptr;
194
195 if (args[2]->IsObject() && IsIPCPipe()) {
196 Local<Object> send_handle_obj = args[2].As<Object>();
197
198 HandleWrap* wrap;
199 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
200 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
201 // Reference LibuvStreamWrap instance to prevent it from being garbage
202 // collected before `AfterWrite` is called.
203 req_wrap_obj->Set(env->context(),
204 env->handle_string(),
205 send_handle_obj).Check();
206 }
207
208 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
209 SetWriteResult(res);
210
211 return res.err;
212 }
213
214
215 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)216 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
217 Environment* env = Environment::GetCurrent(args);
218 CHECK(args[0]->IsObject());
219 CHECK(args[1]->IsString());
220
221 Local<Object> req_wrap_obj = args[0].As<Object>();
222 Local<String> string = args[1].As<String>();
223 Local<Object> send_handle_obj;
224 if (args[2]->IsObject())
225 send_handle_obj = args[2].As<Object>();
226
227 // Compute the size of the storage that the string will be flattened into.
228 // For UTF8 strings that are very long, go ahead and take the hit for
229 // computing their actual size, rather than tripling the storage.
230 size_t storage_size;
231 if (enc == UTF8 && string->Length() > 65535 &&
232 !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
233 return 0;
234 else if (!StringBytes::StorageSize(env->isolate(), string, enc)
235 .To(&storage_size))
236 return 0;
237
238 if (storage_size > INT_MAX)
239 return UV_ENOBUFS;
240
241 // Try writing immediately if write size isn't too big
242 char stack_storage[16384]; // 16kb
243 size_t data_size;
244 size_t synchronously_written = 0;
245 uv_buf_t buf;
246
247 bool try_write = storage_size <= sizeof(stack_storage) &&
248 (!IsIPCPipe() || send_handle_obj.IsEmpty());
249 if (try_write) {
250 data_size = StringBytes::Write(env->isolate(),
251 stack_storage,
252 storage_size,
253 string,
254 enc);
255 buf = uv_buf_init(stack_storage, data_size);
256
257 uv_buf_t* bufs = &buf;
258 size_t count = 1;
259 const int err = DoTryWrite(&bufs, &count);
260 // Keep track of the bytes written here, because we're taking a shortcut
261 // by using `DoTryWrite()` directly instead of using the utilities
262 // provided by `Write()`.
263 synchronously_written = count == 0 ? data_size : data_size - buf.len;
264 bytes_written_ += synchronously_written;
265
266 // Immediate failure or success
267 if (err != 0 || count == 0) {
268 SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
269 return err;
270 }
271
272 // Partial write
273 CHECK_EQ(count, 1);
274 }
275
276 AllocatedBuffer data;
277
278 if (try_write) {
279 // Copy partial data
280 data = AllocatedBuffer::AllocateManaged(env, buf.len);
281 memcpy(data.data(), buf.base, buf.len);
282 data_size = buf.len;
283 } else {
284 // Write it
285 data = AllocatedBuffer::AllocateManaged(env, storage_size);
286 data_size = StringBytes::Write(env->isolate(),
287 data.data(),
288 storage_size,
289 string,
290 enc);
291 }
292
293 CHECK_LE(data_size, storage_size);
294
295 buf = uv_buf_init(data.data(), data_size);
296
297 uv_stream_t* send_handle = nullptr;
298
299 if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
300 HandleWrap* wrap;
301 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
302 send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
303 // Reference LibuvStreamWrap instance to prevent it from being garbage
304 // collected before `AfterWrite` is called.
305 req_wrap_obj->Set(env->context(),
306 env->handle_string(),
307 send_handle_obj).Check();
308 }
309
310 StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
311 res.bytes += synchronously_written;
312
313 SetWriteResult(res);
314 if (res.wrap != nullptr) {
315 res.wrap->SetAllocatedStorage(std::move(data));
316 }
317
318 return res.err;
319 }
320
321
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)322 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
323 Local<ArrayBuffer> ab,
324 size_t offset,
325 StreamBaseJSChecks checks) {
326 Environment* env = env_;
327
328 DCHECK_EQ(static_cast<int32_t>(nread), nread);
329 DCHECK_LE(offset, INT32_MAX);
330
331 if (checks == DONT_SKIP_NREAD_CHECKS) {
332 if (ab.IsEmpty()) {
333 DCHECK_EQ(offset, 0);
334 DCHECK_LE(nread, 0);
335 } else {
336 DCHECK_GE(nread, 0);
337 }
338 }
339
340 env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
341 env->stream_base_state()[kArrayBufferOffset] = offset;
342
343 Local<Value> argv[] = {
344 ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
345 };
346
347 AsyncWrap* wrap = GetAsyncWrap();
348 CHECK_NOT_NULL(wrap);
349 Local<Value> onread = wrap->object()->GetInternalField(
350 StreamBase::kOnReadFunctionField);
351 CHECK(onread->IsFunction());
352 return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
353 }
354
355
IsIPCPipe()356 bool StreamBase::IsIPCPipe() {
357 return false;
358 }
359
360
GetFD()361 int StreamBase::GetFD() {
362 return -1;
363 }
364
365
GetObject()366 Local<Object> StreamBase::GetObject() {
367 return GetAsyncWrap()->object();
368 }
369
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)370 void StreamBase::AddMethod(Environment* env,
371 Local<Signature> signature,
372 enum PropertyAttribute attributes,
373 Local<FunctionTemplate> t,
374 JSMethodFunction* stream_method,
375 Local<String> string) {
376 Local<FunctionTemplate> templ =
377 env->NewFunctionTemplate(stream_method,
378 signature,
379 ConstructorBehavior::kThrow,
380 SideEffectType::kHasNoSideEffect);
381 t->PrototypeTemplate()->SetAccessorProperty(
382 string, templ, Local<FunctionTemplate>(), attributes);
383 }
384
AddMethods(Environment * env,Local<FunctionTemplate> t)385 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
386 HandleScope scope(env->isolate());
387
388 enum PropertyAttribute attributes =
389 static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
390 Local<Signature> sig = Signature::New(env->isolate(), t);
391
392 AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
393 AddMethod(
394 env, sig, attributes, t, GetExternal, env->external_stream_string());
395 AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
396 AddMethod(
397 env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
398 env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
399 env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
400 env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
401 env->SetProtoMethod(t,
402 "useUserBuffer",
403 JSMethod<&StreamBase::UseUserBuffer>);
404 env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
405 env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
406 env->SetProtoMethod(
407 t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
408 env->SetProtoMethod(
409 t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
410 env->SetProtoMethod(
411 t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
412 env->SetProtoMethod(
413 t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
414 t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
415 "isStreamBase"),
416 True(env->isolate()));
417 t->PrototypeTemplate()->SetAccessor(
418 FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
419 BaseObject::InternalFieldGet<
420 StreamBase::kOnReadFunctionField>,
421 BaseObject::InternalFieldSet<
422 StreamBase::kOnReadFunctionField,
423 &Value::IsFunction>);
424 }
425
GetFD(const FunctionCallbackInfo<Value> & args)426 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
427 // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
428 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
429 if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
430
431 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
432
433 args.GetReturnValue().Set(wrap->GetFD());
434 }
435
GetBytesRead(const FunctionCallbackInfo<Value> & args)436 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
437 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
438 if (wrap == nullptr) return args.GetReturnValue().Set(0);
439
440 // uint64_t -> double. 53bits is enough for all real cases.
441 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
442 }
443
GetBytesWritten(const FunctionCallbackInfo<Value> & args)444 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
445 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
446 if (wrap == nullptr) return args.GetReturnValue().Set(0);
447
448 // uint64_t -> double. 53bits is enough for all real cases.
449 args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
450 }
451
GetExternal(const FunctionCallbackInfo<Value> & args)452 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
453 StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
454 if (wrap == nullptr) return;
455
456 Local<External> ext = External::New(args.GetIsolate(), wrap);
457 args.GetReturnValue().Set(ext);
458 }
459
460 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)461 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
462 StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
463 if (wrap == nullptr) return;
464
465 if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
466
467 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
468 args.GetReturnValue().Set((wrap->*Method)(args));
469 }
470
DoTryWrite(uv_buf_t ** bufs,size_t * count)471 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
472 // No TryWrite by default
473 return 0;
474 }
475
476
Error() const477 const char* StreamResource::Error() const {
478 return nullptr;
479 }
480
481
ClearError()482 void StreamResource::ClearError() {
483 // No-op
484 }
485
486
OnStreamAlloc(size_t suggested_size)487 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
488 CHECK_NOT_NULL(stream_);
489 Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
490 return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
491 }
492
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)493 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
494 CHECK_NOT_NULL(stream_);
495 StreamBase* stream = static_cast<StreamBase*>(stream_);
496 Environment* env = stream->stream_env();
497 HandleScope handle_scope(env->isolate());
498 Context::Scope context_scope(env->context());
499 AllocatedBuffer buf(env, buf_);
500
501 if (nread <= 0) {
502 if (nread < 0)
503 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
504 return;
505 }
506
507 CHECK_LE(static_cast<size_t>(nread), buf.size());
508 buf.Resize(nread);
509
510 stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
511 }
512
513
OnStreamAlloc(size_t suggested_size)514 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
515 return buffer_;
516 }
517
518
OnStreamRead(ssize_t nread,const uv_buf_t & buf)519 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
520 CHECK_NOT_NULL(stream_);
521
522 StreamBase* stream = static_cast<StreamBase*>(stream_);
523 Environment* env = stream->stream_env();
524 HandleScope handle_scope(env->isolate());
525 Context::Scope context_scope(env->context());
526
527 // To deal with the case where POLLHUP is received and UV_EOF is returned, as
528 // libuv returns an empty buffer (on unices only).
529 if (nread == UV_EOF && buf.base == nullptr) {
530 stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
531 return;
532 }
533
534 CHECK_EQ(buf.base, buffer_.base);
535
536 MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
537 Local<ArrayBuffer>(),
538 0,
539 StreamBase::SKIP_NREAD_CHECKS);
540 Local<Value> next_buf_v;
541 if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
542 buffer_.base = Buffer::Data(next_buf_v);
543 buffer_.len = Buffer::Length(next_buf_v);
544 }
545 }
546
547
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)548 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
549 StreamReq* req_wrap, int status) {
550 StreamBase* stream = static_cast<StreamBase*>(stream_);
551 Environment* env = stream->stream_env();
552 AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
553 HandleScope handle_scope(env->isolate());
554 Context::Scope context_scope(env->context());
555 CHECK(!async_wrap->persistent().IsEmpty());
556 Local<Object> req_wrap_obj = async_wrap->object();
557
558 Local<Value> argv[] = {
559 Integer::New(env->isolate(), status),
560 stream->GetObject(),
561 Undefined(env->isolate())
562 };
563
564 const char* msg = stream->Error();
565 if (msg != nullptr) {
566 argv[2] = OneByteString(env->isolate(), msg);
567 stream->ClearError();
568 }
569
570 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
571 async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
572 }
573
OnStreamAfterWrite(WriteWrap * req_wrap,int status)574 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
575 WriteWrap* req_wrap, int status) {
576 OnStreamAfterReqFinished(req_wrap, status);
577 }
578
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)579 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
580 ShutdownWrap* req_wrap, int status) {
581 OnStreamAfterReqFinished(req_wrap, status);
582 }
583
OnDone(int status)584 void ShutdownWrap::OnDone(int status) {
585 stream()->EmitAfterShutdown(this, status);
586 Dispose();
587 }
588
OnDone(int status)589 void WriteWrap::OnDone(int status) {
590 stream()->EmitAfterWrite(this, status);
591 Dispose();
592 }
593
~StreamListener()594 StreamListener::~StreamListener() {
595 if (stream_ != nullptr)
596 stream_->RemoveStreamListener(this);
597 }
598
OnStreamAfterShutdown(ShutdownWrap * w,int status)599 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
600 CHECK_NOT_NULL(previous_listener_);
601 previous_listener_->OnStreamAfterShutdown(w, status);
602 }
603
OnStreamAfterWrite(WriteWrap * w,int status)604 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
605 CHECK_NOT_NULL(previous_listener_);
606 previous_listener_->OnStreamAfterWrite(w, status);
607 }
608
~StreamResource()609 StreamResource::~StreamResource() {
610 while (listener_ != nullptr) {
611 StreamListener* listener = listener_;
612 listener->OnStreamDestroy();
613 // Remove the listener if it didn’t remove itself. This makes the logic
614 // in `OnStreamDestroy()` implementations easier, because they
615 // may call generic cleanup functions which can just remove the
616 // listener unconditionally.
617 if (listener == listener_)
618 RemoveStreamListener(listener_);
619 }
620 }
621
CreateShutdownWrap(Local<Object> object)622 ShutdownWrap* StreamBase::CreateShutdownWrap(
623 Local<Object> object) {
624 auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
625 wrap->MakeWeak();
626 return wrap;
627 }
628
CreateWriteWrap(Local<Object> object)629 WriteWrap* StreamBase::CreateWriteWrap(
630 Local<Object> object) {
631 auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
632 wrap->MakeWeak();
633 return wrap;
634 }
635
636 } // namespace node
637