• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "tracing/node_trace_writer.h"
2 
3 #include "util-inl.h"
4 
5 #include <fcntl.h>
6 #include <cstring>
7 
8 namespace node {
9 namespace tracing {
10 
NodeTraceWriter(const std::string & log_file_pattern)11 NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
12     : log_file_pattern_(log_file_pattern) {}
13 
InitializeOnThread(uv_loop_t * loop)14 void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
15   CHECK_NULL(tracing_loop_);
16   tracing_loop_ = loop;
17 
18   flush_signal_.data = this;
19   int err = uv_async_init(tracing_loop_, &flush_signal_,
20                           [](uv_async_t* signal) {
21     NodeTraceWriter* trace_writer =
22         ContainerOf(&NodeTraceWriter::flush_signal_, signal);
23     trace_writer->FlushPrivate();
24   });
25   CHECK_EQ(err, 0);
26 
27   exit_signal_.data = this;
28   err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
29   CHECK_EQ(err, 0);
30 }
31 
WriteSuffix()32 void NodeTraceWriter::WriteSuffix() {
33   // If our final log file has traces, then end the file appropriately.
34   // This means that if no trace events are recorded, then no trace file is
35   // produced.
36   bool should_flush = false;
37   {
38     Mutex::ScopedLock scoped_lock(stream_mutex_);
39     if (total_traces_ > 0) {
40       total_traces_ = kTracesPerFile;  // Act as if we reached the file limit.
41       should_flush = true;
42     }
43   }
44   if (should_flush) {
45     Flush(true);
46   }
47 }
48 
~NodeTraceWriter()49 NodeTraceWriter::~NodeTraceWriter() {
50   WriteSuffix();
51   uv_fs_t req;
52   if (fd_ != -1) {
53     CHECK_EQ(0, uv_fs_close(nullptr, &req, fd_, nullptr));
54     uv_fs_req_cleanup(&req);
55   }
56   uv_async_send(&exit_signal_);
57   Mutex::ScopedLock scoped_lock(request_mutex_);
58   while (!exited_) {
59     exit_cond_.Wait(scoped_lock);
60   }
61 }
62 
replace_substring(std::string * target,const std::string & search,const std::string & insert)63 void replace_substring(std::string* target,
64                        const std::string& search,
65                        const std::string& insert) {
66   size_t pos = target->find(search);
67   for (; pos != std::string::npos; pos = target->find(search, pos)) {
68     target->replace(pos, search.size(), insert);
69     pos += insert.size();
70   }
71 }
72 
OpenNewFileForStreaming()73 void NodeTraceWriter::OpenNewFileForStreaming() {
74   ++file_num_;
75   uv_fs_t req;
76 
77   // Evaluate a JS-style template string, it accepts the values ${pid} and
78   // ${rotation}
79   std::string filepath(log_file_pattern_);
80   replace_substring(&filepath, "${pid}", std::to_string(uv_os_getpid()));
81   replace_substring(&filepath, "${rotation}", std::to_string(file_num_));
82 
83   if (fd_ != -1) {
84     CHECK_EQ(uv_fs_close(nullptr, &req, fd_, nullptr), 0);
85     uv_fs_req_cleanup(&req);
86   }
87 
88   fd_ = uv_fs_open(nullptr, &req, filepath.c_str(),
89       O_CREAT | O_WRONLY | O_TRUNC, 0644, nullptr);
90   uv_fs_req_cleanup(&req);
91   if (fd_ < 0) {
92     fprintf(stderr, "Could not open trace file %s: %s\n",
93                     filepath.c_str(),
94                     uv_strerror(fd_));
95     fd_ = -1;
96   }
97 }
98 
AppendTraceEvent(TraceObject * trace_event)99 void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) {
100   Mutex::ScopedLock scoped_lock(stream_mutex_);
101   // If this is the first trace event, open a new file for streaming.
102   if (total_traces_ == 0) {
103     OpenNewFileForStreaming();
104     // Constructing a new JSONTraceWriter object appends "{\"traceEvents\":["
105     // to stream_.
106     // In other words, the constructor initializes the serialization stream
107     // to a state where we can start writing trace events to it.
108     // Repeatedly constructing and destroying json_trace_writer_ allows
109     // us to use V8's JSON writer instead of implementing our own.
110     json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_));
111   }
112   ++total_traces_;
113   json_trace_writer_->AppendTraceEvent(trace_event);
114 }
115 
FlushPrivate()116 void NodeTraceWriter::FlushPrivate() {
117   std::string str;
118   int highest_request_id;
119   {
120     Mutex::ScopedLock stream_scoped_lock(stream_mutex_);
121     if (total_traces_ >= kTracesPerFile) {
122       total_traces_ = 0;
123       // Destroying the member JSONTraceWriter object appends "]}" to
124       // stream_ - in other words, ending a JSON file.
125       json_trace_writer_.reset();
126     }
127     // str() makes a copy of the contents of the stream.
128     str = stream_.str();
129     stream_.str("");
130     stream_.clear();
131   }
132   {
133     Mutex::ScopedLock request_scoped_lock(request_mutex_);
134     highest_request_id = num_write_requests_;
135   }
136   WriteToFile(std::move(str), highest_request_id);
137 }
138 
Flush(bool blocking)139 void NodeTraceWriter::Flush(bool blocking) {
140   Mutex::ScopedLock scoped_lock(request_mutex_);
141   {
142     // We need to lock the mutexes here in a nested fashion; stream_mutex_
143     // protects json_trace_writer_, and without request_mutex_ there might be
144     // a time window in which the stream state changes?
145     Mutex::ScopedLock stream_mutex_lock(stream_mutex_);
146     if (!json_trace_writer_)
147       return;
148   }
149   int request_id = ++num_write_requests_;
150   int err = uv_async_send(&flush_signal_);
151   CHECK_EQ(err, 0);
152   if (blocking) {
153     // Wait until data associated with this request id has been written to disk.
154     // This guarantees that data from all earlier requests have also been
155     // written.
156     while (request_id > highest_request_id_completed_) {
157       request_cond_.Wait(scoped_lock);
158     }
159   }
160 }
161 
WriteToFile(std::string && str,int highest_request_id)162 void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
163   if (fd_ == -1) return;
164 
165   uv_buf_t buf = uv_buf_init(nullptr, 0);
166   {
167     Mutex::ScopedLock lock(request_mutex_);
168     write_req_queue_.emplace(WriteRequest {
169       std::move(str), highest_request_id
170     });
171     if (write_req_queue_.size() == 1) {
172       buf = uv_buf_init(
173           const_cast<char*>(write_req_queue_.front().str.c_str()),
174           write_req_queue_.front().str.length());
175     }
176   }
177   // Only one write request for the same file descriptor should be active at
178   // a time.
179   if (buf.base != nullptr && fd_ != -1) {
180     StartWrite(buf);
181   }
182 }
183 
StartWrite(uv_buf_t buf)184 void NodeTraceWriter::StartWrite(uv_buf_t buf) {
185   int err = uv_fs_write(
186       tracing_loop_, &write_req_, fd_, &buf, 1, -1,
187       [](uv_fs_t* req) {
188         NodeTraceWriter* writer =
189             ContainerOf(&NodeTraceWriter::write_req_, req);
190         writer->AfterWrite();
191       });
192   CHECK_EQ(err, 0);
193 }
194 
AfterWrite()195 void NodeTraceWriter::AfterWrite() {
196   CHECK_GE(write_req_.result, 0);
197   uv_fs_req_cleanup(&write_req_);
198 
199   uv_buf_t buf = uv_buf_init(nullptr, 0);
200   {
201     Mutex::ScopedLock scoped_lock(request_mutex_);
202     int highest_request_id = write_req_queue_.front().highest_request_id;
203     write_req_queue_.pop();
204     highest_request_id_completed_ = highest_request_id;
205     request_cond_.Broadcast(scoped_lock);
206     if (!write_req_queue_.empty()) {
207       buf = uv_buf_init(
208           const_cast<char*>(write_req_queue_.front().str.c_str()),
209           write_req_queue_.front().str.length());
210     }
211   }
212   if (buf.base != nullptr && fd_ != -1) {
213     StartWrite(buf);
214   }
215 }
216 
217 // static
ExitSignalCb(uv_async_t * signal)218 void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
219   NodeTraceWriter* trace_writer =
220       ContainerOf(&NodeTraceWriter::exit_signal_, signal);
221   // Close both flush_signal_ and exit_signal_.
222   uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
223            [](uv_handle_t* signal) {
224              NodeTraceWriter* trace_writer =
225                  ContainerOf(&NodeTraceWriter::flush_signal_,
226                              reinterpret_cast<uv_async_t*>(signal));
227              uv_close(
228                  reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
229                  [](uv_handle_t* signal) {
230                    NodeTraceWriter* trace_writer =
231                        ContainerOf(&NodeTraceWriter::exit_signal_,
232                                    reinterpret_cast<uv_async_t*>(signal));
233                    Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
234                    trace_writer->exited_ = true;
235                    trace_writer->exit_cond_.Signal(scoped_lock);
236                  });
237            });
238 }
239 }  // namespace tracing
240 }  // namespace node
241