• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
2 
3 #include "logstream.h"
4 #include <async_wrap-inl.h>
5 #include <base_object-inl.h>
6 #include <env-inl.h>
7 #include <memory_tracker-inl.h>
8 #include <node_external_reference.h>
9 #include <stream_base-inl.h>
10 #include <uv.h>
11 #include <v8.h>
12 #include "bindingdata.h"
13 
14 namespace node {
15 
16 using v8::FunctionTemplate;
17 using v8::Local;
18 using v8::Object;
19 
20 namespace quic {
21 
GetConstructorTemplate(Environment * env)22 Local<FunctionTemplate> LogStream::GetConstructorTemplate(Environment* env) {
23   auto& state = BindingData::Get(env);
24   auto tmpl = state.logstream_constructor_template();
25   if (tmpl.IsEmpty()) {
26     tmpl = FunctionTemplate::New(env->isolate());
27     tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
28     tmpl->InstanceTemplate()->SetInternalFieldCount(
29         StreamBase::kInternalFieldCount);
30     tmpl->SetClassName(state.logstream_string());
31     StreamBase::AddMethods(env, tmpl);
32     state.set_logstream_constructor_template(tmpl);
33   }
34   return tmpl;
35 }
36 
Create(Environment * env)37 BaseObjectPtr<LogStream> LogStream::Create(Environment* env) {
38   v8::Local<v8::Object> obj;
39   if (!GetConstructorTemplate(env)
40            ->InstanceTemplate()
41            ->NewInstance(env->context())
42            .ToLocal(&obj)) {
43     return BaseObjectPtr<LogStream>();
44   }
45   return MakeDetachedBaseObject<LogStream>(env, obj);
46 }
47 
LogStream(Environment * env,Local<Object> obj)48 LogStream::LogStream(Environment* env, Local<Object> obj)
49     : AsyncWrap(env, obj, AsyncWrap::PROVIDER_QUIC_LOGSTREAM), StreamBase(env) {
50   MakeWeak();
51   StreamBase::AttachToObject(GetObject());
52 }
53 
Emit(const uint8_t * data,size_t len,EmitOption option)54 void LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) {
55   if (fin_seen_) return;
56   fin_seen_ = option == EmitOption::FIN;
57 
58   size_t remaining = len;
59   // If the len is greater than the size of the buffer returned by
60   // EmitAlloc then EmitRead will be called multiple times.
61   while (remaining != 0) {
62     uv_buf_t buf = EmitAlloc(len);
63     size_t len = std::min<size_t>(remaining, buf.len);
64     memcpy(buf.base, data, len);
65     remaining -= len;
66     data += len;
67     // If we are actively reading from the stream, we'll call emit
68     // read immediately. Otherwise we buffer the chunk and will push
69     // the chunks out the next time ReadStart() is called.
70     if (reading_) {
71       EmitRead(len, buf);
72     } else {
73       // The total measures the total memory used so we always
74       // increment but buf.len and not chunk len.
75       ensure_space(buf.len);
76       total_ += buf.len;
77       buffer_.push_back(Chunk{len, buf});
78     }
79   }
80 
81   if (ended_ && reading_) {
82     EmitRead(UV_EOF);
83   }
84 }
85 
Emit(const std::string_view line,EmitOption option)86 void LogStream::Emit(const std::string_view line, EmitOption option) {
87   Emit(reinterpret_cast<const uint8_t*>(line.data()), line.length(), option);
88 }
89 
End()90 void LogStream::End() {
91   ended_ = true;
92 }
93 
ReadStart()94 int LogStream::ReadStart() {
95   if (reading_) return 0;
96   // Flush any chunks that have already been buffered.
97   for (const auto& chunk : buffer_) EmitRead(chunk.len, chunk.buf);
98   total_ = 0;
99   buffer_.clear();
100   if (fin_seen_) {
101     // If we've already received the fin, there's nothing else to wait for.
102     EmitRead(UV_EOF);
103     return ReadStop();
104   }
105   // Otherwise, we're going to wait for more chunks to be written.
106   reading_ = true;
107   return 0;
108 }
109 
ReadStop()110 int LogStream::ReadStop() {
111   reading_ = false;
112   return 0;
113 }
114 
115 // We do not use either of these.
DoShutdown(ShutdownWrap * req_wrap)116 int LogStream::DoShutdown(ShutdownWrap* req_wrap) {
117   UNREACHABLE();
118 }
DoWrite(WriteWrap * w,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)119 int LogStream::DoWrite(WriteWrap* w,
120                        uv_buf_t* bufs,
121                        size_t count,
122                        uv_stream_t* send_handle) {
123   UNREACHABLE();
124 }
125 
IsAlive()126 bool LogStream::IsAlive() {
127   return !ended_;
128 }
129 
IsClosing()130 bool LogStream::IsClosing() {
131   return ended_;
132 }
133 
GetAsyncWrap()134 AsyncWrap* LogStream::GetAsyncWrap() {
135   return this;
136 }
137 
MemoryInfo(MemoryTracker * tracker) const138 void LogStream::MemoryInfo(MemoryTracker* tracker) const {
139   tracker->TrackFieldWithSize("buffer", total_);
140 }
141 
142 // The LogStream buffer enforces a maximum size of kMaxLogStreamBuffer.
ensure_space(size_t amt)143 void LogStream::ensure_space(size_t amt) {
144   while (total_ + amt > kMaxLogStreamBuffer) {
145     total_ -= buffer_.front().buf.len;
146     buffer_.pop_front();
147   }
148 }
149 }  // namespace quic
150 }  // namespace node
151 
152 #endif  // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
153