1 #ifndef SRC_STREAM_BASE_H_ 2 #define SRC_STREAM_BASE_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "env.h" 7 #include "async_wrap.h" 8 #include "node.h" 9 #include "util.h" 10 11 #include "v8.h" 12 13 namespace node { 14 15 // Forward declarations 16 class Environment; 17 class ShutdownWrap; 18 class WriteWrap; 19 class StreamBase; 20 class StreamResource; 21 class ExternalReferenceRegistry; 22 23 struct StreamWriteResult { 24 bool async; 25 int err; 26 WriteWrap* wrap; 27 size_t bytes; 28 BaseObjectPtr<AsyncWrap> wrap_obj; 29 }; 30 31 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args); 32 33 class StreamReq { 34 public: 35 // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot 36 // here because instances derived from StreamReq will also derive from 37 // BaseObject, and the slots are used for the identical purpose. 38 enum InternalFields { 39 kSlot = BaseObject::kSlot, 40 kStreamReqField = BaseObject::kInternalFieldCount, 41 kInternalFieldCount 42 }; 43 44 inline explicit StreamReq( 45 StreamBase* stream, 46 v8::Local<v8::Object> req_wrap_obj); 47 48 virtual ~StreamReq() = default; 49 virtual AsyncWrap* GetAsyncWrap() = 0; 50 inline v8::Local<v8::Object> object(); 51 52 // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate 53 // if there is a pending exception/termination. 54 void Done(int status, const char* error_str = nullptr); 55 inline void Dispose(); 56 stream()57 StreamBase* stream() const { return stream_; } 58 59 static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj); 60 61 // Sets all internal fields of `req_wrap_obj` to `nullptr`. 62 // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, 63 // and what we use in C++ after creating these objects from their 64 // v8::ObjectTemplates, to avoid the overhead of calling the 65 // constructor explicitly. 66 static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj); 67 68 protected: 69 virtual void OnDone(int status) = 0; 70 71 inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj); 72 73 private: 74 StreamBase* const stream_; 75 }; 76 77 class ShutdownWrap : public StreamReq { 78 public: 79 inline ShutdownWrap( 80 StreamBase* stream, 81 v8::Local<v8::Object> req_wrap_obj); 82 83 static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj); 84 template <typename T, bool kIsWeak> 85 static inline ShutdownWrap* FromObject( 86 const BaseObjectPtrImpl<T, kIsWeak>& base_obj); 87 88 // Call stream()->EmitAfterShutdown() and dispose of this request wrap. 89 void OnDone(int status) override; 90 }; 91 92 class WriteWrap : public StreamReq { 93 public: 94 inline void SetBackingStore(std::unique_ptr<v8::BackingStore> bs); 95 96 inline WriteWrap( 97 StreamBase* stream, 98 v8::Local<v8::Object> req_wrap_obj); 99 100 static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj); 101 template <typename T, bool kIsWeak> 102 static inline WriteWrap* FromObject( 103 const BaseObjectPtrImpl<T, kIsWeak>& base_obj); 104 105 // Call stream()->EmitAfterWrite() and dispose of this request wrap. 106 void OnDone(int status) override; 107 108 private: 109 std::unique_ptr<v8::BackingStore> backing_store_; 110 }; 111 112 113 // This is the generic interface for objects that control Node.js' C++ streams. 114 // For example, the default `EmitToJSStreamListener` emits a stream's data 115 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. 116 class StreamListener { 117 public: 118 virtual ~StreamListener(); 119 120 // This is called when a stream wants to allocate memory before 121 // reading data into the freshly allocated buffer (i.e. it is always followed 122 // by a `OnStreamRead()` call). 123 // This memory may be statically or dynamically allocated; for example, 124 // a protocol parser may want to read data into a static buffer if it knows 125 // that all data is going to be fully handled during the next 126 // `OnStreamRead()` call. 127 // The returned buffer does not need to contain `suggested_size` bytes. 128 // The default implementation of this method returns a buffer that has exactly 129 // the suggested size and is allocated using malloc(). 130 // It is not valid to return a zero-length buffer from this method. 131 // It is not guaranteed that the corresponding `OnStreamRead()` call 132 // happens in the same event loop turn as this call. 133 virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; 134 135 // `OnStreamRead()` is called when data is available on the socket and has 136 // been read into the buffer provided by `OnStreamAlloc()`. 137 // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer 138 // with base nullptr in case of an error. 139 // `nread` is the number of read bytes (which is at most the buffer length), 140 // or, if negative, a libuv error code. 141 virtual void OnStreamRead(ssize_t nread, 142 const uv_buf_t& buf) = 0; 143 144 // This is called once a write has finished. `status` may be 0 or, 145 // if negative, a libuv error code. 146 // By default, this is simply passed on to the previous listener 147 // (and raises an assertion if there is none). 148 virtual void OnStreamAfterWrite(WriteWrap* w, int status); 149 150 // This is called once a shutdown has finished. `status` may be 0 or, 151 // if negative, a libuv error code. 152 // By default, this is simply passed on to the previous listener 153 // (and raises an assertion if there is none). 154 virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); 155 156 // This is called by the stream if it determines that it wants more data 157 // to be written to it. Not all streams support this. 158 // This callback will not be called as long as there are active writes. 159 // It is not supported by all streams; `stream->HasWantsWrite()` returns 160 // true if it is supported by a stream. OnStreamWantsWrite(size_t suggested_size)161 virtual void OnStreamWantsWrite(size_t suggested_size) {} 162 163 // This is called immediately before the stream is destroyed. OnStreamDestroy()164 virtual void OnStreamDestroy() {} 165 166 // The stream this is currently associated with, or nullptr if there is none. stream()167 StreamResource* stream() const { return stream_; } 168 169 protected: 170 // Pass along a read error to the `StreamListener` instance that was active 171 // before this one. For example, a protocol parser does not care about read 172 // errors and may instead want to let the original handler 173 // (e.g. the JS handler) take care of the situation. 174 inline void PassReadErrorToPreviousListener(ssize_t nread); 175 176 StreamResource* stream_ = nullptr; 177 StreamListener* previous_listener_ = nullptr; 178 179 friend class StreamResource; 180 }; 181 182 183 // An (incomplete) stream listener class that calls the `.oncomplete()` 184 // method of the JS objects associated with the wrap objects. 185 class ReportWritesToJSStreamListener : public StreamListener { 186 public: 187 void OnStreamAfterWrite(WriteWrap* w, int status) override; 188 void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 189 190 private: 191 void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); 192 }; 193 194 195 // A default emitter that just pushes data chunks as Buffer instances to 196 // JS land via the handle’s .ondata method. 197 class EmitToJSStreamListener : public ReportWritesToJSStreamListener { 198 public: 199 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 200 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 201 }; 202 203 204 // An alternative listener that uses a custom, user-provided buffer 205 // for reading data. 206 class CustomBufferJSListener : public ReportWritesToJSStreamListener { 207 public: 208 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 209 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; OnStreamDestroy()210 void OnStreamDestroy() override { delete this; } 211 CustomBufferJSListener(uv_buf_t buffer)212 explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} 213 214 private: 215 uv_buf_t buffer_; 216 }; 217 218 219 // A generic stream, comparable to JS land’s `Duplex` streams. 220 // A stream is always controlled through one `StreamListener` instance. 221 class StreamResource { 222 public: 223 virtual ~StreamResource(); 224 225 // These need to be implemented on the readable side of this stream: 226 227 // Start reading from the underlying resource. This is called by the consumer 228 // when more data is desired. Use `EmitAlloc()` and `EmitData()` to 229 // pass data along to the consumer. 230 virtual int ReadStart() = 0; 231 // Stop reading from the underlying resource. This is called by the 232 // consumer when its buffers are full and no more data can be handled. 233 virtual int ReadStop() = 0; 234 235 // These need to be implemented on the writable side of this stream: 236 // All of these methods may return an error code synchronously. 237 // In that case, the finish callback should *not* be called. 238 239 // Perform a shutdown operation, and either call req_wrap->Done() when 240 // finished and return 0, return 1 for synchronous success, or 241 // a libuv error code for synchronous failures. 242 virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; 243 // Try to write as much data as possible synchronously, and modify 244 // `*bufs` and `*count` accordingly. This is a no-op by default. 245 // Return 0 for success and a libuv error code for failures. 246 virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); 247 // Initiate a write of data. 248 // Upon an immediate failure, a libuv error code is returned, 249 // w->Done() will never be called and caller should free `bufs`. 250 // Otherwise, 0 is returned and w->Done(status) will be called 251 // with status set to either 252 // (1) 0 after all data are written, or 253 // (2) a libuv error code when an error occurs 254 // in either case, w->Done() will never be called before DoWrite() returns. 255 // When 0 is returned: 256 // (1) memory specified by `bufs` and `count` must remain valid until 257 // w->Done() gets called. 258 // (2) `bufs` might or might not be changed, caller should not rely on this. 259 // (3) `bufs` should be freed after w->Done() gets called. 260 virtual int DoWrite(WriteWrap* w, 261 uv_buf_t* bufs, 262 size_t count, 263 uv_stream_t* send_handle) = 0; 264 265 // Returns true if the stream supports the `OnStreamWantsWrite()` interface. HasWantsWrite()266 virtual bool HasWantsWrite() const { return false; } 267 268 // Optionally, this may provide an error message to be used for 269 // failing writes. 270 virtual const char* Error() const; 271 // Clear the current error (i.e. that would be returned by Error()). 272 virtual void ClearError(); 273 274 // Transfer ownership of this stream to `listener`. The previous listener 275 // will not receive any more callbacks while the new listener was active. 276 inline void PushStreamListener(StreamListener* listener); 277 // Remove a listener, and, if this was the currently active one, 278 // transfer ownership back to the previous listener. 279 void RemoveStreamListener(StreamListener* listener); 280 281 protected: 282 // Call the current listener's OnStreamAlloc() method. 283 inline uv_buf_t EmitAlloc(size_t suggested_size); 284 // Call the current listener's OnStreamRead() method and update the 285 // stream's read byte counter. 286 inline void EmitRead( 287 ssize_t nread, 288 const uv_buf_t& buf = uv_buf_init(nullptr, 0)); 289 // Call the current listener's OnStreamAfterWrite() method. 290 inline void EmitAfterWrite(WriteWrap* w, int status); 291 // Call the current listener's OnStreamAfterShutdown() method. 292 inline void EmitAfterShutdown(ShutdownWrap* w, int status); 293 // Call the current listener's OnStreamWantsWrite() method. 294 inline void EmitWantsWrite(size_t suggested_size); 295 296 StreamListener* listener_ = nullptr; 297 uint64_t bytes_read_ = 0; 298 uint64_t bytes_written_ = 0; 299 300 friend class StreamListener; 301 }; 302 303 304 class StreamBase : public StreamResource { 305 public: 306 // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot 307 // because instances deriving from StreamBase generally also derived from 308 // BaseObject (it's possible for it not to, however). 309 enum InternalFields { 310 kSlot = BaseObject::kSlot, 311 kStreamBaseField = BaseObject::kInternalFieldCount, 312 kOnReadFunctionField, 313 kInternalFieldCount 314 }; 315 316 static void AddMethods(Environment* env, 317 v8::Local<v8::FunctionTemplate> target); 318 static void RegisterExternalReferences(ExternalReferenceRegistry* registry); 319 virtual bool IsAlive() = 0; 320 virtual bool IsClosing() = 0; 321 virtual bool IsIPCPipe(); 322 virtual int GetFD(); 323 324 enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS }; 325 326 v8::MaybeLocal<v8::Value> CallJSOnreadMethod( 327 ssize_t nread, 328 v8::Local<v8::ArrayBuffer> ab, 329 size_t offset = 0, 330 StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); 331 332 // This is named `stream_env` to avoid name clashes, because a lot of 333 // subclasses are also `BaseObject`s. stream_env()334 Environment* stream_env() const { return env_; } 335 336 // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate 337 // if there is a pending exception/termination. 338 // Shut down the current stream. This request can use an existing 339 // ShutdownWrap object (that was created in JS), or a new one will be created. 340 // Returns 1 in case of a synchronous completion, 0 in case of asynchronous 341 // completion, and a libuv error case in case of synchronous failure. 342 int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); 343 344 // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate 345 // if there is a pending exception/termination. 346 // Write data to the current stream. This request can use an existing 347 // WriteWrap object (that was created in JS), or a new one will be created. 348 // This will first try to write synchronously using `DoTryWrite()`, then 349 // asynchronously using `DoWrite()`. 350 // Caller can pass `skip_try_write` as true if it has already called 351 // `DoTryWrite()` and ends up with a partial write, or it knows that the 352 // write is too large to finish synchronously. 353 // If the return value indicates a synchronous completion, no callback will 354 // be invoked. 355 StreamWriteResult Write( 356 uv_buf_t* bufs, 357 size_t count, 358 uv_stream_t* send_handle = nullptr, 359 v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>(), 360 bool skip_try_write = false); 361 362 // These can be overridden by subclasses to get more specific wrap instances. 363 // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap 364 // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like 365 // an associated libuv request. 366 virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object); 367 virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object); 368 369 // One of these must be implemented 370 virtual AsyncWrap* GetAsyncWrap() = 0; 371 virtual v8::Local<v8::Object> GetObject(); 372 373 static inline StreamBase* FromObject(v8::Local<v8::Object> obj); 374 375 protected: 376 inline explicit StreamBase(Environment* env); 377 378 // JS Methods 379 int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); 380 int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args); 381 int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args); 382 int Writev(const v8::FunctionCallbackInfo<v8::Value>& args); 383 int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 384 template <enum encoding enc> 385 int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); 386 int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 387 388 static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); 389 static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); 390 static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args); 391 static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args); 392 inline void AttachToObject(v8::Local<v8::Object> obj); 393 394 template <int (StreamBase::*Method)( 395 const v8::FunctionCallbackInfo<v8::Value>& args)> 396 static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); 397 398 // Internal, used only in StreamBase methods + env.cc. 399 enum StreamBaseStateFields { 400 kReadBytesOrError, 401 kArrayBufferOffset, 402 kBytesWritten, 403 kLastWriteWasAsync, 404 kNumStreamBaseStateFields 405 }; 406 407 private: 408 Environment* env_; 409 EmitToJSStreamListener default_listener_; 410 411 void SetWriteResult(const StreamWriteResult& res); 412 static void AddMethod(Environment* env, 413 v8::Local<v8::Signature> sig, 414 enum v8::PropertyAttribute attributes, 415 v8::Local<v8::FunctionTemplate> t, 416 JSMethodFunction* stream_method, 417 v8::Local<v8::String> str); 418 419 friend class WriteWrap; 420 friend class ShutdownWrap; 421 friend class Environment; // For kNumStreamBaseStateFields. 422 }; 423 424 425 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances. 426 // `OtherBase` must have a constructor that matches the `AsyncWrap` 427 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature 428 // and be a subclass of `AsyncWrap`. 429 template <typename OtherBase> 430 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { 431 public: 432 SimpleShutdownWrap(StreamBase* stream, 433 v8::Local<v8::Object> req_wrap_obj); 434 GetAsyncWrap()435 AsyncWrap* GetAsyncWrap() override { return this; } 436 437 SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(SimpleShutdownWrap)438 SET_MEMORY_INFO_NAME(SimpleShutdownWrap) 439 SET_SELF_SIZE(SimpleShutdownWrap) 440 441 bool IsNotIndicativeOfMemoryLeakAtExit() const override { 442 return OtherBase::IsNotIndicativeOfMemoryLeakAtExit(); 443 } 444 }; 445 446 template <typename OtherBase> 447 class SimpleWriteWrap : public WriteWrap, public OtherBase { 448 public: 449 SimpleWriteWrap(StreamBase* stream, 450 v8::Local<v8::Object> req_wrap_obj); 451 GetAsyncWrap()452 AsyncWrap* GetAsyncWrap() override { return this; } 453 454 SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(SimpleWriteWrap)455 SET_MEMORY_INFO_NAME(SimpleWriteWrap) 456 SET_SELF_SIZE(SimpleWriteWrap) 457 458 bool IsNotIndicativeOfMemoryLeakAtExit() const override { 459 return OtherBase::IsNotIndicativeOfMemoryLeakAtExit(); 460 } 461 }; 462 463 } // namespace node 464 465 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 466 467 #endif // SRC_STREAM_BASE_H_ 468