1 #include "tracing/node_trace_buffer.h"
2
3 #include <memory>
4 #include "util-inl.h"
5
6 namespace node {
7 namespace tracing {
8
InternalTraceBuffer(size_t max_chunks,uint32_t id,Agent * agent)9 InternalTraceBuffer::InternalTraceBuffer(size_t max_chunks, uint32_t id,
10 Agent* agent)
11 : flushing_(false), max_chunks_(max_chunks),
12 agent_(agent), id_(id) {
13 chunks_.resize(max_chunks);
14 }
15
AddTraceEvent(uint64_t * handle)16 TraceObject* InternalTraceBuffer::AddTraceEvent(uint64_t* handle) {
17 Mutex::ScopedLock scoped_lock(mutex_);
18 // Create new chunk if last chunk is full or there is no chunk.
19 if (total_chunks_ == 0 || chunks_[total_chunks_ - 1]->IsFull()) {
20 auto& chunk = chunks_[total_chunks_++];
21 if (chunk) {
22 chunk->Reset(current_chunk_seq_++);
23 } else {
24 chunk = std::make_unique<TraceBufferChunk>(current_chunk_seq_++);
25 }
26 }
27 auto& chunk = chunks_[total_chunks_ - 1];
28 size_t event_index;
29 TraceObject* trace_object = chunk->AddTraceEvent(&event_index);
30 *handle = MakeHandle(total_chunks_ - 1, chunk->seq(), event_index);
31 return trace_object;
32 }
33
GetEventByHandle(uint64_t handle)34 TraceObject* InternalTraceBuffer::GetEventByHandle(uint64_t handle) {
35 Mutex::ScopedLock scoped_lock(mutex_);
36 if (handle == 0) {
37 // A handle value of zero never has a trace event associated with it.
38 return nullptr;
39 }
40 size_t chunk_index, event_index;
41 uint32_t buffer_id, chunk_seq;
42 ExtractHandle(handle, &buffer_id, &chunk_index, &chunk_seq, &event_index);
43 if (buffer_id != id_ || chunk_index >= total_chunks_) {
44 // Either the chunk belongs to the other buffer, or is outside the current
45 // range of chunks loaded in memory (the latter being true suggests that
46 // the chunk has already been flushed and is no longer in memory.)
47 return nullptr;
48 }
49 auto& chunk = chunks_[chunk_index];
50 if (chunk->seq() != chunk_seq) {
51 // Chunk is no longer in memory.
52 return nullptr;
53 }
54 return chunk->GetEventAt(event_index);
55 }
56
Flush(bool blocking)57 void InternalTraceBuffer::Flush(bool blocking) {
58 {
59 Mutex::ScopedLock scoped_lock(mutex_);
60 if (total_chunks_ > 0) {
61 flushing_ = true;
62 for (size_t i = 0; i < total_chunks_; ++i) {
63 auto& chunk = chunks_[i];
64 for (size_t j = 0; j < chunk->size(); ++j) {
65 TraceObject* trace_event = chunk->GetEventAt(j);
66 // Another thread may have added a trace that is yet to be
67 // initialized. Skip such traces.
68 // https://github.com/nodejs/node/issues/21038.
69 if (trace_event->name()) {
70 agent_->AppendTraceEvent(trace_event);
71 }
72 }
73 }
74 total_chunks_ = 0;
75 flushing_ = false;
76 }
77 }
78 agent_->Flush(blocking);
79 }
80
MakeHandle(size_t chunk_index,uint32_t chunk_seq,size_t event_index) const81 uint64_t InternalTraceBuffer::MakeHandle(
82 size_t chunk_index, uint32_t chunk_seq, size_t event_index) const {
83 return ((static_cast<uint64_t>(chunk_seq) * Capacity() +
84 chunk_index * TraceBufferChunk::kChunkSize + event_index) << 1) + id_;
85 }
86
ExtractHandle(uint64_t handle,uint32_t * buffer_id,size_t * chunk_index,uint32_t * chunk_seq,size_t * event_index) const87 void InternalTraceBuffer::ExtractHandle(
88 uint64_t handle, uint32_t* buffer_id, size_t* chunk_index,
89 uint32_t* chunk_seq, size_t* event_index) const {
90 *buffer_id = static_cast<uint32_t>(handle & 0x1);
91 handle >>= 1;
92 *chunk_seq = static_cast<uint32_t>(handle / Capacity());
93 size_t indices = handle % Capacity();
94 *chunk_index = indices / TraceBufferChunk::kChunkSize;
95 *event_index = indices % TraceBufferChunk::kChunkSize;
96 }
97
NodeTraceBuffer(size_t max_chunks,Agent * agent,uv_loop_t * tracing_loop)98 NodeTraceBuffer::NodeTraceBuffer(size_t max_chunks,
99 Agent* agent, uv_loop_t* tracing_loop)
100 : tracing_loop_(tracing_loop),
101 buffer1_(max_chunks, 0, agent),
102 buffer2_(max_chunks, 1, agent) {
103 current_buf_.store(&buffer1_);
104
105 flush_signal_.data = this;
106 int err = uv_async_init(tracing_loop_, &flush_signal_,
107 NonBlockingFlushSignalCb);
108 CHECK_EQ(err, 0);
109
110 exit_signal_.data = this;
111 err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
112 CHECK_EQ(err, 0);
113 }
114
~NodeTraceBuffer()115 NodeTraceBuffer::~NodeTraceBuffer() {
116 uv_async_send(&exit_signal_);
117 Mutex::ScopedLock scoped_lock(exit_mutex_);
118 while (!exited_) {
119 exit_cond_.Wait(scoped_lock);
120 }
121 }
122
AddTraceEvent(uint64_t * handle)123 TraceObject* NodeTraceBuffer::AddTraceEvent(uint64_t* handle) {
124 // If the buffer is full, attempt to perform a flush.
125 if (!TryLoadAvailableBuffer()) {
126 // Assign a value of zero as the trace event handle.
127 // This is equivalent to calling InternalTraceBuffer::MakeHandle(0, 0, 0),
128 // and will cause GetEventByHandle to return NULL if passed as an argument.
129 *handle = 0;
130 return nullptr;
131 }
132 return current_buf_.load()->AddTraceEvent(handle);
133 }
134
GetEventByHandle(uint64_t handle)135 TraceObject* NodeTraceBuffer::GetEventByHandle(uint64_t handle) {
136 return current_buf_.load()->GetEventByHandle(handle);
137 }
138
Flush()139 bool NodeTraceBuffer::Flush() {
140 buffer1_.Flush(true);
141 buffer2_.Flush(true);
142 return true;
143 }
144
145 // Attempts to set current_buf_ such that it references a buffer that can
146 // write at least one trace event. If both buffers are unavailable this
147 // method returns false; otherwise it returns true.
TryLoadAvailableBuffer()148 bool NodeTraceBuffer::TryLoadAvailableBuffer() {
149 InternalTraceBuffer* prev_buf = current_buf_.load();
150 if (prev_buf->IsFull()) {
151 uv_async_send(&flush_signal_); // trigger flush on a separate thread
152 InternalTraceBuffer* other_buf = prev_buf == &buffer1_ ?
153 &buffer2_ : &buffer1_;
154 if (!other_buf->IsFull()) {
155 current_buf_.store(other_buf);
156 } else {
157 return false;
158 }
159 }
160 return true;
161 }
162
163 // static
NonBlockingFlushSignalCb(uv_async_t * signal)164 void NodeTraceBuffer::NonBlockingFlushSignalCb(uv_async_t* signal) {
165 NodeTraceBuffer* buffer = static_cast<NodeTraceBuffer*>(signal->data);
166 if (buffer->buffer1_.IsFull() && !buffer->buffer1_.IsFlushing()) {
167 buffer->buffer1_.Flush(false);
168 }
169 if (buffer->buffer2_.IsFull() && !buffer->buffer2_.IsFlushing()) {
170 buffer->buffer2_.Flush(false);
171 }
172 }
173
174 // static
ExitSignalCb(uv_async_t * signal)175 void NodeTraceBuffer::ExitSignalCb(uv_async_t* signal) {
176 NodeTraceBuffer* buffer =
177 ContainerOf(&NodeTraceBuffer::exit_signal_, signal);
178
179 // Close both flush_signal_ and exit_signal_.
180 uv_close(reinterpret_cast<uv_handle_t*>(&buffer->flush_signal_),
181 [](uv_handle_t* signal) {
182 NodeTraceBuffer* buffer =
183 ContainerOf(&NodeTraceBuffer::flush_signal_,
184 reinterpret_cast<uv_async_t*>(signal));
185
186 uv_close(reinterpret_cast<uv_handle_t*>(&buffer->exit_signal_),
187 [](uv_handle_t* signal) {
188 NodeTraceBuffer* buffer =
189 ContainerOf(&NodeTraceBuffer::exit_signal_,
190 reinterpret_cast<uv_async_t*>(signal));
191 Mutex::ScopedLock scoped_lock(buffer->exit_mutex_);
192 buffer->exited_ = true;
193 buffer->exit_cond_.Signal(scoped_lock);
194 });
195 });
196 }
197
198 } // namespace tracing
199 } // namespace node
200