1 #include "stream_pipe.h"
2 #include "stream_base-inl.h"
3 #include "node_buffer.h"
4 #include "util-inl.h"
5
6 namespace node {
7
8 using v8::BackingStore;
9 using v8::Context;
10 using v8::Function;
11 using v8::FunctionCallbackInfo;
12 using v8::FunctionTemplate;
13 using v8::HandleScope;
14 using v8::Isolate;
15 using v8::Just;
16 using v8::Local;
17 using v8::Maybe;
18 using v8::Nothing;
19 using v8::Object;
20 using v8::Value;
21
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)22 StreamPipe::StreamPipe(StreamBase* source,
23 StreamBase* sink,
24 Local<Object> obj)
25 : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
26 MakeWeak();
27
28 CHECK_NOT_NULL(sink);
29 CHECK_NOT_NULL(source);
30
31 source->PushStreamListener(&readable_listener_);
32 sink->PushStreamListener(&writable_listener_);
33
34 uses_wants_write_ = sink->HasWantsWrite();
35 }
36
~StreamPipe()37 StreamPipe::~StreamPipe() {
38 Unpipe(true);
39 }
40
source()41 StreamBase* StreamPipe::source() {
42 return static_cast<StreamBase*>(readable_listener_.stream());
43 }
44
sink()45 StreamBase* StreamPipe::sink() {
46 return static_cast<StreamBase*>(writable_listener_.stream());
47 }
48
Unpipe(bool is_in_deletion)49 void StreamPipe::Unpipe(bool is_in_deletion) {
50 if (is_closed_)
51 return;
52
53 // Note that we possibly cannot use virtual methods on `source` and `sink`
54 // here, because this function can be called from their destructors via
55 // `OnStreamDestroy()`.
56 if (!source_destroyed_)
57 source()->ReadStop();
58
59 is_closed_ = true;
60 is_reading_ = false;
61 source()->RemoveStreamListener(&readable_listener_);
62 if (pending_writes_ == 0)
63 sink()->RemoveStreamListener(&writable_listener_);
64
65 if (is_in_deletion) return;
66
67 // Delay the JS-facing part with SetImmediate, because this might be from
68 // inside the garbage collector, so we can’t run JS here.
69 HandleScope handle_scope(env()->isolate());
70 BaseObjectPtr<StreamPipe> strong_ref{this};
71 env()->SetImmediate([this, strong_ref](Environment* env) {
72 HandleScope handle_scope(env->isolate());
73 Context::Scope context_scope(env->context());
74 Local<Object> object = this->object();
75
76 Local<Value> onunpipe;
77 if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
78 return;
79 if (onunpipe->IsFunction() &&
80 MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
81 return;
82 }
83
84 // Set all the links established in the constructor to `null`.
85 Local<Value> null = Null(env->isolate());
86
87 Local<Value> source_v;
88 Local<Value> sink_v;
89 if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
90 !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
91 !source_v->IsObject() || !sink_v->IsObject()) {
92 return;
93 }
94
95 if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
96 object->Set(env->context(), env->sink_string(), null).IsNothing() ||
97 source_v.As<Object>()
98 ->Set(env->context(), env->pipe_target_string(), null)
99 .IsNothing() ||
100 sink_v.As<Object>()
101 ->Set(env->context(), env->pipe_source_string(), null)
102 .IsNothing()) {
103 return;
104 }
105 });
106 }
107
OnStreamAlloc(size_t suggested_size)108 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
109 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
110 size_t size = std::min(suggested_size, pipe->wanted_data_);
111 CHECK_GT(size, 0);
112 return pipe->env()->allocate_managed_buffer(size);
113 }
114
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)115 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
116 const uv_buf_t& buf_) {
117 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
118 std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
119 if (nread < 0) {
120 // EOF or error; stop reading and pass the error to the previous listener
121 // (which might end up in JS).
122 pipe->is_eof_ = true;
123 // Cache `sink()` here because the previous listener might do things
124 // that eventually lead to an `Unpipe()` call.
125 StreamBase* sink = pipe->sink();
126 stream()->ReadStop();
127 CHECK_NOT_NULL(previous_listener_);
128 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
129 // If we’re not writing, close now. Otherwise, we’ll do that in
130 // `OnStreamAfterWrite()`.
131 if (pipe->pending_writes_ == 0) {
132 sink->Shutdown();
133 pipe->Unpipe();
134 }
135 return;
136 }
137
138 pipe->ProcessData(nread, std::move(bs));
139 }
140
ProcessData(size_t nread,std::unique_ptr<BackingStore> bs)141 void StreamPipe::ProcessData(size_t nread,
142 std::unique_ptr<BackingStore> bs) {
143 CHECK(uses_wants_write_ || pending_writes_ == 0);
144 uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
145 StreamWriteResult res = sink()->Write(&buffer, 1);
146 pending_writes_++;
147 if (!res.async) {
148 writable_listener_.OnStreamAfterWrite(nullptr, res.err);
149 } else {
150 is_reading_ = false;
151 res.wrap->SetBackingStore(std::move(bs));
152 if (source() != nullptr)
153 source()->ReadStop();
154 }
155 }
156
OnStreamAfterWrite(WriteWrap * w,int status)157 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
158 int status) {
159 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
160 pipe->pending_writes_--;
161 if (pipe->is_closed_) {
162 if (pipe->pending_writes_ == 0) {
163 Environment* env = pipe->env();
164 HandleScope handle_scope(env->isolate());
165 Context::Scope context_scope(env->context());
166 if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
167 return;
168 stream()->RemoveStreamListener(this);
169 }
170 return;
171 }
172
173 if (pipe->is_eof_) {
174 HandleScope handle_scope(pipe->env()->isolate());
175 InternalCallbackScope callback_scope(pipe,
176 InternalCallbackScope::kSkipTaskQueues);
177 pipe->sink()->Shutdown();
178 pipe->Unpipe();
179 return;
180 }
181
182 if (status != 0) {
183 CHECK_NOT_NULL(previous_listener_);
184 StreamListener* prev = previous_listener_;
185 pipe->Unpipe();
186 prev->OnStreamAfterWrite(w, status);
187 return;
188 }
189
190 if (!pipe->uses_wants_write_) {
191 OnStreamWantsWrite(65536);
192 }
193 }
194
OnStreamAfterShutdown(ShutdownWrap * w,int status)195 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
196 int status) {
197 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
198 CHECK_NOT_NULL(previous_listener_);
199 StreamListener* prev = previous_listener_;
200 pipe->Unpipe();
201 prev->OnStreamAfterShutdown(w, status);
202 }
203
OnStreamDestroy()204 void StreamPipe::ReadableListener::OnStreamDestroy() {
205 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
206 pipe->source_destroyed_ = true;
207 if (!pipe->is_eof_) {
208 OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
209 }
210 }
211
OnStreamDestroy()212 void StreamPipe::WritableListener::OnStreamDestroy() {
213 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
214 pipe->sink_destroyed_ = true;
215 pipe->is_eof_ = true;
216 pipe->pending_writes_ = 0;
217 pipe->Unpipe();
218 }
219
OnStreamWantsWrite(size_t suggested_size)220 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
221 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
222 pipe->wanted_data_ = suggested_size;
223 if (pipe->is_reading_ || pipe->is_closed_)
224 return;
225 HandleScope handle_scope(pipe->env()->isolate());
226 InternalCallbackScope callback_scope(pipe,
227 InternalCallbackScope::kSkipTaskQueues);
228 pipe->is_reading_ = true;
229 pipe->source()->ReadStart();
230 }
231
OnStreamAlloc(size_t suggested_size)232 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
233 CHECK_NOT_NULL(previous_listener_);
234 return previous_listener_->OnStreamAlloc(suggested_size);
235 }
236
OnStreamRead(ssize_t nread,const uv_buf_t & buf)237 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
238 const uv_buf_t& buf) {
239 CHECK_NOT_NULL(previous_listener_);
240 return previous_listener_->OnStreamRead(nread, buf);
241 }
242
New(StreamBase * source,StreamBase * sink,Local<Object> obj)243 Maybe<StreamPipe*> StreamPipe::New(StreamBase* source,
244 StreamBase* sink,
245 Local<Object> obj) {
246 std::unique_ptr<StreamPipe> stream_pipe(new StreamPipe(source, sink, obj));
247
248 // Set up links between this object and the source/sink objects.
249 // In particular, this makes sure that they are garbage collected as a group,
250 // if that applies to the given streams (for example, Http2Streams use
251 // weak references).
252 Environment* env = source->stream_env();
253 if (obj->Set(env->context(), env->source_string(), source->GetObject())
254 .IsNothing()) {
255 return Nothing<StreamPipe*>();
256 }
257 if (source->GetObject()
258 ->Set(env->context(), env->pipe_target_string(), obj)
259 .IsNothing()) {
260 return Nothing<StreamPipe*>();
261 }
262 if (obj->Set(env->context(), env->sink_string(), sink->GetObject())
263 .IsNothing()) {
264 return Nothing<StreamPipe*>();
265 }
266 if (sink->GetObject()
267 ->Set(env->context(), env->pipe_source_string(), obj)
268 .IsNothing()) {
269 return Nothing<StreamPipe*>();
270 }
271
272 return Just(stream_pipe.release());
273 }
274
New(const FunctionCallbackInfo<Value> & args)275 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
276 CHECK(args.IsConstructCall());
277 CHECK(args[0]->IsObject());
278 CHECK(args[1]->IsObject());
279 StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
280 StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
281
282 if (StreamPipe::New(source, sink, args.This()).IsNothing()) return;
283 }
284
Start(const FunctionCallbackInfo<Value> & args)285 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
286 StreamPipe* pipe;
287 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
288 pipe->is_closed_ = false;
289 pipe->writable_listener_.OnStreamWantsWrite(65536);
290 }
291
Unpipe(const FunctionCallbackInfo<Value> & args)292 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
293 StreamPipe* pipe;
294 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
295 pipe->Unpipe();
296 }
297
IsClosed(const FunctionCallbackInfo<Value> & args)298 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
299 StreamPipe* pipe;
300 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
301 args.GetReturnValue().Set(pipe->is_closed_);
302 }
303
PendingWrites(const FunctionCallbackInfo<Value> & args)304 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
305 StreamPipe* pipe;
306 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
307 args.GetReturnValue().Set(pipe->pending_writes_);
308 }
309
310 namespace {
311
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)312 void InitializeStreamPipe(Local<Object> target,
313 Local<Value> unused,
314 Local<Context> context,
315 void* priv) {
316 Environment* env = Environment::GetCurrent(context);
317 Isolate* isolate = env->isolate();
318
319 // Create FunctionTemplate for FileHandle::CloseReq
320 Local<FunctionTemplate> pipe = NewFunctionTemplate(isolate, StreamPipe::New);
321 SetProtoMethod(isolate, pipe, "unpipe", StreamPipe::Unpipe);
322 SetProtoMethod(isolate, pipe, "start", StreamPipe::Start);
323 SetProtoMethod(isolate, pipe, "isClosed", StreamPipe::IsClosed);
324 SetProtoMethod(isolate, pipe, "pendingWrites", StreamPipe::PendingWrites);
325 pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
326 pipe->InstanceTemplate()->SetInternalFieldCount(
327 StreamPipe::kInternalFieldCount);
328 SetConstructorFunction(context, target, "StreamPipe", pipe);
329 }
330
331 } // anonymous namespace
332
333 } // namespace node
334
335 NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_pipe, node::InitializeStreamPipe)
336