• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_pipe.h"
2 #include "stream_base-inl.h"
3 #include "node_buffer.h"
4 #include "util-inl.h"
5 
6 using v8::Context;
7 using v8::Function;
8 using v8::FunctionCallbackInfo;
9 using v8::FunctionTemplate;
10 using v8::Local;
11 using v8::Object;
12 using v8::Value;
13 
14 namespace node {
15 
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)16 StreamPipe::StreamPipe(StreamBase* source,
17                        StreamBase* sink,
18                        Local<Object> obj)
19     : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20   MakeWeak();
21 
22   CHECK_NOT_NULL(sink);
23   CHECK_NOT_NULL(source);
24 
25   source->PushStreamListener(&readable_listener_);
26   sink->PushStreamListener(&writable_listener_);
27 
28   uses_wants_write_ = sink->HasWantsWrite();
29 
30   // Set up links between this object and the source/sink objects.
31   // In particular, this makes sure that they are garbage collected as a group,
32   // if that applies to the given streams (for example, Http2Streams use
33   // weak references).
34   obj->Set(env()->context(), env()->source_string(), source->GetObject())
35       .Check();
36   source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37       .Check();
38   obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39       .Check();
40   sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41       .Check();
42 }
43 
~StreamPipe()44 StreamPipe::~StreamPipe() {
45   Unpipe(true);
46 }
47 
source()48 StreamBase* StreamPipe::source() {
49   return static_cast<StreamBase*>(readable_listener_.stream());
50 }
51 
sink()52 StreamBase* StreamPipe::sink() {
53   return static_cast<StreamBase*>(writable_listener_.stream());
54 }
55 
Unpipe(bool is_in_deletion)56 void StreamPipe::Unpipe(bool is_in_deletion) {
57   if (is_closed_)
58     return;
59 
60   // Note that we possibly cannot use virtual methods on `source` and `sink`
61   // here, because this function can be called from their destructors via
62   // `OnStreamDestroy()`.
63   if (!source_destroyed_)
64     source()->ReadStop();
65 
66   is_closed_ = true;
67   is_reading_ = false;
68   source()->RemoveStreamListener(&readable_listener_);
69   if (pending_writes_ == 0)
70     sink()->RemoveStreamListener(&writable_listener_);
71 
72   if (is_in_deletion) return;
73 
74   // Delay the JS-facing part with SetImmediate, because this might be from
75   // inside the garbage collector, so we can’t run JS here.
76   HandleScope handle_scope(env()->isolate());
77   BaseObjectPtr<StreamPipe> strong_ref{this};
78   env()->SetImmediate([this, strong_ref](Environment* env) {
79     HandleScope handle_scope(env->isolate());
80     Context::Scope context_scope(env->context());
81     Local<Object> object = this->object();
82 
83     Local<Value> onunpipe;
84     if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
85       return;
86     if (onunpipe->IsFunction() &&
87         MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
88       return;
89     }
90 
91     // Set all the links established in the constructor to `null`.
92     Local<Value> null = Null(env->isolate());
93 
94     Local<Value> source_v;
95     Local<Value> sink_v;
96     if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
97         !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
98         !source_v->IsObject() || !sink_v->IsObject()) {
99       return;
100     }
101 
102     if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
103         object->Set(env->context(), env->sink_string(), null).IsNothing() ||
104         source_v.As<Object>()
105             ->Set(env->context(), env->pipe_target_string(), null)
106             .IsNothing() ||
107         sink_v.As<Object>()
108             ->Set(env->context(), env->pipe_source_string(), null)
109             .IsNothing()) {
110       return;
111     }
112   });
113 }
114 
OnStreamAlloc(size_t suggested_size)115 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
116   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
117   size_t size = std::min(suggested_size, pipe->wanted_data_);
118   CHECK_GT(size, 0);
119   return pipe->env()->AllocateManaged(size).release();
120 }
121 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)122 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
123                                                 const uv_buf_t& buf_) {
124   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
125   AllocatedBuffer buf(pipe->env(), buf_);
126   if (nread < 0) {
127     // EOF or error; stop reading and pass the error to the previous listener
128     // (which might end up in JS).
129     pipe->is_eof_ = true;
130     // Cache `sink()` here because the previous listener might do things
131     // that eventually lead to an `Unpipe()` call.
132     StreamBase* sink = pipe->sink();
133     stream()->ReadStop();
134     CHECK_NOT_NULL(previous_listener_);
135     previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
136     // If we’re not writing, close now. Otherwise, we’ll do that in
137     // `OnStreamAfterWrite()`.
138     if (pipe->pending_writes_ == 0) {
139       sink->Shutdown();
140       pipe->Unpipe();
141     }
142     return;
143   }
144 
145   pipe->ProcessData(nread, std::move(buf));
146 }
147 
ProcessData(size_t nread,AllocatedBuffer && buf)148 void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
149   CHECK(uses_wants_write_ || pending_writes_ == 0);
150   uv_buf_t buffer = uv_buf_init(buf.data(), nread);
151   StreamWriteResult res = sink()->Write(&buffer, 1);
152   pending_writes_++;
153   if (!res.async) {
154     writable_listener_.OnStreamAfterWrite(nullptr, res.err);
155   } else {
156     is_reading_ = false;
157     res.wrap->SetAllocatedStorage(std::move(buf));
158     if (source() != nullptr)
159       source()->ReadStop();
160   }
161 }
162 
OnStreamAfterWrite(WriteWrap * w,int status)163 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
164                                                       int status) {
165   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
166   pipe->pending_writes_--;
167   if (pipe->is_closed_) {
168     if (pipe->pending_writes_ == 0) {
169       Environment* env = pipe->env();
170       HandleScope handle_scope(env->isolate());
171       Context::Scope context_scope(env->context());
172       pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
173       stream()->RemoveStreamListener(this);
174     }
175     return;
176   }
177 
178   if (pipe->is_eof_) {
179     HandleScope handle_scope(pipe->env()->isolate());
180     InternalCallbackScope callback_scope(pipe,
181         InternalCallbackScope::kSkipTaskQueues);
182     pipe->sink()->Shutdown();
183     pipe->Unpipe();
184     return;
185   }
186 
187   if (status != 0) {
188     CHECK_NOT_NULL(previous_listener_);
189     StreamListener* prev = previous_listener_;
190     pipe->Unpipe();
191     prev->OnStreamAfterWrite(w, status);
192     return;
193   }
194 
195   if (!pipe->uses_wants_write_) {
196     OnStreamWantsWrite(65536);
197   }
198 }
199 
OnStreamAfterShutdown(ShutdownWrap * w,int status)200 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
201                                                          int status) {
202   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
203   CHECK_NOT_NULL(previous_listener_);
204   StreamListener* prev = previous_listener_;
205   pipe->Unpipe();
206   prev->OnStreamAfterShutdown(w, status);
207 }
208 
OnStreamDestroy()209 void StreamPipe::ReadableListener::OnStreamDestroy() {
210   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
211   pipe->source_destroyed_ = true;
212   if (!pipe->is_eof_) {
213     OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
214   }
215 }
216 
OnStreamDestroy()217 void StreamPipe::WritableListener::OnStreamDestroy() {
218   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
219   pipe->sink_destroyed_ = true;
220   pipe->is_eof_ = true;
221   pipe->pending_writes_ = 0;
222   pipe->Unpipe();
223 }
224 
OnStreamWantsWrite(size_t suggested_size)225 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
226   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
227   pipe->wanted_data_ = suggested_size;
228   if (pipe->is_reading_ || pipe->is_closed_)
229     return;
230   HandleScope handle_scope(pipe->env()->isolate());
231   InternalCallbackScope callback_scope(pipe,
232       InternalCallbackScope::kSkipTaskQueues);
233   pipe->is_reading_ = true;
234   pipe->source()->ReadStart();
235 }
236 
OnStreamAlloc(size_t suggested_size)237 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
238   CHECK_NOT_NULL(previous_listener_);
239   return previous_listener_->OnStreamAlloc(suggested_size);
240 }
241 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)242 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
243                                                 const uv_buf_t& buf) {
244   CHECK_NOT_NULL(previous_listener_);
245   return previous_listener_->OnStreamRead(nread, buf);
246 }
247 
New(const FunctionCallbackInfo<Value> & args)248 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
249   CHECK(args.IsConstructCall());
250   CHECK(args[0]->IsObject());
251   CHECK(args[1]->IsObject());
252   StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
253   StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
254 
255   new StreamPipe(source, sink, args.This());
256 }
257 
Start(const FunctionCallbackInfo<Value> & args)258 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
259   StreamPipe* pipe;
260   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
261   pipe->is_closed_ = false;
262   pipe->writable_listener_.OnStreamWantsWrite(65536);
263 }
264 
Unpipe(const FunctionCallbackInfo<Value> & args)265 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
266   StreamPipe* pipe;
267   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
268   pipe->Unpipe();
269 }
270 
IsClosed(const FunctionCallbackInfo<Value> & args)271 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
272   StreamPipe* pipe;
273   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
274   args.GetReturnValue().Set(pipe->is_closed_);
275 }
276 
PendingWrites(const FunctionCallbackInfo<Value> & args)277 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
278   StreamPipe* pipe;
279   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
280   args.GetReturnValue().Set(pipe->pending_writes_);
281 }
282 
283 namespace {
284 
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)285 void InitializeStreamPipe(Local<Object> target,
286                           Local<Value> unused,
287                           Local<Context> context,
288                           void* priv) {
289   Environment* env = Environment::GetCurrent(context);
290 
291   // Create FunctionTemplate for FileHandle::CloseReq
292   Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
293   Local<String> stream_pipe_string =
294       FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
295   env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296   env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297   env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298   env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299   pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300   pipe->SetClassName(stream_pipe_string);
301   pipe->InstanceTemplate()->SetInternalFieldCount(
302       StreamPipe::kInternalFieldCount);
303   target
304       ->Set(context, stream_pipe_string,
305             pipe->GetFunction(context).ToLocalChecked())
306       .Check();
307 }
308 
309 }  // anonymous namespace
310 
311 }  // namespace node
312 
313 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
314                                    node::InitializeStreamPipe)
315