• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/build_config.h"
18 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
19 
20 #include "perfetto/ext/base/unix_task_runner.h"
21 
22 #include <errno.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 
26 #include <limits>
27 
28 #include "perfetto/ext/base/watchdog.h"
29 
30 namespace perfetto {
31 namespace base {
32 
UnixTaskRunner()33 UnixTaskRunner::UnixTaskRunner() {
34   AddFileDescriptorWatch(event_.fd(), [] {
35     // Not reached -- see PostFileDescriptorWatches().
36     PERFETTO_DFATAL("Should be unreachable.");
37   });
38 }
39 
40 UnixTaskRunner::~UnixTaskRunner() = default;
41 
WakeUp()42 void UnixTaskRunner::WakeUp() {
43   event_.Notify();
44 }
45 
Run()46 void UnixTaskRunner::Run() {
47   PERFETTO_DCHECK_THREAD(thread_checker_);
48   created_thread_id_ = GetThreadId();
49   quit_ = false;
50   for (;;) {
51     int poll_timeout_ms;
52     {
53       std::lock_guard<std::mutex> lock(lock_);
54       if (quit_)
55         return;
56       poll_timeout_ms = GetDelayMsToNextTaskLocked();
57       UpdateWatchTasksLocked();
58     }
59     int ret = PERFETTO_EINTR(poll(
60         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
61     PERFETTO_CHECK(ret >= 0);
62 
63     // To avoid starvation we always interleave all types of tasks -- immediate,
64     // delayed and file descriptor watches.
65     PostFileDescriptorWatches();
66     RunImmediateAndDelayedTask();
67   }
68 }
69 
Quit()70 void UnixTaskRunner::Quit() {
71   std::lock_guard<std::mutex> lock(lock_);
72   quit_ = true;
73   WakeUp();
74 }
75 
QuitCalled()76 bool UnixTaskRunner::QuitCalled() {
77   std::lock_guard<std::mutex> lock(lock_);
78   return quit_;
79 }
80 
IsIdleForTesting()81 bool UnixTaskRunner::IsIdleForTesting() {
82   std::lock_guard<std::mutex> lock(lock_);
83   return immediate_tasks_.empty();
84 }
85 
UpdateWatchTasksLocked()86 void UnixTaskRunner::UpdateWatchTasksLocked() {
87   PERFETTO_DCHECK_THREAD(thread_checker_);
88   if (!watch_tasks_changed_)
89     return;
90   watch_tasks_changed_ = false;
91   poll_fds_.clear();
92   for (auto& it : watch_tasks_) {
93     it.second.poll_fd_index = poll_fds_.size();
94     poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
95   }
96 }
97 
RunImmediateAndDelayedTask()98 void UnixTaskRunner::RunImmediateAndDelayedTask() {
99   // If locking overhead becomes an issue, add a separate work queue.
100   std::function<void()> immediate_task;
101   std::function<void()> delayed_task;
102   TimeMillis now = GetWallTimeMs();
103   {
104     std::lock_guard<std::mutex> lock(lock_);
105     if (!immediate_tasks_.empty()) {
106       immediate_task = std::move(immediate_tasks_.front());
107       immediate_tasks_.pop_front();
108     }
109     if (!delayed_tasks_.empty()) {
110       auto it = delayed_tasks_.begin();
111       if (now >= it->first) {
112         delayed_task = std::move(it->second);
113         delayed_tasks_.erase(it);
114       }
115     }
116   }
117 
118   errno = 0;
119   if (immediate_task)
120     RunTaskWithWatchdogGuard(immediate_task);
121   errno = 0;
122   if (delayed_task)
123     RunTaskWithWatchdogGuard(delayed_task);
124 }
125 
PostFileDescriptorWatches()126 void UnixTaskRunner::PostFileDescriptorWatches() {
127   PERFETTO_DCHECK_THREAD(thread_checker_);
128   for (size_t i = 0; i < poll_fds_.size(); i++) {
129     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
130       continue;
131     poll_fds_[i].revents = 0;
132 
133     // The wake-up event is handled inline to avoid an infinite recursion of
134     // posted tasks.
135     if (poll_fds_[i].fd == event_.fd()) {
136       event_.Clear();
137       continue;
138     }
139 
140     // Binding to |this| is safe since we are the only object executing the
141     // task.
142     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
143                        poll_fds_[i].fd));
144 
145     // Make the fd negative while a posted task is pending. This makes poll(2)
146     // ignore the fd.
147     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
148     poll_fds_[i].fd = -poll_fds_[i].fd;
149   }
150 }
151 
RunFileDescriptorWatch(int fd)152 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
153   std::function<void()> task;
154   {
155     std::lock_guard<std::mutex> lock(lock_);
156     auto it = watch_tasks_.find(fd);
157     if (it == watch_tasks_.end())
158       return;
159     // Make poll(2) pay attention to the fd again. Since another thread may have
160     // updated this watch we need to refresh the set first.
161     UpdateWatchTasksLocked();
162     size_t fd_index = it->second.poll_fd_index;
163     PERFETTO_DCHECK(fd_index < poll_fds_.size());
164     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
165     poll_fds_[fd_index].fd = fd;
166     task = it->second.callback;
167   }
168   errno = 0;
169   RunTaskWithWatchdogGuard(task);
170 }
171 
GetDelayMsToNextTaskLocked() const172 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
173   PERFETTO_DCHECK_THREAD(thread_checker_);
174   if (!immediate_tasks_.empty())
175     return 0;
176   if (!delayed_tasks_.empty()) {
177     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
178     return std::max(0, static_cast<int>(diff.count()));
179   }
180   return -1;
181 }
182 
PostTask(std::function<void ()> task)183 void UnixTaskRunner::PostTask(std::function<void()> task) {
184   bool was_empty;
185   {
186     std::lock_guard<std::mutex> lock(lock_);
187     was_empty = immediate_tasks_.empty();
188     immediate_tasks_.push_back(std::move(task));
189   }
190   if (was_empty)
191     WakeUp();
192 }
193 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)194 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
195                                      uint32_t delay_ms) {
196   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
197   {
198     std::lock_guard<std::mutex> lock(lock_);
199     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
200   }
201   WakeUp();
202 }
203 
AddFileDescriptorWatch(int fd,std::function<void ()> task)204 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
205                                             std::function<void()> task) {
206   PERFETTO_DCHECK(fd >= 0);
207   {
208     std::lock_guard<std::mutex> lock(lock_);
209     PERFETTO_DCHECK(!watch_tasks_.count(fd));
210     watch_tasks_[fd] = {std::move(task), SIZE_MAX};
211     watch_tasks_changed_ = true;
212   }
213   WakeUp();
214 }
215 
RemoveFileDescriptorWatch(int fd)216 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
217   PERFETTO_DCHECK(fd >= 0);
218   {
219     std::lock_guard<std::mutex> lock(lock_);
220     PERFETTO_DCHECK(watch_tasks_.count(fd));
221     watch_tasks_.erase(fd);
222     watch_tasks_changed_ = true;
223   }
224   // No need to schedule a wake-up for this.
225 }
226 
RunsTasksOnCurrentThread() const227 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
228   return GetThreadId() == created_thread_id_;
229 }
230 
231 }  // namespace base
232 }  // namespace perfetto
233 
234 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
235