• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_pipe.h"
2 #include "allocated_buffer-inl.h"
3 #include "stream_base-inl.h"
4 #include "node_buffer.h"
5 #include "util-inl.h"
6 
7 namespace node {
8 
9 using v8::Context;
10 using v8::Function;
11 using v8::FunctionCallbackInfo;
12 using v8::FunctionTemplate;
13 using v8::HandleScope;
14 using v8::Local;
15 using v8::Object;
16 using v8::Value;
17 
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)18 StreamPipe::StreamPipe(StreamBase* source,
19                        StreamBase* sink,
20                        Local<Object> obj)
21     : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
22   MakeWeak();
23 
24   CHECK_NOT_NULL(sink);
25   CHECK_NOT_NULL(source);
26 
27   source->PushStreamListener(&readable_listener_);
28   sink->PushStreamListener(&writable_listener_);
29 
30   uses_wants_write_ = sink->HasWantsWrite();
31 
32   // Set up links between this object and the source/sink objects.
33   // In particular, this makes sure that they are garbage collected as a group,
34   // if that applies to the given streams (for example, Http2Streams use
35   // weak references).
36   obj->Set(env()->context(), env()->source_string(), source->GetObject())
37       .Check();
38   source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
39       .Check();
40   obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
41       .Check();
42   sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
43       .Check();
44 }
45 
~StreamPipe()46 StreamPipe::~StreamPipe() {
47   Unpipe(true);
48 }
49 
source()50 StreamBase* StreamPipe::source() {
51   return static_cast<StreamBase*>(readable_listener_.stream());
52 }
53 
sink()54 StreamBase* StreamPipe::sink() {
55   return static_cast<StreamBase*>(writable_listener_.stream());
56 }
57 
Unpipe(bool is_in_deletion)58 void StreamPipe::Unpipe(bool is_in_deletion) {
59   if (is_closed_)
60     return;
61 
62   // Note that we possibly cannot use virtual methods on `source` and `sink`
63   // here, because this function can be called from their destructors via
64   // `OnStreamDestroy()`.
65   if (!source_destroyed_)
66     source()->ReadStop();
67 
68   is_closed_ = true;
69   is_reading_ = false;
70   source()->RemoveStreamListener(&readable_listener_);
71   if (pending_writes_ == 0)
72     sink()->RemoveStreamListener(&writable_listener_);
73 
74   if (is_in_deletion) return;
75 
76   // Delay the JS-facing part with SetImmediate, because this might be from
77   // inside the garbage collector, so we can’t run JS here.
78   HandleScope handle_scope(env()->isolate());
79   BaseObjectPtr<StreamPipe> strong_ref{this};
80   env()->SetImmediate([this, strong_ref](Environment* env) {
81     HandleScope handle_scope(env->isolate());
82     Context::Scope context_scope(env->context());
83     Local<Object> object = this->object();
84 
85     Local<Value> onunpipe;
86     if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
87       return;
88     if (onunpipe->IsFunction() &&
89         MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
90       return;
91     }
92 
93     // Set all the links established in the constructor to `null`.
94     Local<Value> null = Null(env->isolate());
95 
96     Local<Value> source_v;
97     Local<Value> sink_v;
98     if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
99         !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
100         !source_v->IsObject() || !sink_v->IsObject()) {
101       return;
102     }
103 
104     if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
105         object->Set(env->context(), env->sink_string(), null).IsNothing() ||
106         source_v.As<Object>()
107             ->Set(env->context(), env->pipe_target_string(), null)
108             .IsNothing() ||
109         sink_v.As<Object>()
110             ->Set(env->context(), env->pipe_source_string(), null)
111             .IsNothing()) {
112       return;
113     }
114   });
115 }
116 
OnStreamAlloc(size_t suggested_size)117 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
118   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
119   size_t size = std::min(suggested_size, pipe->wanted_data_);
120   CHECK_GT(size, 0);
121   return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
122 }
123 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)124 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
125                                                 const uv_buf_t& buf_) {
126   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
127   AllocatedBuffer buf(pipe->env(), buf_);
128   if (nread < 0) {
129     // EOF or error; stop reading and pass the error to the previous listener
130     // (which might end up in JS).
131     pipe->is_eof_ = true;
132     // Cache `sink()` here because the previous listener might do things
133     // that eventually lead to an `Unpipe()` call.
134     StreamBase* sink = pipe->sink();
135     stream()->ReadStop();
136     CHECK_NOT_NULL(previous_listener_);
137     previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
138     // If we’re not writing, close now. Otherwise, we’ll do that in
139     // `OnStreamAfterWrite()`.
140     if (pipe->pending_writes_ == 0) {
141       sink->Shutdown();
142       pipe->Unpipe();
143     }
144     return;
145   }
146 
147   pipe->ProcessData(nread, std::move(buf));
148 }
149 
ProcessData(size_t nread,AllocatedBuffer && buf)150 void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
151   CHECK(uses_wants_write_ || pending_writes_ == 0);
152   uv_buf_t buffer = uv_buf_init(buf.data(), nread);
153   StreamWriteResult res = sink()->Write(&buffer, 1);
154   pending_writes_++;
155   if (!res.async) {
156     writable_listener_.OnStreamAfterWrite(nullptr, res.err);
157   } else {
158     is_reading_ = false;
159     res.wrap->SetAllocatedStorage(std::move(buf));
160     if (source() != nullptr)
161       source()->ReadStop();
162   }
163 }
164 
OnStreamAfterWrite(WriteWrap * w,int status)165 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
166                                                       int status) {
167   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
168   pipe->pending_writes_--;
169   if (pipe->is_closed_) {
170     if (pipe->pending_writes_ == 0) {
171       Environment* env = pipe->env();
172       HandleScope handle_scope(env->isolate());
173       Context::Scope context_scope(env->context());
174       pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
175       stream()->RemoveStreamListener(this);
176     }
177     return;
178   }
179 
180   if (pipe->is_eof_) {
181     HandleScope handle_scope(pipe->env()->isolate());
182     InternalCallbackScope callback_scope(pipe,
183         InternalCallbackScope::kSkipTaskQueues);
184     pipe->sink()->Shutdown();
185     pipe->Unpipe();
186     return;
187   }
188 
189   if (status != 0) {
190     CHECK_NOT_NULL(previous_listener_);
191     StreamListener* prev = previous_listener_;
192     pipe->Unpipe();
193     prev->OnStreamAfterWrite(w, status);
194     return;
195   }
196 
197   if (!pipe->uses_wants_write_) {
198     OnStreamWantsWrite(65536);
199   }
200 }
201 
OnStreamAfterShutdown(ShutdownWrap * w,int status)202 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
203                                                          int status) {
204   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
205   CHECK_NOT_NULL(previous_listener_);
206   StreamListener* prev = previous_listener_;
207   pipe->Unpipe();
208   prev->OnStreamAfterShutdown(w, status);
209 }
210 
OnStreamDestroy()211 void StreamPipe::ReadableListener::OnStreamDestroy() {
212   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
213   pipe->source_destroyed_ = true;
214   if (!pipe->is_eof_) {
215     OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
216   }
217 }
218 
OnStreamDestroy()219 void StreamPipe::WritableListener::OnStreamDestroy() {
220   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
221   pipe->sink_destroyed_ = true;
222   pipe->is_eof_ = true;
223   pipe->pending_writes_ = 0;
224   pipe->Unpipe();
225 }
226 
OnStreamWantsWrite(size_t suggested_size)227 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
228   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
229   pipe->wanted_data_ = suggested_size;
230   if (pipe->is_reading_ || pipe->is_closed_)
231     return;
232   HandleScope handle_scope(pipe->env()->isolate());
233   InternalCallbackScope callback_scope(pipe,
234       InternalCallbackScope::kSkipTaskQueues);
235   pipe->is_reading_ = true;
236   pipe->source()->ReadStart();
237 }
238 
OnStreamAlloc(size_t suggested_size)239 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
240   CHECK_NOT_NULL(previous_listener_);
241   return previous_listener_->OnStreamAlloc(suggested_size);
242 }
243 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)244 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
245                                                 const uv_buf_t& buf) {
246   CHECK_NOT_NULL(previous_listener_);
247   return previous_listener_->OnStreamRead(nread, buf);
248 }
249 
New(const FunctionCallbackInfo<Value> & args)250 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
251   CHECK(args.IsConstructCall());
252   CHECK(args[0]->IsObject());
253   CHECK(args[1]->IsObject());
254   StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
255   StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
256 
257   new StreamPipe(source, sink, args.This());
258 }
259 
Start(const FunctionCallbackInfo<Value> & args)260 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
261   StreamPipe* pipe;
262   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
263   pipe->is_closed_ = false;
264   pipe->writable_listener_.OnStreamWantsWrite(65536);
265 }
266 
Unpipe(const FunctionCallbackInfo<Value> & args)267 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
268   StreamPipe* pipe;
269   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
270   pipe->Unpipe();
271 }
272 
IsClosed(const FunctionCallbackInfo<Value> & args)273 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
274   StreamPipe* pipe;
275   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
276   args.GetReturnValue().Set(pipe->is_closed_);
277 }
278 
PendingWrites(const FunctionCallbackInfo<Value> & args)279 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
280   StreamPipe* pipe;
281   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
282   args.GetReturnValue().Set(pipe->pending_writes_);
283 }
284 
285 namespace {
286 
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)287 void InitializeStreamPipe(Local<Object> target,
288                           Local<Value> unused,
289                           Local<Context> context,
290                           void* priv) {
291   Environment* env = Environment::GetCurrent(context);
292 
293   // Create FunctionTemplate for FileHandle::CloseReq
294   Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
295   env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296   env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297   env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298   env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299   pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300   pipe->InstanceTemplate()->SetInternalFieldCount(
301       StreamPipe::kInternalFieldCount);
302   env->SetConstructorFunction(target, "StreamPipe", pipe);
303 }
304 
305 }  // anonymous namespace
306 
307 }  // namespace node
308 
309 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
310                                    node::InitializeStreamPipe)
311