1 #include "tracing/node_trace_writer.h"
2
3 #include "util-inl.h"
4
5 #include <fcntl.h>
6 #include <cstring>
7
8 namespace node {
9 namespace tracing {
10
NodeTraceWriter(const std::string & log_file_pattern)11 NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
12 : log_file_pattern_(log_file_pattern) {}
13
InitializeOnThread(uv_loop_t * loop)14 void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
15 CHECK_NULL(tracing_loop_);
16 tracing_loop_ = loop;
17
18 flush_signal_.data = this;
19 int err = uv_async_init(tracing_loop_, &flush_signal_,
20 [](uv_async_t* signal) {
21 NodeTraceWriter* trace_writer =
22 ContainerOf(&NodeTraceWriter::flush_signal_, signal);
23 trace_writer->FlushPrivate();
24 });
25 CHECK_EQ(err, 0);
26
27 exit_signal_.data = this;
28 err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
29 CHECK_EQ(err, 0);
30 }
31
WriteSuffix()32 void NodeTraceWriter::WriteSuffix() {
33 // If our final log file has traces, then end the file appropriately.
34 // This means that if no trace events are recorded, then no trace file is
35 // produced.
36 bool should_flush = false;
37 {
38 Mutex::ScopedLock scoped_lock(stream_mutex_);
39 if (total_traces_ > 0) {
40 total_traces_ = kTracesPerFile; // Act as if we reached the file limit.
41 should_flush = true;
42 }
43 }
44 if (should_flush) {
45 Flush(true);
46 }
47 }
48
~NodeTraceWriter()49 NodeTraceWriter::~NodeTraceWriter() {
50 WriteSuffix();
51 uv_fs_t req;
52 if (fd_ != -1) {
53 CHECK_EQ(0, uv_fs_close(nullptr, &req, fd_, nullptr));
54 uv_fs_req_cleanup(&req);
55 }
56 uv_async_send(&exit_signal_);
57 Mutex::ScopedLock scoped_lock(request_mutex_);
58 while (!exited_) {
59 exit_cond_.Wait(scoped_lock);
60 }
61 }
62
replace_substring(std::string * target,const std::string & search,const std::string & insert)63 void replace_substring(std::string* target,
64 const std::string& search,
65 const std::string& insert) {
66 size_t pos = target->find(search);
67 for (; pos != std::string::npos; pos = target->find(search, pos)) {
68 target->replace(pos, search.size(), insert);
69 pos += insert.size();
70 }
71 }
72
OpenNewFileForStreaming()73 void NodeTraceWriter::OpenNewFileForStreaming() {
74 ++file_num_;
75 uv_fs_t req;
76
77 // Evaluate a JS-style template string, it accepts the values ${pid} and
78 // ${rotation}
79 std::string filepath(log_file_pattern_);
80 replace_substring(&filepath, "${pid}", std::to_string(uv_os_getpid()));
81 replace_substring(&filepath, "${rotation}", std::to_string(file_num_));
82
83 if (fd_ != -1) {
84 CHECK_EQ(uv_fs_close(nullptr, &req, fd_, nullptr), 0);
85 uv_fs_req_cleanup(&req);
86 }
87
88 fd_ = uv_fs_open(nullptr, &req, filepath.c_str(),
89 O_CREAT | O_WRONLY | O_TRUNC, 0644, nullptr);
90 uv_fs_req_cleanup(&req);
91 if (fd_ < 0) {
92 fprintf(stderr, "Could not open trace file %s: %s\n",
93 filepath.c_str(),
94 uv_strerror(fd_));
95 fd_ = -1;
96 }
97 }
98
AppendTraceEvent(TraceObject * trace_event)99 void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) {
100 Mutex::ScopedLock scoped_lock(stream_mutex_);
101 // If this is the first trace event, open a new file for streaming.
102 if (total_traces_ == 0) {
103 OpenNewFileForStreaming();
104 // Constructing a new JSONTraceWriter object appends "{\"traceEvents\":["
105 // to stream_.
106 // In other words, the constructor initializes the serialization stream
107 // to a state where we can start writing trace events to it.
108 // Repeatedly constructing and destroying json_trace_writer_ allows
109 // us to use V8's JSON writer instead of implementing our own.
110 json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_));
111 }
112 ++total_traces_;
113 json_trace_writer_->AppendTraceEvent(trace_event);
114 }
115
FlushPrivate()116 void NodeTraceWriter::FlushPrivate() {
117 std::string str;
118 int highest_request_id;
119 {
120 Mutex::ScopedLock stream_scoped_lock(stream_mutex_);
121 if (total_traces_ >= kTracesPerFile) {
122 total_traces_ = 0;
123 // Destroying the member JSONTraceWriter object appends "]}" to
124 // stream_ - in other words, ending a JSON file.
125 json_trace_writer_.reset();
126 }
127 // str() makes a copy of the contents of the stream.
128 str = stream_.str();
129 stream_.str("");
130 stream_.clear();
131 }
132 {
133 Mutex::ScopedLock request_scoped_lock(request_mutex_);
134 highest_request_id = num_write_requests_;
135 }
136 WriteToFile(std::move(str), highest_request_id);
137 }
138
Flush(bool blocking)139 void NodeTraceWriter::Flush(bool blocking) {
140 Mutex::ScopedLock scoped_lock(request_mutex_);
141 {
142 // We need to lock the mutexes here in a nested fashion; stream_mutex_
143 // protects json_trace_writer_, and without request_mutex_ there might be
144 // a time window in which the stream state changes?
145 Mutex::ScopedLock stream_mutex_lock(stream_mutex_);
146 if (!json_trace_writer_)
147 return;
148 }
149 int request_id = ++num_write_requests_;
150 int err = uv_async_send(&flush_signal_);
151 CHECK_EQ(err, 0);
152 if (blocking) {
153 // Wait until data associated with this request id has been written to disk.
154 // This guarantees that data from all earlier requests have also been
155 // written.
156 while (request_id > highest_request_id_completed_) {
157 request_cond_.Wait(scoped_lock);
158 }
159 }
160 }
161
WriteToFile(std::string && str,int highest_request_id)162 void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
163 if (fd_ == -1) return;
164
165 uv_buf_t buf = uv_buf_init(nullptr, 0);
166 {
167 Mutex::ScopedLock lock(request_mutex_);
168 write_req_queue_.emplace(WriteRequest {
169 std::move(str), highest_request_id
170 });
171 if (write_req_queue_.size() == 1) {
172 buf = uv_buf_init(
173 const_cast<char*>(write_req_queue_.front().str.c_str()),
174 write_req_queue_.front().str.length());
175 }
176 }
177 // Only one write request for the same file descriptor should be active at
178 // a time.
179 if (buf.base != nullptr && fd_ != -1) {
180 StartWrite(buf);
181 }
182 }
183
StartWrite(uv_buf_t buf)184 void NodeTraceWriter::StartWrite(uv_buf_t buf) {
185 int err = uv_fs_write(
186 tracing_loop_, &write_req_, fd_, &buf, 1, -1,
187 [](uv_fs_t* req) {
188 NodeTraceWriter* writer =
189 ContainerOf(&NodeTraceWriter::write_req_, req);
190 writer->AfterWrite();
191 });
192 CHECK_EQ(err, 0);
193 }
194
AfterWrite()195 void NodeTraceWriter::AfterWrite() {
196 CHECK_GE(write_req_.result, 0);
197 uv_fs_req_cleanup(&write_req_);
198
199 uv_buf_t buf = uv_buf_init(nullptr, 0);
200 {
201 Mutex::ScopedLock scoped_lock(request_mutex_);
202 int highest_request_id = write_req_queue_.front().highest_request_id;
203 write_req_queue_.pop();
204 highest_request_id_completed_ = highest_request_id;
205 request_cond_.Broadcast(scoped_lock);
206 if (!write_req_queue_.empty()) {
207 buf = uv_buf_init(
208 const_cast<char*>(write_req_queue_.front().str.c_str()),
209 write_req_queue_.front().str.length());
210 }
211 }
212 if (buf.base != nullptr && fd_ != -1) {
213 StartWrite(buf);
214 }
215 }
216
217 // static
ExitSignalCb(uv_async_t * signal)218 void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
219 NodeTraceWriter* trace_writer =
220 ContainerOf(&NodeTraceWriter::exit_signal_, signal);
221 // Close both flush_signal_ and exit_signal_.
222 uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
223 [](uv_handle_t* signal) {
224 NodeTraceWriter* trace_writer =
225 ContainerOf(&NodeTraceWriter::flush_signal_,
226 reinterpret_cast<uv_async_t*>(signal));
227 uv_close(
228 reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
229 [](uv_handle_t* signal) {
230 NodeTraceWriter* trace_writer =
231 ContainerOf(&NodeTraceWriter::exit_signal_,
232 reinterpret_cast<uv_async_t*>(signal));
233 Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
234 trace_writer->exited_ = true;
235 trace_writer->exit_cond_.Signal(scoped_lock);
236 });
237 });
238 }
239 } // namespace tracing
240 } // namespace node
241