• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "stream_pipe.h"
2 #include "stream_base-inl.h"
3 #include "node_buffer.h"
4 #include "util-inl.h"
5 
6 namespace node {
7 
8 using v8::BackingStore;
9 using v8::Context;
10 using v8::Function;
11 using v8::FunctionCallbackInfo;
12 using v8::FunctionTemplate;
13 using v8::HandleScope;
14 using v8::Isolate;
15 using v8::Just;
16 using v8::Local;
17 using v8::Maybe;
18 using v8::Nothing;
19 using v8::Object;
20 using v8::Value;
21 
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)22 StreamPipe::StreamPipe(StreamBase* source,
23                        StreamBase* sink,
24                        Local<Object> obj)
25     : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
26   MakeWeak();
27 
28   CHECK_NOT_NULL(sink);
29   CHECK_NOT_NULL(source);
30 
31   source->PushStreamListener(&readable_listener_);
32   sink->PushStreamListener(&writable_listener_);
33 
34   uses_wants_write_ = sink->HasWantsWrite();
35 }
36 
~StreamPipe()37 StreamPipe::~StreamPipe() {
38   Unpipe(true);
39 }
40 
source()41 StreamBase* StreamPipe::source() {
42   return static_cast<StreamBase*>(readable_listener_.stream());
43 }
44 
sink()45 StreamBase* StreamPipe::sink() {
46   return static_cast<StreamBase*>(writable_listener_.stream());
47 }
48 
Unpipe(bool is_in_deletion)49 void StreamPipe::Unpipe(bool is_in_deletion) {
50   if (is_closed_)
51     return;
52 
53   // Note that we possibly cannot use virtual methods on `source` and `sink`
54   // here, because this function can be called from their destructors via
55   // `OnStreamDestroy()`.
56   if (!source_destroyed_)
57     source()->ReadStop();
58 
59   is_closed_ = true;
60   is_reading_ = false;
61   source()->RemoveStreamListener(&readable_listener_);
62   if (pending_writes_ == 0)
63     sink()->RemoveStreamListener(&writable_listener_);
64 
65   if (is_in_deletion) return;
66 
67   // Delay the JS-facing part with SetImmediate, because this might be from
68   // inside the garbage collector, so we can’t run JS here.
69   HandleScope handle_scope(env()->isolate());
70   BaseObjectPtr<StreamPipe> strong_ref{this};
71   env()->SetImmediate([this, strong_ref](Environment* env) {
72     HandleScope handle_scope(env->isolate());
73     Context::Scope context_scope(env->context());
74     Local<Object> object = this->object();
75 
76     Local<Value> onunpipe;
77     if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
78       return;
79     if (onunpipe->IsFunction() &&
80         MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
81       return;
82     }
83 
84     // Set all the links established in the constructor to `null`.
85     Local<Value> null = Null(env->isolate());
86 
87     Local<Value> source_v;
88     Local<Value> sink_v;
89     if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
90         !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
91         !source_v->IsObject() || !sink_v->IsObject()) {
92       return;
93     }
94 
95     if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
96         object->Set(env->context(), env->sink_string(), null).IsNothing() ||
97         source_v.As<Object>()
98             ->Set(env->context(), env->pipe_target_string(), null)
99             .IsNothing() ||
100         sink_v.As<Object>()
101             ->Set(env->context(), env->pipe_source_string(), null)
102             .IsNothing()) {
103       return;
104     }
105   });
106 }
107 
OnStreamAlloc(size_t suggested_size)108 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
109   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
110   size_t size = std::min(suggested_size, pipe->wanted_data_);
111   CHECK_GT(size, 0);
112   return pipe->env()->allocate_managed_buffer(size);
113 }
114 
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)115 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
116                                                 const uv_buf_t& buf_) {
117   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
118   std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
119   if (nread < 0) {
120     // EOF or error; stop reading and pass the error to the previous listener
121     // (which might end up in JS).
122     pipe->is_eof_ = true;
123     // Cache `sink()` here because the previous listener might do things
124     // that eventually lead to an `Unpipe()` call.
125     StreamBase* sink = pipe->sink();
126     stream()->ReadStop();
127     CHECK_NOT_NULL(previous_listener_);
128     previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
129     // If we’re not writing, close now. Otherwise, we’ll do that in
130     // `OnStreamAfterWrite()`.
131     if (pipe->pending_writes_ == 0) {
132       sink->Shutdown();
133       pipe->Unpipe();
134     }
135     return;
136   }
137 
138   pipe->ProcessData(nread, std::move(bs));
139 }
140 
ProcessData(size_t nread,std::unique_ptr<BackingStore> bs)141 void StreamPipe::ProcessData(size_t nread,
142                              std::unique_ptr<BackingStore> bs) {
143   CHECK(uses_wants_write_ || pending_writes_ == 0);
144   uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
145   StreamWriteResult res = sink()->Write(&buffer, 1);
146   pending_writes_++;
147   if (!res.async) {
148     writable_listener_.OnStreamAfterWrite(nullptr, res.err);
149   } else {
150     is_reading_ = false;
151     res.wrap->SetBackingStore(std::move(bs));
152     if (source() != nullptr)
153       source()->ReadStop();
154   }
155 }
156 
OnStreamAfterWrite(WriteWrap * w,int status)157 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
158                                                       int status) {
159   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
160   pipe->pending_writes_--;
161   if (pipe->is_closed_) {
162     if (pipe->pending_writes_ == 0) {
163       Environment* env = pipe->env();
164       HandleScope handle_scope(env->isolate());
165       Context::Scope context_scope(env->context());
166       if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
167         return;
168       stream()->RemoveStreamListener(this);
169     }
170     return;
171   }
172 
173   if (pipe->is_eof_) {
174     HandleScope handle_scope(pipe->env()->isolate());
175     InternalCallbackScope callback_scope(pipe,
176         InternalCallbackScope::kSkipTaskQueues);
177     pipe->sink()->Shutdown();
178     pipe->Unpipe();
179     return;
180   }
181 
182   if (status != 0) {
183     CHECK_NOT_NULL(previous_listener_);
184     StreamListener* prev = previous_listener_;
185     pipe->Unpipe();
186     prev->OnStreamAfterWrite(w, status);
187     return;
188   }
189 
190   if (!pipe->uses_wants_write_) {
191     OnStreamWantsWrite(65536);
192   }
193 }
194 
OnStreamAfterShutdown(ShutdownWrap * w,int status)195 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
196                                                          int status) {
197   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
198   CHECK_NOT_NULL(previous_listener_);
199   StreamListener* prev = previous_listener_;
200   pipe->Unpipe();
201   prev->OnStreamAfterShutdown(w, status);
202 }
203 
OnStreamDestroy()204 void StreamPipe::ReadableListener::OnStreamDestroy() {
205   StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
206   pipe->source_destroyed_ = true;
207   if (!pipe->is_eof_) {
208     OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
209   }
210 }
211 
OnStreamDestroy()212 void StreamPipe::WritableListener::OnStreamDestroy() {
213   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
214   pipe->sink_destroyed_ = true;
215   pipe->is_eof_ = true;
216   pipe->pending_writes_ = 0;
217   pipe->Unpipe();
218 }
219 
OnStreamWantsWrite(size_t suggested_size)220 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
221   StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
222   pipe->wanted_data_ = suggested_size;
223   if (pipe->is_reading_ || pipe->is_closed_)
224     return;
225   HandleScope handle_scope(pipe->env()->isolate());
226   InternalCallbackScope callback_scope(pipe,
227       InternalCallbackScope::kSkipTaskQueues);
228   pipe->is_reading_ = true;
229   pipe->source()->ReadStart();
230 }
231 
OnStreamAlloc(size_t suggested_size)232 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
233   CHECK_NOT_NULL(previous_listener_);
234   return previous_listener_->OnStreamAlloc(suggested_size);
235 }
236 
OnStreamRead(ssize_t nread,const uv_buf_t & buf)237 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
238                                                 const uv_buf_t& buf) {
239   CHECK_NOT_NULL(previous_listener_);
240   return previous_listener_->OnStreamRead(nread, buf);
241 }
242 
New(StreamBase * source,StreamBase * sink,Local<Object> obj)243 Maybe<StreamPipe*> StreamPipe::New(StreamBase* source,
244                                    StreamBase* sink,
245                                    Local<Object> obj) {
246   std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));
247 
248   // Set up links between this object and the source/sink objects.
249   // In particular, this makes sure that they are garbage collected as a group,
250   // if that applies to the given streams (for example, Http2Streams use
251   // weak references).
252   Environment* env = source->stream_env();
253   if (obj->Set(env->context(), env->source_string(), source->GetObject())
254           .IsNothing()) {
255     return Nothing<StreamPipe*>();
256   }
257   if (source->GetObject()
258           ->Set(env->context(), env->pipe_target_string(), obj)
259           .IsNothing()) {
260     return Nothing<StreamPipe*>();
261   }
262   if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
263           .IsNothing()) {
264     return Nothing<StreamPipe*>();
265   }
266   if (sink->GetObject()
267           ->Set(env->context(), env->pipe_source_string(), obj)
268           .IsNothing()) {
269     return Nothing<StreamPipe*>();
270   }
271 
272   return Just(stream_pipe.release());
273 }
274 
New(const FunctionCallbackInfo<Value> & args)275 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
276   CHECK(args.IsConstructCall());
277   CHECK(args[0]->IsObject());
278   CHECK(args[1]->IsObject());
279   StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
280   StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
281 
282   if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
283 }
284 
Start(const FunctionCallbackInfo<Value> & args)285 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
286   StreamPipe* pipe;
287   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
288   pipe->is_closed_ = false;
289   pipe->writable_listener_.OnStreamWantsWrite(65536);
290 }
291 
Unpipe(const FunctionCallbackInfo<Value> & args)292 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
293   StreamPipe* pipe;
294   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
295   pipe->Unpipe();
296 }
297 
IsClosed(const FunctionCallbackInfo<Value> & args)298 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
299   StreamPipe* pipe;
300   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
301   args.GetReturnValue().Set(pipe->is_closed_);
302 }
303 
PendingWrites(const FunctionCallbackInfo<Value> & args)304 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
305   StreamPipe* pipe;
306   ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
307   args.GetReturnValue().Set(pipe->pending_writes_);
308 }
309 
310 namespace {
311 
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)312 void InitializeStreamPipe(Local<Object> target,
313                           Local<Value> unused,
314                           Local<Context> context,
315                           void* priv) {
316   Environment* env = Environment::GetCurrent(context);
317   Isolate* isolate = env->isolate();
318 
319   // Create FunctionTemplate for FileHandle::CloseReq
320   Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New);
321   SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe);
322   SetProtoMethod(isolate, pipe, "start", StreamPipe::Start);
323   SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed);
324   SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites);
325   pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
326   pipe->InstanceTemplate()->SetInternalFieldCount(
327       StreamPipe::kInternalFieldCount);
328   SetConstructorFunction(context, target, "StreamPipe", pipe);
329 }
330 
331 }  // anonymous namespace
332 
333 }  // namespace node
334 
335 NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe)
336