• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/build_config.h"
18 
19 #include "perfetto/ext/base/platform.h"
20 #include "perfetto/ext/base/unix_task_runner.h"
21 
22 #include <errno.h>
23 #include <stdlib.h>
24 
25 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
26 #include <Windows.h>
27 #include <synchapi.h>
28 #else
29 #include <unistd.h>
30 #endif
31 
32 #include <algorithm>
33 #include <limits>
34 
35 #include "perfetto/ext/base/watchdog.h"
36 
37 namespace perfetto {
38 namespace base {
39 
UnixTaskRunner()40 UnixTaskRunner::UnixTaskRunner() {
41   AddFileDescriptorWatch(event_.fd(), [] {
42     // Not reached -- see PostFileDescriptorWatches().
43     PERFETTO_DFATAL("Should be unreachable.");
44   });
45 }
46 
47 UnixTaskRunner::~UnixTaskRunner() = default;
48 
WakeUp()49 void UnixTaskRunner::WakeUp() {
50   event_.Notify();
51 }
52 
Run()53 void UnixTaskRunner::Run() {
54   PERFETTO_DCHECK_THREAD(thread_checker_);
55   created_thread_id_ = GetThreadId();
56   quit_ = false;
57   for (;;) {
58     int poll_timeout_ms;
59     {
60       std::lock_guard<std::mutex> lock(lock_);
61       if (quit_)
62         return;
63       poll_timeout_ms = GetDelayMsToNextTaskLocked();
64       UpdateWatchTasksLocked();
65     }
66 
67 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
68     DWORD timeout =
69         poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
70     DWORD ret =
71         WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
72                                &poll_fds_[0], /*bWaitAll=*/false, timeout);
73     // Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
74     // set, even when >1 is signalled. In order to avoid starvation,
75     // PostFileDescriptorWatches() will WaitForSingleObject() each other handle
76     // to ensure fairness. |ret| here is passed just to avoid an extra
77     // WaitForSingleObject() for the one handle that WaitForMultipleObject()
78     // returned.
79     PostFileDescriptorWatches(ret);
80 #else
81     platform::BeforeMaybeBlockingSyscall();
82     int ret = PERFETTO_EINTR(poll(
83         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
84     platform::AfterMaybeBlockingSyscall();
85     PERFETTO_CHECK(ret >= 0);
86     PostFileDescriptorWatches(0 /*ignored*/);
87 #endif
88 
89     // To avoid starvation we always interleave all types of tasks -- immediate,
90     // delayed and file descriptor watches.
91     RunImmediateAndDelayedTask();
92   }
93 }
94 
Quit()95 void UnixTaskRunner::Quit() {
96   std::lock_guard<std::mutex> lock(lock_);
97   quit_ = true;
98   WakeUp();
99 }
100 
QuitCalled()101 bool UnixTaskRunner::QuitCalled() {
102   std::lock_guard<std::mutex> lock(lock_);
103   return quit_;
104 }
105 
IsIdleForTesting()106 bool UnixTaskRunner::IsIdleForTesting() {
107   std::lock_guard<std::mutex> lock(lock_);
108   return immediate_tasks_.empty();
109 }
110 
UpdateWatchTasksLocked()111 void UnixTaskRunner::UpdateWatchTasksLocked() {
112   PERFETTO_DCHECK_THREAD(thread_checker_);
113 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
114   if (!watch_tasks_changed_)
115     return;
116   watch_tasks_changed_ = false;
117 #endif
118   poll_fds_.clear();
119   for (auto& it : watch_tasks_) {
120     PlatformHandle handle = it.first;
121     WatchTask& watch_task = it.second;
122 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
123     if (!watch_task.pending)
124       poll_fds_.push_back(handle);
125 #else
126     watch_task.poll_fd_index = poll_fds_.size();
127     poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
128 #endif
129   }
130 }
131 
RunImmediateAndDelayedTask()132 void UnixTaskRunner::RunImmediateAndDelayedTask() {
133   // If locking overhead becomes an issue, add a separate work queue.
134   std::function<void()> immediate_task;
135   std::function<void()> delayed_task;
136   TimeMillis now = GetWallTimeMs();
137   {
138     std::lock_guard<std::mutex> lock(lock_);
139     if (!immediate_tasks_.empty()) {
140       immediate_task = std::move(immediate_tasks_.front());
141       immediate_tasks_.pop_front();
142     }
143     if (!delayed_tasks_.empty()) {
144       auto it = delayed_tasks_.begin();
145       if (now >= it->first) {
146         delayed_task = std::move(it->second);
147         delayed_tasks_.erase(it);
148       }
149     }
150   }
151 
152   errno = 0;
153   if (immediate_task)
154     RunTaskWithWatchdogGuard(immediate_task);
155   errno = 0;
156   if (delayed_task)
157     RunTaskWithWatchdogGuard(delayed_task);
158 }
159 
PostFileDescriptorWatches(uint64_t windows_wait_result)160 void UnixTaskRunner::PostFileDescriptorWatches(uint64_t windows_wait_result) {
161   PERFETTO_DCHECK_THREAD(thread_checker_);
162   for (size_t i = 0; i < poll_fds_.size(); i++) {
163 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
164     const PlatformHandle handle = poll_fds_[i];
165     // |windows_wait_result| is the result of WaitForMultipleObjects() call. If
166     // one of the objects was signalled, it will have a value between
167     // [0, poll_fds_.size()].
168     if (i != windows_wait_result &&
169         WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
170       continue;
171     }
172 #else
173     base::ignore_result(windows_wait_result);
174     const PlatformHandle handle = poll_fds_[i].fd;
175     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
176       continue;
177     poll_fds_[i].revents = 0;
178 #endif
179 
180     // The wake-up event is handled inline to avoid an infinite recursion of
181     // posted tasks.
182     if (handle == event_.fd()) {
183       event_.Clear();
184       continue;
185     }
186 
187     // Binding to |this| is safe since we are the only object executing the
188     // task.
189     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
190 
191     // Flag the task as pending.
192 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
193     // On Windows this is done by marking the WatchTask entry as pending. This
194     // is more expensive than Linux as requires rebuilding the |poll_fds_|
195     // vector on each call. There doesn't seem to be a good alternative though.
196     auto it = watch_tasks_.find(handle);
197     PERFETTO_CHECK(it != watch_tasks_.end());
198     PERFETTO_DCHECK(!it->second.pending);
199     it->second.pending = true;
200 #else
201     // On UNIX systems instead, we just make the fd negative while its task is
202     // pending. This makes poll(2) ignore the fd.
203     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
204     poll_fds_[i].fd = -poll_fds_[i].fd;
205 #endif
206   }
207 }
208 
RunFileDescriptorWatch(PlatformHandle fd)209 void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
210   std::function<void()> task;
211   {
212     std::lock_guard<std::mutex> lock(lock_);
213     auto it = watch_tasks_.find(fd);
214     if (it == watch_tasks_.end())
215       return;
216     WatchTask& watch_task = it->second;
217 
218     // Make poll(2) pay attention to the fd again. Since another thread may have
219     // updated this watch we need to refresh the set first.
220     UpdateWatchTasksLocked();
221 
222 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
223     // On Windows we manually track the presence of outstanding tasks for the
224     // watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
225     // task to the |poll_fds_| vector.
226     PERFETTO_DCHECK(watch_task.pending);
227     watch_task.pending = false;
228 #else
229     size_t fd_index = watch_task.poll_fd_index;
230     PERFETTO_DCHECK(fd_index < poll_fds_.size());
231     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
232     poll_fds_[fd_index].fd = fd;
233 #endif
234     task = watch_task.callback;
235   }
236   errno = 0;
237   RunTaskWithWatchdogGuard(task);
238 }
239 
GetDelayMsToNextTaskLocked() const240 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
241   PERFETTO_DCHECK_THREAD(thread_checker_);
242   if (!immediate_tasks_.empty())
243     return 0;
244   if (!delayed_tasks_.empty()) {
245     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
246     return std::max(0, static_cast<int>(diff.count()));
247   }
248   return -1;
249 }
250 
PostTask(std::function<void ()> task)251 void UnixTaskRunner::PostTask(std::function<void()> task) {
252   bool was_empty;
253   {
254     std::lock_guard<std::mutex> lock(lock_);
255     was_empty = immediate_tasks_.empty();
256     immediate_tasks_.push_back(std::move(task));
257   }
258   if (was_empty)
259     WakeUp();
260 }
261 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)262 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
263                                      uint32_t delay_ms) {
264   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
265   {
266     std::lock_guard<std::mutex> lock(lock_);
267     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
268   }
269   WakeUp();
270 }
271 
AddFileDescriptorWatch(PlatformHandle fd,std::function<void ()> task)272 void UnixTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
273                                             std::function<void()> task) {
274   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
275   {
276     std::lock_guard<std::mutex> lock(lock_);
277     PERFETTO_DCHECK(!watch_tasks_.count(fd));
278     WatchTask& watch_task = watch_tasks_[fd];
279     watch_task.callback = std::move(task);
280 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
281     watch_task.pending = false;
282 #else
283     watch_task.poll_fd_index = SIZE_MAX;
284 #endif
285     watch_tasks_changed_ = true;
286   }
287   WakeUp();
288 }
289 
RemoveFileDescriptorWatch(PlatformHandle fd)290 void UnixTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
291   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
292   {
293     std::lock_guard<std::mutex> lock(lock_);
294     PERFETTO_DCHECK(watch_tasks_.count(fd));
295     watch_tasks_.erase(fd);
296     watch_tasks_changed_ = true;
297   }
298   // No need to schedule a wake-up for this.
299 }
300 
RunsTasksOnCurrentThread() const301 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
302   return GetThreadId() == created_thread_id_;
303 }
304 
305 }  // namespace base
306 }  // namespace perfetto
307