1 #include "js_stream.h"
2
3 #include "async_wrap.h"
4 #include "env-inl.h"
5 #include "node_errors.h"
6 #include "stream_base-inl.h"
7 #include "util-inl.h"
8 #include "v8.h"
9
10 namespace node {
11
12 using errors::TryCatchScope;
13
14 using v8::Array;
15 using v8::Context;
16 using v8::FunctionCallbackInfo;
17 using v8::FunctionTemplate;
18 using v8::HandleScope;
19 using v8::Int32;
20 using v8::Local;
21 using v8::Object;
22 using v8::Value;
23
24
JSStream(Environment * env,Local<Object> obj)25 JSStream::JSStream(Environment* env, Local<Object> obj)
26 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM),
27 StreamBase(env) {
28 MakeWeak();
29 StreamBase::AttachToObject(obj);
30 }
31
32
GetAsyncWrap()33 AsyncWrap* JSStream::GetAsyncWrap() {
34 return static_cast<AsyncWrap*>(this);
35 }
36
37
IsAlive()38 bool JSStream::IsAlive() {
39 return true;
40 }
41
42
IsClosing()43 bool JSStream::IsClosing() {
44 HandleScope scope(env()->isolate());
45 Context::Scope context_scope(env()->context());
46 TryCatchScope try_catch(env());
47 Local<Value> value;
48 if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
49 if (try_catch.HasCaught() && !try_catch.HasTerminated())
50 errors::TriggerUncaughtException(env()->isolate(), try_catch);
51 return true;
52 }
53 return value->IsTrue();
54 }
55
56
ReadStart()57 int JSStream::ReadStart() {
58 HandleScope scope(env()->isolate());
59 Context::Scope context_scope(env()->context());
60 TryCatchScope try_catch(env());
61 Local<Value> value;
62 int value_int = UV_EPROTO;
63 if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
64 !value->Int32Value(env()->context()).To(&value_int)) {
65 if (try_catch.HasCaught() && !try_catch.HasTerminated())
66 errors::TriggerUncaughtException(env()->isolate(), try_catch);
67 }
68 return value_int;
69 }
70
71
ReadStop()72 int JSStream::ReadStop() {
73 HandleScope scope(env()->isolate());
74 Context::Scope context_scope(env()->context());
75 TryCatchScope try_catch(env());
76 Local<Value> value;
77 int value_int = UV_EPROTO;
78 if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
79 !value->Int32Value(env()->context()).To(&value_int)) {
80 if (try_catch.HasCaught() && !try_catch.HasTerminated())
81 errors::TriggerUncaughtException(env()->isolate(), try_catch);
82 }
83 return value_int;
84 }
85
86
DoShutdown(ShutdownWrap * req_wrap)87 int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
88 HandleScope scope(env()->isolate());
89 Context::Scope context_scope(env()->context());
90
91 Local<Value> argv[] = {
92 req_wrap->object()
93 };
94
95 TryCatchScope try_catch(env());
96 Local<Value> value;
97 int value_int = UV_EPROTO;
98 if (!MakeCallback(env()->onshutdown_string(),
99 arraysize(argv),
100 argv).ToLocal(&value) ||
101 !value->Int32Value(env()->context()).To(&value_int)) {
102 if (try_catch.HasCaught() && !try_catch.HasTerminated())
103 errors::TriggerUncaughtException(env()->isolate(), try_catch);
104 }
105 return value_int;
106 }
107
108
DoWrite(WriteWrap * w,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)109 int JSStream::DoWrite(WriteWrap* w,
110 uv_buf_t* bufs,
111 size_t count,
112 uv_stream_t* send_handle) {
113 CHECK_NULL(send_handle);
114
115 HandleScope scope(env()->isolate());
116 Context::Scope context_scope(env()->context());
117
118 MaybeStackBuffer<Local<Value>, 16> bufs_arr(count);
119 for (size_t i = 0; i < count; i++) {
120 bufs_arr[i] =
121 Buffer::Copy(env(), bufs[i].base, bufs[i].len).ToLocalChecked();
122 }
123
124 Local<Value> argv[] = {
125 w->object(),
126 Array::New(env()->isolate(), bufs_arr.out(), count)
127 };
128
129 TryCatchScope try_catch(env());
130 Local<Value> value;
131 int value_int = UV_EPROTO;
132 if (!MakeCallback(env()->onwrite_string(),
133 arraysize(argv),
134 argv).ToLocal(&value) ||
135 !value->Int32Value(env()->context()).To(&value_int)) {
136 if (try_catch.HasCaught() && !try_catch.HasTerminated())
137 errors::TriggerUncaughtException(env()->isolate(), try_catch);
138 }
139 return value_int;
140 }
141
142
New(const FunctionCallbackInfo<Value> & args)143 void JSStream::New(const FunctionCallbackInfo<Value>& args) {
144 // This constructor should not be exposed to public javascript.
145 // Therefore we assert that we are not trying to call this as a
146 // normal function.
147 CHECK(args.IsConstructCall());
148 Environment* env = Environment::GetCurrent(args);
149 new JSStream(env, args.This());
150 }
151
152
153 template <class Wrap>
Finish(const FunctionCallbackInfo<Value> & args)154 void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
155 CHECK(args[0]->IsObject());
156 Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));
157
158 CHECK(args[1]->IsInt32());
159 w->Done(args[1].As<Int32>()->Value());
160 }
161
162
ReadBuffer(const FunctionCallbackInfo<Value> & args)163 void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
164 JSStream* wrap;
165 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
166
167 ArrayBufferViewContents<char> buffer(args[0]);
168 const char* data = buffer.data();
169 int len = buffer.length();
170
171 // Repeatedly ask the stream's owner for memory, copy the data that we
172 // just read from JS into those buffers and emit them as reads.
173 while (len != 0) {
174 uv_buf_t buf = wrap->EmitAlloc(len);
175 ssize_t avail = len;
176 if (static_cast<ssize_t>(buf.len) < avail)
177 avail = buf.len;
178
179 memcpy(buf.base, data, avail);
180 data += avail;
181 len -= avail;
182 wrap->EmitRead(avail, buf);
183 }
184 }
185
186
EmitEOF(const FunctionCallbackInfo<Value> & args)187 void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
188 JSStream* wrap;
189 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
190
191 wrap->EmitRead(UV_EOF);
192 }
193
194
Initialize(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)195 void JSStream::Initialize(Local<Object> target,
196 Local<Value> unused,
197 Local<Context> context,
198 void* priv) {
199 Environment* env = Environment::GetCurrent(context);
200
201 Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
202 t->InstanceTemplate()
203 ->SetInternalFieldCount(StreamBase::kInternalFieldCount);
204 t->Inherit(AsyncWrap::GetConstructorTemplate(env));
205
206 env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
207 env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
208 env->SetProtoMethod(t, "readBuffer", ReadBuffer);
209 env->SetProtoMethod(t, "emitEOF", EmitEOF);
210
211 StreamBase::AddMethods(env, t);
212 env->SetConstructorFunction(target, "JSStream", t);
213 }
214
215 } // namespace node
216
217 NODE_MODULE_CONTEXT_AWARE_INTERNAL(js_stream, node::JSStream::Initialize)
218