1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/base/unix_task_runner.h"
18
19 #include "perfetto/base/build_config.h"
20
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24
25 #include <limits>
26
27 namespace perfetto {
28 namespace base {
29
UnixTaskRunner()30 UnixTaskRunner::UnixTaskRunner() {
31 AddFileDescriptorWatch(event_.fd(), [] {
32 // Not reached -- see PostFileDescriptorWatches().
33 PERFETTO_DFATAL("Should be unreachable.");
34 });
35 }
36
37 UnixTaskRunner::~UnixTaskRunner() = default;
38
WakeUp()39 void UnixTaskRunner::WakeUp() {
40 event_.Notify();
41 }
42
Run()43 void UnixTaskRunner::Run() {
44 PERFETTO_DCHECK_THREAD(thread_checker_);
45 created_thread_id_ = GetThreadId();
46 quit_ = false;
47 for (;;) {
48 int poll_timeout_ms;
49 {
50 std::lock_guard<std::mutex> lock(lock_);
51 if (quit_)
52 return;
53 poll_timeout_ms = GetDelayMsToNextTaskLocked();
54 UpdateWatchTasksLocked();
55 }
56 int ret = PERFETTO_EINTR(poll(
57 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
58 PERFETTO_CHECK(ret >= 0);
59
60 // To avoid starvation we always interleave all types of tasks -- immediate,
61 // delayed and file descriptor watches.
62 PostFileDescriptorWatches();
63 RunImmediateAndDelayedTask();
64 }
65 }
66
Quit()67 void UnixTaskRunner::Quit() {
68 std::lock_guard<std::mutex> lock(lock_);
69 quit_ = true;
70 WakeUp();
71 }
72
QuitCalled()73 bool UnixTaskRunner::QuitCalled() {
74 std::lock_guard<std::mutex> lock(lock_);
75 return quit_;
76 }
77
IsIdleForTesting()78 bool UnixTaskRunner::IsIdleForTesting() {
79 std::lock_guard<std::mutex> lock(lock_);
80 return immediate_tasks_.empty();
81 }
82
UpdateWatchTasksLocked()83 void UnixTaskRunner::UpdateWatchTasksLocked() {
84 PERFETTO_DCHECK_THREAD(thread_checker_);
85 if (!watch_tasks_changed_)
86 return;
87 watch_tasks_changed_ = false;
88 poll_fds_.clear();
89 for (auto& it : watch_tasks_) {
90 it.second.poll_fd_index = poll_fds_.size();
91 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
92 }
93 }
94
RunImmediateAndDelayedTask()95 void UnixTaskRunner::RunImmediateAndDelayedTask() {
96 // If locking overhead becomes an issue, add a separate work queue.
97 std::function<void()> immediate_task;
98 std::function<void()> delayed_task;
99 TimeMillis now = GetWallTimeMs();
100 {
101 std::lock_guard<std::mutex> lock(lock_);
102 if (!immediate_tasks_.empty()) {
103 immediate_task = std::move(immediate_tasks_.front());
104 immediate_tasks_.pop_front();
105 }
106 if (!delayed_tasks_.empty()) {
107 auto it = delayed_tasks_.begin();
108 if (now >= it->first) {
109 delayed_task = std::move(it->second);
110 delayed_tasks_.erase(it);
111 }
112 }
113 }
114
115 errno = 0;
116 if (immediate_task)
117 RunTask(immediate_task);
118 errno = 0;
119 if (delayed_task)
120 RunTask(delayed_task);
121 }
122
PostFileDescriptorWatches()123 void UnixTaskRunner::PostFileDescriptorWatches() {
124 PERFETTO_DCHECK_THREAD(thread_checker_);
125 for (size_t i = 0; i < poll_fds_.size(); i++) {
126 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
127 continue;
128 poll_fds_[i].revents = 0;
129
130 // The wake-up event is handled inline to avoid an infinite recursion of
131 // posted tasks.
132 if (poll_fds_[i].fd == event_.fd()) {
133 event_.Clear();
134 continue;
135 }
136
137 // Binding to |this| is safe since we are the only object executing the
138 // task.
139 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
140 poll_fds_[i].fd));
141
142 // Make the fd negative while a posted task is pending. This makes poll(2)
143 // ignore the fd.
144 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
145 poll_fds_[i].fd = -poll_fds_[i].fd;
146 }
147 }
148
RunFileDescriptorWatch(int fd)149 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
150 std::function<void()> task;
151 {
152 std::lock_guard<std::mutex> lock(lock_);
153 auto it = watch_tasks_.find(fd);
154 if (it == watch_tasks_.end())
155 return;
156 // Make poll(2) pay attention to the fd again. Since another thread may have
157 // updated this watch we need to refresh the set first.
158 UpdateWatchTasksLocked();
159 size_t fd_index = it->second.poll_fd_index;
160 PERFETTO_DCHECK(fd_index < poll_fds_.size());
161 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
162 poll_fds_[fd_index].fd = fd;
163 task = it->second.callback;
164 }
165 errno = 0;
166 RunTask(task);
167 }
168
GetDelayMsToNextTaskLocked() const169 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
170 PERFETTO_DCHECK_THREAD(thread_checker_);
171 if (!immediate_tasks_.empty())
172 return 0;
173 if (!delayed_tasks_.empty()) {
174 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
175 return std::max(0, static_cast<int>(diff.count()));
176 }
177 return -1;
178 }
179
PostTask(std::function<void ()> task)180 void UnixTaskRunner::PostTask(std::function<void()> task) {
181 bool was_empty;
182 {
183 std::lock_guard<std::mutex> lock(lock_);
184 was_empty = immediate_tasks_.empty();
185 immediate_tasks_.push_back(std::move(task));
186 }
187 if (was_empty)
188 WakeUp();
189 }
190
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)191 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
192 uint32_t delay_ms) {
193 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
194 {
195 std::lock_guard<std::mutex> lock(lock_);
196 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
197 }
198 WakeUp();
199 }
200
AddFileDescriptorWatch(int fd,std::function<void ()> task)201 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
202 std::function<void()> task) {
203 PERFETTO_DCHECK(fd >= 0);
204 {
205 std::lock_guard<std::mutex> lock(lock_);
206 PERFETTO_DCHECK(!watch_tasks_.count(fd));
207 watch_tasks_[fd] = {std::move(task), SIZE_MAX};
208 watch_tasks_changed_ = true;
209 }
210 WakeUp();
211 }
212
RemoveFileDescriptorWatch(int fd)213 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
214 PERFETTO_DCHECK(fd >= 0);
215 {
216 std::lock_guard<std::mutex> lock(lock_);
217 PERFETTO_DCHECK(watch_tasks_.count(fd));
218 watch_tasks_.erase(fd);
219 watch_tasks_changed_ = true;
220 }
221 // No need to schedule a wake-up for this.
222 }
223
RunsTasksOnCurrentThread() const224 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
225 return GetThreadId() == created_thread_id_;
226 }
227
228 } // namespace base
229 } // namespace perfetto
230