• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "tracing/node_trace_buffer.h"
2 
3 #include <memory>
4 #include "util-inl.h"
5 
6 namespace node {
7 namespace tracing {
8 
InternalTraceBuffer(size_t max_chunks,uint32_t id,Agent * agent)9 InternalTraceBuffer::InternalTraceBuffer(size_t max_chunks, uint32_t id,
10                                          Agent* agent)
11     : flushing_(false), max_chunks_(max_chunks),
12       agent_(agent), id_(id) {
13   chunks_.resize(max_chunks);
14 }
15 
AddTraceEvent(uint64_t * handle)16 TraceObject* InternalTraceBuffer::AddTraceEvent(uint64_t* handle) {
17   Mutex::ScopedLock scoped_lock(mutex_);
18   // Create new chunk if last chunk is full or there is no chunk.
19   if (total_chunks_ == 0 || chunks_[total_chunks_ - 1]->IsFull()) {
20     auto& chunk = chunks_[total_chunks_++];
21     if (chunk) {
22       chunk->Reset(current_chunk_seq_++);
23     } else {
24       chunk = std::make_unique<TraceBufferChunk>(current_chunk_seq_++);
25     }
26   }
27   auto& chunk = chunks_[total_chunks_ - 1];
28   size_t event_index;
29   TraceObject* trace_object = chunk->AddTraceEvent(&event_index);
30   *handle = MakeHandle(total_chunks_ - 1, chunk->seq(), event_index);
31   return trace_object;
32 }
33 
GetEventByHandle(uint64_t handle)34 TraceObject* InternalTraceBuffer::GetEventByHandle(uint64_t handle) {
35   Mutex::ScopedLock scoped_lock(mutex_);
36   if (handle == 0) {
37     // A handle value of zero never has a trace event associated with it.
38     return nullptr;
39   }
40   size_t chunk_index, event_index;
41   uint32_t buffer_id, chunk_seq;
42   ExtractHandle(handle, &buffer_id, &chunk_index, &chunk_seq, &event_index);
43   if (buffer_id != id_ || chunk_index >= total_chunks_) {
44     // Either the chunk belongs to the other buffer, or is outside the current
45     // range of chunks loaded in memory (the latter being true suggests that
46     // the chunk has already been flushed and is no longer in memory.)
47     return nullptr;
48   }
49   auto& chunk = chunks_[chunk_index];
50   if (chunk->seq() != chunk_seq) {
51     // Chunk is no longer in memory.
52     return nullptr;
53   }
54   return chunk->GetEventAt(event_index);
55 }
56 
Flush(bool blocking)57 void InternalTraceBuffer::Flush(bool blocking) {
58   {
59     Mutex::ScopedLock scoped_lock(mutex_);
60     if (total_chunks_ > 0) {
61       flushing_ = true;
62       for (size_t i = 0; i < total_chunks_; ++i) {
63         auto& chunk = chunks_[i];
64         for (size_t j = 0; j < chunk->size(); ++j) {
65           TraceObject* trace_event = chunk->GetEventAt(j);
66           // Another thread may have added a trace that is yet to be
67           // initialized. Skip such traces.
68           // https://github.com/nodejs/node/issues/21038.
69           if (trace_event->name()) {
70             agent_->AppendTraceEvent(trace_event);
71           }
72         }
73       }
74       total_chunks_ = 0;
75       flushing_ = false;
76     }
77   }
78   agent_->Flush(blocking);
79 }
80 
MakeHandle(size_t chunk_index,uint32_t chunk_seq,size_t event_index) const81 uint64_t InternalTraceBuffer::MakeHandle(
82     size_t chunk_index, uint32_t chunk_seq, size_t event_index) const {
83   return ((static_cast<uint64_t>(chunk_seq) * Capacity() +
84           chunk_index * TraceBufferChunk::kChunkSize + event_index) << 1) + id_;
85 }
86 
ExtractHandle(uint64_t handle,uint32_t * buffer_id,size_t * chunk_index,uint32_t * chunk_seq,size_t * event_index) const87 void InternalTraceBuffer::ExtractHandle(
88     uint64_t handle, uint32_t* buffer_id, size_t* chunk_index,
89     uint32_t* chunk_seq, size_t* event_index) const {
90   *buffer_id = static_cast<uint32_t>(handle & 0x1);
91   handle >>= 1;
92   *chunk_seq = static_cast<uint32_t>(handle / Capacity());
93   size_t indices = handle % Capacity();
94   *chunk_index = indices / TraceBufferChunk::kChunkSize;
95   *event_index = indices % TraceBufferChunk::kChunkSize;
96 }
97 
NodeTraceBuffer(size_t max_chunks,Agent * agent,uv_loop_t * tracing_loop)98 NodeTraceBuffer::NodeTraceBuffer(size_t max_chunks,
99     Agent* agent, uv_loop_t* tracing_loop)
100     : tracing_loop_(tracing_loop),
101       buffer1_(max_chunks, 0, agent),
102       buffer2_(max_chunks, 1, agent) {
103   current_buf_.store(&buffer1_);
104 
105   flush_signal_.data = this;
106   int err = uv_async_init(tracing_loop_, &flush_signal_,
107                           NonBlockingFlushSignalCb);
108   CHECK_EQ(err, 0);
109 
110   exit_signal_.data = this;
111   err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
112   CHECK_EQ(err, 0);
113 }
114 
~NodeTraceBuffer()115 NodeTraceBuffer::~NodeTraceBuffer() {
116   uv_async_send(&exit_signal_);
117   Mutex::ScopedLock scoped_lock(exit_mutex_);
118   while (!exited_) {
119     exit_cond_.Wait(scoped_lock);
120   }
121 }
122 
AddTraceEvent(uint64_t * handle)123 TraceObject* NodeTraceBuffer::AddTraceEvent(uint64_t* handle) {
124   // If the buffer is full, attempt to perform a flush.
125   if (!TryLoadAvailableBuffer()) {
126     // Assign a value of zero as the trace event handle.
127     // This is equivalent to calling InternalTraceBuffer::MakeHandle(0, 0, 0),
128     // and will cause GetEventByHandle to return NULL if passed as an argument.
129     *handle = 0;
130     return nullptr;
131   }
132   return current_buf_.load()->AddTraceEvent(handle);
133 }
134 
GetEventByHandle(uint64_t handle)135 TraceObject* NodeTraceBuffer::GetEventByHandle(uint64_t handle) {
136   return current_buf_.load()->GetEventByHandle(handle);
137 }
138 
Flush()139 bool NodeTraceBuffer::Flush() {
140   buffer1_.Flush(true);
141   buffer2_.Flush(true);
142   return true;
143 }
144 
145 // Attempts to set current_buf_ such that it references a buffer that can
146 // write at least one trace event. If both buffers are unavailable this
147 // method returns false; otherwise it returns true.
TryLoadAvailableBuffer()148 bool NodeTraceBuffer::TryLoadAvailableBuffer() {
149   InternalTraceBuffer* prev_buf = current_buf_.load();
150   if (prev_buf->IsFull()) {
151     uv_async_send(&flush_signal_);  // trigger flush on a separate thread
152     InternalTraceBuffer* other_buf = prev_buf == &buffer1_ ?
153       &buffer2_ : &buffer1_;
154     if (!other_buf->IsFull()) {
155       current_buf_.store(other_buf);
156     } else {
157       return false;
158     }
159   }
160   return true;
161 }
162 
163 // static
NonBlockingFlushSignalCb(uv_async_t * signal)164 void NodeTraceBuffer::NonBlockingFlushSignalCb(uv_async_t* signal) {
165   NodeTraceBuffer* buffer = static_cast<NodeTraceBuffer*>(signal->data);
166   if (buffer->buffer1_.IsFull() && !buffer->buffer1_.IsFlushing()) {
167     buffer->buffer1_.Flush(false);
168   }
169   if (buffer->buffer2_.IsFull() && !buffer->buffer2_.IsFlushing()) {
170     buffer->buffer2_.Flush(false);
171   }
172 }
173 
174 // static
ExitSignalCb(uv_async_t * signal)175 void NodeTraceBuffer::ExitSignalCb(uv_async_t* signal) {
176   NodeTraceBuffer* buffer =
177       ContainerOf(&NodeTraceBuffer::exit_signal_, signal);
178 
179   // Close both flush_signal_ and exit_signal_.
180   uv_close(reinterpret_cast<uv_handle_t*>(&buffer->flush_signal_),
181            [](uv_handle_t* signal) {
182     NodeTraceBuffer* buffer =
183         ContainerOf(&NodeTraceBuffer::flush_signal_,
184                     reinterpret_cast<uv_async_t*>(signal));
185 
186     uv_close(reinterpret_cast<uv_handle_t*>(&buffer->exit_signal_),
187              [](uv_handle_t* signal) {
188       NodeTraceBuffer* buffer =
189           ContainerOf(&NodeTraceBuffer::exit_signal_,
190                       reinterpret_cast<uv_async_t*>(signal));
191         Mutex::ScopedLock scoped_lock(buffer->exit_mutex_);
192         buffer->exited_ = true;
193         buffer->exit_cond_.Signal(scoped_lock);
194     });
195   });
196 }
197 
198 }  // namespace tracing
199 }  // namespace node
200