• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2   This file is part of ThreadSanitizer, a dynamic data race detector.
3 
4   Copyright (C) 2008-2009 Google Inc
5      opensource@google.com
6 
7   This program is free software; you can redistribute it and/or
8   modify it under the terms of the GNU General Public License as
9   published by the Free Software Foundation; either version 2 of the
10   License, or (at your option) any later version.
11 
12   This program is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   General Public License for more details.
16 
17   You should have received a copy of the GNU General Public License
18   along with this program; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20   02111-1307, USA.
21 
22   The GNU General Public License is contained in the file COPYING.
23 */
24 
25 // Author: Konstantin Serebryany <opensource@google.com>
26 //
27 // Here we define a few simple classes that wrap threading primitives.
28 //
29 // We need this to create unit tests for ThreadSanitizer (or similar tools)
30 // that will work with different threading frameworks.
31 //
32 // Note, that some of the methods defined here are annotated with
33 // ANNOTATE_* macros defined in dynamic_annotations.h.
34 //
35 // DISCLAIMER: the classes defined in this header file
36 // are NOT intended for general use -- only for unit tests.
37 
38 #ifndef THREAD_WRAPPERS_H
39 #define THREAD_WRAPPERS_H
40 
41 #include <assert.h>
42 #include <limits.h>   // INT_MAX
43 #include <queue>
44 #include <stdio.h>
45 #include <string>
46 #include <time.h>
47 
48 #include "dynamic_annotations.h"
49 
50 using namespace std;
51 
52 #ifdef NDEBUG
53 # error "Pleeease, do not define NDEBUG"
54 #endif
55 
56 #ifdef WIN32
57 # define CHECK(x) do { if (!(x)) { \
58    fprintf(stderr, "Assertion failed: %s (%s:%d) %s\n", \
59           __FUNCTION__, __FILE__, __LINE__, #x); \
60    exit(1); }} while (0)
61 #else
62 # define CHECK assert
63 #endif
64 
65 /// Just a boolean condition. Used by Mutex::LockWhen and similar.
66 class Condition {
67  public:
68   typedef bool (*func_t)(void*);
69 
70   template <typename T>
Condition(bool (* func)(T *),T * arg)71   Condition(bool (*func)(T*), T* arg)
72   : func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
73 
Condition(bool (* func)())74   Condition(bool (*func)())
75   : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
76 
Eval()77   bool Eval() { return func_(arg_); }
78  private:
79   func_t func_;
80   void *arg_;
81 };
82 
83 // Define platform-specific types, constant and functions {{{1
84 static int AtomicIncrement(volatile int *value, int increment);
85 static int GetTimeInMs();
86 
87 class CondVar;
88 class MyThread;
89 class Mutex;
90 //}}}
91 
92 // Include platform-specific header with declaraions.
93 #ifndef WIN32
94 // Include pthread primitives (Linux, Mac)
95 #include "thread_wrappers_pthread.h"
96 #else
97 // Include Windows primitives
98 #include "thread_wrappers_win.h"
99 #endif
100 
101 // Define cross-platform types synchronization primitives {{{1
102 /// Just a message queue.
103 class ProducerConsumerQueue {
104  public:
ProducerConsumerQueue(int unused)105   ProducerConsumerQueue(int unused) {
106     //ANNOTATE_PCQ_CREATE(this);
107   }
~ProducerConsumerQueue()108   ~ProducerConsumerQueue() {
109     CHECK(q_.empty());
110     //ANNOTATE_PCQ_DESTROY(this);
111   }
112 
113   // Put.
Put(void * item)114   void Put(void *item) {
115     mu_.Lock();
116       q_.push(item);
117       ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
118       //ANNOTATE_PCQ_PUT(this);
119     mu_.Unlock();
120   }
121 
122   // Get.
123   // Blocks if the queue is empty.
Get()124   void *Get() {
125     mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
126       void * item;
127       bool ok = TryGetInternal(&item);
128       CHECK(ok);
129     mu_.Unlock();
130     return item;
131   }
132 
133   // If queue is not empty,
134   // remove an element from queue, put it into *res and return true.
135   // Otherwise return false.
TryGet(void ** res)136   bool TryGet(void **res) {
137     mu_.Lock();
138       bool ok = TryGetInternal(res);
139     mu_.Unlock();
140     return ok;
141   }
142 
143  private:
144   Mutex mu_;
145   std::queue<void*> q_; // protected by mu_
146 
147   // Requires mu_
TryGetInternal(void ** item_ptr)148   bool TryGetInternal(void ** item_ptr) {
149     if (q_.empty())
150       return false;
151     *item_ptr = q_.front();
152     q_.pop();
153     //ANNOTATE_PCQ_GET(this);
154     return true;
155   }
156 
IsQueueNotEmpty(std::queue<void * > * queue)157   static bool IsQueueNotEmpty(std::queue<void*> * queue) {
158      return !queue->empty();
159   }
160 };
161 
162 /// Function pointer with zero, one or two parameters.
163 struct Closure {
164   typedef void (*F0)();
165   typedef void (*F1)(void *arg1);
166   typedef void (*F2)(void *arg1, void *arg2);
167   int  n_params;
168   void *f;
169   void *param1;
170   void *param2;
171 
ExecuteClosure172   void Execute() {
173     if (n_params == 0) {
174       (F0(f))();
175     } else if (n_params == 1) {
176       (F1(f))(param1);
177     } else {
178       CHECK(n_params == 2);
179       (F2(f))(param1, param2);
180     }
181     delete this;
182   }
183 };
184 
NewCallback(void (* f)())185 static Closure *NewCallback(void (*f)()) {
186   Closure *res = new Closure;
187   res->n_params = 0;
188   res->f = (void*)(f);
189   res->param1 = NULL;
190   res->param2 = NULL;
191   return res;
192 }
193 
194 template <class P1>
NewCallback(void (* f)(P1),P1 p1)195 Closure *NewCallback(void (*f)(P1), P1 p1) {
196   CHECK(sizeof(P1) <= sizeof(void*));
197   Closure *res = new Closure;
198   res->n_params = 1;
199   res->f = (void*)(f);
200   res->param1 = (void*)(intptr_t)p1;
201   res->param2 = NULL;
202   return res;
203 }
204 
205 template <class P1, class P2>
NewCallback(void (* f)(P1,P2),P1 p1,P2 p2)206 Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
207   CHECK(sizeof(P1) <= sizeof(void*));
208   CHECK(sizeof(P2) <= sizeof(void*));
209   Closure *res = new Closure;
210   res->n_params = 2;
211   res->f = (void*)(f);
212   res->param1 = (void*)p1;
213   res->param2 = (void*)p2;
214   return res;
215 }
216 
217 /*! A thread pool that uses ProducerConsumerQueue.
218   Usage:
219   {
220     ThreadPool pool(n_workers);
221     pool.StartWorkers();
222     pool.Add(NewCallback(func_with_no_args));
223     pool.Add(NewCallback(func_with_one_arg, arg));
224     pool.Add(NewCallback(func_with_two_args, arg1, arg2));
225     ... // more calls to pool.Add()
226 
227     // the ~ThreadPool() is called: we wait workers to finish
228     // and then join all threads in the pool.
229   }
230 */
231 class ThreadPool {
232  public:
233   //! Create n_threads threads, but do not start.
ThreadPool(int n_threads)234   explicit ThreadPool(int n_threads)
235     : queue_(INT_MAX) {
236     for (int i = 0; i < n_threads; i++) {
237       MyThread *thread = new MyThread(&ThreadPool::Worker, this);
238       workers_.push_back(thread);
239     }
240   }
241 
242   //! Start all threads.
StartWorkers()243   void StartWorkers() {
244     for (size_t i = 0; i < workers_.size(); i++) {
245       workers_[i]->Start();
246     }
247   }
248 
249   //! Add a closure.
Add(Closure * closure)250   void Add(Closure *closure) {
251     queue_.Put(closure);
252   }
253 
num_threads()254   int num_threads() { return workers_.size();}
255 
256   //! Wait workers to finish, then join all threads.
~ThreadPool()257   ~ThreadPool() {
258     for (size_t i = 0; i < workers_.size(); i++) {
259       Add(NULL);
260     }
261     for (size_t i = 0; i < workers_.size(); i++) {
262       workers_[i]->Join();
263       delete workers_[i];
264     }
265   }
266  private:
267   std::vector<MyThread*>   workers_;
268   ProducerConsumerQueue  queue_;
269 
Worker(void * p)270   static void *Worker(void *p) {
271     ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
272     while (true) {
273       Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
274       if(closure == NULL) {
275         return NULL;
276       }
277       closure->Execute();
278     }
279   }
280 };
281 
282 class MutexLock {  // Scoped Mutex Locker/Unlocker
283  public:
MutexLock(Mutex * mu)284   MutexLock(Mutex *mu)
285     : mu_(mu) {
286     mu_->Lock();
287   }
~MutexLock()288   ~MutexLock() {
289     mu_->Unlock();
290   }
291  private:
292   Mutex *mu_;
293 };
294 
295 class BlockingCounter {
296  public:
BlockingCounter(int initial_count)297   explicit BlockingCounter(int initial_count) :
298     count_(initial_count) {}
DecrementCount()299   bool DecrementCount() {
300     MutexLock lock(&mu_);
301     count_--;
302     return count_ == 0;
303   }
Wait()304   void Wait() {
305     mu_.LockWhen(Condition(&IsZero, &count_));
306     mu_.Unlock();
307   }
308  private:
IsZero(int * arg)309   static bool IsZero(int *arg) { return *arg == 0; }
310   Mutex mu_;
311   int count_;
312 };
313 
314 //}}}
315 
316 #endif // THREAD_WRAPPERS_H
317 // vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker
318