1 #ifndef SRC_STREAM_BASE_H_ 2 #define SRC_STREAM_BASE_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "env.h" 7 #include "async_wrap.h" 8 #include "node.h" 9 #include "util.h" 10 11 #include "v8.h" 12 13 namespace node { 14 15 // Forward declarations 16 class Environment; 17 class ShutdownWrap; 18 class WriteWrap; 19 class StreamBase; 20 class StreamResource; 21 22 struct StreamWriteResult { 23 bool async; 24 int err; 25 WriteWrap* wrap; 26 size_t bytes; 27 BaseObjectPtr<AsyncWrap> wrap_obj; 28 }; 29 30 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args); 31 32 class StreamReq { 33 public: 34 // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot 35 // here because instances derived from StreamReq will also derive from 36 // BaseObject, and the slots are used for the identical purpose. 37 enum InternalFields { 38 kSlot = BaseObject::kSlot, 39 kStreamReqField = BaseObject::kInternalFieldCount, 40 kInternalFieldCount 41 }; 42 StreamReq(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)43 explicit StreamReq(StreamBase* stream, 44 v8::Local<v8::Object> req_wrap_obj) : stream_(stream) { 45 AttachToObject(req_wrap_obj); 46 } 47 48 virtual ~StreamReq() = default; 49 virtual AsyncWrap* GetAsyncWrap() = 0; 50 v8::Local<v8::Object> object(); 51 52 void Done(int status, const char* error_str = nullptr); 53 void Dispose(); 54 stream()55 inline StreamBase* stream() const { return stream_; } 56 57 static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj); 58 59 // Sets all internal fields of `req_wrap_obj` to `nullptr`. 60 // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, 61 // and what we use in C++ after creating these objects from their 62 // v8::ObjectTemplates, to avoid the overhead of calling the 63 // constructor explicitly. 64 static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj); 65 66 protected: 67 virtual void OnDone(int status) = 0; 68 69 void AttachToObject(v8::Local<v8::Object> req_wrap_obj); 70 71 private: 72 StreamBase* const stream_; 73 }; 74 75 class ShutdownWrap : public StreamReq { 76 public: ShutdownWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)77 ShutdownWrap(StreamBase* stream, 78 v8::Local<v8::Object> req_wrap_obj) 79 : StreamReq(stream, req_wrap_obj) { } 80 81 // Call stream()->EmitAfterShutdown() and dispose of this request wrap. 82 void OnDone(int status) override; 83 }; 84 85 class WriteWrap : public StreamReq { 86 public: 87 void SetAllocatedStorage(AllocatedBuffer&& storage); 88 WriteWrap(StreamBase * stream,v8::Local<v8::Object> req_wrap_obj)89 WriteWrap(StreamBase* stream, 90 v8::Local<v8::Object> req_wrap_obj) 91 : StreamReq(stream, req_wrap_obj) { } 92 93 // Call stream()->EmitAfterWrite() and dispose of this request wrap. 94 void OnDone(int status) override; 95 96 private: 97 AllocatedBuffer storage_; 98 }; 99 100 101 // This is the generic interface for objects that control Node.js' C++ streams. 102 // For example, the default `EmitToJSStreamListener` emits a stream's data 103 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. 104 class StreamListener { 105 public: 106 virtual ~StreamListener(); 107 108 // This is called when a stream wants to allocate memory before 109 // reading data into the freshly allocated buffer (i.e. it is always followed 110 // by a `OnStreamRead()` call). 111 // This memory may be statically or dynamically allocated; for example, 112 // a protocol parser may want to read data into a static buffer if it knows 113 // that all data is going to be fully handled during the next 114 // `OnStreamRead()` call. 115 // The returned buffer does not need to contain `suggested_size` bytes. 116 // The default implementation of this method returns a buffer that has exactly 117 // the suggested size and is allocated using malloc(). 118 // It is not valid to return a zero-length buffer from this method. 119 // It is not guaranteed that the corresponding `OnStreamRead()` call 120 // happens in the same event loop turn as this call. 121 virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; 122 123 // `OnStreamRead()` is called when data is available on the socket and has 124 // been read into the buffer provided by `OnStreamAlloc()`. 125 // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer 126 // with base nullptr in case of an error. 127 // `nread` is the number of read bytes (which is at most the buffer length), 128 // or, if negative, a libuv error code. 129 virtual void OnStreamRead(ssize_t nread, 130 const uv_buf_t& buf) = 0; 131 132 // This is called once a write has finished. `status` may be 0 or, 133 // if negative, a libuv error code. 134 // By default, this is simply passed on to the previous listener 135 // (and raises an assertion if there is none). 136 virtual void OnStreamAfterWrite(WriteWrap* w, int status); 137 138 // This is called once a shutdown has finished. `status` may be 0 or, 139 // if negative, a libuv error code. 140 // By default, this is simply passed on to the previous listener 141 // (and raises an assertion if there is none). 142 virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); 143 144 // This is called by the stream if it determines that it wants more data 145 // to be written to it. Not all streams support this. 146 // This callback will not be called as long as there are active writes. 147 // It is not supported by all streams; `stream->HasWantsWrite()` returns 148 // true if it is supported by a stream. OnStreamWantsWrite(size_t suggested_size)149 virtual void OnStreamWantsWrite(size_t suggested_size) {} 150 151 // This is called immediately before the stream is destroyed. OnStreamDestroy()152 virtual void OnStreamDestroy() {} 153 154 // The stream this is currently associated with, or nullptr if there is none. stream()155 inline StreamResource* stream() { return stream_; } 156 157 protected: 158 // Pass along a read error to the `StreamListener` instance that was active 159 // before this one. For example, a protocol parser does not care about read 160 // errors and may instead want to let the original handler 161 // (e.g. the JS handler) take care of the situation. 162 void PassReadErrorToPreviousListener(ssize_t nread); 163 164 StreamResource* stream_ = nullptr; 165 StreamListener* previous_listener_ = nullptr; 166 167 friend class StreamResource; 168 }; 169 170 171 // An (incomplete) stream listener class that calls the `.oncomplete()` 172 // method of the JS objects associated with the wrap objects. 173 class ReportWritesToJSStreamListener : public StreamListener { 174 public: 175 void OnStreamAfterWrite(WriteWrap* w, int status) override; 176 void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 177 178 private: 179 void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); 180 }; 181 182 183 // A default emitter that just pushes data chunks as Buffer instances to 184 // JS land via the handle’s .ondata method. 185 class EmitToJSStreamListener : public ReportWritesToJSStreamListener { 186 public: 187 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 188 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 189 }; 190 191 192 // An alternative listener that uses a custom, user-provided buffer 193 // for reading data. 194 class CustomBufferJSListener : public ReportWritesToJSStreamListener { 195 public: 196 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 197 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; OnStreamDestroy()198 void OnStreamDestroy() override { delete this; } 199 CustomBufferJSListener(uv_buf_t buffer)200 explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} 201 202 private: 203 uv_buf_t buffer_; 204 }; 205 206 207 // A generic stream, comparable to JS land’s `Duplex` streams. 208 // A stream is always controlled through one `StreamListener` instance. 209 class StreamResource { 210 public: 211 virtual ~StreamResource(); 212 213 // These need to be implemented on the readable side of this stream: 214 215 // Start reading from the underlying resource. This is called by the consumer 216 // when more data is desired. Use `EmitAlloc()` and `EmitData()` to 217 // pass data along to the consumer. 218 virtual int ReadStart() = 0; 219 // Stop reading from the underlying resource. This is called by the 220 // consumer when its buffers are full and no more data can be handled. 221 virtual int ReadStop() = 0; 222 223 // These need to be implemented on the writable side of this stream: 224 // All of these methods may return an error code synchronously. 225 // In that case, the finish callback should *not* be called. 226 227 // Perform a shutdown operation, and either call req_wrap->Done() when 228 // finished and return 0, return 1 for synchronous success, or 229 // a libuv error code for synchronous failures. 230 virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; 231 // Try to write as much data as possible synchronously, and modify 232 // `*bufs` and `*count` accordingly. This is a no-op by default. 233 // Return 0 for success and a libuv error code for failures. 234 virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); 235 // Initiate a write of data. If the write completes synchronously, return 0 on 236 // success (with bufs modified to indicate how much data was consumed) or a 237 // libuv error code on failure. If the write will complete asynchronously, 238 // return 0. When the write completes asynchronously, call req_wrap->Done() 239 // with 0 on success (with bufs modified to indicate how much data was 240 // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if 241 // the write completes synchronously, that is, it should never be called 242 // before DoWrite() has returned. 243 virtual int DoWrite(WriteWrap* w, 244 uv_buf_t* bufs, 245 size_t count, 246 uv_stream_t* send_handle) = 0; 247 248 // Returns true if the stream supports the `OnStreamWantsWrite()` interface. HasWantsWrite()249 virtual bool HasWantsWrite() const { return false; } 250 251 // Optionally, this may provide an error message to be used for 252 // failing writes. 253 virtual const char* Error() const; 254 // Clear the current error (i.e. that would be returned by Error()). 255 virtual void ClearError(); 256 257 // Transfer ownership of this stream to `listener`. The previous listener 258 // will not receive any more callbacks while the new listener was active. 259 void PushStreamListener(StreamListener* listener); 260 // Remove a listener, and, if this was the currently active one, 261 // transfer ownership back to the previous listener. 262 void RemoveStreamListener(StreamListener* listener); 263 264 protected: 265 // Call the current listener's OnStreamAlloc() method. 266 uv_buf_t EmitAlloc(size_t suggested_size); 267 // Call the current listener's OnStreamRead() method and update the 268 // stream's read byte counter. 269 void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); 270 // Call the current listener's OnStreamAfterWrite() method. 271 void EmitAfterWrite(WriteWrap* w, int status); 272 // Call the current listener's OnStreamAfterShutdown() method. 273 void EmitAfterShutdown(ShutdownWrap* w, int status); 274 // Call the current listener's OnStreamWantsWrite() method. 275 void EmitWantsWrite(size_t suggested_size); 276 277 StreamListener* listener_ = nullptr; 278 uint64_t bytes_read_ = 0; 279 uint64_t bytes_written_ = 0; 280 281 friend class StreamListener; 282 }; 283 284 285 class StreamBase : public StreamResource { 286 public: 287 // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot 288 // because instances deriving from StreamBase generally also derived from 289 // BaseObject (it's possible for it not to, however). 290 enum InternalFields { 291 kSlot = BaseObject::kSlot, 292 kStreamBaseField = BaseObject::kInternalFieldCount, 293 kOnReadFunctionField, 294 kInternalFieldCount 295 }; 296 297 static void AddMethods(Environment* env, 298 v8::Local<v8::FunctionTemplate> target); 299 300 virtual bool IsAlive() = 0; 301 virtual bool IsClosing() = 0; 302 virtual bool IsIPCPipe(); 303 virtual int GetFD(); 304 305 enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS }; 306 307 v8::MaybeLocal<v8::Value> CallJSOnreadMethod( 308 ssize_t nread, 309 v8::Local<v8::ArrayBuffer> ab, 310 size_t offset = 0, 311 StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); 312 313 // This is named `stream_env` to avoid name clashes, because a lot of 314 // subclasses are also `BaseObject`s. 315 Environment* stream_env() const; 316 317 // Shut down the current stream. This request can use an existing 318 // ShutdownWrap object (that was created in JS), or a new one will be created. 319 // Returns 1 in case of a synchronous completion, 0 in case of asynchronous 320 // completion, and a libuv error case in case of synchronous failure. 321 int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); 322 323 // Write data to the current stream. This request can use an existing 324 // WriteWrap object (that was created in JS), or a new one will be created. 325 // This will first try to write synchronously using `DoTryWrite()`, then 326 // asynchronously using `DoWrite()`. 327 // If the return value indicates a synchronous completion, no callback will 328 // be invoked. 329 StreamWriteResult Write( 330 uv_buf_t* bufs, 331 size_t count, 332 uv_stream_t* send_handle = nullptr, 333 v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); 334 335 // These can be overridden by subclasses to get more specific wrap instances. 336 // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap 337 // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like 338 // an associated libuv request. 339 virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object); 340 virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object); 341 342 // One of these must be implemented 343 virtual AsyncWrap* GetAsyncWrap() = 0; 344 virtual v8::Local<v8::Object> GetObject(); 345 346 static StreamBase* FromObject(v8::Local<v8::Object> obj); 347 348 protected: 349 explicit StreamBase(Environment* env); 350 351 // JS Methods 352 int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); 353 int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args); 354 int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args); 355 int Writev(const v8::FunctionCallbackInfo<v8::Value>& args); 356 int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 357 template <enum encoding enc> 358 int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); 359 int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 360 361 static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); 362 static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); 363 static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args); 364 static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args); 365 void AttachToObject(v8::Local<v8::Object> obj); 366 367 template <int (StreamBase::*Method)( 368 const v8::FunctionCallbackInfo<v8::Value>& args)> 369 static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); 370 371 // Internal, used only in StreamBase methods + env.cc. 372 enum StreamBaseStateFields { 373 kReadBytesOrError, 374 kArrayBufferOffset, 375 kBytesWritten, 376 kLastWriteWasAsync, 377 kNumStreamBaseStateFields 378 }; 379 380 private: 381 Environment* env_; 382 EmitToJSStreamListener default_listener_; 383 384 void SetWriteResult(const StreamWriteResult& res); 385 static void AddMethod(Environment* env, 386 v8::Local<v8::Signature> sig, 387 enum v8::PropertyAttribute attributes, 388 v8::Local<v8::FunctionTemplate> t, 389 JSMethodFunction* stream_method, 390 v8::Local<v8::String> str); 391 392 friend class WriteWrap; 393 friend class ShutdownWrap; 394 friend class Environment; // For kNumStreamBaseStateFields. 395 }; 396 397 398 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances. 399 // `OtherBase` must have a constructor that matches the `AsyncWrap` 400 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature 401 // and be a subclass of `AsyncWrap`. 402 template <typename OtherBase> 403 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { 404 public: 405 SimpleShutdownWrap(StreamBase* stream, 406 v8::Local<v8::Object> req_wrap_obj); 407 GetAsyncWrap()408 AsyncWrap* GetAsyncWrap() override { return this; } 409 410 SET_NO_MEMORY_INFO() 411 SET_MEMORY_INFO_NAME(SimpleShutdownWrap) 412 SET_SELF_SIZE(SimpleShutdownWrap) 413 }; 414 415 template <typename OtherBase> 416 class SimpleWriteWrap : public WriteWrap, public OtherBase { 417 public: 418 SimpleWriteWrap(StreamBase* stream, 419 v8::Local<v8::Object> req_wrap_obj); 420 GetAsyncWrap()421 AsyncWrap* GetAsyncWrap() override { return this; } 422 423 SET_NO_MEMORY_INFO() 424 SET_MEMORY_INFO_NAME(SimpleWriteWrap) 425 SET_SELF_SIZE(SimpleWriteWrap) 426 }; 427 428 } // namespace node 429 430 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 431 432 #endif // SRC_STREAM_BASE_H_ 433