• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // Multi-threaded tests of ConditionVariable class.
6 
7 #include <time.h>
8 #include <algorithm>
9 #include <vector>
10 
11 #include "base/condition_variable.h"
12 #include "base/lock.h"
13 #include "base/logging.h"
14 #include "base/platform_thread.h"
15 #include "base/scoped_ptr.h"
16 #include "base/spin_wait.h"
17 #include "base/thread_collision_warner.h"
18 #include "base/time.h"
19 #include "testing/gtest/include/gtest/gtest.h"
20 #include "testing/platform_test.h"
21 
22 using base::TimeDelta;
23 using base::TimeTicks;
24 
25 namespace {
26 //------------------------------------------------------------------------------
27 // Define our test class, with several common variables.
28 //------------------------------------------------------------------------------
29 
30 class ConditionVariableTest : public PlatformTest {
31  public:
32   const TimeDelta kZeroMs;
33   const TimeDelta kTenMs;
34   const TimeDelta kThirtyMs;
35   const TimeDelta kFortyFiveMs;
36   const TimeDelta kSixtyMs;
37   const TimeDelta kOneHundredMs;
38 
ConditionVariableTest()39   explicit ConditionVariableTest()
40     : kZeroMs(TimeDelta::FromMilliseconds(0)),
41       kTenMs(TimeDelta::FromMilliseconds(10)),
42       kThirtyMs(TimeDelta::FromMilliseconds(30)),
43       kFortyFiveMs(TimeDelta::FromMilliseconds(45)),
44       kSixtyMs(TimeDelta::FromMilliseconds(60)),
45       kOneHundredMs(TimeDelta::FromMilliseconds(100)) {
46   }
47 };
48 
49 //------------------------------------------------------------------------------
50 // Define a class that will control activities an several multi-threaded tests.
51 // The general structure of multi-threaded tests is that a test case will
52 // construct an instance of a WorkQueue.  The WorkQueue will spin up some
53 // threads and control them throughout their lifetime, as well as maintaining
54 // a central repository of the work thread's activity.  Finally, the WorkQueue
55 // will command the the worker threads to terminate.  At that point, the test
56 // cases will validate that the WorkQueue has records showing that the desired
57 // activities were performed.
58 //------------------------------------------------------------------------------
59 
60 // Callers are responsible for synchronizing access to the following class.
61 // The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for
62 // all synchronized access.
63 class WorkQueue : public PlatformThread::Delegate {
64  public:
65   explicit WorkQueue(int thread_count);
66   ~WorkQueue();
67 
68   // PlatformThread::Delegate interface.
69   void ThreadMain();
70 
71   //----------------------------------------------------------------------------
72   // Worker threads only call the following methods.
73   // They should use the lock to get exclusive access.
74   int GetThreadId();  // Get an ID assigned to a thread..
75   bool EveryIdWasAllocated() const;  // Indicates that all IDs were handed out.
76   TimeDelta GetAnAssignment(int thread_id);  // Get a work task duration.
77   void WorkIsCompleted(int thread_id);
78 
79   int task_count() const;
80   bool allow_help_requests() const;  // Workers can signal more workers.
81   bool shutdown() const;  // Check if shutdown has been requested.
82 
83   void thread_shutting_down();
84 
85 
86   //----------------------------------------------------------------------------
87   // Worker threads can call them but not needed to acquire a lock.
88   Lock* lock();
89 
90   ConditionVariable* work_is_available();
91   ConditionVariable* all_threads_have_ids();
92   ConditionVariable* no_more_tasks();
93 
94   //----------------------------------------------------------------------------
95   // The rest of the methods are for use by the controlling master thread (the
96   // test case code).
97   void ResetHistory();
98   int GetMinCompletionsByWorkerThread() const;
99   int GetMaxCompletionsByWorkerThread() const;
100   int GetNumThreadsTakingAssignments() const;
101   int GetNumThreadsCompletingTasks() const;
102   int GetNumberOfCompletedTasks() const;
103   TimeDelta GetWorkTime() const;
104 
105   void SetWorkTime(TimeDelta delay);
106   void SetTaskCount(int count);
107   void SetAllowHelp(bool allow);
108 
109   // Caller must acquire lock before calling.
110   void SetShutdown();
111 
112   // Compares the |shutdown_task_count_| to the |thread_count| and returns true
113   // if they are equal.  This check will acquire the |lock_| so the caller
114   // should not hold the lock when calling this method.
115   bool ThreadSafeCheckShutdown(int thread_count);
116 
117  private:
118   // Both worker threads and controller use the following to synchronize.
119   Lock lock_;
120   ConditionVariable work_is_available_;  // To tell threads there is work.
121 
122   // Conditions to notify the controlling process (if it is interested).
123   ConditionVariable all_threads_have_ids_;  // All threads are running.
124   ConditionVariable no_more_tasks_;  // Task count is zero.
125 
126   const int thread_count_;
127   scoped_array<PlatformThreadHandle> thread_handles_;
128   std::vector<int> assignment_history_;  // Number of assignment per worker.
129   std::vector<int> completion_history_;  // Number of completions per worker.
130   int thread_started_counter_;  // Used to issue unique id to workers.
131   int shutdown_task_count_;  // Number of tasks told to shutdown
132   int task_count_;  // Number of assignment tasks waiting to be processed.
133   TimeDelta worker_delay_;  // Time each task takes to complete.
134   bool allow_help_requests_;  // Workers can signal more workers.
135   bool shutdown_;  // Set when threads need to terminate.
136 
137   DFAKE_MUTEX(locked_methods_);
138 };
139 
140 //------------------------------------------------------------------------------
141 // The next section contains the actual tests.
142 //------------------------------------------------------------------------------
143 
TEST_F(ConditionVariableTest,StartupShutdownTest)144 TEST_F(ConditionVariableTest, StartupShutdownTest) {
145   Lock lock;
146 
147   // First try trivial startup/shutdown.
148   {
149     ConditionVariable cv1(&lock);
150   }  // Call for cv1 destruction.
151 
152   // Exercise with at least a few waits.
153   ConditionVariable cv(&lock);
154 
155   lock.Acquire();
156   cv.TimedWait(kTenMs);  // Wait for 10 ms.
157   cv.TimedWait(kTenMs);  // Wait for 10 ms.
158   lock.Release();
159 
160   lock.Acquire();
161   cv.TimedWait(kTenMs);  // Wait for 10 ms.
162   cv.TimedWait(kTenMs);  // Wait for 10 ms.
163   cv.TimedWait(kTenMs);  // Wait for 10 ms.
164   lock.Release();
165 }  // Call for cv destruction.
166 
TEST_F(ConditionVariableTest,TimeoutTest)167 TEST_F(ConditionVariableTest, TimeoutTest) {
168   Lock lock;
169   ConditionVariable cv(&lock);
170   lock.Acquire();
171 
172   TimeTicks start = TimeTicks::Now();
173   const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300);
174   // Allow for clocking rate granularity.
175   const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50);
176 
177   cv.TimedWait(WAIT_TIME + FUDGE_TIME);
178   TimeDelta duration = TimeTicks::Now() - start;
179   // We can't use EXPECT_GE here as the TimeDelta class does not support the
180   // required stream conversion.
181   EXPECT_TRUE(duration >= WAIT_TIME);
182 
183   lock.Release();
184 }
185 
186 // Test serial task servicing, as well as two parallel task servicing methods.
187 // TODO(maruel): This test is flaky, see http://crbug.com/10607
TEST_F(ConditionVariableTest,FLAKY_MultiThreadConsumerTest)188 TEST_F(ConditionVariableTest, FLAKY_MultiThreadConsumerTest) {
189   const int kThreadCount = 10;
190   WorkQueue queue(kThreadCount);  // Start the threads.
191 
192   const int kTaskCount = 10;  // Number of tasks in each mini-test here.
193 
194   base::Time start_time;  // Used to time task processing.
195 
196   {
197     AutoLock auto_lock(*queue.lock());
198     while (!queue.EveryIdWasAllocated())
199       queue.all_threads_have_ids()->Wait();
200   }
201 
202   // Wait a bit more to allow threads to reach their wait state.
203   // If threads aren't in a wait state, they may start to gobble up tasks in
204   // parallel, short-circuiting (breaking) this test.
205   PlatformThread::Sleep(100);
206 
207   {
208     // Since we have no tasks yet, all threads should be waiting by now.
209     AutoLock auto_lock(*queue.lock());
210     EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
211     EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
212     EXPECT_EQ(0, queue.task_count());
213     EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
214     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
215     EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
216 
217     // Set up to make one worker do 30ms tasks sequentially.
218     queue.ResetHistory();
219     queue.SetTaskCount(kTaskCount);
220     queue.SetWorkTime(kThirtyMs);
221     queue.SetAllowHelp(false);
222 
223     start_time = base::Time::Now();
224   }
225 
226   queue.work_is_available()->Signal();  // Start up one thread.
227 
228 
229   {
230     // Wait until all 10 work tasks have at least been assigned.
231     AutoLock auto_lock(*queue.lock());
232     while (queue.task_count())
233       queue.no_more_tasks()->Wait();
234     // The last of the tasks *might* still be running, but... all but one should
235     // be done by now, since tasks are being done serially.
236     EXPECT_LE(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1),
237               (base::Time::Now() - start_time).InMilliseconds());
238 
239     EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments());
240     EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks());
241     EXPECT_LE(kTaskCount - 1, queue.GetMaxCompletionsByWorkerThread());
242     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
243     EXPECT_LE(kTaskCount - 1, queue.GetNumberOfCompletedTasks());
244   }
245 
246   // Wait to be sure all tasks are done.
247   while (1) {
248     {
249       AutoLock auto_lock(*queue.lock());
250       if (kTaskCount == queue.GetNumberOfCompletedTasks())
251         break;
252     }
253     PlatformThread::Sleep(30);  // Wait a little.
254   }
255 
256   {
257     // Check that all work was done by one thread id.
258     AutoLock auto_lock(*queue.lock());
259     EXPECT_EQ(1, queue.GetNumThreadsTakingAssignments());
260     EXPECT_EQ(1, queue.GetNumThreadsCompletingTasks());
261     EXPECT_EQ(0, queue.task_count());
262     EXPECT_EQ(kTaskCount, queue.GetMaxCompletionsByWorkerThread());
263     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
264     EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
265 
266     // Set up to make each task include getting help from another worker, so
267     // so that the work gets done in paralell.
268     queue.ResetHistory();
269     queue.SetTaskCount(kTaskCount);
270     queue.SetWorkTime(kThirtyMs);
271     queue.SetAllowHelp(true);
272 
273     start_time = base::Time::Now();
274   }
275 
276   queue.work_is_available()->Signal();  // But each worker can signal another.
277   // Wait to allow the all workers to get done.
278   while (1) {
279     {
280       AutoLock auto_lock(*queue.lock());
281       if (kTaskCount == queue.GetNumberOfCompletedTasks())
282         break;
283     }
284     PlatformThread::Sleep(30);  // Wait a little.
285   }
286 
287   {
288     // Wait until all work tasks have at least been assigned.
289     AutoLock auto_lock(*queue.lock());
290     while (queue.task_count())
291       queue.no_more_tasks()->Wait();
292     // Since they can all run almost in parallel, there is no guarantee that all
293     // tasks are finished, but we should have gotten here faster than it would
294     // take to run all tasks serially.
295     EXPECT_GT(queue.GetWorkTime().InMilliseconds() * (kTaskCount - 1),
296               (base::Time::Now() - start_time).InMilliseconds());
297 
298     // To avoid racy assumptions, we'll just assert that at least 2 threads
299     // did work.
300     EXPECT_LE(2, queue.GetNumThreadsTakingAssignments());
301     EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
302 
303     // Try to ask all workers to help, and only a few will do the work.
304     queue.ResetHistory();
305     queue.SetTaskCount(3);
306     queue.SetWorkTime(kThirtyMs);
307     queue.SetAllowHelp(false);
308   }
309   queue.work_is_available()->Broadcast();  // Make them all try.
310   // Wait to allow the 3 workers to get done.
311   PlatformThread::Sleep(45);
312 
313   {
314     AutoLock auto_lock(*queue.lock());
315     EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
316     EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
317     EXPECT_EQ(0, queue.task_count());
318     EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
319     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
320     EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
321 
322     // Set up to make each task get help from another worker.
323     queue.ResetHistory();
324     queue.SetTaskCount(3);
325     queue.SetWorkTime(kThirtyMs);
326     queue.SetAllowHelp(true);  // Allow (unnecessary) help requests.
327   }
328   queue.work_is_available()->Broadcast();  // We already signal all threads.
329   // Wait to allow the 3 workers to get done.
330   PlatformThread::Sleep(100);
331 
332   {
333     AutoLock auto_lock(*queue.lock());
334     EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
335     EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
336     EXPECT_EQ(0, queue.task_count());
337     EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
338     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
339     EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
340 
341     // Set up to make each task get help from another worker.
342     queue.ResetHistory();
343     queue.SetTaskCount(20);
344     queue.SetWorkTime(kThirtyMs);
345     queue.SetAllowHelp(true);
346   }
347   queue.work_is_available()->Signal();  // But each worker can signal another.
348   // Wait to allow the 10 workers to get done.
349   PlatformThread::Sleep(100);  // Should take about 60 ms.
350 
351   {
352     AutoLock auto_lock(*queue.lock());
353     EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
354     EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
355     EXPECT_EQ(0, queue.task_count());
356     EXPECT_EQ(2, queue.GetMaxCompletionsByWorkerThread());
357     EXPECT_EQ(2, queue.GetMinCompletionsByWorkerThread());
358     EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
359 
360     // Same as last test, but with Broadcast().
361     queue.ResetHistory();
362     queue.SetTaskCount(20);  // 2 tasks per process.
363     queue.SetWorkTime(kThirtyMs);
364     queue.SetAllowHelp(true);
365   }
366   queue.work_is_available()->Broadcast();
367   // Wait to allow the 10 workers to get done.
368   PlatformThread::Sleep(100);  // Should take about 60 ms.
369 
370   {
371     AutoLock auto_lock(*queue.lock());
372     EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
373     EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
374     EXPECT_EQ(0, queue.task_count());
375     EXPECT_EQ(2, queue.GetMaxCompletionsByWorkerThread());
376     EXPECT_EQ(2, queue.GetMinCompletionsByWorkerThread());
377     EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
378 
379     queue.SetShutdown();
380   }
381   queue.work_is_available()->Broadcast();  // Force check for shutdown.
382 
383   SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
384                                    queue.ThreadSafeCheckShutdown(kThreadCount));
385   PlatformThread::Sleep(10);  // Be sure they're all shutdown.
386 }
387 
TEST_F(ConditionVariableTest,LargeFastTaskTest)388 TEST_F(ConditionVariableTest, LargeFastTaskTest) {
389   const int kThreadCount = 200;
390   WorkQueue queue(kThreadCount);  // Start the threads.
391 
392   Lock private_lock;  // Used locally for master to wait.
393   AutoLock private_held_lock(private_lock);
394   ConditionVariable private_cv(&private_lock);
395 
396   {
397     AutoLock auto_lock(*queue.lock());
398     while (!queue.EveryIdWasAllocated())
399       queue.all_threads_have_ids()->Wait();
400   }
401 
402   // Wait a bit more to allow threads to reach their wait state.
403   private_cv.TimedWait(kThirtyMs);
404 
405   {
406     // Since we have no tasks, all threads should be waiting by now.
407     AutoLock auto_lock(*queue.lock());
408     EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
409     EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
410     EXPECT_EQ(0, queue.task_count());
411     EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
412     EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
413     EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
414 
415     // Set up to make all workers do (an average of) 20 tasks.
416     queue.ResetHistory();
417     queue.SetTaskCount(20 * kThreadCount);
418     queue.SetWorkTime(kFortyFiveMs);
419     queue.SetAllowHelp(false);
420   }
421   queue.work_is_available()->Broadcast();  // Start up all threads.
422   // Wait until we've handed out all tasks.
423   {
424     AutoLock auto_lock(*queue.lock());
425     while (queue.task_count() != 0)
426       queue.no_more_tasks()->Wait();
427   }
428 
429   // Wait till the last of the tasks complete.
430   // Don't bother to use locks: We may not get info in time... but we'll see it
431   // eventually.
432   SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
433                                     20 * kThreadCount ==
434                                       queue.GetNumberOfCompletedTasks());
435 
436   {
437     // With Broadcast(), every thread should have participated.
438     // but with racing.. they may not all have done equal numbers of tasks.
439     AutoLock auto_lock(*queue.lock());
440     EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
441     EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
442     EXPECT_EQ(0, queue.task_count());
443     EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread());
444     EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks());
445 
446     // Set up to make all workers do (an average of) 4 tasks.
447     queue.ResetHistory();
448     queue.SetTaskCount(kThreadCount * 4);
449     queue.SetWorkTime(kFortyFiveMs);
450     queue.SetAllowHelp(true);  // Might outperform Broadcast().
451   }
452   queue.work_is_available()->Signal();  // Start up one thread.
453 
454   // Wait until we've handed out all tasks
455   {
456     AutoLock auto_lock(*queue.lock());
457     while (queue.task_count() != 0)
458       queue.no_more_tasks()->Wait();
459   }
460 
461   // Wait till the last of the tasks complete.
462   // Don't bother to use locks: We may not get info in time... but we'll see it
463   // eventually.
464   SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
465                                     4 * kThreadCount ==
466                                       queue.GetNumberOfCompletedTasks());
467 
468   {
469     // With Signal(), every thread should have participated.
470     // but with racing.. they may not all have done four tasks.
471     AutoLock auto_lock(*queue.lock());
472     EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
473     EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
474     EXPECT_EQ(0, queue.task_count());
475     EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread());
476     EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks());
477 
478     queue.SetShutdown();
479   }
480   queue.work_is_available()->Broadcast();  // Force check for shutdown.
481 
482   // Wait for shutdowns to complete.
483   SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
484                                    queue.ThreadSafeCheckShutdown(kThreadCount));
485   PlatformThread::Sleep(10);  // Be sure they're all shutdown.
486 }
487 
488 //------------------------------------------------------------------------------
489 // Finally we provide the implementation for the methods in the WorkQueue class.
490 //------------------------------------------------------------------------------
491 
WorkQueue(int thread_count)492 WorkQueue::WorkQueue(int thread_count)
493   : lock_(),
494     work_is_available_(&lock_),
495     all_threads_have_ids_(&lock_),
496     no_more_tasks_(&lock_),
497     thread_count_(thread_count),
498     thread_handles_(new PlatformThreadHandle[thread_count]),
499     assignment_history_(thread_count),
500     completion_history_(thread_count),
501     thread_started_counter_(0),
502     shutdown_task_count_(0),
503     task_count_(0),
504     allow_help_requests_(false),
505     shutdown_(false) {
506   EXPECT_GE(thread_count_, 1);
507   ResetHistory();
508   SetTaskCount(0);
509   SetWorkTime(TimeDelta::FromMilliseconds(30));
510 
511   for (int i = 0; i < thread_count_; ++i) {
512     PlatformThreadHandle pth;
513     EXPECT_TRUE(PlatformThread::Create(0, this, &pth));
514     thread_handles_[i] = pth;
515   }
516 }
517 
~WorkQueue()518 WorkQueue::~WorkQueue() {
519   {
520     AutoLock auto_lock(lock_);
521     SetShutdown();
522   }
523   work_is_available_.Broadcast();  // Tell them all to terminate.
524 
525   for (int i = 0; i < thread_count_; ++i) {
526     PlatformThread::Join(thread_handles_[i]);
527   }
528 }
529 
GetThreadId()530 int WorkQueue::GetThreadId() {
531   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
532   DCHECK(!EveryIdWasAllocated());
533   return thread_started_counter_++;  // Give out Unique IDs.
534 }
535 
EveryIdWasAllocated() const536 bool WorkQueue::EveryIdWasAllocated() const {
537   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
538   return thread_count_ == thread_started_counter_;
539 }
540 
GetAnAssignment(int thread_id)541 TimeDelta WorkQueue::GetAnAssignment(int thread_id) {
542   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
543   DCHECK_LT(0, task_count_);
544   assignment_history_[thread_id]++;
545   if (0 == --task_count_) {
546     no_more_tasks_.Signal();
547   }
548   return worker_delay_;
549 }
550 
WorkIsCompleted(int thread_id)551 void WorkQueue::WorkIsCompleted(int thread_id) {
552   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
553   completion_history_[thread_id]++;
554 }
555 
task_count() const556 int WorkQueue::task_count() const {
557   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
558   return task_count_;
559 }
560 
allow_help_requests() const561 bool WorkQueue::allow_help_requests() const {
562   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
563   return allow_help_requests_;
564 }
565 
shutdown() const566 bool WorkQueue::shutdown() const {
567   lock_.AssertAcquired();
568   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
569   return shutdown_;
570 }
571 
572 // Because this method is called from the test's main thread we need to actually
573 // take the lock.  Threads will call the thread_shutting_down() method with the
574 // lock already acquired.
ThreadSafeCheckShutdown(int thread_count)575 bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) {
576   bool all_shutdown;
577   AutoLock auto_lock(lock_);
578   {
579     // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock.
580     DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
581     all_shutdown = (shutdown_task_count_ == thread_count);
582   }
583   return all_shutdown;
584 }
585 
thread_shutting_down()586 void WorkQueue::thread_shutting_down() {
587   lock_.AssertAcquired();
588   DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
589   shutdown_task_count_++;
590 }
591 
lock()592 Lock* WorkQueue::lock() {
593   return &lock_;
594 }
595 
work_is_available()596 ConditionVariable* WorkQueue::work_is_available() {
597   return &work_is_available_;
598 }
599 
all_threads_have_ids()600 ConditionVariable* WorkQueue::all_threads_have_ids() {
601   return &all_threads_have_ids_;
602 }
603 
no_more_tasks()604 ConditionVariable* WorkQueue::no_more_tasks() {
605   return &no_more_tasks_;
606 }
607 
ResetHistory()608 void WorkQueue::ResetHistory() {
609   for (int i = 0; i < thread_count_; ++i) {
610     assignment_history_[i] = 0;
611     completion_history_[i] = 0;
612   }
613 }
614 
GetMinCompletionsByWorkerThread() const615 int WorkQueue::GetMinCompletionsByWorkerThread() const {
616   int minumum = completion_history_[0];
617   for (int i = 0; i < thread_count_; ++i)
618     minumum = std::min(minumum, completion_history_[i]);
619   return minumum;
620 }
621 
GetMaxCompletionsByWorkerThread() const622 int WorkQueue::GetMaxCompletionsByWorkerThread() const {
623   int maximum = completion_history_[0];
624   for (int i = 0; i < thread_count_; ++i)
625     maximum = std::max(maximum, completion_history_[i]);
626   return maximum;
627 }
628 
GetNumThreadsTakingAssignments() const629 int WorkQueue::GetNumThreadsTakingAssignments() const {
630   int count = 0;
631   for (int i = 0; i < thread_count_; ++i)
632     if (assignment_history_[i])
633       count++;
634   return count;
635 }
636 
GetNumThreadsCompletingTasks() const637 int WorkQueue::GetNumThreadsCompletingTasks() const {
638   int count = 0;
639   for (int i = 0; i < thread_count_; ++i)
640     if (completion_history_[i])
641       count++;
642   return count;
643 }
644 
GetNumberOfCompletedTasks() const645 int WorkQueue::GetNumberOfCompletedTasks() const {
646   int total = 0;
647   for (int i = 0; i < thread_count_; ++i)
648     total += completion_history_[i];
649   return total;
650 }
651 
GetWorkTime() const652 TimeDelta WorkQueue::GetWorkTime() const {
653   return worker_delay_;
654 }
655 
SetWorkTime(TimeDelta delay)656 void WorkQueue::SetWorkTime(TimeDelta delay) {
657   worker_delay_ = delay;
658 }
659 
SetTaskCount(int count)660 void WorkQueue::SetTaskCount(int count) {
661   task_count_ = count;
662 }
663 
SetAllowHelp(bool allow)664 void WorkQueue::SetAllowHelp(bool allow) {
665   allow_help_requests_ = allow;
666 }
667 
SetShutdown()668 void WorkQueue::SetShutdown() {
669   lock_.AssertAcquired();
670   shutdown_ = true;
671 }
672 
673 //------------------------------------------------------------------------------
674 // Define the standard worker task. Several tests will spin out many of these
675 // threads.
676 //------------------------------------------------------------------------------
677 
678 // The multithread tests involve several threads with a task to perform as
679 // directed by an instance of the class WorkQueue.
680 // The task is to:
681 // a) Check to see if there are more tasks (there is a task counter).
682 //    a1) Wait on condition variable if there are no tasks currently.
683 // b) Call a function to see what should be done.
684 // c) Do some computation based on the number of milliseconds returned in (b).
685 // d) go back to (a).
686 
687 // WorkQueue::ThreadMain() implements the above task for all threads.
688 // It calls the controlling object to tell the creator about progress, and to
689 // ask about tasks.
690 
ThreadMain()691 void WorkQueue::ThreadMain() {
692   int thread_id;
693   {
694     AutoLock auto_lock(lock_);
695     thread_id = GetThreadId();
696     if (EveryIdWasAllocated())
697       all_threads_have_ids()->Signal();  // Tell creator we're ready.
698   }
699 
700   Lock private_lock;  // Used to waste time on "our work".
701   while (1) {  // This is the main consumer loop.
702     TimeDelta work_time;
703     bool could_use_help;
704     {
705       AutoLock auto_lock(lock_);
706       while (0 == task_count() && !shutdown()) {
707         work_is_available()->Wait();
708       }
709       if (shutdown()) {
710         // Ack the notification of a shutdown message back to the controller.
711         thread_shutting_down();
712         return;  // Terminate.
713       }
714       // Get our task duration from the queue.
715       work_time = GetAnAssignment(thread_id);
716       could_use_help = (task_count() > 0) && allow_help_requests();
717     }  // Release lock
718 
719     // Do work (outside of locked region.
720     if (could_use_help)
721       work_is_available()->Signal();  // Get help from other threads.
722 
723     if (work_time > TimeDelta::FromMilliseconds(0)) {
724       // We could just sleep(), but we'll instead further exercise the
725       // condition variable class, and do a timed wait.
726       AutoLock auto_lock(private_lock);
727       ConditionVariable private_cv(&private_lock);
728       private_cv.TimedWait(work_time);  // Unsynchronized waiting.
729     }
730 
731     {
732       AutoLock auto_lock(lock_);
733       // Send notification that we completed our "work."
734       WorkIsCompleted(thread_id);
735     }
736   }
737 }
738 
739 }  // namespace
740