1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "platform/impl/task_runner.h"
6
7 #include <csignal>
8 #include <thread>
9
10 #include "util/osp_logging.h"
11
12 namespace openscreen {
13
14 namespace {
15
16 // This is mutated by the signal handler installed by RunUntilSignaled(), and is
17 // checked by RunUntilStopped().
18 //
19 // Per the C++14 spec, passing visible changes to memory between a signal
20 // handler and a program thread must be done through a volatile variable.
21 volatile enum {
22 kNotRunning,
23 kNotSignaled,
24 kSignaled
25 } g_signal_state = kNotRunning;
26
OnReceivedSignal(int signal)27 void OnReceivedSignal(int signal) {
28 g_signal_state = kSignaled;
29 }
30
31 } // namespace
32
TaskRunnerImpl(ClockNowFunctionPtr now_function,TaskWaiter * event_waiter,Clock::duration waiter_timeout)33 TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
34 TaskWaiter* event_waiter,
35 Clock::duration waiter_timeout)
36 : now_function_(now_function),
37 is_running_(false),
38 task_waiter_(event_waiter),
39 waiter_timeout_(waiter_timeout) {}
40
~TaskRunnerImpl()41 TaskRunnerImpl::~TaskRunnerImpl() {
42 // Ensure no thread is currently executing inside RunUntilStopped().
43 OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
44 }
45
PostPackagedTask(Task task)46 void TaskRunnerImpl::PostPackagedTask(Task task) {
47 std::lock_guard<std::mutex> lock(task_mutex_);
48 tasks_.emplace_back(std::move(task));
49 if (task_waiter_) {
50 task_waiter_->OnTaskPosted();
51 } else {
52 run_loop_wakeup_.notify_one();
53 }
54 }
55
PostPackagedTaskWithDelay(Task task,Clock::duration delay)56 void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
57 Clock::duration delay) {
58 std::lock_guard<std::mutex> lock(task_mutex_);
59 if (delay <= Clock::duration::zero()) {
60 tasks_.emplace_back(std::move(task));
61 } else {
62 delayed_tasks_.emplace(
63 std::make_pair(now_function_() + delay, std::move(task)));
64 }
65 if (task_waiter_) {
66 task_waiter_->OnTaskPosted();
67 } else {
68 run_loop_wakeup_.notify_one();
69 }
70 }
71
IsRunningOnTaskRunner()72 bool TaskRunnerImpl::IsRunningOnTaskRunner() {
73 return task_runner_thread_id_ == std::this_thread::get_id();
74 }
75
RunUntilStopped()76 void TaskRunnerImpl::RunUntilStopped() {
77 OSP_DCHECK(!is_running_);
78 task_runner_thread_id_ = std::this_thread::get_id();
79 is_running_ = true;
80
81 OSP_DVLOG << "Running tasks until stopped...";
82 // Main loop: Run until the |is_running_| flag is set back to false by the
83 // "quit task" posted by RequestStopSoon(), or the process received a
84 // termination signal.
85 while (is_running_) {
86 ScheduleDelayedTasks();
87 if (GrabMoreRunnableTasks()) {
88 RunRunnableTasks();
89 }
90 if (g_signal_state == kSignaled) {
91 is_running_ = false;
92 }
93 }
94
95 OSP_DVLOG << "Finished running, entering flushing phase...";
96 // Flushing phase: Ensure all immediately-runnable tasks are run before
97 // returning. Since running some tasks might cause more immediately-runnable
98 // tasks to be posted, loop until there is no more work.
99 //
100 // If there is bad code that posts tasks indefinitely, this loop will never
101 // break. However, that also means there is a code path spinning a CPU core at
102 // 100% all the time. Rather than mitigate this problem scenario, purposely
103 // let it manifest here in the hopes that unit testing will reveal it (e.g., a
104 // unit test that never finishes running).
105 while (GrabMoreRunnableTasks()) {
106 RunRunnableTasks();
107 }
108 OSP_DVLOG << "Finished flushing...";
109 task_runner_thread_id_ = std::thread::id();
110 }
111
RunUntilSignaled()112 void TaskRunnerImpl::RunUntilSignaled() {
113 OSP_CHECK_EQ(g_signal_state, kNotRunning)
114 << __func__ << " may not be invoked concurrently.";
115 g_signal_state = kNotSignaled;
116 const auto old_sigint_handler = std::signal(SIGINT, &OnReceivedSignal);
117 const auto old_sigterm_handler = std::signal(SIGTERM, &OnReceivedSignal);
118
119 RunUntilStopped();
120
121 std::signal(SIGINT, old_sigint_handler);
122 std::signal(SIGTERM, old_sigterm_handler);
123 OSP_DVLOG << "Received SIGNIT or SIGTERM, setting state to not running...";
124 g_signal_state = kNotRunning;
125 }
126
RequestStopSoon()127 void TaskRunnerImpl::RequestStopSoon() {
128 PostTask([this]() { is_running_ = false; });
129 }
130
RunRunnableTasks()131 void TaskRunnerImpl::RunRunnableTasks() {
132 for (TaskWithMetadata& running_task : running_tasks_) {
133 // Move the task to the stack so that its bound state is freed immediately
134 // after being run.
135 TaskWithMetadata task = std::move(running_task);
136 task();
137 }
138 running_tasks_.clear();
139 }
140
ScheduleDelayedTasks()141 void TaskRunnerImpl::ScheduleDelayedTasks() {
142 std::lock_guard<std::mutex> lock(task_mutex_);
143
144 // Getting the time can be expensive on some platforms, so only get it once.
145 const auto current_time = now_function_();
146 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
147 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
148 tasks_.push_back(std::move(it->second));
149 }
150 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
151 }
152
GrabMoreRunnableTasks()153 bool TaskRunnerImpl::GrabMoreRunnableTasks() {
154 OSP_DCHECK(running_tasks_.empty());
155
156 std::unique_lock<std::mutex> lock(task_mutex_);
157 if (!tasks_.empty()) {
158 running_tasks_.swap(tasks_);
159 return true;
160 }
161
162 if (!is_running_) {
163 return false; // Stop was requested. Don't wait for more tasks.
164 }
165
166 if (task_waiter_) {
167 Clock::duration timeout = waiter_timeout_;
168 if (!delayed_tasks_.empty()) {
169 Clock::duration next_task_delta =
170 delayed_tasks_.begin()->first - now_function_();
171 if (next_task_delta < timeout) {
172 timeout = next_task_delta;
173 }
174 }
175 lock.unlock();
176 task_waiter_->WaitForTaskToBePosted(timeout);
177 return false;
178 }
179
180 if (delayed_tasks_.empty()) {
181 run_loop_wakeup_.wait(lock);
182 } else {
183 run_loop_wakeup_.wait_for(lock,
184 delayed_tasks_.begin()->first - now_function_());
185 }
186 return false;
187 }
188
189 } // namespace openscreen
190