1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
6 //
7 // This Source Code Form is subject to the terms of the Mozilla
8 // Public License v. 2.0. If a copy of the MPL was not distributed
9 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
10
11 #define EIGEN_USE_THREADS
12 #include "main.h"
13 #include "Eigen/CXX11/ThreadPool"
14 #include "Eigen/CXX11/Tensor"
15
test_create_destroy_empty_pool()16 static void test_create_destroy_empty_pool()
17 {
18 // Just create and destroy the pool. This will wind up and tear down worker
19 // threads. Ensure there are no issues in that logic.
20 for (int i = 0; i < 16; ++i) {
21 ThreadPool tp(i);
22 }
23 }
24
25
test_parallelism(bool allow_spinning)26 static void test_parallelism(bool allow_spinning)
27 {
28 // Test we never-ever fail to match available tasks with idle threads.
29 const int kThreads = 16; // code below expects that this is a multiple of 4
30 ThreadPool tp(kThreads, allow_spinning);
31 VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
32 VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1);
33 for (int iter = 0; iter < 100; ++iter) {
34 std::atomic<int> running(0);
35 std::atomic<int> done(0);
36 std::atomic<int> phase(0);
37 // Schedule kThreads tasks and ensure that they all are running.
38 for (int i = 0; i < kThreads; ++i) {
39 tp.Schedule([&]() {
40 const int thread_id = tp.CurrentThreadId();
41 VERIFY_GE(thread_id, 0);
42 VERIFY_LE(thread_id, kThreads - 1);
43 running++;
44 while (phase < 1) {
45 }
46 done++;
47 });
48 }
49 while (running != kThreads) {
50 }
51 running = 0;
52 phase = 1;
53 // Now, while the previous tasks exit, schedule another kThreads tasks and
54 // ensure that they are running.
55 for (int i = 0; i < kThreads; ++i) {
56 tp.Schedule([&, i]() {
57 running++;
58 while (phase < 2) {
59 }
60 // When all tasks are running, half of tasks exit, quarter of tasks
61 // continue running and quarter of tasks schedule another 2 tasks each.
62 // Concurrently main thread schedules another quarter of tasks.
63 // This gives us another kThreads tasks and we ensure that they all
64 // are running.
65 if (i < kThreads / 2) {
66 } else if (i < 3 * kThreads / 4) {
67 running++;
68 while (phase < 3) {
69 }
70 done++;
71 } else {
72 for (int j = 0; j < 2; ++j) {
73 tp.Schedule([&]() {
74 running++;
75 while (phase < 3) {
76 }
77 done++;
78 });
79 }
80 }
81 done++;
82 });
83 }
84 while (running != kThreads) {
85 }
86 running = 0;
87 phase = 2;
88 for (int i = 0; i < kThreads / 4; ++i) {
89 tp.Schedule([&]() {
90 running++;
91 while (phase < 3) {
92 }
93 done++;
94 });
95 }
96 while (running != kThreads) {
97 }
98 phase = 3;
99 while (done != 3 * kThreads) {
100 }
101 }
102 }
103
104
test_cancel()105 static void test_cancel()
106 {
107 ThreadPool tp(2);
108
109 // Schedule a large number of closure that each sleeps for one second. This
110 // will keep the thread pool busy for much longer than the default test timeout.
111 for (int i = 0; i < 1000; ++i) {
112 tp.Schedule([]() {
113 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
114 });
115 }
116
117 // Cancel the processing of all the closures that are still pending.
118 tp.Cancel();
119 }
120
test_pool_partitions()121 static void test_pool_partitions() {
122 const int kThreads = 2;
123 ThreadPool tp(kThreads);
124
125 // Assign each thread to its own partition, so that stealing other work only
126 // occurs globally when a thread is idle.
127 std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
128 for (int i = 0; i < kThreads; ++i) {
129 steal_partitions[i] = std::make_pair(i, i + 1);
130 }
131 tp.SetStealPartitions(steal_partitions);
132
133 std::atomic<int> running(0);
134 std::atomic<int> done(0);
135 std::atomic<int> phase(0);
136
137 // Schedule kThreads tasks and ensure that they all are running.
138 for (int i = 0; i < kThreads; ++i) {
139 tp.Schedule([&]() {
140 const int thread_id = tp.CurrentThreadId();
141 VERIFY_GE(thread_id, 0);
142 VERIFY_LE(thread_id, kThreads - 1);
143 ++running;
144 while (phase < 1) {
145 }
146 ++done;
147 });
148 }
149 while (running != kThreads) {
150 }
151 // Schedule each closure to only run on thread 'i' and verify that it does.
152 for (int i = 0; i < kThreads; ++i) {
153 tp.ScheduleWithHint(
154 [&, i]() {
155 ++running;
156 const int thread_id = tp.CurrentThreadId();
157 VERIFY_IS_EQUAL(thread_id, i);
158 while (phase < 2) {
159 }
160 ++done;
161 },
162 i, i + 1);
163 }
164 running = 0;
165 phase = 1;
166 while (running != kThreads) {
167 }
168 running = 0;
169 phase = 2;
170 }
171
172
EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)173 EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
174 {
175 CALL_SUBTEST(test_create_destroy_empty_pool());
176 CALL_SUBTEST(test_parallelism(true));
177 CALL_SUBTEST(test_parallelism(false));
178 CALL_SUBTEST(test_cancel());
179 CALL_SUBTEST(test_pool_partitions());
180 }
181