1 #include "stream_pipe.h"
2 #include "allocated_buffer-inl.h"
3 #include "stream_base-inl.h"
4 #include "node_buffer.h"
5 #include "util-inl.h"
6
7 namespace node {
8
9 using v8::Context;
10 using v8::Function;
11 using v8::FunctionCallbackInfo;
12 using v8::FunctionTemplate;
13 using v8::HandleScope;
14 using v8::Local;
15 using v8::Object;
16 using v8::Value;
17
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)18 StreamPipe::StreamPipe(StreamBase* source,
19 StreamBase* sink,
20 Local<Object> obj)
21 : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
22 MakeWeak();
23
24 CHECK_NOT_NULL(sink);
25 CHECK_NOT_NULL(source);
26
27 source->PushStreamListener(&readable_listener_);
28 sink->PushStreamListener(&writable_listener_);
29
30 uses_wants_write_ = sink->HasWantsWrite();
31
32 // Set up links between this object and the source/sink objects.
33 // In particular, this makes sure that they are garbage collected as a group,
34 // if that applies to the given streams (for example, Http2Streams use
35 // weak references).
36 obj->Set(env()->context(), env()->source_string(), source->GetObject())
37 .Check();
38 source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
39 .Check();
40 obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
41 .Check();
42 sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
43 .Check();
44 }
45
~StreamPipe()46 StreamPipe::~StreamPipe() {
47 Unpipe(true);
48 }
49
source()50 StreamBase* StreamPipe::source() {
51 return static_cast<StreamBase*>(readable_listener_.stream());
52 }
53
sink()54 StreamBase* StreamPipe::sink() {
55 return static_cast<StreamBase*>(writable_listener_.stream());
56 }
57
Unpipe(bool is_in_deletion)58 void StreamPipe::Unpipe(bool is_in_deletion) {
59 if (is_closed_)
60 return;
61
62 // Note that we possibly cannot use virtual methods on `source` and `sink`
63 // here, because this function can be called from their destructors via
64 // `OnStreamDestroy()`.
65 if (!source_destroyed_)
66 source()->ReadStop();
67
68 is_closed_ = true;
69 is_reading_ = false;
70 source()->RemoveStreamListener(&readable_listener_);
71 if (pending_writes_ == 0)
72 sink()->RemoveStreamListener(&writable_listener_);
73
74 if (is_in_deletion) return;
75
76 // Delay the JS-facing part with SetImmediate, because this might be from
77 // inside the garbage collector, so we can’t run JS here.
78 HandleScope handle_scope(env()->isolate());
79 BaseObjectPtr<StreamPipe> strong_ref{this};
80 env()->SetImmediate([this, strong_ref](Environment* env) {
81 HandleScope handle_scope(env->isolate());
82 Context::Scope context_scope(env->context());
83 Local<Object> object = this->object();
84
85 Local<Value> onunpipe;
86 if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
87 return;
88 if (onunpipe->IsFunction() &&
89 MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
90 return;
91 }
92
93 // Set all the links established in the constructor to `null`.
94 Local<Value> null = Null(env->isolate());
95
96 Local<Value> source_v;
97 Local<Value> sink_v;
98 if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
99 !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
100 !source_v->IsObject() || !sink_v->IsObject()) {
101 return;
102 }
103
104 if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
105 object->Set(env->context(), env->sink_string(), null).IsNothing() ||
106 source_v.As<Object>()
107 ->Set(env->context(), env->pipe_target_string(), null)
108 .IsNothing() ||
109 sink_v.As<Object>()
110 ->Set(env->context(), env->pipe_source_string(), null)
111 .IsNothing()) {
112 return;
113 }
114 });
115 }
116
OnStreamAlloc(size_t suggested_size)117 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
118 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
119 size_t size = std::min(suggested_size, pipe->wanted_data_);
120 CHECK_GT(size, 0);
121 return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
122 }
123
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)124 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
125 const uv_buf_t& buf_) {
126 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
127 AllocatedBuffer buf(pipe->env(), buf_);
128 if (nread < 0) {
129 // EOF or error; stop reading and pass the error to the previous listener
130 // (which might end up in JS).
131 pipe->is_eof_ = true;
132 // Cache `sink()` here because the previous listener might do things
133 // that eventually lead to an `Unpipe()` call.
134 StreamBase* sink = pipe->sink();
135 stream()->ReadStop();
136 CHECK_NOT_NULL(previous_listener_);
137 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
138 // If we’re not writing, close now. Otherwise, we’ll do that in
139 // `OnStreamAfterWrite()`.
140 if (pipe->pending_writes_ == 0) {
141 sink->Shutdown();
142 pipe->Unpipe();
143 }
144 return;
145 }
146
147 pipe->ProcessData(nread, std::move(buf));
148 }
149
ProcessData(size_t nread,AllocatedBuffer && buf)150 void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
151 CHECK(uses_wants_write_ || pending_writes_ == 0);
152 uv_buf_t buffer = uv_buf_init(buf.data(), nread);
153 StreamWriteResult res = sink()->Write(&buffer, 1);
154 pending_writes_++;
155 if (!res.async) {
156 writable_listener_.OnStreamAfterWrite(nullptr, res.err);
157 } else {
158 is_reading_ = false;
159 res.wrap->SetAllocatedStorage(std::move(buf));
160 if (source() != nullptr)
161 source()->ReadStop();
162 }
163 }
164
OnStreamAfterWrite(WriteWrap * w,int status)165 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
166 int status) {
167 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
168 pipe->pending_writes_--;
169 if (pipe->is_closed_) {
170 if (pipe->pending_writes_ == 0) {
171 Environment* env = pipe->env();
172 HandleScope handle_scope(env->isolate());
173 Context::Scope context_scope(env->context());
174 pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
175 stream()->RemoveStreamListener(this);
176 }
177 return;
178 }
179
180 if (pipe->is_eof_) {
181 HandleScope handle_scope(pipe->env()->isolate());
182 InternalCallbackScope callback_scope(pipe,
183 InternalCallbackScope::kSkipTaskQueues);
184 pipe->sink()->Shutdown();
185 pipe->Unpipe();
186 return;
187 }
188
189 if (status != 0) {
190 CHECK_NOT_NULL(previous_listener_);
191 StreamListener* prev = previous_listener_;
192 pipe->Unpipe();
193 prev->OnStreamAfterWrite(w, status);
194 return;
195 }
196
197 if (!pipe->uses_wants_write_) {
198 OnStreamWantsWrite(65536);
199 }
200 }
201
OnStreamAfterShutdown(ShutdownWrap * w,int status)202 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
203 int status) {
204 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
205 CHECK_NOT_NULL(previous_listener_);
206 StreamListener* prev = previous_listener_;
207 pipe->Unpipe();
208 prev->OnStreamAfterShutdown(w, status);
209 }
210
OnStreamDestroy()211 void StreamPipe::ReadableListener::OnStreamDestroy() {
212 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
213 pipe->source_destroyed_ = true;
214 if (!pipe->is_eof_) {
215 OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
216 }
217 }
218
OnStreamDestroy()219 void StreamPipe::WritableListener::OnStreamDestroy() {
220 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
221 pipe->sink_destroyed_ = true;
222 pipe->is_eof_ = true;
223 pipe->pending_writes_ = 0;
224 pipe->Unpipe();
225 }
226
OnStreamWantsWrite(size_t suggested_size)227 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
228 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
229 pipe->wanted_data_ = suggested_size;
230 if (pipe->is_reading_ || pipe->is_closed_)
231 return;
232 HandleScope handle_scope(pipe->env()->isolate());
233 InternalCallbackScope callback_scope(pipe,
234 InternalCallbackScope::kSkipTaskQueues);
235 pipe->is_reading_ = true;
236 pipe->source()->ReadStart();
237 }
238
OnStreamAlloc(size_t suggested_size)239 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
240 CHECK_NOT_NULL(previous_listener_);
241 return previous_listener_->OnStreamAlloc(suggested_size);
242 }
243
OnStreamRead(ssize_t nread,const uv_buf_t & buf)244 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
245 const uv_buf_t& buf) {
246 CHECK_NOT_NULL(previous_listener_);
247 return previous_listener_->OnStreamRead(nread, buf);
248 }
249
New(const FunctionCallbackInfo<Value> & args)250 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
251 CHECK(args.IsConstructCall());
252 CHECK(args[0]->IsObject());
253 CHECK(args[1]->IsObject());
254 StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
255 StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
256
257 new StreamPipe(source, sink, args.This());
258 }
259
Start(const FunctionCallbackInfo<Value> & args)260 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
261 StreamPipe* pipe;
262 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
263 pipe->is_closed_ = false;
264 pipe->writable_listener_.OnStreamWantsWrite(65536);
265 }
266
Unpipe(const FunctionCallbackInfo<Value> & args)267 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
268 StreamPipe* pipe;
269 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
270 pipe->Unpipe();
271 }
272
IsClosed(const FunctionCallbackInfo<Value> & args)273 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
274 StreamPipe* pipe;
275 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
276 args.GetReturnValue().Set(pipe->is_closed_);
277 }
278
PendingWrites(const FunctionCallbackInfo<Value> & args)279 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
280 StreamPipe* pipe;
281 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
282 args.GetReturnValue().Set(pipe->pending_writes_);
283 }
284
285 namespace {
286
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)287 void InitializeStreamPipe(Local<Object> target,
288 Local<Value> unused,
289 Local<Context> context,
290 void* priv) {
291 Environment* env = Environment::GetCurrent(context);
292
293 // Create FunctionTemplate for FileHandle::CloseReq
294 Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
295 env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296 env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297 env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298 env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299 pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300 pipe->InstanceTemplate()->SetInternalFieldCount(
301 StreamPipe::kInternalFieldCount);
302 env->SetConstructorFunction(target, "StreamPipe", pipe);
303 }
304
305 } // anonymous namespace
306
307 } // namespace node
308
309 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
310 node::InitializeStreamPipe)
311