• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/message_loops/base_message_loop.h>
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #ifndef __APPLE__
13 #include <sys/sysmacros.h>
14 #endif
15 
16 #ifndef __ANDROID_HOST__
17 // Used for MISC_MAJOR. Only required for the target and not always available
18 // for the host.
19 #include <linux/major.h>
20 #endif
21 
22 #include <vector>
23 
24 #include <base/bind.h>
25 #include <base/files/file_path.h>
26 #include <base/files/file_util.h>
27 #include <base/run_loop.h>
28 #include <base/strings/string_number_conversions.h>
29 #include <base/strings/string_split.h>
30 
31 #include <brillo/location_logging.h>
32 #include <brillo/strings/string_utils.h>
33 
34 using base::Closure;
35 
36 namespace {
37 
38 const char kMiscMinorPath[] = "/proc/misc";
39 const char kBinderDriverName[] = "binder";
40 
41 }  // namespace
42 
43 namespace brillo {
44 
45 const int BaseMessageLoop::kInvalidMinor = -1;
46 const int BaseMessageLoop::kUninitializedMinor = -2;
47 
BaseMessageLoop()48 BaseMessageLoop::BaseMessageLoop() {
49   CHECK(!base::MessageLoop::current())
50       << "You can't create a base::MessageLoopForIO when another "
51          "base::MessageLoop is already created for this thread.";
52   owned_base_loop_.reset(new base::MessageLoopForIO);
53   base_loop_ = owned_base_loop_.get();
54 }
55 
BaseMessageLoop(base::MessageLoopForIO * base_loop)56 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
57     : base_loop_(base_loop) {}
58 
~BaseMessageLoop()59 BaseMessageLoop::~BaseMessageLoop() {
60   for (auto& io_task : io_tasks_) {
61     DVLOG_LOC(io_task.second.location(), 1)
62         << "Removing file descriptor watcher task_id " << io_task.first
63         << " leaked on BaseMessageLoop, scheduled from this location.";
64     io_task.second.StopWatching();
65   }
66 
67   // Note all pending canceled delayed tasks when destroying the message loop.
68   size_t lazily_deleted_tasks = 0;
69   for (const auto& delayed_task : delayed_tasks_) {
70     if (delayed_task.second.closure.is_null()) {
71       lazily_deleted_tasks++;
72     } else {
73       DVLOG_LOC(delayed_task.second.location, 1)
74           << "Removing delayed task_id " << delayed_task.first
75           << " leaked on BaseMessageLoop, scheduled from this location.";
76     }
77   }
78   if (lazily_deleted_tasks) {
79     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
80   }
81 }
82 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,base::TimeDelta delay)83 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
84     const tracked_objects::Location& from_here,
85     const Closure &task,
86     base::TimeDelta delay) {
87   TaskId task_id =  NextTaskId();
88   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
89       from_here,
90       base::Bind(&BaseMessageLoop::OnRanPostedTask,
91                  weak_ptr_factory_.GetWeakPtr(),
92                  task_id),
93       delay);
94   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
95                           << " to run in " << delay << ".";
96   if (!base_scheduled)
97     return MessageLoop::kTaskIdNull;
98 
99   delayed_tasks_.emplace(task_id,
100                          DelayedTask{from_here, task_id, std::move(task)});
101   return task_id;
102 }
103 
WatchFileDescriptor(const tracked_objects::Location & from_here,int fd,WatchMode mode,bool persistent,const Closure & task)104 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
105     const tracked_objects::Location& from_here,
106     int fd,
107     WatchMode mode,
108     bool persistent,
109     const Closure &task) {
110   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
111   if (fd < 0)
112     return MessageLoop::kTaskIdNull;
113 
114   base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
115   switch (mode) {
116     case MessageLoop::kWatchRead:
117       base_mode = base::MessageLoopForIO::WATCH_READ;
118       break;
119     case MessageLoop::kWatchWrite:
120       base_mode = base::MessageLoopForIO::WATCH_WRITE;
121       break;
122     default:
123       return MessageLoop::kTaskIdNull;
124   }
125 
126   TaskId task_id =  NextTaskId();
127   auto it_bool = io_tasks_.emplace(
128       std::piecewise_construct,
129       std::forward_as_tuple(task_id),
130       std::forward_as_tuple(
131           from_here, this, task_id, fd, base_mode, persistent, task));
132   // This should always insert a new element.
133   DCHECK(it_bool.second);
134   bool scheduled = it_bool.first->second.StartWatching();
135   DVLOG_LOC(from_here, 1)
136       << "Watching fd " << fd << " for "
137       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
138       << (persistent ? " persistently" : " just once")
139       << " as task_id " << task_id
140       << (scheduled ? " successfully" : " failed.");
141 
142   if (!scheduled) {
143     io_tasks_.erase(task_id);
144     return MessageLoop::kTaskIdNull;
145   }
146 
147 #ifndef __ANDROID_HOST__
148   // Determine if the passed fd is the binder file descriptor. For that, we need
149   // to check that is a special char device and that the major and minor device
150   // numbers match. The binder file descriptor can't be removed and added back
151   // to an epoll group when there's work available to be done by the file
152   // descriptor due to bugs in the binder driver (b/26524111) when used with
153   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
154   // This may cause the binder file descriptor to be attended with higher
155   // priority and cause starvation of other events.
156   struct stat buf;
157   if (fstat(fd, &buf) == 0 &&
158       S_ISCHR(buf.st_mode) &&
159       major(buf.st_rdev) == MISC_MAJOR &&
160       minor(buf.st_rdev) == GetBinderMinor()) {
161     it_bool.first->second.RunImmediately();
162   }
163 #endif
164 
165   return task_id;
166 }
167 
CancelTask(TaskId task_id)168 bool BaseMessageLoop::CancelTask(TaskId task_id) {
169   if (task_id == kTaskIdNull)
170     return false;
171   auto delayed_task_it = delayed_tasks_.find(task_id);
172   if (delayed_task_it == delayed_tasks_.end()) {
173     // This might be an IOTask then.
174     auto io_task_it = io_tasks_.find(task_id);
175     if (io_task_it == io_tasks_.end())
176       return false;
177     return io_task_it->second.CancelTask();
178   }
179   // A DelayedTask was found for this task_id at this point.
180 
181   // Check if the callback was already canceled but we have the entry in
182   // delayed_tasks_ since it didn't fire yet in the message loop.
183   if (delayed_task_it->second.closure.is_null())
184     return false;
185 
186   DVLOG_LOC(delayed_task_it->second.location, 1)
187       << "Removing task_id " << task_id << " scheduled from this location.";
188   // We reset to closure to a null Closure to release all the resources
189   // used by this closure at this point, but we don't remove the task_id from
190   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
191   delayed_task_it->second.closure = Closure();
192 
193   return true;
194 }
195 
RunOnce(bool may_block)196 bool BaseMessageLoop::RunOnce(bool may_block) {
197   run_once_ = true;
198   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
199   base_run_loop_ = &run_loop;
200   if (!may_block)
201     run_loop.RunUntilIdle();
202   else
203     run_loop.Run();
204   base_run_loop_ = nullptr;
205   // If the flag was reset to false, it means a closure was run.
206   if (!run_once_)
207     return true;
208 
209   run_once_ = false;
210   return false;
211 }
212 
Run()213 void BaseMessageLoop::Run() {
214   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
215   base_run_loop_ = &run_loop;
216   run_loop.Run();
217   base_run_loop_ = nullptr;
218 }
219 
BreakLoop()220 void BaseMessageLoop::BreakLoop() {
221   if (base_run_loop_ == nullptr) {
222     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
223     return;  // Message loop not running, nothing to do.
224   }
225   base_run_loop_->Quit();
226 }
227 
QuitClosure() const228 Closure BaseMessageLoop::QuitClosure() const {
229   if (base_run_loop_ == nullptr)
230     return base::Bind(&base::DoNothing);
231   return base_run_loop_->QuitClosure();
232 }
233 
NextTaskId()234 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
235   TaskId res;
236   do {
237     res = ++last_id_;
238     // We would run out of memory before we run out of task ids.
239   } while (!res ||
240            delayed_tasks_.find(res) != delayed_tasks_.end() ||
241            io_tasks_.find(res) != io_tasks_.end());
242   return res;
243 }
244 
OnRanPostedTask(MessageLoop::TaskId task_id)245 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
246   auto task_it = delayed_tasks_.find(task_id);
247   DCHECK(task_it != delayed_tasks_.end());
248   if (!task_it->second.closure.is_null()) {
249     DVLOG_LOC(task_it->second.location, 1)
250         << "Running delayed task_id " << task_id
251         << " scheduled from this location.";
252     // Mark the task as canceled while we are running it so CancelTask returns
253     // false.
254     Closure closure = std::move(task_it->second.closure);
255     task_it->second.closure = Closure();
256     closure.Run();
257 
258     // If the |run_once_| flag is set, it is because we are instructed to run
259     // only once callback.
260     if (run_once_) {
261       run_once_ = false;
262       BreakLoop();
263     }
264   }
265   delayed_tasks_.erase(task_it);
266 }
267 
OnFileReadyPostedTask(MessageLoop::TaskId task_id)268 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
269   auto task_it = io_tasks_.find(task_id);
270   // Even if this task was canceled while we were waiting in the message loop
271   // for this method to run, the entry in io_tasks_ should still be present, but
272   // won't do anything.
273   DCHECK(task_it != io_tasks_.end());
274   task_it->second.OnFileReadyPostedTask();
275 }
276 
ParseBinderMinor(const std::string & file_contents)277 int BaseMessageLoop::ParseBinderMinor(
278     const std::string& file_contents) {
279   int result = kInvalidMinor;
280   // Split along '\n', then along the ' '. Note that base::SplitString trims all
281   // white spaces at the beginning and end after splitting.
282   std::vector<std::string> lines =
283       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
284                         base::SPLIT_WANT_ALL);
285   for (const std::string& line : lines) {
286     if (line.empty())
287       continue;
288     std::string number;
289     std::string name;
290     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
291       continue;
292 
293     if (name == kBinderDriverName && base::StringToInt(number, &result))
294       break;
295   }
296   return result;
297 }
298 
GetBinderMinor()299 unsigned int BaseMessageLoop::GetBinderMinor() {
300   if (binder_minor_ != kUninitializedMinor)
301     return binder_minor_;
302 
303   std::string proc_misc;
304   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
305     return binder_minor_;
306   binder_minor_ = ParseBinderMinor(proc_misc);
307   return binder_minor_;
308 }
309 
IOTask(const tracked_objects::Location & location,BaseMessageLoop * loop,MessageLoop::TaskId task_id,int fd,base::MessageLoopForIO::Mode base_mode,bool persistent,const Closure & task)310 BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
311                                 BaseMessageLoop* loop,
312                                 MessageLoop::TaskId task_id,
313                                 int fd,
314                                 base::MessageLoopForIO::Mode base_mode,
315                                 bool persistent,
316                                 const Closure& task)
317     : location_(location), loop_(loop), task_id_(task_id),
318       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
319 
StartWatching()320 bool BaseMessageLoop::IOTask::StartWatching() {
321   return loop_->base_loop_->WatchFileDescriptor(
322       fd_, persistent_, base_mode_, &fd_watcher_, this);
323 }
324 
StopWatching()325 void BaseMessageLoop::IOTask::StopWatching() {
326   // This is safe to call even if we are not watching for it.
327   fd_watcher_.StopWatchingFileDescriptor();
328 }
329 
OnFileCanReadWithoutBlocking(int)330 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
331   OnFileReady();
332 }
333 
OnFileCanWriteWithoutBlocking(int)334 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
335   OnFileReady();
336 }
337 
OnFileReady()338 void BaseMessageLoop::IOTask::OnFileReady() {
339   // For file descriptors marked with the immediate_run flag, we don't call
340   // StopWatching() and wait, instead we dispatch the callback immediately.
341   if (immediate_run_) {
342     posted_task_pending_ = true;
343     OnFileReadyPostedTask();
344     return;
345   }
346 
347   // When the file descriptor becomes available we stop watching for it and
348   // schedule a task to run the callback from the main loop. The callback will
349   // run using the same scheduler used to run other delayed tasks, avoiding
350   // starvation of the available posted tasks if there are file descriptors
351   // always available. The new posted task will use the same TaskId as the
352   // current file descriptor watching task an could be canceled in either state,
353   // when waiting for the file descriptor or waiting in the main loop.
354   StopWatching();
355   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
356       location_,
357       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
358                  loop_->weak_ptr_factory_.GetWeakPtr(),
359                  task_id_));
360   posted_task_pending_ = true;
361   if (base_scheduled) {
362     DVLOG_LOC(location_, 1)
363         << "Dispatching task_id " << task_id_ << " for "
364         << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
365             "reading" : "writing")
366         << " file descriptor " << fd_ << ", scheduled from this location.";
367   } else {
368     // In the rare case that PostTask() fails, we fall back to run it directly.
369     // This would indicate a bigger problem with the message loop setup.
370     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
371     OnFileReadyPostedTask();
372   }
373 }
374 
OnFileReadyPostedTask()375 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
376   // We can't access |this| after running the |closure_| since it could call
377   // CancelTask on its own task_id, so we copy the members we need now.
378   BaseMessageLoop* loop_ptr = loop_;
379   DCHECK(posted_task_pending_ = true);
380   posted_task_pending_ = false;
381 
382   // If this task was already canceled, the closure will be null and there is
383   // nothing else to do here. This execution doesn't count a step for RunOnce()
384   // unless we have a callback to run.
385   if (closure_.is_null()) {
386     loop_->io_tasks_.erase(task_id_);
387     return;
388   }
389 
390   DVLOG_LOC(location_, 1)
391       << "Running task_id " << task_id_ << " for "
392       << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
393           "reading" : "writing")
394       << " file descriptor " << fd_ << ", scheduled from this location.";
395 
396   if (persistent_) {
397     // In the persistent case we just run the callback. If this callback cancels
398     // the task id, we can't access |this| anymore, so we re-start watching the
399     // file descriptor before running the callback, unless this is a fd where
400     // we didn't stop watching the file descriptor when it became available.
401     if (!immediate_run_)
402       StartWatching();
403     closure_.Run();
404   } else {
405     // This will destroy |this|, the fd_watcher and therefore stop watching this
406     // file descriptor.
407     Closure closure_copy = std::move(closure_);
408     loop_->io_tasks_.erase(task_id_);
409     // Run the closure from the local copy we just made.
410     closure_copy.Run();
411   }
412 
413   if (loop_ptr->run_once_) {
414     loop_ptr->run_once_ = false;
415     loop_ptr->BreakLoop();
416   }
417 }
418 
CancelTask()419 bool BaseMessageLoop::IOTask::CancelTask() {
420   if (closure_.is_null())
421     return false;
422 
423   DVLOG_LOC(location_, 1)
424       << "Removing task_id " << task_id_ << " scheduled from this location.";
425 
426   if (!posted_task_pending_) {
427     // Destroying the FileDescriptorWatcher implicitly stops watching the file
428     // descriptor. This will delete our instance.
429     loop_->io_tasks_.erase(task_id_);
430     return true;
431   }
432   // The IOTask is waiting for the message loop to run its delayed task, so
433   // it is not watching for the file descriptor. We release the closure
434   // resources now but keep the IOTask instance alive while we wait for the
435   // callback to run and delete the IOTask.
436   closure_ = Closure();
437   return true;
438 }
439 
440 }  // namespace brillo
441