• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "RecordReadThread.h"
18 
19 #include <sys/resource.h>
20 #include <unistd.h>
21 
22 #include <algorithm>
23 #include <unordered_map>
24 
25 #include "environment.h"
26 #include "record.h"
27 
28 namespace simpleperf {
29 
30 static constexpr size_t kDefaultLowBufferLevel = 10 * 1024 * 1024u;
31 static constexpr size_t kDefaultCriticalBufferLevel = 5 * 1024 * 1024u;
32 
RecordBuffer(size_t buffer_size)33 RecordBuffer::RecordBuffer(size_t buffer_size)
34     : read_head_(0), write_head_(0), buffer_size_(buffer_size), buffer_(new char[buffer_size]) {
35 }
36 
GetFreeSize() const37 size_t RecordBuffer::GetFreeSize() const {
38     size_t write_head = write_head_.load(std::memory_order_relaxed);
39     size_t read_head = read_head_.load(std::memory_order_relaxed);
40     size_t write_tail = read_head > 0 ? read_head - 1 : buffer_size_ - 1;
41     if (write_head <= write_tail) {
42       return write_tail - write_head;
43     }
44     return buffer_size_ - write_head + write_tail;
45 }
46 
AllocWriteSpace(size_t record_size)47 char* RecordBuffer::AllocWriteSpace(size_t record_size) {
48   size_t write_head = write_head_.load(std::memory_order_relaxed);
49   size_t read_head = read_head_.load(std::memory_order_acquire);
50   size_t write_tail = read_head > 0 ? read_head - 1 : buffer_size_ - 1;
51   cur_write_record_size_ = record_size;
52   if (write_head < write_tail) {
53     if (write_head + record_size > write_tail) {
54       return nullptr;
55     }
56   } else if (write_head + record_size > buffer_size_) {
57     // Not enough space at the end of the buffer, need to wrap to the start of the buffer.
58     if (write_tail < record_size) {
59       return nullptr;
60     }
61     if (buffer_size_ - write_head >= sizeof(perf_event_header)) {
62       // Set the size field in perf_event_header to 0. So GetCurrentRecord() can wrap to the start
63       // of the buffer when size is 0.
64       memset(buffer_.get() + write_head, 0, sizeof(perf_event_header));
65     }
66     cur_write_record_size_ += buffer_size_ - write_head;
67     write_head = 0;
68   }
69   return buffer_.get() + write_head;
70 }
71 
FinishWrite()72 void RecordBuffer::FinishWrite() {
73   size_t write_head = write_head_.load(std::memory_order_relaxed);
74   write_head = (write_head + cur_write_record_size_) % buffer_size_;
75   write_head_.store(write_head, std::memory_order_release);
76 }
77 
GetCurrentRecord()78 char* RecordBuffer::GetCurrentRecord() {
79   size_t write_head = write_head_.load(std::memory_order_acquire);
80   size_t read_head = read_head_.load(std::memory_order_relaxed);
81   if (read_head == write_head) {
82     return nullptr;
83   }
84   perf_event_header header;
85   if (read_head > write_head) {
86     if (buffer_size_ - read_head < sizeof(header) ||
87         (memcpy(&header, buffer_.get() + read_head, sizeof(header)) && header.size == 0)) {
88       // Need to wrap to the start of the buffer.
89       cur_read_record_size_ += buffer_size_ - read_head;
90       read_head = 0;
91       memcpy(&header, buffer_.get(), sizeof(header));
92     }
93   } else {
94     memcpy(&header, buffer_.get() + read_head, sizeof(header));
95   }
96   cur_read_record_size_ += header.size;
97   return buffer_.get() + read_head;
98 }
99 
MoveToNextRecord()100 void RecordBuffer::MoveToNextRecord() {
101   size_t read_head = read_head_.load(std::memory_order_relaxed);
102   read_head = (read_head + cur_read_record_size_) % buffer_size_;
103   read_head_.store(read_head, std::memory_order_release);
104   cur_read_record_size_ = 0;
105 }
106 
RecordParser(const perf_event_attr & attr)107 RecordParser::RecordParser(const perf_event_attr& attr)
108     : sample_type_(attr.sample_type),
109       sample_regs_count_(__builtin_popcountll(attr.sample_regs_user)) {
110   size_t pos = sizeof(perf_event_header);
111   uint64_t mask = PERF_SAMPLE_IDENTIFIER | PERF_SAMPLE_IP | PERF_SAMPLE_TID;
112   pos += __builtin_popcountll(sample_type_ & mask) * sizeof(uint64_t);
113   if (sample_type_ & PERF_SAMPLE_TIME) {
114     time_pos_in_sample_records_ = pos;
115     pos += sizeof(uint64_t);
116   }
117   mask = PERF_SAMPLE_ADDR | PERF_SAMPLE_ID | PERF_SAMPLE_STREAM_ID | PERF_SAMPLE_CPU |
118       PERF_SAMPLE_PERIOD;
119   pos += __builtin_popcountll(sample_type_ & mask) * sizeof(uint64_t);
120   callchain_pos_in_sample_records_ = pos;
121   if ((sample_type_ & PERF_SAMPLE_TIME) && attr.sample_id_all) {
122     mask = PERF_SAMPLE_IDENTIFIER | PERF_SAMPLE_CPU | PERF_SAMPLE_STREAM_ID | PERF_SAMPLE_ID;
123     time_rpos_in_non_sample_records_ = (__builtin_popcountll(sample_type_ & mask) + 1) *
124         sizeof(uint64_t);
125   }
126 }
127 
GetTimePos(const perf_event_header & header) const128 size_t RecordParser::GetTimePos(const perf_event_header& header) const {
129   if (header.type == PERF_RECORD_SAMPLE) {
130     return time_pos_in_sample_records_;
131   }
132   if (time_rpos_in_non_sample_records_ != 0u &&
133       time_rpos_in_non_sample_records_ < header.size - sizeof(perf_event_header)) {
134     return header.size - time_rpos_in_non_sample_records_;
135   }
136   return 0;
137 }
138 
GetStackSizePos(const std::function<void (size_t,size_t,void *)> & read_record_fn) const139 size_t RecordParser::GetStackSizePos(
140     const std::function<void(size_t,size_t,void*)>& read_record_fn) const{
141   size_t pos = callchain_pos_in_sample_records_;
142   if (sample_type_ & PERF_SAMPLE_CALLCHAIN) {
143     uint64_t ip_nr;
144     read_record_fn(pos, sizeof(ip_nr), &ip_nr);
145     pos += (ip_nr + 1) * sizeof(uint64_t);
146   }
147   if (sample_type_ & PERF_SAMPLE_RAW) {
148     uint32_t size;
149     read_record_fn(pos, sizeof(size), &size);
150     pos += size + sizeof(uint32_t);
151   }
152   if (sample_type_ & PERF_SAMPLE_BRANCH_STACK) {
153     uint64_t stack_nr;
154     read_record_fn(pos, sizeof(stack_nr), &stack_nr);
155     pos += sizeof(uint64_t) + stack_nr * sizeof(BranchStackItemType);
156   }
157   if (sample_type_ & PERF_SAMPLE_REGS_USER) {
158     uint64_t abi;
159     read_record_fn(pos, sizeof(abi), &abi);
160     pos += (1 + (abi == 0 ? 0 : sample_regs_count_)) * sizeof(uint64_t);
161   }
162   return (sample_type_ & PERF_SAMPLE_STACK_USER) ? pos : 0;
163 }
164 
KernelRecordReader(EventFd * event_fd)165 KernelRecordReader::KernelRecordReader(EventFd* event_fd) : event_fd_(event_fd) {
166   size_t buffer_size;
167   buffer_ = event_fd_->GetMappedBuffer(buffer_size);
168   buffer_mask_ = buffer_size - 1;
169 }
170 
GetDataFromKernelBuffer()171 bool KernelRecordReader::GetDataFromKernelBuffer() {
172   data_size_ = event_fd_->GetAvailableMmapDataSize(data_pos_);
173   if (data_size_ == 0) {
174     return false;
175   }
176   init_data_size_ = data_size_;
177   record_header_.size = 0;
178   return true;
179 }
180 
ReadRecord(size_t pos,size_t size,void * dest)181 void KernelRecordReader::ReadRecord(size_t pos, size_t size, void* dest) {
182   pos = (pos + data_pos_) & buffer_mask_;
183   size_t copy_size = std::min(size, buffer_mask_ + 1 - pos);
184   memcpy(dest, buffer_ + pos, copy_size);
185   if (copy_size < size) {
186     memcpy(static_cast<char*>(dest) + copy_size, buffer_, size - copy_size);
187   }
188 }
189 
MoveToNextRecord(const RecordParser & parser)190 bool KernelRecordReader::MoveToNextRecord(const RecordParser& parser) {
191   data_pos_ = (data_pos_ + record_header_.size) & buffer_mask_;
192   data_size_ -= record_header_.size;
193   if (data_size_ == 0) {
194     event_fd_->DiscardMmapData(init_data_size_);
195     init_data_size_ = 0;
196     return false;
197   }
198   ReadRecord(0, sizeof(record_header_), &record_header_);
199   size_t time_pos = parser.GetTimePos(record_header_);
200   if (time_pos != 0) {
201     ReadRecord(time_pos, sizeof(record_time_), &record_time_);
202   }
203   return true;
204 }
205 
RecordReadThread(size_t record_buffer_size,const perf_event_attr & attr,size_t min_mmap_pages,size_t max_mmap_pages)206 RecordReadThread::RecordReadThread(size_t record_buffer_size, const perf_event_attr& attr,
207                                    size_t min_mmap_pages, size_t max_mmap_pages)
208     : record_buffer_(record_buffer_size), record_parser_(attr), attr_(attr),
209       min_mmap_pages_(min_mmap_pages), max_mmap_pages_(max_mmap_pages) {
210   if (attr.sample_type & PERF_SAMPLE_STACK_USER) {
211     stack_size_in_sample_record_ = attr.sample_stack_user;
212   }
213   record_buffer_low_level_ = std::min(record_buffer_size / 4, kDefaultLowBufferLevel);
214   record_buffer_critical_level_ = std::min(record_buffer_size / 6, kDefaultCriticalBufferLevel);
215 }
216 
~RecordReadThread()217 RecordReadThread::~RecordReadThread() {
218   if (read_thread_) {
219     StopReadThread();
220   }
221 }
222 
RegisterDataCallback(IOEventLoop & loop,const std::function<bool ()> & data_callback)223 bool RecordReadThread::RegisterDataCallback(IOEventLoop& loop,
224                                             const std::function<bool()>& data_callback) {
225   int cmd_fd[2];
226   int data_fd[2];
227   if (pipe2(cmd_fd, O_CLOEXEC) != 0 || pipe2(data_fd, O_CLOEXEC) != 0) {
228     PLOG(ERROR) << "pipe2";
229     return false;
230   }
231   read_cmd_fd_.reset(cmd_fd[0]);
232   write_cmd_fd_.reset(cmd_fd[1]);
233   cmd_ = NO_CMD;
234   read_data_fd_.reset(data_fd[0]);
235   write_data_fd_.reset(data_fd[1]);
236   has_data_notification_ = false;
237   if (!loop.AddReadEvent(read_data_fd_, data_callback)) {
238     return false;
239   }
240   read_thread_.reset(new std::thread([&]() { RunReadThread(); }));
241   return true;
242 }
243 
AddEventFds(const std::vector<EventFd * > & event_fds)244 bool RecordReadThread::AddEventFds(const std::vector<EventFd*>& event_fds) {
245   return SendCmdToReadThread(CMD_ADD_EVENT_FDS, const_cast<std::vector<EventFd*>*>(&event_fds));
246 }
247 
RemoveEventFds(const std::vector<EventFd * > & event_fds)248 bool RecordReadThread::RemoveEventFds(const std::vector<EventFd*>& event_fds) {
249   return SendCmdToReadThread(CMD_REMOVE_EVENT_FDS, const_cast<std::vector<EventFd*>*>(&event_fds));
250 }
251 
SyncKernelBuffer()252 bool RecordReadThread::SyncKernelBuffer() {
253   return SendCmdToReadThread(CMD_SYNC_KERNEL_BUFFER, nullptr);
254 }
255 
StopReadThread()256 bool RecordReadThread::StopReadThread() {
257   bool result = SendCmdToReadThread(CMD_STOP_THREAD, nullptr);
258   if (result) {
259     read_thread_->join();
260     read_thread_ = nullptr;
261   }
262   return result;
263 }
264 
SendCmdToReadThread(Cmd cmd,void * cmd_arg)265 bool RecordReadThread::SendCmdToReadThread(Cmd cmd, void* cmd_arg) {
266   {
267     std::lock_guard<std::mutex> lock(cmd_mutex_);
268     cmd_ = cmd;
269     cmd_arg_ = cmd_arg;
270   }
271   char dummy = 0;
272   if (TEMP_FAILURE_RETRY(write(write_cmd_fd_, &dummy, 1)) != 1) {
273     return false;
274   }
275   std::unique_lock<std::mutex> lock(cmd_mutex_);
276   while (cmd_ != NO_CMD) {
277     cmd_finish_cond_.wait(lock);
278   }
279   return cmd_result_;
280 }
281 
GetRecord()282 std::unique_ptr<Record> RecordReadThread::GetRecord() {
283   record_buffer_.MoveToNextRecord();
284   char* p = record_buffer_.GetCurrentRecord();
285   if (p != nullptr) {
286     return ReadRecordFromBuffer(attr_, p);
287   }
288   if (has_data_notification_) {
289     char dummy;
290     TEMP_FAILURE_RETRY(read(read_data_fd_, &dummy, 1));
291     has_data_notification_ = false;
292   }
293   return nullptr;
294 }
295 
RunReadThread()296 void RecordReadThread::RunReadThread() {
297   IncreaseThreadPriority();
298   IOEventLoop loop;
299   CHECK(loop.AddReadEvent(read_cmd_fd_, [&]() { return HandleCmd(loop); }));
300   loop.RunLoop();
301 }
302 
IncreaseThreadPriority()303 void RecordReadThread::IncreaseThreadPriority() {
304   // TODO: use real time priority for root.
305   rlimit rlim;
306   int result = getrlimit(RLIMIT_NICE, &rlim);
307   if (result == 0 && rlim.rlim_cur == 40) {
308     result = setpriority(PRIO_PROCESS, gettid(), -20);
309     if (result == 0) {
310       LOG(VERBOSE) << "Priority of record read thread is increased";
311     }
312   }
313 }
314 
GetCmd()315 RecordReadThread::Cmd RecordReadThread::GetCmd() {
316   std::lock_guard<std::mutex> lock(cmd_mutex_);
317   return cmd_;
318 }
319 
HandleCmd(IOEventLoop & loop)320 bool RecordReadThread::HandleCmd(IOEventLoop& loop) {
321   char dummy;
322   TEMP_FAILURE_RETRY(read(read_cmd_fd_, &dummy, 1));
323   bool result = true;
324   switch (GetCmd()) {
325     case CMD_ADD_EVENT_FDS:
326       result = HandleAddEventFds(loop, *static_cast<std::vector<EventFd*>*>(cmd_arg_));
327       break;
328     case CMD_REMOVE_EVENT_FDS:
329       result = HandleRemoveEventFds(*static_cast<std::vector<EventFd*>*>(cmd_arg_));
330       break;
331     case CMD_SYNC_KERNEL_BUFFER:
332       result = ReadRecordsFromKernelBuffer();
333       break;
334     case CMD_STOP_THREAD:
335       result = loop.ExitLoop();
336       break;
337     default:
338       LOG(ERROR) << "Unknown cmd: " << GetCmd();
339       result = false;
340       break;
341   }
342   std::lock_guard<std::mutex> lock(cmd_mutex_);
343   cmd_ = NO_CMD;
344   cmd_result_ = result;
345   cmd_finish_cond_.notify_one();
346   return true;
347 }
348 
HandleAddEventFds(IOEventLoop & loop,const std::vector<EventFd * > & event_fds)349 bool RecordReadThread::HandleAddEventFds(IOEventLoop& loop,
350                                          const std::vector<EventFd*>& event_fds) {
351   std::unordered_map<int, EventFd*> cpu_map;
352   for (size_t pages = max_mmap_pages_; pages >= min_mmap_pages_; pages >>= 1) {
353     bool success = true;
354     for (EventFd* fd : event_fds) {
355       auto it = cpu_map.find(fd->Cpu());
356       if (it == cpu_map.end()) {
357         if (!fd->CreateMappedBuffer(pages, pages == min_mmap_pages_)) {
358           success = false;
359           break;
360         }
361         cpu_map[fd->Cpu()] = fd;
362       } else {
363         if (!fd->ShareMappedBuffer(*(it->second), pages == min_mmap_pages_)) {
364           success = false;
365           break;
366         }
367       }
368     }
369     if (success) {
370       LOG(VERBOSE) << "Each kernel buffer is " << pages << " pages.";
371       break;
372     }
373     for (auto& pair : cpu_map) {
374       pair.second->DestroyMappedBuffer();
375     }
376     cpu_map.clear();
377   }
378   if (cpu_map.empty()) {
379     return false;
380   }
381   for (auto& pair : cpu_map) {
382     if (!pair.second->StartPolling(loop, [this]() { return ReadRecordsFromKernelBuffer(); })) {
383       return false;
384     }
385     kernel_record_readers_.emplace_back(pair.second);
386   }
387   return true;
388 }
389 
HandleRemoveEventFds(const std::vector<EventFd * > & event_fds)390 bool RecordReadThread::HandleRemoveEventFds(const std::vector<EventFd*>& event_fds) {
391   for (auto& event_fd : event_fds) {
392     if (event_fd->HasMappedBuffer()) {
393       auto it = std::find_if(kernel_record_readers_.begin(), kernel_record_readers_.end(),
394                              [&](const KernelRecordReader& reader) {
395                                return reader.GetEventFd() == event_fd;
396       });
397       if (it != kernel_record_readers_.end()) {
398         kernel_record_readers_.erase(it);
399         event_fd->StopPolling();
400         event_fd->DestroyMappedBuffer();
401       }
402     }
403   }
404   return true;
405 }
406 
CompareRecordTime(KernelRecordReader * r1,KernelRecordReader * r2)407 static bool CompareRecordTime(KernelRecordReader* r1, KernelRecordReader* r2) {
408   return r1->RecordTime() > r2->RecordTime();
409 }
410 
411 // When reading from mmap buffers, we prefer reading from all buffers at once rather than reading
412 // one buffer at a time. Because by reading all buffers at once, we can merge records from
413 // different buffers easily in memory. Otherwise, we have to sort records with greater effort.
ReadRecordsFromKernelBuffer()414 bool RecordReadThread::ReadRecordsFromKernelBuffer() {
415   do {
416     std::vector<KernelRecordReader*> readers;
417     for (auto& reader : kernel_record_readers_) {
418       if (reader.GetDataFromKernelBuffer()) {
419         readers.push_back(&reader);
420       }
421     }
422     if (readers.empty()) {
423       break;
424     }
425     if (readers.size() == 1u) {
426       // Only one buffer has data, process it directly.
427       while (readers[0]->MoveToNextRecord(record_parser_)) {
428         PushRecordToRecordBuffer(readers[0]);
429       }
430     } else {
431       // Use a binary heap to merge records from different buffers. As records from the same buffer
432       // are already ordered by time, we only need to merge the first record from all buffers. And
433       // each time a record is popped from the heap, we put the next record from its buffer into
434       // the heap.
435       for (auto& reader : readers) {
436         reader->MoveToNextRecord(record_parser_);
437       }
438       std::make_heap(readers.begin(), readers.end(), CompareRecordTime);
439       size_t size = readers.size();
440       while (size > 0) {
441         std::pop_heap(readers.begin(), readers.begin() + size, CompareRecordTime);
442         PushRecordToRecordBuffer(readers[size - 1]);
443         if (readers[size - 1]->MoveToNextRecord(record_parser_)) {
444           std::push_heap(readers.begin(), readers.begin() + size, CompareRecordTime);
445         } else {
446           size--;
447         }
448       }
449     }
450     if (!SendDataNotificationToMainThread()) {
451       return false;
452     }
453     // If there are no commands, we can loop until there is no more data from the kernel.
454   } while (GetCmd() == NO_CMD);
455   return true;
456 }
457 
PushRecordToRecordBuffer(KernelRecordReader * kernel_record_reader)458 void RecordReadThread::PushRecordToRecordBuffer(KernelRecordReader* kernel_record_reader) {
459   const perf_event_header& header = kernel_record_reader->RecordHeader();
460   if (header.type == PERF_RECORD_SAMPLE && stack_size_in_sample_record_ > 1024) {
461     size_t free_size = record_buffer_.GetFreeSize();
462     if (free_size < record_buffer_critical_level_) {
463       // When the free size in record buffer is below critical level, drop sample records to save
464       // space for more important records (like mmap or fork records).
465       lost_samples_++;
466       return;
467     }
468     size_t stack_size_limit = stack_size_in_sample_record_;
469     if (free_size < record_buffer_low_level_) {
470       // When the free size in record buffer is below low level, cut the stack data in sample
471       // records to 1K. This makes the unwinder unwind only part of the callchains, but hopefully
472       // the call chain joiner can complete the callchains.
473       stack_size_limit = 1024;
474     }
475     size_t stack_size_pos = record_parser_.GetStackSizePos(
476         [&](size_t pos, size_t size, void* dest) {
477           return kernel_record_reader->ReadRecord(pos, size, dest);
478     });
479     uint64_t stack_size;
480     kernel_record_reader->ReadRecord(stack_size_pos, sizeof(stack_size), &stack_size);
481     if (stack_size > 0) {
482       size_t dyn_stack_size_pos = stack_size_pos + sizeof(stack_size) + stack_size;
483       uint64_t dyn_stack_size;
484       kernel_record_reader->ReadRecord(dyn_stack_size_pos, sizeof(dyn_stack_size), &dyn_stack_size);
485       if (dyn_stack_size == 0) {
486         // If stack_user_data.dyn_size == 0, it may be because the kernel misses the patch to
487         // update dyn_size, like in N9 (See b/22612370). So assume all stack data is valid if
488         // dyn_size == 0.
489         // TODO: Add cts test.
490         dyn_stack_size = stack_size;
491       }
492       // When simpleperf requests the kernel to dump 64K stack per sample, it will allocate 64K
493       // space in each sample to store stack data. However, a thread may use less stack than 64K.
494       // So not all the 64K stack data in a sample is valid, and we only need to keep valid stack
495       // data, whose size is dyn_stack_size.
496       uint64_t new_stack_size = std::min<uint64_t>(dyn_stack_size, stack_size_limit);
497       if (stack_size > new_stack_size) {
498         // Remove part of the stack data.
499         perf_event_header new_header = header;
500         new_header.size -= stack_size - new_stack_size;
501         char* p = record_buffer_.AllocWriteSpace(new_header.size);
502         if (p != nullptr) {
503           memcpy(p, &new_header, sizeof(new_header));
504           size_t pos = sizeof(new_header);
505           kernel_record_reader->ReadRecord(pos, stack_size_pos - pos, p + pos);
506           memcpy(p + stack_size_pos, &new_stack_size, sizeof(uint64_t));
507           pos = stack_size_pos + sizeof(uint64_t);
508           kernel_record_reader->ReadRecord(pos, new_stack_size, p + pos);
509           memcpy(p + pos + new_stack_size, &new_stack_size, sizeof(uint64_t));
510           record_buffer_.FinishWrite();
511           if (new_stack_size < dyn_stack_size) {
512             cut_stack_samples_++;
513           }
514         } else {
515           lost_samples_++;
516         }
517         return;
518       }
519     }
520   }
521   char* p = record_buffer_.AllocWriteSpace(header.size);
522   if (p != nullptr) {
523     kernel_record_reader->ReadRecord(0, header.size, p);
524     record_buffer_.FinishWrite();
525   } else {
526     if (header.type == PERF_RECORD_SAMPLE) {
527       lost_samples_++;
528     } else {
529       lost_non_samples_++;
530     }
531   }
532 }
533 
SendDataNotificationToMainThread()534 bool RecordReadThread::SendDataNotificationToMainThread() {
535   if (!has_data_notification_.load(std::memory_order_relaxed)) {
536     has_data_notification_ = true;
537     char dummy = 0;
538     if (TEMP_FAILURE_RETRY(write(write_data_fd_, &dummy, 1)) != 1) {
539       PLOG(ERROR) << "write";
540       return false;
541     }
542   }
543   return true;
544 }
545 
546 }  // namespace simpleperf
547