• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "task_processor.h"
18 #include "base/time_utils.h"
19 #include "common_runtime_test.h"
20 #include "thread-current-inl.h"
21 #include "thread_pool.h"
22 
23 namespace art {
24 namespace gc {
25 
26 class TaskProcessorTest : public CommonRuntimeTest {
27  public:
28 };
29 
30 class RecursiveTask : public HeapTask {
31  public:
RecursiveTask(TaskProcessor * task_processor,Atomic<size_t> * counter,size_t max_recursion)32   RecursiveTask(TaskProcessor* task_processor, Atomic<size_t>* counter, size_t max_recursion)
33      : HeapTask(NanoTime() + MsToNs(10)), task_processor_(task_processor), counter_(counter),
34        max_recursion_(max_recursion) {
35   }
Run(Thread * self)36   void Run(Thread* self) override {
37     if (max_recursion_ > 0) {
38       task_processor_->AddTask(self,
39                                new RecursiveTask(task_processor_, counter_, max_recursion_ - 1));
40       counter_->fetch_add(1U, std::memory_order_seq_cst);
41     }
42   }
43 
44  private:
45   TaskProcessor* const task_processor_;
46   Atomic<size_t>* const counter_;
47   const size_t max_recursion_;
48 };
49 
50 class WorkUntilDoneTask : public SelfDeletingTask {
51  public:
WorkUntilDoneTask(TaskProcessor * task_processor,Atomic<bool> * done_running)52   WorkUntilDoneTask(TaskProcessor* task_processor, Atomic<bool>* done_running)
53       : task_processor_(task_processor), done_running_(done_running) {
54   }
Run(Thread * self)55   void Run(Thread* self) override {
56     task_processor_->RunAllTasks(self);
57     done_running_->store(true, std::memory_order_seq_cst);
58   }
59 
60  private:
61   TaskProcessor* const task_processor_;
62   Atomic<bool>* done_running_;
63 };
64 
TEST_F(TaskProcessorTest,Interrupt)65 TEST_F(TaskProcessorTest, Interrupt) {
66   ThreadPool thread_pool("task processor test", 1U);
67   Thread* const self = Thread::Current();
68   TaskProcessor task_processor;
69   static constexpr size_t kRecursion = 10;
70   Atomic<bool> done_running(false);
71   Atomic<size_t> counter(0);
72   task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
73   task_processor.Start(self);
74   // Add a task which will wait until interrupted to the thread pool.
75   thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
76   thread_pool.StartWorkers(self);
77   ASSERT_FALSE(done_running);
78   // Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0.
79   while (counter.load(std::memory_order_seq_cst) != kRecursion) {
80     usleep(10);
81   }
82   ASSERT_FALSE(done_running);
83   task_processor.Stop(self);
84   thread_pool.Wait(self, true, false);
85   // After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and
86   // set done_running_ to true.
87   ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
88 
89   // Test that we finish remaining tasks before returning from RunTasksUntilInterrupted.
90   counter.store(0, std::memory_order_seq_cst);
91   done_running.store(false, std::memory_order_seq_cst);
92   // Self interrupt before any of the other tasks run, but since we added them we should keep on
93   // working until all the tasks are completed.
94   task_processor.Stop(self);
95   task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
96   thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
97   thread_pool.StartWorkers(self);
98   thread_pool.Wait(self, true, false);
99   ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
100   ASSERT_EQ(counter.load(std::memory_order_seq_cst), kRecursion);
101 }
102 
103 class TestOrderTask : public HeapTask {
104  public:
TestOrderTask(uint64_t expected_time,size_t expected_counter,size_t * counter)105   TestOrderTask(uint64_t expected_time, size_t expected_counter, size_t* counter)
106      : HeapTask(expected_time), expected_counter_(expected_counter), counter_(counter) {
107   }
Run(Thread * thread ATTRIBUTE_UNUSED)108   void Run(Thread* thread ATTRIBUTE_UNUSED) override {
109     ASSERT_EQ(*counter_, expected_counter_);
110     ++*counter_;
111   }
112 
113  private:
114   const size_t expected_counter_;
115   size_t* const counter_;
116 };
117 
TEST_F(TaskProcessorTest,Ordering)118 TEST_F(TaskProcessorTest, Ordering) {
119   static const size_t kNumTasks = 25;
120   const uint64_t current_time = NanoTime();
121   Thread* const self = Thread::Current();
122   TaskProcessor task_processor;
123   task_processor.Stop(self);
124   size_t counter = 0;
125   std::vector<std::pair<uint64_t, size_t>> orderings;
126   for (size_t i = 0; i < kNumTasks; ++i) {
127     orderings.push_back(std::make_pair(current_time + MsToNs(10U * i), i));
128   }
129   for (size_t i = 0; i < kNumTasks; ++i) {
130     std::swap(orderings[i], orderings[(i * 87654231 + 12345) % orderings.size()]);
131   }
132   for (const auto& pair : orderings) {
133     auto* task = new TestOrderTask(pair.first, pair.second, &counter);
134     task_processor.AddTask(self, task);
135   }
136   ThreadPool thread_pool("task processor test", 1U);
137   Atomic<bool> done_running(false);
138   // Add a task which will wait until interrupted to the thread pool.
139   thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
140   ASSERT_FALSE(done_running.load(std::memory_order_seq_cst));
141   thread_pool.StartWorkers(self);
142   thread_pool.Wait(self, true, false);
143   ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
144   ASSERT_EQ(counter, kNumTasks);
145 }
146 
147 }  // namespace gc
148 }  // namespace art
149