1 #ifndef SRC_STREAM_BASE_H_ 2 #define SRC_STREAM_BASE_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "env.h" 7 #include "allocated_buffer.h" 8 #include "async_wrap.h" 9 #include "node.h" 10 #include "util.h" 11 12 #include "v8.h" 13 14 namespace node { 15 16 // Forward declarations 17 class Environment; 18 class ShutdownWrap; 19 class WriteWrap; 20 class StreamBase; 21 class StreamResource; 22 23 struct StreamWriteResult { 24 bool async; 25 int err; 26 WriteWrap* wrap; 27 size_t bytes; 28 BaseObjectPtr<AsyncWrap> wrap_obj; 29 }; 30 31 using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args); 32 33 class StreamReq { 34 public: 35 // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot 36 // here because instances derived from StreamReq will also derive from 37 // BaseObject, and the slots are used for the identical purpose. 38 enum InternalFields { 39 kSlot = BaseObject::kSlot, 40 kStreamReqField = BaseObject::kInternalFieldCount, 41 kInternalFieldCount 42 }; 43 44 inline explicit StreamReq( 45 StreamBase* stream, 46 v8::Local<v8::Object> req_wrap_obj); 47 48 virtual ~StreamReq() = default; 49 virtual AsyncWrap* GetAsyncWrap() = 0; 50 inline v8::Local<v8::Object> object(); 51 52 inline void Done(int status, const char* error_str = nullptr); 53 inline void Dispose(); 54 stream()55 StreamBase* stream() const { return stream_; } 56 57 static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj); 58 59 // Sets all internal fields of `req_wrap_obj` to `nullptr`. 60 // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, 61 // and what we use in C++ after creating these objects from their 62 // v8::ObjectTemplates, to avoid the overhead of calling the 63 // constructor explicitly. 64 static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj); 65 66 protected: 67 virtual void OnDone(int status) = 0; 68 69 inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj); 70 71 private: 72 StreamBase* const stream_; 73 }; 74 75 class ShutdownWrap : public StreamReq { 76 public: 77 inline ShutdownWrap( 78 StreamBase* stream, 79 v8::Local<v8::Object> req_wrap_obj); 80 81 static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj); 82 template <typename T, bool kIsWeak> 83 static inline ShutdownWrap* FromObject( 84 const BaseObjectPtrImpl<T, kIsWeak>& base_obj); 85 86 // Call stream()->EmitAfterShutdown() and dispose of this request wrap. 87 void OnDone(int status) override; 88 }; 89 90 class WriteWrap : public StreamReq { 91 public: 92 inline void SetAllocatedStorage(AllocatedBuffer&& storage); 93 94 inline WriteWrap( 95 StreamBase* stream, 96 v8::Local<v8::Object> req_wrap_obj); 97 98 static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj); 99 template <typename T, bool kIsWeak> 100 static inline WriteWrap* FromObject( 101 const BaseObjectPtrImpl<T, kIsWeak>& base_obj); 102 103 // Call stream()->EmitAfterWrite() and dispose of this request wrap. 104 void OnDone(int status) override; 105 106 private: 107 AllocatedBuffer storage_; 108 }; 109 110 111 // This is the generic interface for objects that control Node.js' C++ streams. 112 // For example, the default `EmitToJSStreamListener` emits a stream's data 113 // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. 114 class StreamListener { 115 public: 116 virtual ~StreamListener(); 117 118 // This is called when a stream wants to allocate memory before 119 // reading data into the freshly allocated buffer (i.e. it is always followed 120 // by a `OnStreamRead()` call). 121 // This memory may be statically or dynamically allocated; for example, 122 // a protocol parser may want to read data into a static buffer if it knows 123 // that all data is going to be fully handled during the next 124 // `OnStreamRead()` call. 125 // The returned buffer does not need to contain `suggested_size` bytes. 126 // The default implementation of this method returns a buffer that has exactly 127 // the suggested size and is allocated using malloc(). 128 // It is not valid to return a zero-length buffer from this method. 129 // It is not guaranteed that the corresponding `OnStreamRead()` call 130 // happens in the same event loop turn as this call. 131 virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; 132 133 // `OnStreamRead()` is called when data is available on the socket and has 134 // been read into the buffer provided by `OnStreamAlloc()`. 135 // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer 136 // with base nullptr in case of an error. 137 // `nread` is the number of read bytes (which is at most the buffer length), 138 // or, if negative, a libuv error code. 139 virtual void OnStreamRead(ssize_t nread, 140 const uv_buf_t& buf) = 0; 141 142 // This is called once a write has finished. `status` may be 0 or, 143 // if negative, a libuv error code. 144 // By default, this is simply passed on to the previous listener 145 // (and raises an assertion if there is none). 146 virtual void OnStreamAfterWrite(WriteWrap* w, int status); 147 148 // This is called once a shutdown has finished. `status` may be 0 or, 149 // if negative, a libuv error code. 150 // By default, this is simply passed on to the previous listener 151 // (and raises an assertion if there is none). 152 virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); 153 154 // This is called by the stream if it determines that it wants more data 155 // to be written to it. Not all streams support this. 156 // This callback will not be called as long as there are active writes. 157 // It is not supported by all streams; `stream->HasWantsWrite()` returns 158 // true if it is supported by a stream. OnStreamWantsWrite(size_t suggested_size)159 virtual void OnStreamWantsWrite(size_t suggested_size) {} 160 161 // This is called immediately before the stream is destroyed. OnStreamDestroy()162 virtual void OnStreamDestroy() {} 163 164 // The stream this is currently associated with, or nullptr if there is none. stream()165 StreamResource* stream() const { return stream_; } 166 167 protected: 168 // Pass along a read error to the `StreamListener` instance that was active 169 // before this one. For example, a protocol parser does not care about read 170 // errors and may instead want to let the original handler 171 // (e.g. the JS handler) take care of the situation. 172 inline void PassReadErrorToPreviousListener(ssize_t nread); 173 174 StreamResource* stream_ = nullptr; 175 StreamListener* previous_listener_ = nullptr; 176 177 friend class StreamResource; 178 }; 179 180 181 // An (incomplete) stream listener class that calls the `.oncomplete()` 182 // method of the JS objects associated with the wrap objects. 183 class ReportWritesToJSStreamListener : public StreamListener { 184 public: 185 void OnStreamAfterWrite(WriteWrap* w, int status) override; 186 void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 187 188 private: 189 void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); 190 }; 191 192 193 // A default emitter that just pushes data chunks as Buffer instances to 194 // JS land via the handle’s .ondata method. 195 class EmitToJSStreamListener : public ReportWritesToJSStreamListener { 196 public: 197 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 198 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 199 }; 200 201 202 // An alternative listener that uses a custom, user-provided buffer 203 // for reading data. 204 class CustomBufferJSListener : public ReportWritesToJSStreamListener { 205 public: 206 uv_buf_t OnStreamAlloc(size_t suggested_size) override; 207 void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; OnStreamDestroy()208 void OnStreamDestroy() override { delete this; } 209 CustomBufferJSListener(uv_buf_t buffer)210 explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} 211 212 private: 213 uv_buf_t buffer_; 214 }; 215 216 217 // A generic stream, comparable to JS land’s `Duplex` streams. 218 // A stream is always controlled through one `StreamListener` instance. 219 class StreamResource { 220 public: 221 virtual ~StreamResource(); 222 223 // These need to be implemented on the readable side of this stream: 224 225 // Start reading from the underlying resource. This is called by the consumer 226 // when more data is desired. Use `EmitAlloc()` and `EmitData()` to 227 // pass data along to the consumer. 228 virtual int ReadStart() = 0; 229 // Stop reading from the underlying resource. This is called by the 230 // consumer when its buffers are full and no more data can be handled. 231 virtual int ReadStop() = 0; 232 233 // These need to be implemented on the writable side of this stream: 234 // All of these methods may return an error code synchronously. 235 // In that case, the finish callback should *not* be called. 236 237 // Perform a shutdown operation, and either call req_wrap->Done() when 238 // finished and return 0, return 1 for synchronous success, or 239 // a libuv error code for synchronous failures. 240 virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; 241 // Try to write as much data as possible synchronously, and modify 242 // `*bufs` and `*count` accordingly. This is a no-op by default. 243 // Return 0 for success and a libuv error code for failures. 244 virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); 245 // Initiate a write of data. If the write completes synchronously, return 0 on 246 // success (with bufs modified to indicate how much data was consumed) or a 247 // libuv error code on failure. If the write will complete asynchronously, 248 // return 0. When the write completes asynchronously, call req_wrap->Done() 249 // with 0 on success (with bufs modified to indicate how much data was 250 // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if 251 // the write completes synchronously, that is, it should never be called 252 // before DoWrite() has returned. 253 virtual int DoWrite(WriteWrap* w, 254 uv_buf_t* bufs, 255 size_t count, 256 uv_stream_t* send_handle) = 0; 257 258 // Returns true if the stream supports the `OnStreamWantsWrite()` interface. HasWantsWrite()259 virtual bool HasWantsWrite() const { return false; } 260 261 // Optionally, this may provide an error message to be used for 262 // failing writes. 263 virtual const char* Error() const; 264 // Clear the current error (i.e. that would be returned by Error()). 265 virtual void ClearError(); 266 267 // Transfer ownership of this stream to `listener`. The previous listener 268 // will not receive any more callbacks while the new listener was active. 269 inline void PushStreamListener(StreamListener* listener); 270 // Remove a listener, and, if this was the currently active one, 271 // transfer ownership back to the previous listener. 272 inline void RemoveStreamListener(StreamListener* listener); 273 274 protected: 275 // Call the current listener's OnStreamAlloc() method. 276 inline uv_buf_t EmitAlloc(size_t suggested_size); 277 // Call the current listener's OnStreamRead() method and update the 278 // stream's read byte counter. 279 inline void EmitRead( 280 ssize_t nread, 281 const uv_buf_t& buf = uv_buf_init(nullptr, 0)); 282 // Call the current listener's OnStreamAfterWrite() method. 283 inline void EmitAfterWrite(WriteWrap* w, int status); 284 // Call the current listener's OnStreamAfterShutdown() method. 285 inline void EmitAfterShutdown(ShutdownWrap* w, int status); 286 // Call the current listener's OnStreamWantsWrite() method. 287 inline void EmitWantsWrite(size_t suggested_size); 288 289 StreamListener* listener_ = nullptr; 290 uint64_t bytes_read_ = 0; 291 uint64_t bytes_written_ = 0; 292 293 friend class StreamListener; 294 }; 295 296 297 class StreamBase : public StreamResource { 298 public: 299 // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot 300 // because instances deriving from StreamBase generally also derived from 301 // BaseObject (it's possible for it not to, however). 302 enum InternalFields { 303 kSlot = BaseObject::kSlot, 304 kStreamBaseField = BaseObject::kInternalFieldCount, 305 kOnReadFunctionField, 306 kInternalFieldCount 307 }; 308 309 static void AddMethods(Environment* env, 310 v8::Local<v8::FunctionTemplate> target); 311 312 virtual bool IsAlive() = 0; 313 virtual bool IsClosing() = 0; 314 virtual bool IsIPCPipe(); 315 virtual int GetFD(); 316 317 enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS }; 318 319 v8::MaybeLocal<v8::Value> CallJSOnreadMethod( 320 ssize_t nread, 321 v8::Local<v8::ArrayBuffer> ab, 322 size_t offset = 0, 323 StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); 324 325 // This is named `stream_env` to avoid name clashes, because a lot of 326 // subclasses are also `BaseObject`s. stream_env()327 Environment* stream_env() const { return env_; } 328 329 // Shut down the current stream. This request can use an existing 330 // ShutdownWrap object (that was created in JS), or a new one will be created. 331 // Returns 1 in case of a synchronous completion, 0 in case of asynchronous 332 // completion, and a libuv error case in case of synchronous failure. 333 inline int Shutdown( 334 v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); 335 336 // Write data to the current stream. This request can use an existing 337 // WriteWrap object (that was created in JS), or a new one will be created. 338 // This will first try to write synchronously using `DoTryWrite()`, then 339 // asynchronously using `DoWrite()`. 340 // If the return value indicates a synchronous completion, no callback will 341 // be invoked. 342 inline StreamWriteResult Write( 343 uv_buf_t* bufs, 344 size_t count, 345 uv_stream_t* send_handle = nullptr, 346 v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); 347 348 // These can be overridden by subclasses to get more specific wrap instances. 349 // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap 350 // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like 351 // an associated libuv request. 352 virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object); 353 virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object); 354 355 // One of these must be implemented 356 virtual AsyncWrap* GetAsyncWrap() = 0; 357 virtual v8::Local<v8::Object> GetObject(); 358 359 static inline StreamBase* FromObject(v8::Local<v8::Object> obj); 360 361 protected: 362 inline explicit StreamBase(Environment* env); 363 364 // JS Methods 365 int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); 366 int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args); 367 int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args); 368 int Writev(const v8::FunctionCallbackInfo<v8::Value>& args); 369 int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 370 template <enum encoding enc> 371 int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); 372 int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); 373 374 static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); 375 static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); 376 static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args); 377 static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args); 378 inline void AttachToObject(v8::Local<v8::Object> obj); 379 380 template <int (StreamBase::*Method)( 381 const v8::FunctionCallbackInfo<v8::Value>& args)> 382 static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); 383 384 // Internal, used only in StreamBase methods + env.cc. 385 enum StreamBaseStateFields { 386 kReadBytesOrError, 387 kArrayBufferOffset, 388 kBytesWritten, 389 kLastWriteWasAsync, 390 kNumStreamBaseStateFields 391 }; 392 393 private: 394 Environment* env_; 395 EmitToJSStreamListener default_listener_; 396 397 void SetWriteResult(const StreamWriteResult& res); 398 static void AddMethod(Environment* env, 399 v8::Local<v8::Signature> sig, 400 enum v8::PropertyAttribute attributes, 401 v8::Local<v8::FunctionTemplate> t, 402 JSMethodFunction* stream_method, 403 v8::Local<v8::String> str); 404 405 friend class WriteWrap; 406 friend class ShutdownWrap; 407 friend class Environment; // For kNumStreamBaseStateFields. 408 }; 409 410 411 // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances. 412 // `OtherBase` must have a constructor that matches the `AsyncWrap` 413 // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature 414 // and be a subclass of `AsyncWrap`. 415 template <typename OtherBase> 416 class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { 417 public: 418 SimpleShutdownWrap(StreamBase* stream, 419 v8::Local<v8::Object> req_wrap_obj); 420 GetAsyncWrap()421 AsyncWrap* GetAsyncWrap() override { return this; } 422 423 SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(SimpleShutdownWrap)424 SET_MEMORY_INFO_NAME(SimpleShutdownWrap) 425 SET_SELF_SIZE(SimpleShutdownWrap) 426 427 bool IsNotIndicativeOfMemoryLeakAtExit() const override { 428 return OtherBase::IsNotIndicativeOfMemoryLeakAtExit(); 429 } 430 }; 431 432 template <typename OtherBase> 433 class SimpleWriteWrap : public WriteWrap, public OtherBase { 434 public: 435 SimpleWriteWrap(StreamBase* stream, 436 v8::Local<v8::Object> req_wrap_obj); 437 GetAsyncWrap()438 AsyncWrap* GetAsyncWrap() override { return this; } 439 440 SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(SimpleWriteWrap)441 SET_MEMORY_INFO_NAME(SimpleWriteWrap) 442 SET_SELF_SIZE(SimpleWriteWrap) 443 444 bool IsNotIndicativeOfMemoryLeakAtExit() const override { 445 return OtherBase::IsNotIndicativeOfMemoryLeakAtExit(); 446 } 447 }; 448 449 } // namespace node 450 451 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 452 453 #endif // SRC_STREAM_BASE_H_ 454