1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/base/build_config.h"
18 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
19
20 #include "perfetto/ext/base/unix_task_runner.h"
21
22 #include <errno.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <limits>
27
28 #include "perfetto/ext/base/watchdog.h"
29
30 namespace perfetto {
31 namespace base {
32
UnixTaskRunner()33 UnixTaskRunner::UnixTaskRunner() {
34 AddFileDescriptorWatch(event_.fd(), [] {
35 // Not reached -- see PostFileDescriptorWatches().
36 PERFETTO_DFATAL("Should be unreachable.");
37 });
38 }
39
40 UnixTaskRunner::~UnixTaskRunner() = default;
41
WakeUp()42 void UnixTaskRunner::WakeUp() {
43 event_.Notify();
44 }
45
Run()46 void UnixTaskRunner::Run() {
47 PERFETTO_DCHECK_THREAD(thread_checker_);
48 created_thread_id_ = GetThreadId();
49 quit_ = false;
50 for (;;) {
51 int poll_timeout_ms;
52 {
53 std::lock_guard<std::mutex> lock(lock_);
54 if (quit_)
55 return;
56 poll_timeout_ms = GetDelayMsToNextTaskLocked();
57 UpdateWatchTasksLocked();
58 }
59 int ret = PERFETTO_EINTR(poll(
60 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
61 PERFETTO_CHECK(ret >= 0);
62
63 // To avoid starvation we always interleave all types of tasks -- immediate,
64 // delayed and file descriptor watches.
65 PostFileDescriptorWatches();
66 RunImmediateAndDelayedTask();
67 }
68 }
69
Quit()70 void UnixTaskRunner::Quit() {
71 std::lock_guard<std::mutex> lock(lock_);
72 quit_ = true;
73 WakeUp();
74 }
75
QuitCalled()76 bool UnixTaskRunner::QuitCalled() {
77 std::lock_guard<std::mutex> lock(lock_);
78 return quit_;
79 }
80
IsIdleForTesting()81 bool UnixTaskRunner::IsIdleForTesting() {
82 std::lock_guard<std::mutex> lock(lock_);
83 return immediate_tasks_.empty();
84 }
85
UpdateWatchTasksLocked()86 void UnixTaskRunner::UpdateWatchTasksLocked() {
87 PERFETTO_DCHECK_THREAD(thread_checker_);
88 if (!watch_tasks_changed_)
89 return;
90 watch_tasks_changed_ = false;
91 poll_fds_.clear();
92 for (auto& it : watch_tasks_) {
93 it.second.poll_fd_index = poll_fds_.size();
94 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
95 }
96 }
97
RunImmediateAndDelayedTask()98 void UnixTaskRunner::RunImmediateAndDelayedTask() {
99 // If locking overhead becomes an issue, add a separate work queue.
100 std::function<void()> immediate_task;
101 std::function<void()> delayed_task;
102 TimeMillis now = GetWallTimeMs();
103 {
104 std::lock_guard<std::mutex> lock(lock_);
105 if (!immediate_tasks_.empty()) {
106 immediate_task = std::move(immediate_tasks_.front());
107 immediate_tasks_.pop_front();
108 }
109 if (!delayed_tasks_.empty()) {
110 auto it = delayed_tasks_.begin();
111 if (now >= it->first) {
112 delayed_task = std::move(it->second);
113 delayed_tasks_.erase(it);
114 }
115 }
116 }
117
118 errno = 0;
119 if (immediate_task)
120 RunTaskWithWatchdogGuard(immediate_task);
121 errno = 0;
122 if (delayed_task)
123 RunTaskWithWatchdogGuard(delayed_task);
124 }
125
PostFileDescriptorWatches()126 void UnixTaskRunner::PostFileDescriptorWatches() {
127 PERFETTO_DCHECK_THREAD(thread_checker_);
128 for (size_t i = 0; i < poll_fds_.size(); i++) {
129 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
130 continue;
131 poll_fds_[i].revents = 0;
132
133 // The wake-up event is handled inline to avoid an infinite recursion of
134 // posted tasks.
135 if (poll_fds_[i].fd == event_.fd()) {
136 event_.Clear();
137 continue;
138 }
139
140 // Binding to |this| is safe since we are the only object executing the
141 // task.
142 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
143 poll_fds_[i].fd));
144
145 // Make the fd negative while a posted task is pending. This makes poll(2)
146 // ignore the fd.
147 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
148 poll_fds_[i].fd = -poll_fds_[i].fd;
149 }
150 }
151
RunFileDescriptorWatch(int fd)152 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
153 std::function<void()> task;
154 {
155 std::lock_guard<std::mutex> lock(lock_);
156 auto it = watch_tasks_.find(fd);
157 if (it == watch_tasks_.end())
158 return;
159 // Make poll(2) pay attention to the fd again. Since another thread may have
160 // updated this watch we need to refresh the set first.
161 UpdateWatchTasksLocked();
162 size_t fd_index = it->second.poll_fd_index;
163 PERFETTO_DCHECK(fd_index < poll_fds_.size());
164 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
165 poll_fds_[fd_index].fd = fd;
166 task = it->second.callback;
167 }
168 errno = 0;
169 RunTaskWithWatchdogGuard(task);
170 }
171
GetDelayMsToNextTaskLocked() const172 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
173 PERFETTO_DCHECK_THREAD(thread_checker_);
174 if (!immediate_tasks_.empty())
175 return 0;
176 if (!delayed_tasks_.empty()) {
177 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
178 return std::max(0, static_cast<int>(diff.count()));
179 }
180 return -1;
181 }
182
PostTask(std::function<void ()> task)183 void UnixTaskRunner::PostTask(std::function<void()> task) {
184 bool was_empty;
185 {
186 std::lock_guard<std::mutex> lock(lock_);
187 was_empty = immediate_tasks_.empty();
188 immediate_tasks_.push_back(std::move(task));
189 }
190 if (was_empty)
191 WakeUp();
192 }
193
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)194 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
195 uint32_t delay_ms) {
196 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
197 {
198 std::lock_guard<std::mutex> lock(lock_);
199 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
200 }
201 WakeUp();
202 }
203
AddFileDescriptorWatch(int fd,std::function<void ()> task)204 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
205 std::function<void()> task) {
206 PERFETTO_DCHECK(fd >= 0);
207 {
208 std::lock_guard<std::mutex> lock(lock_);
209 PERFETTO_DCHECK(!watch_tasks_.count(fd));
210 watch_tasks_[fd] = {std::move(task), SIZE_MAX};
211 watch_tasks_changed_ = true;
212 }
213 WakeUp();
214 }
215
RemoveFileDescriptorWatch(int fd)216 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
217 PERFETTO_DCHECK(fd >= 0);
218 {
219 std::lock_guard<std::mutex> lock(lock_);
220 PERFETTO_DCHECK(watch_tasks_.count(fd));
221 watch_tasks_.erase(fd);
222 watch_tasks_changed_ = true;
223 }
224 // No need to schedule a wake-up for this.
225 }
226
RunsTasksOnCurrentThread() const227 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
228 return GetThreadId() == created_thread_id_;
229 }
230
231 } // namespace base
232 } // namespace perfetto
233
234 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
235