1 #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
2
3 #include "logstream.h"
4 #include <async_wrap-inl.h>
5 #include <base_object-inl.h>
6 #include <env-inl.h>
7 #include <memory_tracker-inl.h>
8 #include <node_external_reference.h>
9 #include <stream_base-inl.h>
10 #include <uv.h>
11 #include <v8.h>
12 #include "bindingdata.h"
13
14 namespace node {
15
16 using v8::FunctionTemplate;
17 using v8::Local;
18 using v8::Object;
19
20 namespace quic {
21
GetConstructorTemplate(Environment * env)22 Local<FunctionTemplate> LogStream::GetConstructorTemplate(Environment* env) {
23 auto& state = BindingData::Get(env);
24 auto tmpl = state.logstream_constructor_template();
25 if (tmpl.IsEmpty()) {
26 tmpl = FunctionTemplate::New(env->isolate());
27 tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
28 tmpl->InstanceTemplate()->SetInternalFieldCount(
29 StreamBase::kInternalFieldCount);
30 tmpl->SetClassName(state.logstream_string());
31 StreamBase::AddMethods(env, tmpl);
32 state.set_logstream_constructor_template(tmpl);
33 }
34 return tmpl;
35 }
36
Create(Environment * env)37 BaseObjectPtr<LogStream> LogStream::Create(Environment* env) {
38 v8::Local<v8::Object> obj;
39 if (!GetConstructorTemplate(env)
40 ->InstanceTemplate()
41 ->NewInstance(env->context())
42 .ToLocal(&obj)) {
43 return BaseObjectPtr<LogStream>();
44 }
45 return MakeDetachedBaseObject<LogStream>(env, obj);
46 }
47
LogStream(Environment * env,Local<Object> obj)48 LogStream::LogStream(Environment* env, Local<Object> obj)
49 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_QUIC_LOGSTREAM), StreamBase(env) {
50 MakeWeak();
51 StreamBase::AttachToObject(GetObject());
52 }
53
Emit(const uint8_t * data,size_t len,EmitOption option)54 void LogStream::Emit(const uint8_t* data, size_t len, EmitOption option) {
55 if (fin_seen_) return;
56 fin_seen_ = option == EmitOption::FIN;
57
58 size_t remaining = len;
59 // If the len is greater than the size of the buffer returned by
60 // EmitAlloc then EmitRead will be called multiple times.
61 while (remaining != 0) {
62 uv_buf_t buf = EmitAlloc(len);
63 size_t len = std::min<size_t>(remaining, buf.len);
64 memcpy(buf.base, data, len);
65 remaining -= len;
66 data += len;
67 // If we are actively reading from the stream, we'll call emit
68 // read immediately. Otherwise we buffer the chunk and will push
69 // the chunks out the next time ReadStart() is called.
70 if (reading_) {
71 EmitRead(len, buf);
72 } else {
73 // The total measures the total memory used so we always
74 // increment but buf.len and not chunk len.
75 ensure_space(buf.len);
76 total_ += buf.len;
77 buffer_.push_back(Chunk{len, buf});
78 }
79 }
80
81 if (ended_ && reading_) {
82 EmitRead(UV_EOF);
83 }
84 }
85
Emit(const std::string_view line,EmitOption option)86 void LogStream::Emit(const std::string_view line, EmitOption option) {
87 Emit(reinterpret_cast<const uint8_t*>(line.data()), line.length(), option);
88 }
89
End()90 void LogStream::End() {
91 ended_ = true;
92 }
93
ReadStart()94 int LogStream::ReadStart() {
95 if (reading_) return 0;
96 // Flush any chunks that have already been buffered.
97 for (const auto& chunk : buffer_) EmitRead(chunk.len, chunk.buf);
98 total_ = 0;
99 buffer_.clear();
100 if (fin_seen_) {
101 // If we've already received the fin, there's nothing else to wait for.
102 EmitRead(UV_EOF);
103 return ReadStop();
104 }
105 // Otherwise, we're going to wait for more chunks to be written.
106 reading_ = true;
107 return 0;
108 }
109
ReadStop()110 int LogStream::ReadStop() {
111 reading_ = false;
112 return 0;
113 }
114
115 // We do not use either of these.
DoShutdown(ShutdownWrap * req_wrap)116 int LogStream::DoShutdown(ShutdownWrap* req_wrap) {
117 UNREACHABLE();
118 }
DoWrite(WriteWrap * w,uv_buf_t * bufs,size_t count,uv_stream_t * send_handle)119 int LogStream::DoWrite(WriteWrap* w,
120 uv_buf_t* bufs,
121 size_t count,
122 uv_stream_t* send_handle) {
123 UNREACHABLE();
124 }
125
IsAlive()126 bool LogStream::IsAlive() {
127 return !ended_;
128 }
129
IsClosing()130 bool LogStream::IsClosing() {
131 return ended_;
132 }
133
GetAsyncWrap()134 AsyncWrap* LogStream::GetAsyncWrap() {
135 return this;
136 }
137
MemoryInfo(MemoryTracker * tracker) const138 void LogStream::MemoryInfo(MemoryTracker* tracker) const {
139 tracker->TrackFieldWithSize("buffer", total_);
140 }
141
142 // The LogStream buffer enforces a maximum size of kMaxLogStreamBuffer.
ensure_space(size_t amt)143 void LogStream::ensure_space(size_t amt) {
144 while (total_ + amt > kMaxLogStreamBuffer) {
145 total_ -= buffer_.front().buf.len;
146 buffer_.pop_front();
147 }
148 }
149 } // namespace quic
150 } // namespace node
151
152 #endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
153