• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "platform/impl/task_runner.h"
6 
7 #include <csignal>
8 #include <thread>
9 
10 #include "util/osp_logging.h"
11 
12 namespace openscreen {
13 
14 namespace {
15 
16 // This is mutated by the signal handler installed by RunUntilSignaled(), and is
17 // checked by RunUntilStopped().
18 //
19 // Per the C++14 spec, passing visible changes to memory between a signal
20 // handler and a program thread must be done through a volatile variable.
21 volatile enum {
22   kNotRunning,
23   kNotSignaled,
24   kSignaled
25 } g_signal_state = kNotRunning;
26 
OnReceivedSignal(int signal)27 void OnReceivedSignal(int signal) {
28   g_signal_state = kSignaled;
29 }
30 
31 }  // namespace
32 
TaskRunnerImpl(ClockNowFunctionPtr now_function,TaskWaiter * event_waiter,Clock::duration waiter_timeout)33 TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
34                                TaskWaiter* event_waiter,
35                                Clock::duration waiter_timeout)
36     : now_function_(now_function),
37       is_running_(false),
38       task_waiter_(event_waiter),
39       waiter_timeout_(waiter_timeout) {}
40 
~TaskRunnerImpl()41 TaskRunnerImpl::~TaskRunnerImpl() {
42   // Ensure no thread is currently executing inside RunUntilStopped().
43   OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
44 }
45 
PostPackagedTask(Task task)46 void TaskRunnerImpl::PostPackagedTask(Task task) {
47   std::lock_guard<std::mutex> lock(task_mutex_);
48   tasks_.emplace_back(std::move(task));
49   if (task_waiter_) {
50     task_waiter_->OnTaskPosted();
51   } else {
52     run_loop_wakeup_.notify_one();
53   }
54 }
55 
PostPackagedTaskWithDelay(Task task,Clock::duration delay)56 void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
57                                                Clock::duration delay) {
58   std::lock_guard<std::mutex> lock(task_mutex_);
59   if (delay <= Clock::duration::zero()) {
60     tasks_.emplace_back(std::move(task));
61   } else {
62     delayed_tasks_.emplace(
63         std::make_pair(now_function_() + delay, std::move(task)));
64   }
65   if (task_waiter_) {
66     task_waiter_->OnTaskPosted();
67   } else {
68     run_loop_wakeup_.notify_one();
69   }
70 }
71 
IsRunningOnTaskRunner()72 bool TaskRunnerImpl::IsRunningOnTaskRunner() {
73   return task_runner_thread_id_ == std::this_thread::get_id();
74 }
75 
RunUntilStopped()76 void TaskRunnerImpl::RunUntilStopped() {
77   OSP_DCHECK(!is_running_);
78   task_runner_thread_id_ = std::this_thread::get_id();
79   is_running_ = true;
80 
81   OSP_DVLOG << "Running tasks until stopped...";
82   // Main loop: Run until the |is_running_| flag is set back to false by the
83   // "quit task" posted by RequestStopSoon(), or the process received a
84   // termination signal.
85   while (is_running_) {
86     ScheduleDelayedTasks();
87     if (GrabMoreRunnableTasks()) {
88       RunRunnableTasks();
89     }
90     if (g_signal_state == kSignaled) {
91       is_running_ = false;
92     }
93   }
94 
95   OSP_DVLOG << "Finished running, entering flushing phase...";
96   // Flushing phase: Ensure all immediately-runnable tasks are run before
97   // returning. Since running some tasks might cause more immediately-runnable
98   // tasks to be posted, loop until there is no more work.
99   //
100   // If there is bad code that posts tasks indefinitely, this loop will never
101   // break. However, that also means there is a code path spinning a CPU core at
102   // 100% all the time. Rather than mitigate this problem scenario, purposely
103   // let it manifest here in the hopes that unit testing will reveal it (e.g., a
104   // unit test that never finishes running).
105   while (GrabMoreRunnableTasks()) {
106     RunRunnableTasks();
107   }
108   OSP_DVLOG << "Finished flushing...";
109   task_runner_thread_id_ = std::thread::id();
110 }
111 
RunUntilSignaled()112 void TaskRunnerImpl::RunUntilSignaled() {
113   OSP_CHECK_EQ(g_signal_state, kNotRunning)
114       << __func__ << " may not be invoked concurrently.";
115   g_signal_state = kNotSignaled;
116   const auto old_sigint_handler = std::signal(SIGINT, &OnReceivedSignal);
117   const auto old_sigterm_handler = std::signal(SIGTERM, &OnReceivedSignal);
118 
119   RunUntilStopped();
120 
121   std::signal(SIGINT, old_sigint_handler);
122   std::signal(SIGTERM, old_sigterm_handler);
123   OSP_DVLOG << "Received SIGNIT or SIGTERM, setting state to not running...";
124   g_signal_state = kNotRunning;
125 }
126 
RequestStopSoon()127 void TaskRunnerImpl::RequestStopSoon() {
128   PostTask([this]() { is_running_ = false; });
129 }
130 
RunRunnableTasks()131 void TaskRunnerImpl::RunRunnableTasks() {
132   for (TaskWithMetadata& running_task : running_tasks_) {
133     // Move the task to the stack so that its bound state is freed immediately
134     // after being run.
135     TaskWithMetadata task = std::move(running_task);
136     task();
137   }
138   running_tasks_.clear();
139 }
140 
ScheduleDelayedTasks()141 void TaskRunnerImpl::ScheduleDelayedTasks() {
142   std::lock_guard<std::mutex> lock(task_mutex_);
143 
144   // Getting the time can be expensive on some platforms, so only get it once.
145   const auto current_time = now_function_();
146   const auto end_of_range = delayed_tasks_.upper_bound(current_time);
147   for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
148     tasks_.push_back(std::move(it->second));
149   }
150   delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
151 }
152 
GrabMoreRunnableTasks()153 bool TaskRunnerImpl::GrabMoreRunnableTasks() {
154   OSP_DCHECK(running_tasks_.empty());
155 
156   std::unique_lock<std::mutex> lock(task_mutex_);
157   if (!tasks_.empty()) {
158     running_tasks_.swap(tasks_);
159     return true;
160   }
161 
162   if (!is_running_) {
163     return false;  // Stop was requested. Don't wait for more tasks.
164   }
165 
166   if (task_waiter_) {
167     Clock::duration timeout = waiter_timeout_;
168     if (!delayed_tasks_.empty()) {
169       Clock::duration next_task_delta =
170           delayed_tasks_.begin()->first - now_function_();
171       if (next_task_delta < timeout) {
172         timeout = next_task_delta;
173       }
174     }
175     lock.unlock();
176     task_waiter_->WaitForTaskToBePosted(timeout);
177     return false;
178   }
179 
180   if (delayed_tasks_.empty()) {
181     run_loop_wakeup_.wait(lock);
182   } else {
183     run_loop_wakeup_.wait_for(lock,
184                               delayed_tasks_.begin()->first - now_function_());
185   }
186   return false;
187 }
188 
189 }  // namespace openscreen
190