• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "platform/impl/task_runner.h"
6 
7 #include <csignal>
8 #include <thread>
9 
10 #include "util/osp_logging.h"
11 
12 namespace openscreen {
13 
14 namespace {
15 
16 // This is mutated by the signal handler installed by RunUntilSignaled(), and is
17 // checked by RunUntilStopped().
18 //
19 // Per the C++14 spec, passing visible changes to memory between a signal
20 // handler and a program thread must be done through a volatile variable.
21 volatile enum {
22   kNotRunning,
23   kNotSignaled,
24   kSignaled
25 } g_signal_state = kNotRunning;
26 
OnReceivedSignal(int signal)27 void OnReceivedSignal(int signal) {
28   g_signal_state = kSignaled;
29 }
30 
31 }  // namespace
32 
TaskRunnerImpl(ClockNowFunctionPtr now_function,TaskWaiter * event_waiter,Clock::duration waiter_timeout)33 TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
34                                TaskWaiter* event_waiter,
35                                Clock::duration waiter_timeout)
36     : now_function_(now_function),
37       is_running_(false),
38       task_waiter_(event_waiter),
39       waiter_timeout_(waiter_timeout) {}
40 
~TaskRunnerImpl()41 TaskRunnerImpl::~TaskRunnerImpl() {
42   // Ensure no thread is currently executing inside RunUntilStopped().
43   OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
44 }
45 
PostPackagedTask(Task task)46 void TaskRunnerImpl::PostPackagedTask(Task task) {
47   std::lock_guard<std::mutex> lock(task_mutex_);
48   tasks_.emplace_back(std::move(task));
49   if (task_waiter_) {
50     task_waiter_->OnTaskPosted();
51   } else {
52     run_loop_wakeup_.notify_one();
53   }
54 }
55 
PostPackagedTaskWithDelay(Task task,Clock::duration delay)56 void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
57                                                Clock::duration delay) {
58   std::lock_guard<std::mutex> lock(task_mutex_);
59   if (delay <= Clock::duration::zero()) {
60     tasks_.emplace_back(std::move(task));
61   } else {
62     delayed_tasks_.emplace(
63         std::make_pair(now_function_() + delay, std::move(task)));
64   }
65   if (task_waiter_) {
66     task_waiter_->OnTaskPosted();
67   } else {
68     run_loop_wakeup_.notify_one();
69   }
70 }
71 
IsRunningOnTaskRunner()72 bool TaskRunnerImpl::IsRunningOnTaskRunner() {
73   return task_runner_thread_id_ == std::this_thread::get_id();
74 }
75 
RunUntilStopped()76 void TaskRunnerImpl::RunUntilStopped() {
77   OSP_DCHECK(!is_running_);
78   task_runner_thread_id_ = std::this_thread::get_id();
79   is_running_ = true;
80 
81   OSP_DVLOG << "Running tasks until stopped...";
82   // Main loop: Run until the |is_running_| flag is set back to false by the
83   // "quit task" posted by RequestStopSoon(), or the process received a
84   // termination signal.
85   while (is_running_) {
86     ScheduleDelayedTasks();
87     if (GrabMoreRunnableTasks()) {
88       RunRunnableTasks();
89     }
90     if (g_signal_state == kSignaled) {
91       is_running_ = false;
92     }
93   }
94 
95   OSP_DVLOG << "Finished running, entering flushing phase...";
96   // Flushing phase: Ensure all immediately-runnable tasks are run before
97   // returning. Since running some tasks might cause more immediately-runnable
98   // tasks to be posted, loop until there is no more work.
99   //
100   // If there is bad code that posts tasks indefinitely, this loop will never
101   // break. However, that also means there is a code path spinning a CPU core at
102   // 100% all the time. Rather than mitigate this problem scenario, purposely
103   // let it manifest here in the hopes that unit testing will reveal it (e.g., a
104   // unit test that never finishes running).
105   while (GrabMoreRunnableTasks()) {
106     RunRunnableTasks();
107   }
108   OSP_DVLOG << "Finished flushing...";
109   task_runner_thread_id_ = std::thread::id();
110 }
111 
RunUntilSignaled()112 void TaskRunnerImpl::RunUntilSignaled() {
113   OSP_CHECK_EQ(g_signal_state, kNotRunning)
114       << __func__ << " may not be invoked concurrently.";
115   g_signal_state = kNotSignaled;
116   const auto old_sigint_handler = std::signal(SIGINT, &OnReceivedSignal);
117   const auto old_sigterm_handler = std::signal(SIGTERM, &OnReceivedSignal);
118 
119   RunUntilStopped();
120 
121   std::signal(SIGINT, old_sigint_handler);
122   std::signal(SIGTERM, old_sigterm_handler);
123   OSP_DVLOG << "Received SIGNIT or SIGTERM, setting state to not running...";
124   g_signal_state = kNotRunning;
125 }
126 
RequestStopSoon()127 void TaskRunnerImpl::RequestStopSoon() {
128   PostTask([this]() { is_running_ = false; });
129 }
130 
RunRunnableTasks()131 void TaskRunnerImpl::RunRunnableTasks() {
132   OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
133   for (TaskWithMetadata& running_task : running_tasks_) {
134     // Move the task to the stack so that its bound state is freed immediately
135     // after being run.
136     TaskWithMetadata task = std::move(running_task);
137     task();
138   }
139   running_tasks_.clear();
140 }
141 
ScheduleDelayedTasks()142 void TaskRunnerImpl::ScheduleDelayedTasks() {
143   std::lock_guard<std::mutex> lock(task_mutex_);
144 
145   // Getting the time can be expensive on some platforms, so only get it once.
146   const auto current_time = now_function_();
147   const auto end_of_range = delayed_tasks_.upper_bound(current_time);
148   for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
149     tasks_.push_back(std::move(it->second));
150   }
151   delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
152 }
153 
GrabMoreRunnableTasks()154 bool TaskRunnerImpl::GrabMoreRunnableTasks() {
155   OSP_DCHECK(running_tasks_.empty());
156 
157   std::unique_lock<std::mutex> lock(task_mutex_);
158   if (!tasks_.empty()) {
159     running_tasks_.swap(tasks_);
160     return true;
161   }
162 
163   if (!is_running_) {
164     return false;  // Stop was requested. Don't wait for more tasks.
165   }
166 
167   if (task_waiter_) {
168     Clock::duration timeout = waiter_timeout_;
169     if (!delayed_tasks_.empty()) {
170       Clock::duration next_task_delta =
171           delayed_tasks_.begin()->first - now_function_();
172       if (next_task_delta < timeout) {
173         timeout = next_task_delta;
174       }
175     }
176     lock.unlock();
177     task_waiter_->WaitForTaskToBePosted(timeout);
178     return false;
179   }
180 
181   if (delayed_tasks_.empty()) {
182     run_loop_wakeup_.wait(lock);
183   } else {
184     run_loop_wakeup_.wait_for(lock,
185                               delayed_tasks_.begin()->first - now_function_());
186   }
187   return false;
188 }
189 
190 }  // namespace openscreen
191