1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Multi-threaded tests of ConditionVariable class.
6
7 #include <time.h>
8 #include <algorithm>
9 #include <vector>
10
11 #include "base/condition_variable.h"
12 #include "base/lock.h"
13 #include "base/logging.h"
14 #include "base/platform_thread.h"
15 #include "base/scoped_ptr.h"
16 #include "base/spin_wait.h"
17 #include "base/thread_collision_warner.h"
18 #include "base/time.h"
19 #include "testing/gtest/include/gtest/gtest.h"
20 #include "testing/platform_test.h"
21
22 using base::TimeDelta;
23 using base::TimeTicks;
24
25 namespace {
26 //------------------------------------------------------------------------------
27 // Define our test class, with several common variables.
28 //------------------------------------------------------------------------------
29
30 class ConditionVariableTest : public PlatformTest {
31 public:
32 const TimeDelta kZeroMs;
33 const TimeDelta kTenMs;
34 const TimeDelta kThirtyMs;
35 const TimeDelta kFortyFiveMs;
36 const TimeDelta kSixtyMs;
37 const TimeDelta kOneHundredMs;
38
ConditionVariableTest()39 explicit ConditionVariableTest()
40 : kZeroMs(TimeDelta::FromMilliseconds(0)),
41 kTenMs(TimeDelta::FromMilliseconds(10)),
42 kThirtyMs(TimeDelta::FromMilliseconds(30)),
43 kFortyFiveMs(TimeDelta::FromMilliseconds(45)),
44 kSixtyMs(TimeDelta::FromMilliseconds(60)),
45 kOneHundredMs(TimeDelta::FromMilliseconds(100)) {
46 }
47 };
48
49 //------------------------------------------------------------------------------
50 // Define a class that will control activities an several multi-threaded tests.
51 // The general structure of multi-threaded tests is that a test case will
52 // construct an instance of a WorkQueue. The WorkQueue will spin up some
53 // threads and control them throughout their lifetime, as well as maintaining
54 // a central repository of the work thread's activity. Finally, the WorkQueue
55 // will command the the worker threads to terminate. At that point, the test
56 // cases will validate that the WorkQueue has records showing that the desired
57 // activities were performed.
58 //------------------------------------------------------------------------------
59
60 // Callers are responsible for synchronizing access to the following class.
61 // The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for
62 // all synchronized access.
63 class WorkQueue : public PlatformThread::Delegate {
64 public:
65 explicit WorkQueue(int thread_count);
66 ~WorkQueue();
67
68 // PlatformThread::Delegate interface.
69 void ThreadMain();
70
71 //----------------------------------------------------------------------------
72 // Worker threads only call the following methods.
73 // They should use the lock to get exclusive access.
74 int GetThreadId(); // Get an ID assigned to a thread..
75 bool EveryIdWasAllocated() const; // Indicates that all IDs were handed out.
76 TimeDelta GetAnAssignment(int thread_id); // Get a work task duration.
77 void WorkIsCompleted(int thread_id);
78
79 int task_count() const;
80 bool allow_help_requests() const; // Workers can signal more workers.
81 bool shutdown() const; // Check if shutdown has been requested.
82
83 void thread_shutting_down();
84
85
86 //----------------------------------------------------------------------------
87 // Worker threads can call them but not needed to acquire a lock.
88 Lock* lock();
89
90 ConditionVariable* work_is_available();
91 ConditionVariable* all_threads_have_ids();
92 ConditionVariable* no_more_tasks();
93
94 //----------------------------------------------------------------------------
95 // The rest of the methods are for use by the controlling master thread (the
96 // test case code).
97 void ResetHistory();
98 int GetMinCompletionsByWorkerThread() const;
99 int GetMaxCompletionsByWorkerThread() const;
100 int GetNumThreadsTakingAssignments() const;
101 int GetNumThreadsCompletingTasks() const;
102 int GetNumberOfCompletedTasks() const;
103 TimeDelta GetWorkTime() const;
104
105 void SetWorkTime(TimeDelta delay);
106 void SetTaskCount(int count);
107 void SetAllowHelp(bool allow);
108
109 // Caller must acquire lock before calling.
110 void SetShutdown();
111
112 // Compares the |shutdown_task_count_| to the |thread_count| and returns true
113 // if they are equal. This check will acquire the |lock_| so the caller
114 // should not hold the lock when calling this method.
115 bool ThreadSafeCheckShutdown(int thread_count);
116
117 private:
118 // Both worker threads and controller use the following to synchronize.
119 Lock lock_;
120 ConditionVariable work_is_available_; // To tell threads there is work.
121
122 // Conditions to notify the controlling process (if it is interested).
123 ConditionVariable all_threads_have_ids_; // All threads are running.
124 ConditionVariable no_more_tasks_; // Task count is zero.
125
126 const int thread_count_;
127 scoped_array<PlatformThreadHandle> thread_handles_;
128 std::vector<int> assignment_history_; // Number of assignment per worker.
129 std::vector<int> completion_history_; // Number of completions per worker.
130 int thread_started_counter_; // Used to issue unique id to workers.
131 int shutdown_task_count_; // Number of tasks told to shutdown
132 int task_count_; // Number of assignment tasks waiting to be processed.
133 TimeDelta worker_delay_; // Time each task takes to complete.
134 bool allow_help_requests_; // Workers can signal more workers.
135 bool shutdown_; // Set when threads need to terminate.
136
137 DFAKE_MUTEX(locked_methods_);
138 };
139
140 //------------------------------------------------------------------------------
141 // The next section contains the actual tests.
142 //------------------------------------------------------------------------------
143
TEST_F(ConditionVariableTest,StartupShutdownTest)144 TEST_F(ConditionVariableTest, StartupShutdownTest) {
145 Lock lock;
146
147 // First try trivial startup/shutdown.
148 {
149 ConditionVariable cv1(&lock);
150 } // Call for cv1 destruction.
151
152 // Exercise with at least a few waits.
153 ConditionVariable cv(&lock);
154
155 lock.Acquire();
156 cv.TimedWait(kTenMs); // Wait for 10 ms.
157 cv.TimedWait(kTenMs); // Wait for 10 ms.
158 lock.Release();
159
160 lock.Acquire();
161 cv.TimedWait(kTenMs); // Wait for 10 ms.
162 cv.TimedWait(kTenMs); // Wait for 10 ms.
163 cv.TimedWait(kTenMs); // Wait for 10 ms.
164 lock.Release();
165 } // Call for cv destruction.
166
TEST_F(ConditionVariableTest,TimeoutTest)167 TEST_F(ConditionVariableTest, TimeoutTest) {
168 Lock lock;
169 ConditionVariable cv(&lock);
170 lock.Acquire();
171
172 TimeTicks start = TimeTicks::Now();
173 const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300);
174 // Allow for clocking rate granularity.
175 const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50);
176
177 cv.TimedWait(WAIT_TIME + FUDGE_TIME);
178 TimeDelta duration = TimeTicks::Now() - start;
179 // We can't use EXPECT_GE here as the TimeDelta class does not support the
180 // required stream conversion.
181 EXPECT_TRUE(duration >= WAIT_TIME);
182
183 lock.Release();
184 }
185
186 // Test serial task servicing, as well as two parallel task servicing methods.
187 // TODO(maruel): This test is flaky, see http://crbug.com/10607
TEST_F(ConditionVariableTest,FLAKY_MultiThreadConsumerTest)188 TEST_F(ConditionVariableTest, FLAKY_MultiThreadConsumerTest) {
189 const int kThreadCount = 10;
190 WorkQueue queue(kThreadCount); // Start the threads.
191
192 const int kTaskCount = 10; // Number of tasks in each mini-test here.
193
194 base::Time start_time; // Used to time task processing.
195
196 {
197 AutoLock auto_lock(*queue.lock());
198 while (!queue.EveryIdWasAllocated())
199 queue.all_threads_have_ids()->Wait();
200 }
201
202 // Wait a bit more to allow threads to reach their wait state.
203 // If threads aren't in a wait state, they may start to gobble up tasks in
204 // parallel, short-circuiting (breaking) this test.
205 PlatformThread::Sleep(100);
206
207 {
208 // Since we have no tasks yet, all threads should be waiting by now.
209 AutoLock auto_lock(*queue.lock());
210 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
211 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
212 EXPECT_EQ(0, queue.task_count());
213 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
214 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
215 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
216
217 // Set up to make one worker do 30ms tasks sequentially.
218 queue.ResetHistory();
219 queue.SetTaskCount(kTaskCount);
220 queue.SetWorkTime(kThirtyMs);
221 queue.SetAllowHelp(false);
222
223 start_time = base::Time::Now();
224 }
225
226 queue.work_is_available()->Signal(); // Start up one thread.
227
228
229 {
230 // Wait until all 10 work tasks have at least been assigned.
231 AutoLock auto_lock(*queue.lock());
232 while (queue.task_count())
233 queue.no_more_tasks()->Wait();
234 // The last of the tasks *might* still be running, but... all but one should
235 // be done by now, since tasks are being done serially.
236 EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1),
237 (base::Time::Now() - start_time).InMilliseconds());
238
239 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments());
240 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks());
241 EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread());
242 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
243 EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks());
244 }
245
246 // Wait to be sure all tasks are done.
247 while (1) {
248 {
249 AutoLock auto_lock(*queue.lock());
250 if (kTaskCount == queue.GetNumberOfCompletedTasks())
251 break;
252 }
253 PlatformThread::Sleep(30); // Wait a little.
254 }
255
256 {
257 // Check that all work was done by one thread id.
258 AutoLock auto_lock(*queue.lock());
259 EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments());
260 EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks());
261 EXPECT_EQ(0, queue.task_count());
262 EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread());
263 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
264 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
265
266 // Set up to make each task include getting help from another worker, so
267 // so that the work gets done in paralell.
268 queue.ResetHistory();
269 queue.SetTaskCount(kTaskCount);
270 queue.SetWorkTime(kThirtyMs);
271 queue.SetAllowHelp(true);
272
273 start_time = base::Time::Now();
274 }
275
276 queue.work_is_available()->Signal(); // But each worker can signal another.
277 // Wait to allow the all workers to get done.
278 while (1) {
279 {
280 AutoLock auto_lock(*queue.lock());
281 if (kTaskCount == queue.GetNumberOfCompletedTasks())
282 break;
283 }
284 PlatformThread::Sleep(30); // Wait a little.
285 }
286
287 {
288 // Wait until all work tasks have at least been assigned.
289 AutoLock auto_lock(*queue.lock());
290 while (queue.task_count())
291 queue.no_more_tasks()->Wait();
292 // Since they can all run almost in parallel, there is no guarantee that all
293 // tasks are finished, but we should have gotten here faster than it would
294 // take to run all tasks serially.
295 EXPECT_GT(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1),
296 (base::Time::Now() - start_time).InMilliseconds());
297
298 // To avoid racy assumptions, we'll just assert that at least 2 threads
299 // did work.
300 EXPECT_LE(2, queue.GetNumThreadsTakingAssignments());
301 EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
302
303 // Try to ask all workers to help, and only a few will do the work.
304 queue.ResetHistory();
305 queue.SetTaskCount(3);
306 queue.SetWorkTime(kThirtyMs);
307 queue.SetAllowHelp(false);
308 }
309 queue.work_is_available()->Broadcast(); // Make them all try.
310 // Wait to allow the 3 workers to get done.
311 PlatformThread::Sleep(45);
312
313 {
314 AutoLock auto_lock(*queue.lock());
315 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
316 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
317 EXPECT_EQ(0, queue.task_count());
318 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
319 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
320 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
321
322 // Set up to make each task get help from another worker.
323 queue.ResetHistory();
324 queue.SetTaskCount(3);
325 queue.SetWorkTime(kThirtyMs);
326 queue.SetAllowHelp(true); // Allow (unnecessary) help requests.
327 }
328 queue.work_is_available()->Broadcast(); // We already signal all threads.
329 // Wait to allow the 3 workers to get done.
330 PlatformThread::Sleep(100);
331
332 {
333 AutoLock auto_lock(*queue.lock());
334 EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
335 EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
336 EXPECT_EQ(0, queue.task_count());
337 EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
338 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
339 EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
340
341 // Set up to make each task get help from another worker.
342 queue.ResetHistory();
343 queue.SetTaskCount(20);
344 queue.SetWorkTime(kThirtyMs);
345 queue.SetAllowHelp(true);
346 }
347 queue.work_is_available()->Signal(); // But each worker can signal another.
348 // Wait to allow the 10 workers to get done.
349 PlatformThread::Sleep(100); // Should take about 60 ms.
350
351 {
352 AutoLock auto_lock(*queue.lock());
353 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
354 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
355 EXPECT_EQ(0, queue.task_count());
356 EXPECT_EQ(2, queue.GetMaxCompletionsByWorkerThread());
357 EXPECT_EQ(2, queue.GetMinCompletionsByWorkerThread());
358 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
359
360 // Same as last test, but with Broadcast().
361 queue.ResetHistory();
362 queue.SetTaskCount(20); // 2 tasks per process.
363 queue.SetWorkTime(kThirtyMs);
364 queue.SetAllowHelp(true);
365 }
366 queue.work_is_available()->Broadcast();
367 // Wait to allow the 10 workers to get done.
368 PlatformThread::Sleep(100); // Should take about 60 ms.
369
370 {
371 AutoLock auto_lock(*queue.lock());
372 EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
373 EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
374 EXPECT_EQ(0, queue.task_count());
375 EXPECT_EQ(2, queue.GetMaxCompletionsByWorkerThread());
376 EXPECT_EQ(2, queue.GetMinCompletionsByWorkerThread());
377 EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
378
379 queue.SetShutdown();
380 }
381 queue.work_is_available()->Broadcast(); // Force check for shutdown.
382
383 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
384 queue.ThreadSafeCheckShutdown(kThreadCount));
385 PlatformThread::Sleep(10); // Be sure they're all shutdown.
386 }
387
TEST_F(ConditionVariableTest,LargeFastTaskTest)388 TEST_F(ConditionVariableTest, LargeFastTaskTest) {
389 const int kThreadCount = 200;
390 WorkQueue queue(kThreadCount); // Start the threads.
391
392 Lock private_lock; // Used locally for master to wait.
393 AutoLock private_held_lock(private_lock);
394 ConditionVariable private_cv(&private_lock);
395
396 {
397 AutoLock auto_lock(*queue.lock());
398 while (!queue.EveryIdWasAllocated())
399 queue.all_threads_have_ids()->Wait();
400 }
401
402 // Wait a bit more to allow threads to reach their wait state.
403 private_cv.TimedWait(kThirtyMs);
404
405 {
406 // Since we have no tasks, all threads should be waiting by now.
407 AutoLock auto_lock(*queue.lock());
408 EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
409 EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
410 EXPECT_EQ(0, queue.task_count());
411 EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
412 EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
413 EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
414
415 // Set up to make all workers do (an average of) 20 tasks.
416 queue.ResetHistory();
417 queue.SetTaskCount(20 * kThreadCount);
418 queue.SetWorkTime(kFortyFiveMs);
419 queue.SetAllowHelp(false);
420 }
421 queue.work_is_available()->Broadcast(); // Start up all threads.
422 // Wait until we've handed out all tasks.
423 {
424 AutoLock auto_lock(*queue.lock());
425 while (queue.task_count() != 0)
426 queue.no_more_tasks()->Wait();
427 }
428
429 // Wait till the last of the tasks complete.
430 // Don't bother to use locks: We may not get info in time... but we'll see it
431 // eventually.
432 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
433 20 * kThreadCount ==
434 queue.GetNumberOfCompletedTasks());
435
436 {
437 // With Broadcast(), every thread should have participated.
438 // but with racing.. they may not all have done equal numbers of tasks.
439 AutoLock auto_lock(*queue.lock());
440 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
441 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
442 EXPECT_EQ(0, queue.task_count());
443 EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread());
444 EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks());
445
446 // Set up to make all workers do (an average of) 4 tasks.
447 queue.ResetHistory();
448 queue.SetTaskCount(kThreadCount * 4);
449 queue.SetWorkTime(kFortyFiveMs);
450 queue.SetAllowHelp(true); // Might outperform Broadcast().
451 }
452 queue.work_is_available()->Signal(); // Start up one thread.
453
454 // Wait until we've handed out all tasks
455 {
456 AutoLock auto_lock(*queue.lock());
457 while (queue.task_count() != 0)
458 queue.no_more_tasks()->Wait();
459 }
460
461 // Wait till the last of the tasks complete.
462 // Don't bother to use locks: We may not get info in time... but we'll see it
463 // eventually.
464 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
465 4 * kThreadCount ==
466 queue.GetNumberOfCompletedTasks());
467
468 {
469 // With Signal(), every thread should have participated.
470 // but with racing.. they may not all have done four tasks.
471 AutoLock auto_lock(*queue.lock());
472 EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
473 EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
474 EXPECT_EQ(0, queue.task_count());
475 EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread());
476 EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks());
477
478 queue.SetShutdown();
479 }
480 queue.work_is_available()->Broadcast(); // Force check for shutdown.
481
482 // Wait for shutdowns to complete.
483 SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
484 queue.ThreadSafeCheckShutdown(kThreadCount));
485 PlatformThread::Sleep(10); // Be sure they're all shutdown.
486 }
487
488 //------------------------------------------------------------------------------
489 // Finally we provide the implementation for the methods in the WorkQueue class.
490 //------------------------------------------------------------------------------
491
WorkQueue(int thread_count)492 WorkQueue::WorkQueue(int thread_count)
493 : lock_(),
494 work_is_available_(&lock_),
495 all_threads_have_ids_(&lock_),
496 no_more_tasks_(&lock_),
497 thread_count_(thread_count),
498 thread_handles_(new PlatformThreadHandle[thread_count]),
499 assignment_history_(thread_count),
500 completion_history_(thread_count),
501 thread_started_counter_(0),
502 shutdown_task_count_(0),
503 task_count_(0),
504 allow_help_requests_(false),
505 shutdown_(false) {
506 EXPECT_GE(thread_count_, 1);
507 ResetHistory();
508 SetTaskCount(0);
509 SetWorkTime(TimeDelta::FromMilliseconds(30));
510
511 for (int i = 0; i < thread_count_; ++i) {
512 PlatformThreadHandle pth;
513 EXPECT_TRUE(PlatformThread::Create(0, this, &pth));
514 thread_handles_[i] = pth;
515 }
516 }
517
~WorkQueue()518 WorkQueue::~WorkQueue() {
519 {
520 AutoLock auto_lock(lock_);
521 SetShutdown();
522 }
523 work_is_available_.Broadcast(); // Tell them all to terminate.
524
525 for (int i = 0; i < thread_count_; ++i) {
526 PlatformThread::Join(thread_handles_[i]);
527 }
528 }
529
GetThreadId()530 int WorkQueue::GetThreadId() {
531 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
532 DCHECK(!EveryIdWasAllocated());
533 return thread_started_counter_++; // Give out Unique IDs.
534 }
535
EveryIdWasAllocated() const536 bool WorkQueue::EveryIdWasAllocated() const {
537 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
538 return thread_count_ == thread_started_counter_;
539 }
540
GetAnAssignment(int thread_id)541 TimeDelta WorkQueue::GetAnAssignment(int thread_id) {
542 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
543 DCHECK_LT(0, task_count_);
544 assignment_history_[thread_id]++;
545 if (0 == --task_count_) {
546 no_more_tasks_.Signal();
547 }
548 return worker_delay_;
549 }
550
WorkIsCompleted(int thread_id)551 void WorkQueue::WorkIsCompleted(int thread_id) {
552 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
553 completion_history_[thread_id]++;
554 }
555
task_count() const556 int WorkQueue::task_count() const {
557 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
558 return task_count_;
559 }
560
allow_help_requests() const561 bool WorkQueue::allow_help_requests() const {
562 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
563 return allow_help_requests_;
564 }
565
shutdown() const566 bool WorkQueue::shutdown() const {
567 lock_.AssertAcquired();
568 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
569 return shutdown_;
570 }
571
572 // Because this method is called from the test's main thread we need to actually
573 // take the lock. Threads will call the thread_shutting_down() method with the
574 // lock already acquired.
ThreadSafeCheckShutdown(int thread_count)575 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) {
576 bool all_shutdown;
577 AutoLock auto_lock(lock_);
578 {
579 // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock.
580 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
581 all_shutdown = (shutdown_task_count_ == thread_count);
582 }
583 return all_shutdown;
584 }
585
thread_shutting_down()586 void WorkQueue::thread_shutting_down() {
587 lock_.AssertAcquired();
588 DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
589 shutdown_task_count_++;
590 }
591
lock()592 Lock* WorkQueue::lock() {
593 return &lock_;
594 }
595
work_is_available()596 ConditionVariable* WorkQueue::work_is_available() {
597 return &work_is_available_;
598 }
599
all_threads_have_ids()600 ConditionVariable* WorkQueue::all_threads_have_ids() {
601 return &all_threads_have_ids_;
602 }
603
no_more_tasks()604 ConditionVariable* WorkQueue::no_more_tasks() {
605 return &no_more_tasks_;
606 }
607
ResetHistory()608 void WorkQueue::ResetHistory() {
609 for (int i = 0; i < thread_count_; ++i) {
610 assignment_history_[i] = 0;
611 completion_history_[i] = 0;
612 }
613 }
614
GetMinCompletionsByWorkerThread() const615 int WorkQueue::GetMinCompletionsByWorkerThread() const {
616 int minumum = completion_history_[0];
617 for (int i = 0; i < thread_count_; ++i)
618 minumum = std::min(minumum, completion_history_[i]);
619 return minumum;
620 }
621
GetMaxCompletionsByWorkerThread() const622 int WorkQueue::GetMaxCompletionsByWorkerThread() const {
623 int maximum = completion_history_[0];
624 for (int i = 0; i < thread_count_; ++i)
625 maximum = std::max(maximum, completion_history_[i]);
626 return maximum;
627 }
628
GetNumThreadsTakingAssignments() const629 int WorkQueue::GetNumThreadsTakingAssignments() const {
630 int count = 0;
631 for (int i = 0; i < thread_count_; ++i)
632 if (assignment_history_[i])
633 count++;
634 return count;
635 }
636
GetNumThreadsCompletingTasks() const637 int WorkQueue::GetNumThreadsCompletingTasks() const {
638 int count = 0;
639 for (int i = 0; i < thread_count_; ++i)
640 if (completion_history_[i])
641 count++;
642 return count;
643 }
644
GetNumberOfCompletedTasks() const645 int WorkQueue::GetNumberOfCompletedTasks() const {
646 int total = 0;
647 for (int i = 0; i < thread_count_; ++i)
648 total += completion_history_[i];
649 return total;
650 }
651
GetWorkTime() const652 TimeDelta WorkQueue::GetWorkTime() const {
653 return worker_delay_;
654 }
655
SetWorkTime(TimeDelta delay)656 void WorkQueue::SetWorkTime(TimeDelta delay) {
657 worker_delay_ = delay;
658 }
659
SetTaskCount(int count)660 void WorkQueue::SetTaskCount(int count) {
661 task_count_ = count;
662 }
663
SetAllowHelp(bool allow)664 void WorkQueue::SetAllowHelp(bool allow) {
665 allow_help_requests_ = allow;
666 }
667
SetShutdown()668 void WorkQueue::SetShutdown() {
669 lock_.AssertAcquired();
670 shutdown_ = true;
671 }
672
673 //------------------------------------------------------------------------------
674 // Define the standard worker task. Several tests will spin out many of these
675 // threads.
676 //------------------------------------------------------------------------------
677
678 // The multithread tests involve several threads with a task to perform as
679 // directed by an instance of the class WorkQueue.
680 // The task is to:
681 // a) Check to see if there are more tasks (there is a task counter).
682 // a1) Wait on condition variable if there are no tasks currently.
683 // b) Call a function to see what should be done.
684 // c) Do some computation based on the number of milliseconds returned in (b).
685 // d) go back to (a).
686
687 // WorkQueue::ThreadMain() implements the above task for all threads.
688 // It calls the controlling object to tell the creator about progress, and to
689 // ask about tasks.
690
ThreadMain()691 void WorkQueue::ThreadMain() {
692 int thread_id;
693 {
694 AutoLock auto_lock(lock_);
695 thread_id = GetThreadId();
696 if (EveryIdWasAllocated())
697 all_threads_have_ids()->Signal(); // Tell creator we're ready.
698 }
699
700 Lock private_lock; // Used to waste time on "our work".
701 while (1) { // This is the main consumer loop.
702 TimeDelta work_time;
703 bool could_use_help;
704 {
705 AutoLock auto_lock(lock_);
706 while (0 == task_count() && !shutdown()) {
707 work_is_available()->Wait();
708 }
709 if (shutdown()) {
710 // Ack the notification of a shutdown message back to the controller.
711 thread_shutting_down();
712 return; // Terminate.
713 }
714 // Get our task duration from the queue.
715 work_time = GetAnAssignment(thread_id);
716 could_use_help = (task_count() > 0) && allow_help_requests();
717 } // Release lock
718
719 // Do work (outside of locked region.
720 if (could_use_help)
721 work_is_available()->Signal(); // Get help from other threads.
722
723 if (work_time > TimeDelta::FromMilliseconds(0)) {
724 // We could just sleep(), but we'll instead further exercise the
725 // condition variable class, and do a timed wait.
726 AutoLock auto_lock(private_lock);
727 ConditionVariable private_cv(&private_lock);
728 private_cv.TimedWait(work_time); // Unsynchronized waiting.
729 }
730
731 {
732 AutoLock auto_lock(lock_);
733 // Send notification that we completed our "work."
734 WorkIsCompleted(thread_id);
735 }
736 }
737 }
738
739 } // namespace
740