• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "platform/impl/task_runner.h"
6 
7 #include <unistd.h>
8 
9 #include <atomic>
10 #include <chrono>
11 #include <string>
12 #include <thread>
13 
14 #include "gmock/gmock.h"
15 #include "gtest/gtest.h"
16 #include "platform/api/time.h"
17 #include "platform/test/fake_clock.h"
18 #include "util/chrono_helpers.h"
19 namespace openscreen {
20 namespace {
21 
22 using ::testing::_;
23 
24 const auto kTaskRunnerSleepTime = milliseconds(1);
25 constexpr Clock::duration kWaitTimeout = milliseconds(1000);
26 
WaitUntilCondition(std::function<bool ()> predicate)27 void WaitUntilCondition(std::function<bool()> predicate) {
28   while (!predicate()) {
29     std::this_thread::sleep_for(kTaskRunnerSleepTime);
30   }
31 }
32 
33 class FakeTaskWaiter final : public TaskRunnerImpl::TaskWaiter {
34  public:
FakeTaskWaiter(ClockNowFunctionPtr now_function)35   explicit FakeTaskWaiter(ClockNowFunctionPtr now_function)
36       : now_function_(now_function) {}
37   ~FakeTaskWaiter() override = default;
38 
WaitForTaskToBePosted(Clock::duration timeout)39   Error WaitForTaskToBePosted(Clock::duration timeout) override {
40     Clock::time_point start = now_function_();
41     waiting_.store(true);
42     while (!has_event_.load() && (now_function_() - start) < timeout) {
43       EXPECT_EQ(usleep(100 /* microseconds */), 0);
44     }
45     waiting_.store(false);
46     has_event_.store(false);
47     return Error::None();
48   }
49 
OnTaskPosted()50   void OnTaskPosted() override { has_event_.store(true); }
51 
WakeUpAndStop()52   void WakeUpAndStop() {
53     OnTaskPosted();
54     task_runner_->RequestStopSoon();
55   }
56 
IsWaiting() const57   bool IsWaiting() const { return waiting_.load(); }
58 
SetTaskRunner(TaskRunnerImpl * task_runner)59   void SetTaskRunner(TaskRunnerImpl* task_runner) {
60     task_runner_ = task_runner;
61   }
62 
63  private:
64   const ClockNowFunctionPtr now_function_;
65   TaskRunnerImpl* task_runner_;
66   std::atomic<bool> has_event_{false};
67   std::atomic<bool> waiting_{false};
68 };
69 
70 class TaskRunnerWithWaiterFactory {
71  public:
Create(ClockNowFunctionPtr now_function)72   static std::unique_ptr<TaskRunnerImpl> Create(
73       ClockNowFunctionPtr now_function) {
74     fake_waiter = std::make_unique<FakeTaskWaiter>(now_function);
75     auto runner = std::make_unique<TaskRunnerImpl>(
76         now_function, fake_waiter.get(), std::chrono::hours(1));
77     fake_waiter->SetTaskRunner(runner.get());
78     return runner;
79   }
80 
81   static std::unique_ptr<FakeTaskWaiter> fake_waiter;
82 };
83 
84 // static
85 std::unique_ptr<FakeTaskWaiter> TaskRunnerWithWaiterFactory::fake_waiter;
86 
87 }  // anonymous namespace
88 
TEST(TaskRunnerImplTest,TaskRunnerExecutesTaskAndStops)89 TEST(TaskRunnerImplTest, TaskRunnerExecutesTaskAndStops) {
90   FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
91   TaskRunnerImpl runner(&fake_clock.now);
92 
93   std::string ran_tasks = "";
94   runner.PostTask([&ran_tasks] { ran_tasks += "1"; });
95   runner.RequestStopSoon();
96 
97   runner.RunUntilStopped();
98   EXPECT_EQ(ran_tasks, "1");
99 }
100 
TEST(TaskRunnerImplTest,TaskRunnerRunsDelayedTasksInOrder)101 TEST(TaskRunnerImplTest, TaskRunnerRunsDelayedTasksInOrder) {
102   FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
103   TaskRunnerImpl runner(&fake_clock.now);
104 
105   std::thread t([&runner] { runner.RunUntilStopped(); });
106 
107   std::string ran_tasks = "";
108 
109   const auto kDelayTime = milliseconds(5);
110   const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
111   runner.PostTaskWithDelay(task_one, kDelayTime);
112 
113   const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
114   runner.PostTaskWithDelay(task_two, kDelayTime * 2);
115 
116   EXPECT_EQ(ran_tasks, "");
117   fake_clock.Advance(kDelayTime);
118   WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
119   EXPECT_EQ(ran_tasks, "1");
120 
121   fake_clock.Advance(kDelayTime);
122   WaitUntilCondition([&ran_tasks] { return ran_tasks == "12"; });
123   EXPECT_EQ(ran_tasks, "12");
124 
125   runner.RequestStopSoon();
126   t.join();
127 }
128 
TEST(TaskRunnerImplTest,SingleThreadedTaskRunnerRunsSequentially)129 TEST(TaskRunnerImplTest, SingleThreadedTaskRunnerRunsSequentially) {
130   FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
131   TaskRunnerImpl runner(&fake_clock.now);
132 
133   std::string ran_tasks;
134   const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
135   const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
136   const auto task_three = [&ran_tasks] { ran_tasks += "3"; };
137   const auto task_four = [&ran_tasks] { ran_tasks += "4"; };
138   const auto task_five = [&ran_tasks] { ran_tasks += "5"; };
139 
140   runner.PostTask(task_one);
141   runner.PostTask(task_two);
142   runner.PostTask(task_three);
143   runner.PostTask(task_four);
144   runner.PostTask(task_five);
145   runner.RequestStopSoon();
146   EXPECT_EQ(ran_tasks, "");
147 
148   runner.RunUntilStopped();
149   EXPECT_EQ(ran_tasks, "12345");
150 }
151 
TEST(TaskRunnerImplTest,RunsAllImmediateTasksBeforeStopping)152 TEST(TaskRunnerImplTest, RunsAllImmediateTasksBeforeStopping) {
153   FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
154   TaskRunnerImpl runner(&fake_clock.now);
155 
156   std::string result;
157   runner.PostTask([&] {
158     result += "Alice";
159 
160     // Post a task that runs just before the quit task.
161     runner.PostTask([&] {
162       result += " says goodbye";
163 
164       // These tasks will enter the queue after the quit task *and* after the
165       // main loop breaks. They will be executed by the flushing phase.
166       runner.PostTask([&] {
167         result += " and is not";
168         runner.PostTask([&] { result += " forgotten."; });
169       });
170     });
171 
172     // Post the quit task.
173     runner.RequestStopSoon();
174   });
175 
176   EXPECT_EQ(result, "");
177   runner.RunUntilStopped();
178   // All posted tasks will execute because RequestStopSoon() guarantees all
179   // immediately-runnable tasks will run before exiting, even if new
180   // immediately-runnable tasks are posted in the meantime.
181   EXPECT_EQ(result, "Alice says goodbye and is not forgotten.");
182 }
183 
TEST(TaskRunnerImplTest,TaskRunnerIsStableWithLotsOfTasks)184 TEST(TaskRunnerImplTest, TaskRunnerIsStableWithLotsOfTasks) {
185   FakeClock fake_clock{Clock::time_point(milliseconds(1337))};
186   TaskRunnerImpl runner(&fake_clock.now);
187 
188   const int kNumberOfTasks = 500;
189   std::string expected_ran_tasks;
190   expected_ran_tasks.append(kNumberOfTasks, '1');
191 
192   std::string ran_tasks;
193   for (int i = 0; i < kNumberOfTasks; ++i) {
194     const auto task = [&ran_tasks] { ran_tasks += "1"; };
195     runner.PostTask(task);
196   }
197 
198   runner.RequestStopSoon();
199   runner.RunUntilStopped();
200   EXPECT_EQ(ran_tasks, expected_ran_tasks);
201 }
202 
TEST(TaskRunnerImplTest,TaskRunnerDelayedTasksDontBlockImmediateTasks)203 TEST(TaskRunnerImplTest, TaskRunnerDelayedTasksDontBlockImmediateTasks) {
204   TaskRunnerImpl runner(Clock::now);
205 
206   std::string ran_tasks;
207   const auto task = [&ran_tasks] { ran_tasks += "1"; };
208   const auto delayed_task = [&ran_tasks] { ran_tasks += "A"; };
209 
210   runner.PostTaskWithDelay(delayed_task, milliseconds(10000));
211   runner.PostTask(task);
212 
213   runner.RequestStopSoon();
214   runner.RunUntilStopped();
215   // The immediate task should have run, even though the delayed task
216   // was added first.
217 
218   EXPECT_EQ(ran_tasks, "1");
219 }
220 
TEST(TaskRunnerImplTest,TaskRunnerUsesEventWaiter)221 TEST(TaskRunnerImplTest, TaskRunnerUsesEventWaiter) {
222   std::unique_ptr<TaskRunnerImpl> runner =
223       TaskRunnerWithWaiterFactory::Create(Clock::now);
224 
225   std::atomic<int> x{0};
226   std::thread t([&runner, &x] {
227     runner.get()->RunUntilStopped();
228     x = 1;
229   });
230 
231   const Clock::time_point start1 = Clock::now();
232   FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
233   while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
234     std::this_thread::sleep_for(kTaskRunnerSleepTime);
235   }
236   ASSERT_TRUE(fake_waiter->IsWaiting());
237 
238   fake_waiter->WakeUpAndStop();
239   const Clock::time_point start2 = Clock::now();
240   while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
241     std::this_thread::sleep_for(kTaskRunnerSleepTime);
242   }
243   ASSERT_EQ(x, 1);
244   ASSERT_FALSE(fake_waiter->IsWaiting());
245   t.join();
246 }
247 
TEST(TaskRunnerImplTest,WakesEventWaiterOnPostTask)248 TEST(TaskRunnerImplTest, WakesEventWaiterOnPostTask) {
249   std::unique_ptr<TaskRunnerImpl> runner =
250       TaskRunnerWithWaiterFactory::Create(Clock::now);
251 
252   std::atomic<int> x{0};
253   std::thread t([&runner] { runner.get()->RunUntilStopped(); });
254 
255   const Clock::time_point start1 = Clock::now();
256   FakeTaskWaiter* fake_waiter = TaskRunnerWithWaiterFactory::fake_waiter.get();
257   while ((Clock::now() - start1) < kWaitTimeout && !fake_waiter->IsWaiting()) {
258     std::this_thread::sleep_for(kTaskRunnerSleepTime);
259   }
260   ASSERT_TRUE(fake_waiter->IsWaiting());
261 
262   runner->PostTask([&x]() { x = 1; });
263   const Clock::time_point start2 = Clock::now();
264   while ((Clock::now() - start2) < kWaitTimeout && x == 0) {
265     std::this_thread::sleep_for(kTaskRunnerSleepTime);
266   }
267   ASSERT_EQ(x, 1);
268 
269   fake_waiter->WakeUpAndStop();
270   t.join();
271 }
272 
273 class RepeatedClass {
274  public:
275   MOCK_METHOD0(Repeat, absl::optional<Clock::duration>());
276 
DoCall()277   absl::optional<Clock::duration> DoCall() {
278     auto result = Repeat();
279     execution_count++;
280     return result;
281   }
282 
283   std::atomic<int> execution_count{0};
284 };
285 
286 }  // namespace openscreen
287