• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
6 //
7 // This Source Code Form is subject to the terms of the Mozilla
8 // Public License v. 2.0. If a copy of the MPL was not distributed
9 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 
11 #define EIGEN_USE_THREADS
12 #include "main.h"
13 #include "Eigen/CXX11/ThreadPool"
14 #include "Eigen/CXX11/Tensor"
15 
test_create_destroy_empty_pool()16 static void test_create_destroy_empty_pool()
17 {
18   // Just create and destroy the pool. This will wind up and tear down worker
19   // threads. Ensure there are no issues in that logic.
20   for (int i = 0; i < 16; ++i) {
21     ThreadPool tp(i);
22   }
23 }
24 
25 
test_parallelism(bool allow_spinning)26 static void test_parallelism(bool allow_spinning)
27 {
28   // Test we never-ever fail to match available tasks with idle threads.
29   const int kThreads = 16;  // code below expects that this is a multiple of 4
30   ThreadPool tp(kThreads, allow_spinning);
31   VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
32   VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1);
33   for (int iter = 0; iter < 100; ++iter) {
34     std::atomic<int> running(0);
35     std::atomic<int> done(0);
36     std::atomic<int> phase(0);
37     // Schedule kThreads tasks and ensure that they all are running.
38     for (int i = 0; i < kThreads; ++i) {
39       tp.Schedule([&]() {
40         const int thread_id = tp.CurrentThreadId();
41         VERIFY_GE(thread_id, 0);
42         VERIFY_LE(thread_id, kThreads - 1);
43         running++;
44         while (phase < 1) {
45         }
46         done++;
47       });
48     }
49     while (running != kThreads) {
50     }
51     running = 0;
52     phase = 1;
53     // Now, while the previous tasks exit, schedule another kThreads tasks and
54     // ensure that they are running.
55     for (int i = 0; i < kThreads; ++i) {
56       tp.Schedule([&, i]() {
57         running++;
58         while (phase < 2) {
59         }
60         // When all tasks are running, half of tasks exit, quarter of tasks
61         // continue running and quarter of tasks schedule another 2 tasks each.
62         // Concurrently main thread schedules another quarter of tasks.
63         // This gives us another kThreads tasks and we ensure that they all
64         // are running.
65         if (i < kThreads / 2) {
66         } else if (i < 3 * kThreads / 4) {
67           running++;
68           while (phase < 3) {
69           }
70           done++;
71         } else {
72           for (int j = 0; j < 2; ++j) {
73             tp.Schedule([&]() {
74               running++;
75               while (phase < 3) {
76               }
77               done++;
78             });
79           }
80         }
81         done++;
82       });
83     }
84     while (running != kThreads) {
85     }
86     running = 0;
87     phase = 2;
88     for (int i = 0; i < kThreads / 4; ++i) {
89       tp.Schedule([&]() {
90         running++;
91         while (phase < 3) {
92         }
93         done++;
94       });
95     }
96     while (running != kThreads) {
97     }
98     phase = 3;
99     while (done != 3 * kThreads) {
100     }
101   }
102 }
103 
104 
test_cancel()105 static void test_cancel()
106 {
107   ThreadPool tp(2);
108 
109   // Schedule a large number of closure that each sleeps for one second. This
110   // will keep the thread pool busy for much longer than the default test timeout.
111   for (int i = 0; i < 1000; ++i) {
112     tp.Schedule([]() {
113       std::this_thread::sleep_for(std::chrono::milliseconds(2000));
114     });
115   }
116 
117   // Cancel the processing of all the closures that are still pending.
118   tp.Cancel();
119 }
120 
test_pool_partitions()121 static void test_pool_partitions() {
122   const int kThreads = 2;
123   ThreadPool tp(kThreads);
124 
125   // Assign each thread to its own partition, so that stealing other work only
126   // occurs globally when a thread is idle.
127   std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
128   for (int i = 0; i < kThreads; ++i) {
129     steal_partitions[i] = std::make_pair(i, i + 1);
130   }
131   tp.SetStealPartitions(steal_partitions);
132 
133   std::atomic<int> running(0);
134   std::atomic<int> done(0);
135   std::atomic<int> phase(0);
136 
137   // Schedule kThreads tasks and ensure that they all are running.
138   for (int i = 0; i < kThreads; ++i) {
139     tp.Schedule([&]() {
140       const int thread_id = tp.CurrentThreadId();
141       VERIFY_GE(thread_id, 0);
142       VERIFY_LE(thread_id, kThreads - 1);
143       ++running;
144       while (phase < 1) {
145       }
146       ++done;
147     });
148   }
149   while (running != kThreads) {
150   }
151   // Schedule each closure to only run on thread 'i' and verify that it does.
152   for (int i = 0; i < kThreads; ++i) {
153     tp.ScheduleWithHint(
154         [&, i]() {
155           ++running;
156           const int thread_id = tp.CurrentThreadId();
157           VERIFY_IS_EQUAL(thread_id, i);
158           while (phase < 2) {
159           }
160           ++done;
161         },
162         i, i + 1);
163   }
164   running = 0;
165   phase = 1;
166   while (running != kThreads) {
167   }
168   running = 0;
169   phase = 2;
170 }
171 
172 
EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)173 EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
174 {
175   CALL_SUBTEST(test_create_destroy_empty_pool());
176   CALL_SUBTEST(test_parallelism(true));
177   CALL_SUBTEST(test_parallelism(false));
178   CALL_SUBTEST(test_cancel());
179   CALL_SUBTEST(test_pool_partitions());
180 }
181