1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/base/unix_task_runner.h"
18
19 #include "perfetto/base/build_config.h"
20
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include <limits>
27
28 namespace perfetto {
29 namespace base {
30
UnixTaskRunner()31 UnixTaskRunner::UnixTaskRunner() {
32 // Create a self-pipe which is used to wake up the main thread from inside
33 // poll(2).
34 int pipe_fds[2];
35 PERFETTO_CHECK(pipe(pipe_fds) == 0);
36
37 // Make the pipe non-blocking so that we never block the waking thread (either
38 // the main thread or another one) when scheduling a wake-up.
39 for (auto fd : pipe_fds) {
40 int flags = fcntl(fd, F_GETFL, 0);
41 PERFETTO_CHECK(flags != -1);
42 PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
43 PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
44 }
45 control_read_.reset(pipe_fds[0]);
46 control_write_.reset(pipe_fds[1]);
47
48 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX)
49 // We are never expecting to have more than a few bytes in the wake-up pipe.
50 // Reduce the buffer size on Linux. Note that this gets rounded up to the page
51 // size.
52 PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
53 #endif
54
55 AddFileDescriptorWatch(control_read_.get(), [] {
56 // Not reached -- see PostFileDescriptorWatches().
57 PERFETTO_DCHECK(false);
58 });
59 }
60
61 UnixTaskRunner::~UnixTaskRunner() = default;
62
WakeUp()63 void UnixTaskRunner::WakeUp() {
64 const char dummy = 'P';
65 if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
66 PERFETTO_DPLOG("write()");
67 }
68
Run()69 void UnixTaskRunner::Run() {
70 PERFETTO_DCHECK_THREAD(thread_checker_);
71 quit_ = false;
72 while (true) {
73 int poll_timeout_ms;
74 {
75 std::lock_guard<std::mutex> lock(lock_);
76 if (quit_)
77 return;
78 poll_timeout_ms = GetDelayMsToNextTaskLocked();
79 UpdateWatchTasksLocked();
80 }
81 int ret = PERFETTO_EINTR(poll(
82 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
83 PERFETTO_CHECK(ret >= 0);
84
85 // To avoid starvation we always interleave all types of tasks -- immediate,
86 // delayed and file descriptor watches.
87 PostFileDescriptorWatches();
88 RunImmediateAndDelayedTask();
89 }
90 }
91
Quit()92 void UnixTaskRunner::Quit() {
93 {
94 std::lock_guard<std::mutex> lock(lock_);
95 quit_ = true;
96 }
97 WakeUp();
98 }
99
IsIdleForTesting()100 bool UnixTaskRunner::IsIdleForTesting() {
101 std::lock_guard<std::mutex> lock(lock_);
102 return immediate_tasks_.empty();
103 }
104
UpdateWatchTasksLocked()105 void UnixTaskRunner::UpdateWatchTasksLocked() {
106 PERFETTO_DCHECK_THREAD(thread_checker_);
107 if (!watch_tasks_changed_)
108 return;
109 watch_tasks_changed_ = false;
110 poll_fds_.clear();
111 for (auto& it : watch_tasks_) {
112 it.second.poll_fd_index = poll_fds_.size();
113 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
114 }
115 }
116
RunImmediateAndDelayedTask()117 void UnixTaskRunner::RunImmediateAndDelayedTask() {
118 // TODO(skyostil): Add a separate work queue in case in case locking overhead
119 // becomes an issue.
120 std::function<void()> immediate_task;
121 std::function<void()> delayed_task;
122 TimeMillis now = GetWallTimeMs();
123 {
124 std::lock_guard<std::mutex> lock(lock_);
125 if (!immediate_tasks_.empty()) {
126 immediate_task = std::move(immediate_tasks_.front());
127 immediate_tasks_.pop_front();
128 }
129 if (!delayed_tasks_.empty()) {
130 auto it = delayed_tasks_.begin();
131 if (now >= it->first) {
132 delayed_task = std::move(it->second);
133 delayed_tasks_.erase(it);
134 }
135 }
136 }
137
138 errno = 0;
139 if (immediate_task)
140 RunTask(immediate_task);
141 errno = 0;
142 if (delayed_task)
143 RunTask(delayed_task);
144 }
145
PostFileDescriptorWatches()146 void UnixTaskRunner::PostFileDescriptorWatches() {
147 PERFETTO_DCHECK_THREAD(thread_checker_);
148 for (size_t i = 0; i < poll_fds_.size(); i++) {
149 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
150 continue;
151 poll_fds_[i].revents = 0;
152
153 // The wake-up event is handled inline to avoid an infinite recursion of
154 // posted tasks.
155 if (poll_fds_[i].fd == control_read_.get()) {
156 // Drain the byte(s) written to the wake-up pipe. We can potentially read
157 // more than one byte if several wake-ups have been scheduled.
158 char buffer[16];
159 if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
160 errno != EAGAIN) {
161 PERFETTO_DPLOG("read()");
162 }
163 continue;
164 }
165
166 // Binding to |this| is safe since we are the only object executing the
167 // task.
168 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
169 poll_fds_[i].fd));
170
171 // Make the fd negative while a posted task is pending. This makes poll(2)
172 // ignore the fd.
173 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
174 poll_fds_[i].fd = -poll_fds_[i].fd;
175 }
176 }
177
RunFileDescriptorWatch(int fd)178 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
179 std::function<void()> task;
180 {
181 std::lock_guard<std::mutex> lock(lock_);
182 auto it = watch_tasks_.find(fd);
183 if (it == watch_tasks_.end())
184 return;
185 // Make poll(2) pay attention to the fd again. Since another thread may have
186 // updated this watch we need to refresh the set first.
187 UpdateWatchTasksLocked();
188 size_t fd_index = it->second.poll_fd_index;
189 PERFETTO_DCHECK(fd_index < poll_fds_.size());
190 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
191 poll_fds_[fd_index].fd = fd;
192 task = it->second.callback;
193 }
194 errno = 0;
195 RunTask(task);
196 }
197
GetDelayMsToNextTaskLocked() const198 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
199 PERFETTO_DCHECK_THREAD(thread_checker_);
200 if (!immediate_tasks_.empty())
201 return 0;
202 if (!delayed_tasks_.empty()) {
203 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
204 return std::max(0, static_cast<int>(diff.count()));
205 }
206 return -1;
207 }
208
PostTask(std::function<void ()> task)209 void UnixTaskRunner::PostTask(std::function<void()> task) {
210 bool was_empty;
211 {
212 std::lock_guard<std::mutex> lock(lock_);
213 was_empty = immediate_tasks_.empty();
214 immediate_tasks_.push_back(std::move(task));
215 }
216 if (was_empty)
217 WakeUp();
218 }
219
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)220 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
221 uint32_t delay_ms) {
222 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
223 {
224 std::lock_guard<std::mutex> lock(lock_);
225 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
226 }
227 WakeUp();
228 }
229
AddFileDescriptorWatch(int fd,std::function<void ()> task)230 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
231 std::function<void()> task) {
232 PERFETTO_DCHECK(fd >= 0);
233 {
234 std::lock_guard<std::mutex> lock(lock_);
235 PERFETTO_DCHECK(!watch_tasks_.count(fd));
236 watch_tasks_[fd] = {std::move(task), SIZE_MAX};
237 watch_tasks_changed_ = true;
238 }
239 WakeUp();
240 }
241
RemoveFileDescriptorWatch(int fd)242 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
243 PERFETTO_DCHECK(fd >= 0);
244 {
245 std::lock_guard<std::mutex> lock(lock_);
246 PERFETTO_DCHECK(watch_tasks_.count(fd));
247 watch_tasks_.erase(fd);
248 watch_tasks_changed_ = true;
249 }
250 // No need to schedule a wake-up for this.
251 }
252
253 } // namespace base
254 } // namespace perfetto
255