1 #include "js_stream.h"
2
3 #include "async_wrap.h"
4 #include "env-inl.h"
5 #include "node_errors.h"
6 #include "stream_base-inl.h"
7 #include "util-inl.h"
8 #include "v8.h"
9
10 namespace node {
11
12 using errors::TryCatchScope;
13
14 using v8::Array;
15 using v8::Context;
16 using v8::FunctionCallbackInfo;
17 using v8::FunctionTemplate;
18 using v8::HandleScope;
19 using v8::Int32;
20 using v8::Isolate;
21 using v8::Local;
22 using v8::Object;
23 using v8::Value;
24
25
JSStream(Environment * env,Local<Object> obj)26 JSStream::JSStream(Environment* env, Local<Object> obj)
27 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM),
28 StreamBase(env) {
29 MakeWeak();
30 StreamBase::AttachToObject(obj);
31 }
32
33
GetAsyncWrap()34 AsyncWrap* JSStream::GetAsyncWrap() {
35 return static_cast<AsyncWrap*>(this);
36 }
37
38
IsAlive()39 bool JSStream::IsAlive() {
40 return true;
41 }
42
43
IsClosing()44 bool JSStream::IsClosing() {
45 HandleScope scope(env()->isolate());
46 Context::Scope context_scope(env()->context());
47 TryCatchScope try_catch(env());
48 Local<Value> value;
49 if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
50 if (try_catch.HasCaught() && !try_catch.HasTerminated())
51 errors::TriggerUncaughtException(env()->isolate(), try_catch);
52 return true;
53 }
54 return value->IsTrue();
55 }
56
57
ReadStart()58 int JSStream::ReadStart() {
59 HandleScope scope(env()->isolate());
60 Context::Scope context_scope(env()->context());
61 TryCatchScope try_catch(env());
62 Local<Value> value;
63 int value_int = UV_EPROTO;
64 if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
65 !value->Int32Value(env()->context()).To(&value_int)) {
66 if (try_catch.HasCaught() && !try_catch.HasTerminated())
67 errors::TriggerUncaughtException(env()->isolate(), try_catch);
68 }
69 return value_int;
70 }
71
72
ReadStop()73 int JSStream::ReadStop() {
74 HandleScope scope(env()->isolate());
75 Context::Scope context_scope(env()->context());
76 TryCatchScope try_catch(env());
77 Local<Value> value;
78 int value_int = UV_EPROTO;
79 if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
80 !value->Int32Value(env()->context()).To(&value_int)) {
81 if (try_catch.HasCaught() && !try_catch.HasTerminated())
82 errors::TriggerUncaughtException(env()->isolate(), try_catch);
83 }
84 return value_int;
85 }
86
87
DoShutdown(ShutdownWrap * req_wrap)88 int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
89 HandleScope scope(env()->isolate());
90 Context::Scope context_scope(env()->context());
91
92 Local<Value> argv[] = {
93 req_wrap->object()
94 };
95
96 TryCatchScope try_catch(env());
97 Local<Value> value;
98 int value_int = UV_EPROTO;
99 if (!MakeCallback(env()->onshutdown_string(),
100 arraysize(argv),
101 argv).ToLocal(&value) ||
102 !value->Int32Value(env()->context()).To(&value_int)) {
103 if (try_catch.HasCaught() && !try_catch.HasTerminated())
104 errors::TriggerUncaughtException(env()->isolate(), try_catch);
105 }
106 return value_int;
107 }
108
109
DoWrite(WriteWrap * w,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)110 int JSStream::DoWrite(WriteWrap* w,
111 uv_buf_t* bufs,
112 size_t count,
113 uv_stream_t* send_handle) {
114 CHECK_NULL(send_handle);
115
116 HandleScope scope(env()->isolate());
117 Context::Scope context_scope(env()->context());
118
119 MaybeStackBuffer<Local<Value>, 16> bufs_arr(count);
120 for (size_t i = 0; i < count; i++) {
121 bufs_arr[i] =
122 Buffer::Copy(env(), bufs[i].base, bufs[i].len).ToLocalChecked();
123 }
124
125 Local<Value> argv[] = {
126 w->object(),
127 Array::New(env()->isolate(), bufs_arr.out(), count)
128 };
129
130 TryCatchScope try_catch(env());
131 Local<Value> value;
132 int value_int = UV_EPROTO;
133 if (!MakeCallback(env()->onwrite_string(),
134 arraysize(argv),
135 argv).ToLocal(&value) ||
136 !value->Int32Value(env()->context()).To(&value_int)) {
137 if (try_catch.HasCaught() && !try_catch.HasTerminated())
138 errors::TriggerUncaughtException(env()->isolate(), try_catch);
139 }
140 return value_int;
141 }
142
143
New(const FunctionCallbackInfo<Value> & args)144 void JSStream::New(const FunctionCallbackInfo<Value>& args) {
145 // This constructor should not be exposed to public javascript.
146 // Therefore we assert that we are not trying to call this as a
147 // normal function.
148 CHECK(args.IsConstructCall());
149 Environment* env = Environment::GetCurrent(args);
150 new JSStream(env, args.This());
151 }
152
153
154 template <class Wrap>
Finish(const FunctionCallbackInfo<Value> & args)155 void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
156 CHECK(args[0]->IsObject());
157 Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));
158
159 CHECK(args[1]->IsInt32());
160 w->Done(args[1].As<Int32>()->Value());
161 }
162
163
ReadBuffer(const FunctionCallbackInfo<Value> & args)164 void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
165 JSStream* wrap;
166 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
167
168 ArrayBufferViewContents<char> buffer(args[0]);
169 const char* data = buffer.data();
170 int len = buffer.length();
171
172 // Repeatedly ask the stream's owner for memory, copy the data that we
173 // just read from JS into those buffers and emit them as reads.
174 while (len != 0) {
175 uv_buf_t buf = wrap->EmitAlloc(len);
176 ssize_t avail = len;
177 if (static_cast<ssize_t>(buf.len) < avail)
178 avail = buf.len;
179
180 memcpy(buf.base, data, avail);
181 data += avail;
182 len -= static_cast<int>(avail);
183 wrap->EmitRead(avail, buf);
184 }
185 }
186
187
EmitEOF(const FunctionCallbackInfo<Value> & args)188 void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
189 JSStream* wrap;
190 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
191
192 wrap->EmitRead(UV_EOF);
193 }
194
195
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)196 void JSStream::Initialize(Local<Object> target,
197 Local<Value> unused,
198 Local<Context> context,
199 void* priv) {
200 Environment* env = Environment::GetCurrent(context);
201 Isolate* isolate = env->isolate();
202
203 Local<FunctionTemplate> t = NewFunctionTemplate(isolate, New);
204 t->InstanceTemplate()
205 ->SetInternalFieldCount(StreamBase::kInternalFieldCount);
206 t->Inherit(AsyncWrap::GetConstructorTemplate(env));
207
208 SetProtoMethod(isolate, t, "finishWrite", Finish<WriteWrap>);
209 SetProtoMethod(isolate, t, "finishShutdown", Finish<ShutdownWrap>);
210 SetProtoMethod(isolate, t, "readBuffer", ReadBuffer);
211 SetProtoMethod(isolate, t, "emitEOF", EmitEOF);
212
213 StreamBase::AddMethods(env, t);
214 SetConstructorFunction(context, target, "JSStream", t);
215 }
216
217 } // namespace node
218
219 NODE_BINDING_CONTEXT_AWARE_INTERNAL(js_stream, node::JSStream::Initialize)
220