• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_base.h"  // NOLINT(build/include_inline)
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 #include "allocated_buffer-inl.h"
5 
6 #include "node.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "env-inl.h"
10 #include "js_stream.h"
11 #include "string_bytes.h"
12 #include "util-inl.h"
13 #include "v8.h"
14 
15 #include <climits>  // INT_MAX
16 
17 namespace node {
18 
19 using v8::Array;
20 using v8::ArrayBuffer;
21 using v8::ConstructorBehavior;
22 using v8::Context;
23 using v8::DontDelete;
24 using v8::DontEnum;
25 using v8::External;
26 using v8::Function;
27 using v8::FunctionCallbackInfo;
28 using v8::FunctionTemplate;
29 using v8::HandleScope;
30 using v8::Integer;
31 using v8::Local;
32 using v8::MaybeLocal;
33 using v8::Object;
34 using v8::PropertyAttribute;
35 using v8::ReadOnly;
36 using v8::SideEffectType;
37 using v8::Signature;
38 using v8::String;
39 using v8::Value;
40 
41 template int StreamBase::WriteString<ASCII>(
42     const FunctionCallbackInfo<Value>& args);
43 template int StreamBase::WriteString<UTF8>(
44     const FunctionCallbackInfo<Value>& args);
45 template int StreamBase::WriteString<UCS2>(
46     const FunctionCallbackInfo<Value>& args);
47 template int StreamBase::WriteString<LATIN1>(
48     const FunctionCallbackInfo<Value>& args);
49 
50 
ReadStartJS(const FunctionCallbackInfo<Value> & args)51 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
52   return ReadStart();
53 }
54 
55 
ReadStopJS(const FunctionCallbackInfo<Value> & args)56 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
57   return ReadStop();
58 }
59 
UseUserBuffer(const FunctionCallbackInfo<Value> & args)60 int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
61   CHECK(Buffer::HasInstance(args[0]));
62 
63   uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
64   PushStreamListener(new CustomBufferJSListener(buf));
65   return 0;
66 }
67 
Shutdown(const FunctionCallbackInfo<Value> & args)68 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
69   CHECK(args[0]->IsObject());
70   Local<Object> req_wrap_obj = args[0].As<Object>();
71 
72   return Shutdown(req_wrap_obj);
73 }
74 
SetWriteResult(const StreamWriteResult & res)75 void StreamBase::SetWriteResult(const StreamWriteResult& res) {
76   env_->stream_base_state()[kBytesWritten] = res.bytes;
77   env_->stream_base_state()[kLastWriteWasAsync] = res.async;
78 }
79 
Writev(const FunctionCallbackInfo<Value> & args)80 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
81   Environment* env = Environment::GetCurrent(args);
82 
83   CHECK(args[0]->IsObject());
84   CHECK(args[1]->IsArray());
85 
86   Local<Object> req_wrap_obj = args[0].As<Object>();
87   Local<Array> chunks = args[1].As<Array>();
88   bool all_buffers = args[2]->IsTrue();
89 
90   size_t count;
91   if (all_buffers)
92     count = chunks->Length();
93   else
94     count = chunks->Length() >> 1;
95 
96   MaybeStackBuffer<uv_buf_t, 16> bufs(count);
97 
98   size_t storage_size = 0;
99   size_t offset;
100 
101   if (!all_buffers) {
102     // Determine storage size first
103     for (size_t i = 0; i < count; i++) {
104       Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
105 
106       if (Buffer::HasInstance(chunk))
107         continue;
108         // Buffer chunk, no additional storage required
109 
110       // String chunk
111       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
112       enum encoding encoding = ParseEncoding(env->isolate(),
113           chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
114       size_t chunk_size;
115       if (encoding == UTF8 && string->Length() > 65535 &&
116           !StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
117         return 0;
118       else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
119                     .To(&chunk_size))
120         return 0;
121       storage_size += chunk_size;
122     }
123 
124     if (storage_size > INT_MAX)
125       return UV_ENOBUFS;
126   } else {
127     for (size_t i = 0; i < count; i++) {
128       Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
129       bufs[i].base = Buffer::Data(chunk);
130       bufs[i].len = Buffer::Length(chunk);
131     }
132   }
133 
134   AllocatedBuffer storage;
135   if (storage_size > 0)
136     storage = AllocatedBuffer::AllocateManaged(env, storage_size);
137 
138   offset = 0;
139   if (!all_buffers) {
140     for (size_t i = 0; i < count; i++) {
141       Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
142 
143       // Write buffer
144       if (Buffer::HasInstance(chunk)) {
145         bufs[i].base = Buffer::Data(chunk);
146         bufs[i].len = Buffer::Length(chunk);
147         continue;
148       }
149 
150       // Write string
151       CHECK_LE(offset, storage_size);
152       char* str_storage = storage.data() + offset;
153       size_t str_size = storage.size() - offset;
154 
155       Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
156       enum encoding encoding = ParseEncoding(env->isolate(),
157           chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
158       str_size = StringBytes::Write(env->isolate(),
159                                     str_storage,
160                                     str_size,
161                                     string,
162                                     encoding);
163       bufs[i].base = str_storage;
164       bufs[i].len = str_size;
165       offset += str_size;
166     }
167   }
168 
169   StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
170   SetWriteResult(res);
171   if (res.wrap != nullptr && storage_size > 0) {
172     res.wrap->SetAllocatedStorage(std::move(storage));
173   }
174   return res.err;
175 }
176 
177 
WriteBuffer(const FunctionCallbackInfo<Value> & args)178 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
179   CHECK(args[0]->IsObject());
180 
181   Environment* env = Environment::GetCurrent(args);
182 
183   if (!args[1]->IsUint8Array()) {
184     node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
185     return 0;
186   }
187 
188   Local<Object> req_wrap_obj = args[0].As<Object>();
189   uv_buf_t buf;
190   buf.base = Buffer::Data(args[1]);
191   buf.len = Buffer::Length(args[1]);
192 
193   uv_stream_t* send_handle = nullptr;
194 
195   if (args[2]->IsObject() && IsIPCPipe()) {
196     Local<Object> send_handle_obj = args[2].As<Object>();
197 
198     HandleWrap* wrap;
199     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
200     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
201     // Reference LibuvStreamWrap instance to prevent it from being garbage
202     // collected before `AfterWrite` is called.
203     req_wrap_obj->Set(env->context(),
204                       env->handle_string(),
205                       send_handle_obj).Check();
206   }
207 
208   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
209   SetWriteResult(res);
210 
211   return res.err;
212 }
213 
214 
215 template <enum encoding enc>
WriteString(const FunctionCallbackInfo<Value> & args)216 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
217   Environment* env = Environment::GetCurrent(args);
218   CHECK(args[0]->IsObject());
219   CHECK(args[1]->IsString());
220 
221   Local<Object> req_wrap_obj = args[0].As<Object>();
222   Local<String> string = args[1].As<String>();
223   Local<Object> send_handle_obj;
224   if (args[2]->IsObject())
225     send_handle_obj = args[2].As<Object>();
226 
227   // Compute the size of the storage that the string will be flattened into.
228   // For UTF8 strings that are very long, go ahead and take the hit for
229   // computing their actual size, rather than tripling the storage.
230   size_t storage_size;
231   if (enc == UTF8 && string->Length() > 65535 &&
232       !StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
233     return 0;
234   else if (!StringBytes::StorageSize(env->isolate(), string, enc)
235                 .To(&storage_size))
236     return 0;
237 
238   if (storage_size > INT_MAX)
239     return UV_ENOBUFS;
240 
241   // Try writing immediately if write size isn't too big
242   char stack_storage[16384];  // 16kb
243   size_t data_size;
244   size_t synchronously_written = 0;
245   uv_buf_t buf;
246 
247   bool try_write = storage_size <= sizeof(stack_storage) &&
248                    (!IsIPCPipe() || send_handle_obj.IsEmpty());
249   if (try_write) {
250     data_size = StringBytes::Write(env->isolate(),
251                                    stack_storage,
252                                    storage_size,
253                                    string,
254                                    enc);
255     buf = uv_buf_init(stack_storage, data_size);
256 
257     uv_buf_t* bufs = &buf;
258     size_t count = 1;
259     const int err = DoTryWrite(&bufs, &count);
260     // Keep track of the bytes written here, because we're taking a shortcut
261     // by using `DoTryWrite()` directly instead of using the utilities
262     // provided by `Write()`.
263     synchronously_written = count == 0 ? data_size : data_size - buf.len;
264     bytes_written_ += synchronously_written;
265 
266     // Immediate failure or success
267     if (err != 0 || count == 0) {
268       SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
269       return err;
270     }
271 
272     // Partial write
273     CHECK_EQ(count, 1);
274   }
275 
276   AllocatedBuffer data;
277 
278   if (try_write) {
279     // Copy partial data
280     data = AllocatedBuffer::AllocateManaged(env, buf.len);
281     memcpy(data.data(), buf.base, buf.len);
282     data_size = buf.len;
283   } else {
284     // Write it
285     data = AllocatedBuffer::AllocateManaged(env, storage_size);
286     data_size = StringBytes::Write(env->isolate(),
287                                    data.data(),
288                                    storage_size,
289                                    string,
290                                    enc);
291   }
292 
293   CHECK_LE(data_size, storage_size);
294 
295   buf = uv_buf_init(data.data(), data_size);
296 
297   uv_stream_t* send_handle = nullptr;
298 
299   if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
300     HandleWrap* wrap;
301     ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
302     send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
303     // Reference LibuvStreamWrap instance to prevent it from being garbage
304     // collected before `AfterWrite` is called.
305     req_wrap_obj->Set(env->context(),
306                       env->handle_string(),
307                       send_handle_obj).Check();
308   }
309 
310   StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
311   res.bytes += synchronously_written;
312 
313   SetWriteResult(res);
314   if (res.wrap != nullptr) {
315     res.wrap->SetAllocatedStorage(std::move(data));
316   }
317 
318   return res.err;
319 }
320 
321 
CallJSOnreadMethod(ssize_t nread,Local<ArrayBuffer> ab,size_t offset,StreamBaseJSChecks checks)322 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
323                                                  Local<ArrayBuffer> ab,
324                                                  size_t offset,
325                                                  StreamBaseJSChecks checks) {
326   Environment* env = env_;
327 
328   DCHECK_EQ(static_cast<int32_t>(nread), nread);
329   DCHECK_LE(offset, INT32_MAX);
330 
331   if (checks == DONT_SKIP_NREAD_CHECKS) {
332     if (ab.IsEmpty()) {
333       DCHECK_EQ(offset, 0);
334       DCHECK_LE(nread, 0);
335     } else {
336       DCHECK_GE(nread, 0);
337     }
338   }
339 
340   env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
341   env->stream_base_state()[kArrayBufferOffset] = offset;
342 
343   Local<Value> argv[] = {
344     ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
345   };
346 
347   AsyncWrap* wrap = GetAsyncWrap();
348   CHECK_NOT_NULL(wrap);
349   Local<Value> onread = wrap->object()->GetInternalField(
350       StreamBase::kOnReadFunctionField);
351   CHECK(onread->IsFunction());
352   return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
353 }
354 
355 
IsIPCPipe()356 bool StreamBase::IsIPCPipe() {
357   return false;
358 }
359 
360 
GetFD()361 int StreamBase::GetFD() {
362   return -1;
363 }
364 
365 
GetObject()366 Local<Object> StreamBase::GetObject() {
367   return GetAsyncWrap()->object();
368 }
369 
AddMethod(Environment * env,Local<Signature> signature,enum PropertyAttribute attributes,Local<FunctionTemplate> t,JSMethodFunction * stream_method,Local<String> string)370 void StreamBase::AddMethod(Environment* env,
371                            Local<Signature> signature,
372                            enum PropertyAttribute attributes,
373                            Local<FunctionTemplate> t,
374                            JSMethodFunction* stream_method,
375                            Local<String> string) {
376   Local<FunctionTemplate> templ =
377       env->NewFunctionTemplate(stream_method,
378                                signature,
379                                ConstructorBehavior::kThrow,
380                                SideEffectType::kHasNoSideEffect);
381   t->PrototypeTemplate()->SetAccessorProperty(
382       string, templ, Local<FunctionTemplate>(), attributes);
383 }
384 
AddMethods(Environment * env,Local<FunctionTemplate> t)385 void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
386   HandleScope scope(env->isolate());
387 
388   enum PropertyAttribute attributes =
389       static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
390   Local<Signature> sig = Signature::New(env->isolate(), t);
391 
392   AddMethod(env, sig, attributes, t, GetFD, env->fd_string());
393   AddMethod(
394       env, sig, attributes, t, GetExternal, env->external_stream_string());
395   AddMethod(env, sig, attributes, t, GetBytesRead, env->bytes_read_string());
396   AddMethod(
397       env, sig, attributes, t, GetBytesWritten, env->bytes_written_string());
398   env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
399   env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
400   env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
401   env->SetProtoMethod(t,
402                       "useUserBuffer",
403                       JSMethod<&StreamBase::UseUserBuffer>);
404   env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
405   env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
406   env->SetProtoMethod(
407       t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>);
408   env->SetProtoMethod(
409       t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
410   env->SetProtoMethod(
411       t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
412   env->SetProtoMethod(
413       t, "writeLatin1String", JSMethod<&StreamBase::WriteString<LATIN1>>);
414   t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(),
415                                                     "isStreamBase"),
416                               True(env->isolate()));
417   t->PrototypeTemplate()->SetAccessor(
418       FIXED_ONE_BYTE_STRING(env->isolate(), "onread"),
419       BaseObject::InternalFieldGet<
420           StreamBase::kOnReadFunctionField>,
421       BaseObject::InternalFieldSet<
422           StreamBase::kOnReadFunctionField,
423           &Value::IsFunction>);
424 }
425 
GetFD(const FunctionCallbackInfo<Value> & args)426 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
427   // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
428   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
429   if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
430 
431   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
432 
433   args.GetReturnValue().Set(wrap->GetFD());
434 }
435 
GetBytesRead(const FunctionCallbackInfo<Value> & args)436 void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
437   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
438   if (wrap == nullptr) return args.GetReturnValue().Set(0);
439 
440   // uint64_t -> double. 53bits is enough for all real cases.
441   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
442 }
443 
GetBytesWritten(const FunctionCallbackInfo<Value> & args)444 void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
445   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
446   if (wrap == nullptr) return args.GetReturnValue().Set(0);
447 
448   // uint64_t -> double. 53bits is enough for all real cases.
449   args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
450 }
451 
GetExternal(const FunctionCallbackInfo<Value> & args)452 void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
453   StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
454   if (wrap == nullptr) return;
455 
456   Local<External> ext = External::New(args.GetIsolate(), wrap);
457   args.GetReturnValue().Set(ext);
458 }
459 
460 template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
JSMethod(const FunctionCallbackInfo<Value> & args)461 void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
462   StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
463   if (wrap == nullptr) return;
464 
465   if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
466 
467   AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
468   args.GetReturnValue().Set((wrap->*Method)(args));
469 }
470 
DoTryWrite(uv_buf_t ** bufs,size_t * count)471 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
472   // No TryWrite by default
473   return 0;
474 }
475 
476 
Error() const477 const char* StreamResource::Error() const {
478   return nullptr;
479 }
480 
481 
ClearError()482 void StreamResource::ClearError() {
483   // No-op
484 }
485 
486 
OnStreamAlloc(size_t suggested_size)487 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
488   CHECK_NOT_NULL(stream_);
489   Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
490   return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
491 }
492 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)493 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
494   CHECK_NOT_NULL(stream_);
495   StreamBase* stream = static_cast<StreamBase*>(stream_);
496   Environment* env = stream->stream_env();
497   HandleScope handle_scope(env->isolate());
498   Context::Scope context_scope(env->context());
499   AllocatedBuffer buf(env, buf_);
500 
501   if (nread <= 0)  {
502     if (nread < 0)
503       stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
504     return;
505   }
506 
507   CHECK_LE(static_cast<size_t>(nread), buf.size());
508   buf.Resize(nread);
509 
510   stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
511 }
512 
513 
OnStreamAlloc(size_t suggested_size)514 uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
515   return buffer_;
516 }
517 
518 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)519 void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
520   CHECK_NOT_NULL(stream_);
521 
522   StreamBase* stream = static_cast<StreamBase*>(stream_);
523   Environment* env = stream->stream_env();
524   HandleScope handle_scope(env->isolate());
525   Context::Scope context_scope(env->context());
526 
527   // To deal with the case where POLLHUP is received and UV_EOF is returned, as
528   // libuv returns an empty buffer (on unices only).
529   if (nread == UV_EOF && buf.base == nullptr) {
530     stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
531     return;
532   }
533 
534   CHECK_EQ(buf.base, buffer_.base);
535 
536   MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
537                              Local<ArrayBuffer>(),
538                              0,
539                              StreamBase::SKIP_NREAD_CHECKS);
540   Local<Value> next_buf_v;
541   if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
542     buffer_.base = Buffer::Data(next_buf_v);
543     buffer_.len = Buffer::Length(next_buf_v);
544   }
545 }
546 
547 
OnStreamAfterReqFinished(StreamReq * req_wrap,int status)548 void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
549     StreamReq* req_wrap, int status) {
550   StreamBase* stream = static_cast<StreamBase*>(stream_);
551   Environment* env = stream->stream_env();
552   AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
553   HandleScope handle_scope(env->isolate());
554   Context::Scope context_scope(env->context());
555   CHECK(!async_wrap->persistent().IsEmpty());
556   Local<Object> req_wrap_obj = async_wrap->object();
557 
558   Local<Value> argv[] = {
559     Integer::New(env->isolate(), status),
560     stream->GetObject(),
561     Undefined(env->isolate())
562   };
563 
564   const char* msg = stream->Error();
565   if (msg != nullptr) {
566     argv[2] = OneByteString(env->isolate(), msg);
567     stream->ClearError();
568   }
569 
570   if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
571     async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
572 }
573 
OnStreamAfterWrite(WriteWrap * req_wrap,int status)574 void ReportWritesToJSStreamListener::OnStreamAfterWrite(
575     WriteWrap* req_wrap, int status) {
576   OnStreamAfterReqFinished(req_wrap, status);
577 }
578 
OnStreamAfterShutdown(ShutdownWrap * req_wrap,int status)579 void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
580     ShutdownWrap* req_wrap, int status) {
581   OnStreamAfterReqFinished(req_wrap, status);
582 }
583 
OnDone(int status)584 void ShutdownWrap::OnDone(int status) {
585   stream()->EmitAfterShutdown(this, status);
586   Dispose();
587 }
588 
OnDone(int status)589 void WriteWrap::OnDone(int status) {
590   stream()->EmitAfterWrite(this, status);
591   Dispose();
592 }
593 
~StreamListener()594 StreamListener::~StreamListener() {
595   if (stream_ != nullptr)
596     stream_->RemoveStreamListener(this);
597 }
598 
OnStreamAfterShutdown(ShutdownWrap * w,int status)599 void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
600   CHECK_NOT_NULL(previous_listener_);
601   previous_listener_->OnStreamAfterShutdown(w, status);
602 }
603 
OnStreamAfterWrite(WriteWrap * w,int status)604 void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
605   CHECK_NOT_NULL(previous_listener_);
606   previous_listener_->OnStreamAfterWrite(w, status);
607 }
608 
~StreamResource()609 StreamResource::~StreamResource() {
610   while (listener_ != nullptr) {
611     StreamListener* listener = listener_;
612     listener->OnStreamDestroy();
613     // Remove the listener if it didn’t remove itself. This makes the logic
614     // in `OnStreamDestroy()` implementations easier, because they
615     // may call generic cleanup functions which can just remove the
616     // listener unconditionally.
617     if (listener == listener_)
618       RemoveStreamListener(listener_);
619   }
620 }
621 
CreateShutdownWrap(Local<Object> object)622 ShutdownWrap* StreamBase::CreateShutdownWrap(
623     Local<Object> object) {
624   auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
625   wrap->MakeWeak();
626   return wrap;
627 }
628 
CreateWriteWrap(Local<Object> object)629 WriteWrap* StreamBase::CreateWriteWrap(
630     Local<Object> object) {
631   auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
632   wrap->MakeWeak();
633   return wrap;
634 }
635 
636 }  // namespace node
637