• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/unix_task_runner.h"
18 
19 #include "perfetto/base/build_config.h"
20 
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 
26 #include <limits>
27 
28 namespace perfetto {
29 namespace base {
30 
UnixTaskRunner()31 UnixTaskRunner::UnixTaskRunner() {
32   // Create a self-pipe which is used to wake up the main thread from inside
33   // poll(2).
34   int pipe_fds[2];
35   PERFETTO_CHECK(pipe(pipe_fds) == 0);
36 
37   // Make the pipe non-blocking so that we never block the waking thread (either
38   // the main thread or another one) when scheduling a wake-up.
39   for (auto fd : pipe_fds) {
40     int flags = fcntl(fd, F_GETFL, 0);
41     PERFETTO_CHECK(flags != -1);
42     PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
43     PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
44   }
45   control_read_.reset(pipe_fds[0]);
46   control_write_.reset(pipe_fds[1]);
47 
48 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX)
49   // We are never expecting to have more than a few bytes in the wake-up pipe.
50   // Reduce the buffer size on Linux. Note that this gets rounded up to the page
51   // size.
52   PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
53 #endif
54 
55   AddFileDescriptorWatch(control_read_.get(), [] {
56     // Not reached -- see PostFileDescriptorWatches().
57     PERFETTO_DCHECK(false);
58   });
59 }
60 
61 UnixTaskRunner::~UnixTaskRunner() = default;
62 
WakeUp()63 void UnixTaskRunner::WakeUp() {
64   const char dummy = 'P';
65   if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
66     PERFETTO_DPLOG("write()");
67 }
68 
Run()69 void UnixTaskRunner::Run() {
70   PERFETTO_DCHECK_THREAD(thread_checker_);
71   quit_ = false;
72   while (true) {
73     int poll_timeout_ms;
74     {
75       std::lock_guard<std::mutex> lock(lock_);
76       if (quit_)
77         return;
78       poll_timeout_ms = GetDelayMsToNextTaskLocked();
79       UpdateWatchTasksLocked();
80     }
81     int ret = PERFETTO_EINTR(poll(
82         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
83     PERFETTO_CHECK(ret >= 0);
84 
85     // To avoid starvation we always interleave all types of tasks -- immediate,
86     // delayed and file descriptor watches.
87     PostFileDescriptorWatches();
88     RunImmediateAndDelayedTask();
89   }
90 }
91 
Quit()92 void UnixTaskRunner::Quit() {
93   {
94     std::lock_guard<std::mutex> lock(lock_);
95     quit_ = true;
96   }
97   WakeUp();
98 }
99 
IsIdleForTesting()100 bool UnixTaskRunner::IsIdleForTesting() {
101   std::lock_guard<std::mutex> lock(lock_);
102   return immediate_tasks_.empty();
103 }
104 
UpdateWatchTasksLocked()105 void UnixTaskRunner::UpdateWatchTasksLocked() {
106   PERFETTO_DCHECK_THREAD(thread_checker_);
107   if (!watch_tasks_changed_)
108     return;
109   watch_tasks_changed_ = false;
110   poll_fds_.clear();
111   for (auto& it : watch_tasks_) {
112     it.second.poll_fd_index = poll_fds_.size();
113     poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
114   }
115 }
116 
RunImmediateAndDelayedTask()117 void UnixTaskRunner::RunImmediateAndDelayedTask() {
118   // TODO(skyostil): Add a separate work queue in case in case locking overhead
119   // becomes an issue.
120   std::function<void()> immediate_task;
121   std::function<void()> delayed_task;
122   TimeMillis now = GetWallTimeMs();
123   {
124     std::lock_guard<std::mutex> lock(lock_);
125     if (!immediate_tasks_.empty()) {
126       immediate_task = std::move(immediate_tasks_.front());
127       immediate_tasks_.pop_front();
128     }
129     if (!delayed_tasks_.empty()) {
130       auto it = delayed_tasks_.begin();
131       if (now >= it->first) {
132         delayed_task = std::move(it->second);
133         delayed_tasks_.erase(it);
134       }
135     }
136   }
137 
138   errno = 0;
139   if (immediate_task)
140     RunTask(immediate_task);
141   errno = 0;
142   if (delayed_task)
143     RunTask(delayed_task);
144 }
145 
PostFileDescriptorWatches()146 void UnixTaskRunner::PostFileDescriptorWatches() {
147   PERFETTO_DCHECK_THREAD(thread_checker_);
148   for (size_t i = 0; i < poll_fds_.size(); i++) {
149     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
150       continue;
151     poll_fds_[i].revents = 0;
152 
153     // The wake-up event is handled inline to avoid an infinite recursion of
154     // posted tasks.
155     if (poll_fds_[i].fd == control_read_.get()) {
156       // Drain the byte(s) written to the wake-up pipe. We can potentially read
157       // more than one byte if several wake-ups have been scheduled.
158       char buffer[16];
159       if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
160           errno != EAGAIN) {
161         PERFETTO_DPLOG("read()");
162       }
163       continue;
164     }
165 
166     // Binding to |this| is safe since we are the only object executing the
167     // task.
168     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
169                        poll_fds_[i].fd));
170 
171     // Make the fd negative while a posted task is pending. This makes poll(2)
172     // ignore the fd.
173     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
174     poll_fds_[i].fd = -poll_fds_[i].fd;
175   }
176 }
177 
RunFileDescriptorWatch(int fd)178 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
179   std::function<void()> task;
180   {
181     std::lock_guard<std::mutex> lock(lock_);
182     auto it = watch_tasks_.find(fd);
183     if (it == watch_tasks_.end())
184       return;
185     // Make poll(2) pay attention to the fd again. Since another thread may have
186     // updated this watch we need to refresh the set first.
187     UpdateWatchTasksLocked();
188     size_t fd_index = it->second.poll_fd_index;
189     PERFETTO_DCHECK(fd_index < poll_fds_.size());
190     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
191     poll_fds_[fd_index].fd = fd;
192     task = it->second.callback;
193   }
194   errno = 0;
195   RunTask(task);
196 }
197 
GetDelayMsToNextTaskLocked() const198 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
199   PERFETTO_DCHECK_THREAD(thread_checker_);
200   if (!immediate_tasks_.empty())
201     return 0;
202   if (!delayed_tasks_.empty()) {
203     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
204     return std::max(0, static_cast<int>(diff.count()));
205   }
206   return -1;
207 }
208 
PostTask(std::function<void ()> task)209 void UnixTaskRunner::PostTask(std::function<void()> task) {
210   bool was_empty;
211   {
212     std::lock_guard<std::mutex> lock(lock_);
213     was_empty = immediate_tasks_.empty();
214     immediate_tasks_.push_back(std::move(task));
215   }
216   if (was_empty)
217     WakeUp();
218 }
219 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)220 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
221                                      uint32_t delay_ms) {
222   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
223   {
224     std::lock_guard<std::mutex> lock(lock_);
225     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
226   }
227   WakeUp();
228 }
229 
AddFileDescriptorWatch(int fd,std::function<void ()> task)230 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
231                                             std::function<void()> task) {
232   PERFETTO_DCHECK(fd >= 0);
233   {
234     std::lock_guard<std::mutex> lock(lock_);
235     PERFETTO_DCHECK(!watch_tasks_.count(fd));
236     watch_tasks_[fd] = {std::move(task), SIZE_MAX};
237     watch_tasks_changed_ = true;
238   }
239   WakeUp();
240 }
241 
RemoveFileDescriptorWatch(int fd)242 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
243   PERFETTO_DCHECK(fd >= 0);
244   {
245     std::lock_guard<std::mutex> lock(lock_);
246     PERFETTO_DCHECK(watch_tasks_.count(fd));
247     watch_tasks_.erase(fd);
248     watch_tasks_changed_ = true;
249   }
250   // No need to schedule a wake-up for this.
251 }
252 
253 }  // namespace base
254 }  // namespace perfetto
255