1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "platform/impl/task_runner.h"
6
7 #include <unistd.h>
8
9 #include <atomic>
10 #include <chrono>
11 #include <string>
12 #include <thread>
13
14 #include "gmock/gmock.h"
15 #include "gtest/gtest.h"
16 #include "platform/api/time.h"
17 #include "platform/test/fake_clock.h"
18 #include "util/chrono_helpers.h"
19 namespace openscreen {
20 namespace {
21
22 using ::testing::_;
23
24 const auto kTaskRunnerSleepTime = milliseconds(1);
25 constexpr Clock::duration kWaitTimeout = milliseconds(1000);
26
WaitUntilCondition(std::function<bool ()> predicate)27 void WaitUntilCondition(std::function<bool()> predicate) {
28 while (!predicate()) {
29 std::this_thread::sleep_for(kTaskRunnerSleepTime);
30 }
31 }
32
33 class FakeTaskWaiter final : public TaskRunnerImpl::TaskWaiter {
34 public:
FakeTaskWaiter(ClockNowFunctionPtr now_function)35 explicit FakeTaskWaiter(ClockNowFunctionPtr now_function)
36 : now_function_(now_function) {}
37 ~FakeTaskWaiter() override = default;
38
WaitForTaskToBePosted(Clock::duration timeout)39 Error WaitForTaskToBePosted(Clock::duration timeout) override {
40 Clock::time_point start = now_function_();
41 waiting_.store(true);
42 while (!has_event_.load() && (now_function_() - start) < timeout) {
43 EXPECT_EQ(usleep(100 /* microseconds */), 0);
44 }
45 waiting_.store(false);
46 has_event_.store(false);
47 return Error::None();
48 }
49
OnTaskPosted()50 void OnTaskPosted() override { has_event_.store(true); }
51
WakeUpAndStop()52 void WakeUpAndStop() {
53 OnTaskPosted();
54 task_runner_->RequestStopSoon();
55 }
56
IsWaiting() const57 bool IsWaiting() const { return waiting_.load(); }
58
SetTaskRunner(TaskRunnerImpl * task_runner)59 void SetTaskRunner(TaskRunnerImpl* task_runner) {
60 task_runner_ = task_runner;
61 }
62
63 private:
64 const ClockNowFunctionPtr now_function_;
65 TaskRunnerImpl* task_runner_;
66 std::atomic<bool> has_event_{false};
67 std::atomic<bool> waiting_{false};
68 };
69
70 class TaskRunnerWithWaiterFactory {
71 public:
Create(ClockNowFunctionPtr now_function)72 static std::unique_ptr<TaskRunnerImpl> Create(
73 ClockNowFunctionPtr now_function) {
74 fake_waiter = std::make_unique<FakeTaskWaiter>(now_function);
75 auto runner = std::make_unique<TaskRunnerImpl>(
76 now_function, fake_waiter.get(), std::chrono::hours(1));
77 fake_waiter->SetTaskRunner(runner.get());
78 return runner;
79 }
80
81 static std::unique_ptr<FakeTaskWaiter> fake_waiter;
82 };
83
84 // static
85 std::unique_ptr<FakeTaskWaiter> TaskRunnerWithWaiterFactory::fake_waiter;
86
87 } // anonymous namespace
88
TEST(TaskRunnerImplTest,TaskRunnerExecutesTaskAndStops)89 TEST(TaskRunnerImplTest, TaskRunnerExecutesTaskAndStops) {
90 FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
91 TaskRunnerImpl runner(&fake_clock.now);
92
93 std::string ran_tasks = "";
94 runner.PostTask([&ran_tasks] { ran_tasks += "1"; });
95 runner.RequestStopSoon();
96
97 runner.RunUntilStopped();
98 EXPECT_EQ(ran_tasks, "1");
99 }
100
TEST(TaskRunnerImplTest,TaskRunnerRunsDelayedTasksInOrder)101 TEST(TaskRunnerImplTest, TaskRunnerRunsDelayedTasksInOrder) {
102 FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
103 TaskRunnerImpl runner(&fake_clock.now);
104
105 std::thread t([&runner] { runner.RunUntilStopped(); });
106
107 std::string ran_tasks = "";
108
109 const auto kDelayTime = milliseconds(5);
110 const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
111 runner.PostTaskWithDelay(task_one, kDelayTime);
112
113 const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
114 runner.PostTaskWithDelay(task_two, kDelayTime * 2);
115
116 EXPECT_EQ(ran_tasks, "");
117 fake_clock.Advance(kDelayTime);
118 WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
119 EXPECT_EQ(ran_tasks, "1");
120
121 fake_clock.Advance(kDelayTime);
122 WaitUntilCondition([&ran_tasks] { return ran_tasks == "12"; });
123 EXPECT_EQ(ran_tasks, "12");
124
125 runner.RequestStopSoon();
126 t.join();
127 }
128
TEST(TaskRunnerImplTest,SingleThreadedTaskRunnerRunsSequentially)129 TEST(TaskRunnerImplTest, SingleThreadedTaskRunnerRunsSequentially) {
130 FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
131 TaskRunnerImpl runner(&fake_clock.now);
132
133 std::string ran_tasks;
134 const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
135 const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
136 const auto task_three = [&ran_tasks] { ran_tasks += "3"; };
137 const auto task_four = [&ran_tasks] { ran_tasks += "4"; };
138 const auto task_five = [&ran_tasks] { ran_tasks += "5"; };
139
140 runner.PostTask(task_one);
141 runner.PostTask(task_two);
142 runner.PostTask(task_three);
143 runner.PostTask(task_four);
144 runner.PostTask(task_five);
145 runner.RequestStopSoon();
146 EXPECT_EQ(ran_tasks, "");
147
148 runner.RunUntilStopped();
149 EXPECT_EQ(ran_tasks, "12345");
150 }
151
TEST(TaskRunnerImplTest,RunsAllImmediateTasksBeforeStopping)152 TEST(TaskRunnerImplTest, RunsAllImmediateTasksBeforeStopping) {
153 FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
154 TaskRunnerImpl runner(&fake_clock.now);
155
156 std::string result;
157 runner.PostTask([&] {
158 result += "Alice";
159
160 // Post a task that runs just before the quit task.
161 runner.PostTask([&] {
162 result += " says goodbye";
163
164 // These tasks will enter the queue after the quit task *and* after the
165 // main loop breaks. They will be executed by the flushing phase.
166 runner.PostTask([&] {
167 result += " and is not";
168 runner.PostTask([&] { result += " forgotten."; });
169 });
170 });
171
172 // Post the quit task.
173 runner.RequestStopSoon();
174 });
175
176 EXPECT_EQ(result, "");
177 runner.RunUntilStopped();
178 // All posted tasks will execute because RequestStopSoon() guarantees all
179 // immediately-runnable tasks will run before exiting, even if new
180 // immediately-runnable tasks are posted in the meantime.
181 EXPECT_EQ(result, "Alice says goodbye and is not forgotten.");
182 }
183
TEST(TaskRunnerImplTest,TaskRunnerIsStableWithLotsOfTasks)184 TEST(TaskRunnerImplTest, TaskRunnerIsStableWithLotsOfTasks) {
185 FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
186 TaskRunnerImpl runner(&fake_clock.now);
187
188 const int kNumberOfTasks = 500;
189 std::string expected_ran_tasks;
190 expected_ran_tasks.append(kNumberOfTasks, '1');
191
192 std::string ran_tasks;
193 for (int i = 0; i < kNumberOfTasks; ++i) {
194 const auto task = [&ran_tasks] { ran_tasks += "1"; };
195 runner.PostTask(task);
196 }
197
198 runner.RequestStopSoon();
199 runner.RunUntilStopped();
200 EXPECT_EQ(ran_tasks, expected_ran_tasks);
201 }
202
TEST(TaskRunnerImplTest,TaskRunnerDelayedTasksDontBlockImmediateTasks)203 TEST(TaskRunnerImplTest, TaskRunnerDelayedTasksDontBlockImmediateTasks) {
204 TaskRunnerImpl runner(Clock::now);
205
206 std::string ran_tasks;
207 const auto task = [&ran_tasks] { ran_tasks += "1"; };
208 const auto delayed_task = [&ran_tasks] { ran_tasks += "A"; };
209
210 runner.PostTaskWithDelay(delayed_task, milliseconds(10000));
211 runner.PostTask(task);
212
213 runner.RequestStopSoon();
214 runner.RunUntilStopped();
215 // The immediate task should have run, even though the delayed task
216 // was added first.
217
218 EXPECT_EQ(ran_tasks, "1");
219 }
220
TEST(TaskRunnerImplTest,TaskRunnerUsesEventWaiter)221 TEST(TaskRunnerImplTest, TaskRunnerUsesEventWaiter) {
222 std::unique_ptr<TaskRunnerImpl> runner =
223 TaskRunnerWithWaiterFactory::Create(Clock::now);
224
225 std::atomic<int> x{0};
226 std::thread t([&runner, &x] {
227 runner.get()->RunUntilStopped();
228 x = 1;
229 });
230
231 const Clock::time_point start1 = Clock::now();
232 FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
233 while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
234 std::this_thread::sleep_for(kTaskRunnerSleepTime);
235 }
236 ASSERT_TRUE(fake_waiter->IsWaiting());
237
238 fake_waiter->WakeUpAndStop();
239 const Clock::time_point start2 = Clock::now();
240 while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
241 std::this_thread::sleep_for(kTaskRunnerSleepTime);
242 }
243 ASSERT_EQ(x, 1);
244 ASSERT_FALSE(fake_waiter->IsWaiting());
245 t.join();
246 }
247
TEST(TaskRunnerImplTest,WakesEventWaiterOnPostTask)248 TEST(TaskRunnerImplTest, WakesEventWaiterOnPostTask) {
249 std::unique_ptr<TaskRunnerImpl> runner =
250 TaskRunnerWithWaiterFactory::Create(Clock::now);
251
252 std::atomic<int> x{0};
253 std::thread t([&runner] { runner.get()->RunUntilStopped(); });
254
255 const Clock::time_point start1 = Clock::now();
256 FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
257 while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
258 std::this_thread::sleep_for(kTaskRunnerSleepTime);
259 }
260 ASSERT_TRUE(fake_waiter->IsWaiting());
261
262 runner->PostTask([&x]() { x = 1; });
263 const Clock::time_point start2 = Clock::now();
264 while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
265 std::this_thread::sleep_for(kTaskRunnerSleepTime);
266 }
267 ASSERT_EQ(x, 1);
268
269 fake_waiter->WakeUpAndStop();
270 t.join();
271 }
272
273 class RepeatedClass {
274 public:
275 MOCK_METHOD0(Repeat, absl::optional<Clock::duration>());
276
DoCall()277 absl::optional<Clock::duration> DoCall() {
278 auto result = Repeat();
279 execution_count++;
280 return result;
281 }
282
283 std::atomic<int> execution_count{0};
284 };
285
286 } // namespace openscreen
287