1 #include "stream_pipe.h"
2 #include "stream_base-inl.h"
3 #include "node_buffer.h"
4 #include "util-inl.h"
5
6 using v8::Context;
7 using v8::Function;
8 using v8::FunctionCallbackInfo;
9 using v8::FunctionTemplate;
10 using v8::Local;
11 using v8::Object;
12 using v8::Value;
13
14 namespace node {
15
StreamPipe(StreamBase * source,StreamBase * sink,Local<Object> obj)16 StreamPipe::StreamPipe(StreamBase* source,
17 StreamBase* sink,
18 Local<Object> obj)
19 : AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20 MakeWeak();
21
22 CHECK_NOT_NULL(sink);
23 CHECK_NOT_NULL(source);
24
25 source->PushStreamListener(&readable_listener_);
26 sink->PushStreamListener(&writable_listener_);
27
28 uses_wants_write_ = sink->HasWantsWrite();
29
30 // Set up links between this object and the source/sink objects.
31 // In particular, this makes sure that they are garbage collected as a group,
32 // if that applies to the given streams (for example, Http2Streams use
33 // weak references).
34 obj->Set(env()->context(), env()->source_string(), source->GetObject())
35 .Check();
36 source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37 .Check();
38 obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39 .Check();
40 sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41 .Check();
42 }
43
~StreamPipe()44 StreamPipe::~StreamPipe() {
45 Unpipe(true);
46 }
47
source()48 StreamBase* StreamPipe::source() {
49 return static_cast<StreamBase*>(readable_listener_.stream());
50 }
51
sink()52 StreamBase* StreamPipe::sink() {
53 return static_cast<StreamBase*>(writable_listener_.stream());
54 }
55
Unpipe(bool is_in_deletion)56 void StreamPipe::Unpipe(bool is_in_deletion) {
57 if (is_closed_)
58 return;
59
60 // Note that we possibly cannot use virtual methods on `source` and `sink`
61 // here, because this function can be called from their destructors via
62 // `OnStreamDestroy()`.
63 if (!source_destroyed_)
64 source()->ReadStop();
65
66 is_closed_ = true;
67 is_reading_ = false;
68 source()->RemoveStreamListener(&readable_listener_);
69 if (pending_writes_ == 0)
70 sink()->RemoveStreamListener(&writable_listener_);
71
72 if (is_in_deletion) return;
73
74 // Delay the JS-facing part with SetImmediate, because this might be from
75 // inside the garbage collector, so we can’t run JS here.
76 HandleScope handle_scope(env()->isolate());
77 BaseObjectPtr<StreamPipe> strong_ref{this};
78 env()->SetImmediate([this, strong_ref](Environment* env) {
79 HandleScope handle_scope(env->isolate());
80 Context::Scope context_scope(env->context());
81 Local<Object> object = this->object();
82
83 Local<Value> onunpipe;
84 if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
85 return;
86 if (onunpipe->IsFunction() &&
87 MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
88 return;
89 }
90
91 // Set all the links established in the constructor to `null`.
92 Local<Value> null = Null(env->isolate());
93
94 Local<Value> source_v;
95 Local<Value> sink_v;
96 if (!object->Get(env->context(), env->source_string()).ToLocal(&source_v) ||
97 !object->Get(env->context(), env->sink_string()).ToLocal(&sink_v) ||
98 !source_v->IsObject() || !sink_v->IsObject()) {
99 return;
100 }
101
102 if (object->Set(env->context(), env->source_string(), null).IsNothing() ||
103 object->Set(env->context(), env->sink_string(), null).IsNothing() ||
104 source_v.As<Object>()
105 ->Set(env->context(), env->pipe_target_string(), null)
106 .IsNothing() ||
107 sink_v.As<Object>()
108 ->Set(env->context(), env->pipe_source_string(), null)
109 .IsNothing()) {
110 return;
111 }
112 });
113 }
114
OnStreamAlloc(size_t suggested_size)115 uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
116 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
117 size_t size = std::min(suggested_size, pipe->wanted_data_);
118 CHECK_GT(size, 0);
119 return pipe->env()->AllocateManaged(size).release();
120 }
121
OnStreamRead(ssize_t nread,const uv_buf_t & buf_)122 void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
123 const uv_buf_t& buf_) {
124 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
125 AllocatedBuffer buf(pipe->env(), buf_);
126 if (nread < 0) {
127 // EOF or error; stop reading and pass the error to the previous listener
128 // (which might end up in JS).
129 pipe->is_eof_ = true;
130 // Cache `sink()` here because the previous listener might do things
131 // that eventually lead to an `Unpipe()` call.
132 StreamBase* sink = pipe->sink();
133 stream()->ReadStop();
134 CHECK_NOT_NULL(previous_listener_);
135 previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
136 // If we’re not writing, close now. Otherwise, we’ll do that in
137 // `OnStreamAfterWrite()`.
138 if (pipe->pending_writes_ == 0) {
139 sink->Shutdown();
140 pipe->Unpipe();
141 }
142 return;
143 }
144
145 pipe->ProcessData(nread, std::move(buf));
146 }
147
ProcessData(size_t nread,AllocatedBuffer && buf)148 void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
149 CHECK(uses_wants_write_ || pending_writes_ == 0);
150 uv_buf_t buffer = uv_buf_init(buf.data(), nread);
151 StreamWriteResult res = sink()->Write(&buffer, 1);
152 pending_writes_++;
153 if (!res.async) {
154 writable_listener_.OnStreamAfterWrite(nullptr, res.err);
155 } else {
156 is_reading_ = false;
157 res.wrap->SetAllocatedStorage(std::move(buf));
158 if (source() != nullptr)
159 source()->ReadStop();
160 }
161 }
162
OnStreamAfterWrite(WriteWrap * w,int status)163 void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
164 int status) {
165 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
166 pipe->pending_writes_--;
167 if (pipe->is_closed_) {
168 if (pipe->pending_writes_ == 0) {
169 Environment* env = pipe->env();
170 HandleScope handle_scope(env->isolate());
171 Context::Scope context_scope(env->context());
172 pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
173 stream()->RemoveStreamListener(this);
174 }
175 return;
176 }
177
178 if (pipe->is_eof_) {
179 HandleScope handle_scope(pipe->env()->isolate());
180 InternalCallbackScope callback_scope(pipe,
181 InternalCallbackScope::kSkipTaskQueues);
182 pipe->sink()->Shutdown();
183 pipe->Unpipe();
184 return;
185 }
186
187 if (status != 0) {
188 CHECK_NOT_NULL(previous_listener_);
189 StreamListener* prev = previous_listener_;
190 pipe->Unpipe();
191 prev->OnStreamAfterWrite(w, status);
192 return;
193 }
194
195 if (!pipe->uses_wants_write_) {
196 OnStreamWantsWrite(65536);
197 }
198 }
199
OnStreamAfterShutdown(ShutdownWrap * w,int status)200 void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
201 int status) {
202 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
203 CHECK_NOT_NULL(previous_listener_);
204 StreamListener* prev = previous_listener_;
205 pipe->Unpipe();
206 prev->OnStreamAfterShutdown(w, status);
207 }
208
OnStreamDestroy()209 void StreamPipe::ReadableListener::OnStreamDestroy() {
210 StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
211 pipe->source_destroyed_ = true;
212 if (!pipe->is_eof_) {
213 OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
214 }
215 }
216
OnStreamDestroy()217 void StreamPipe::WritableListener::OnStreamDestroy() {
218 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
219 pipe->sink_destroyed_ = true;
220 pipe->is_eof_ = true;
221 pipe->pending_writes_ = 0;
222 pipe->Unpipe();
223 }
224
OnStreamWantsWrite(size_t suggested_size)225 void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
226 StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
227 pipe->wanted_data_ = suggested_size;
228 if (pipe->is_reading_ || pipe->is_closed_)
229 return;
230 HandleScope handle_scope(pipe->env()->isolate());
231 InternalCallbackScope callback_scope(pipe,
232 InternalCallbackScope::kSkipTaskQueues);
233 pipe->is_reading_ = true;
234 pipe->source()->ReadStart();
235 }
236
OnStreamAlloc(size_t suggested_size)237 uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
238 CHECK_NOT_NULL(previous_listener_);
239 return previous_listener_->OnStreamAlloc(suggested_size);
240 }
241
OnStreamRead(ssize_t nread,const uv_buf_t & buf)242 void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
243 const uv_buf_t& buf) {
244 CHECK_NOT_NULL(previous_listener_);
245 return previous_listener_->OnStreamRead(nread, buf);
246 }
247
New(const FunctionCallbackInfo<Value> & args)248 void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
249 CHECK(args.IsConstructCall());
250 CHECK(args[0]->IsObject());
251 CHECK(args[1]->IsObject());
252 StreamBase* source = StreamBase::FromObject(args[0].As<Object>());
253 StreamBase* sink = StreamBase::FromObject(args[1].As<Object>());
254
255 new StreamPipe(source, sink, args.This());
256 }
257
Start(const FunctionCallbackInfo<Value> & args)258 void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
259 StreamPipe* pipe;
260 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
261 pipe->is_closed_ = false;
262 pipe->writable_listener_.OnStreamWantsWrite(65536);
263 }
264
Unpipe(const FunctionCallbackInfo<Value> & args)265 void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
266 StreamPipe* pipe;
267 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
268 pipe->Unpipe();
269 }
270
IsClosed(const FunctionCallbackInfo<Value> & args)271 void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
272 StreamPipe* pipe;
273 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
274 args.GetReturnValue().Set(pipe->is_closed_);
275 }
276
PendingWrites(const FunctionCallbackInfo<Value> & args)277 void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
278 StreamPipe* pipe;
279 ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
280 args.GetReturnValue().Set(pipe->pending_writes_);
281 }
282
283 namespace {
284
InitializeStreamPipe(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)285 void InitializeStreamPipe(Local<Object> target,
286 Local<Value> unused,
287 Local<Context> context,
288 void* priv) {
289 Environment* env = Environment::GetCurrent(context);
290
291 // Create FunctionTemplate for FileHandle::CloseReq
292 Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
293 Local<String> stream_pipe_string =
294 FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
295 env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
296 env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297 env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298 env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
299 pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
300 pipe->SetClassName(stream_pipe_string);
301 pipe->InstanceTemplate()->SetInternalFieldCount(
302 StreamPipe::kInternalFieldCount);
303 target
304 ->Set(context, stream_pipe_string,
305 pipe->GetFunction(context).ToLocalChecked())
306 .Check();
307 }
308
309 } // anonymous namespace
310
311 } // namespace node
312
313 NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
314 node::InitializeStreamPipe)
315