• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2012 Google Inc.
3  *
4  * Use of this source code is governed by a BSD-style license that can be
5  * found in the LICENSE file.
6  */
7 
8 #include "SkRunnable.h"
9 #include "SkThreadPool.h"
10 #include "SkThreadUtils.h"
11 #include "SkTypes.h"
12 
13 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID)
14 #include <unistd.h>
15 #endif
16 
17 // Returns the number of cores on this machine.
num_cores()18 static int num_cores() {
19 #if defined(SK_BUILD_FOR_WIN32)
20     SYSTEM_INFO sysinfo;
21     GetSystemInfo(&sysinfo);
22     return sysinfo.dwNumberOfProcessors;
23 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID)
24     return sysconf(_SC_NPROCESSORS_ONLN);
25 #else
26     return 1;
27 #endif
28 }
29 
SkThreadPool(int count)30 SkThreadPool::SkThreadPool(int count)
31 : fState(kRunning_State), fBusyThreads(0) {
32     if (count < 0) count = num_cores();
33     // Create count threads, all running SkThreadPool::Loop.
34     for (int i = 0; i < count; i++) {
35         SkThread* thread = SkNEW_ARGS(SkThread, (&SkThreadPool::Loop, this));
36         *fThreads.append() = thread;
37         thread->start();
38     }
39 }
40 
~SkThreadPool()41 SkThreadPool::~SkThreadPool() {
42     if (kRunning_State == fState) {
43         this->wait();
44     }
45 }
46 
wait()47 void SkThreadPool::wait() {
48     fReady.lock();
49     fState = kWaiting_State;
50     fReady.broadcast();
51     fReady.unlock();
52 
53     // Wait for all threads to stop.
54     for (int i = 0; i < fThreads.count(); i++) {
55         fThreads[i]->join();
56         SkDELETE(fThreads[i]);
57     }
58     SkASSERT(fQueue.isEmpty());
59 }
60 
Loop(void * arg)61 /*static*/ void SkThreadPool::Loop(void* arg) {
62     // The SkThreadPool passes itself as arg to each thread as they're created.
63     SkThreadPool* pool = static_cast<SkThreadPool*>(arg);
64 
65     while (true) {
66         // We have to be holding the lock to read the queue and to call wait.
67         pool->fReady.lock();
68         while(pool->fQueue.isEmpty()) {
69             // Does the client want to stop and are all the threads ready to stop?
70             // If so, we move into the halting state, and whack all the threads so they notice.
71             if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) {
72                 pool->fState = kHalting_State;
73                 pool->fReady.broadcast();
74             }
75             // Any time we find ourselves in the halting state, it's quitting time.
76             if (kHalting_State == pool->fState) {
77                 pool->fReady.unlock();
78                 return;
79             }
80             // wait yields the lock while waiting, but will have it again when awoken.
81             pool->fReady.wait();
82         }
83         // We've got the lock back here, no matter if we ran wait or not.
84 
85         // The queue is not empty, so we have something to run.  Claim it.
86         LinkedRunnable* r = pool->fQueue.tail();
87 
88         pool->fQueue.remove(r);
89 
90         // Having claimed our SkRunnable, we now give up the lock while we run it.
91         // Otherwise, we'd only ever do work on one thread at a time, which rather
92         // defeats the point of this code.
93         pool->fBusyThreads++;
94         pool->fReady.unlock();
95 
96         // OK, now really do the work.
97         r->fRunnable->run();
98         SkDELETE(r);
99 
100         // Let everyone know we're not busy.
101         pool->fReady.lock();
102         pool->fBusyThreads--;
103         pool->fReady.unlock();
104     }
105 
106     SkASSERT(false); // Unreachable.  The only exit happens when pool->fState is kHalting_State.
107 }
108 
add(SkRunnable * r)109 void SkThreadPool::add(SkRunnable* r) {
110     if (NULL == r) {
111         return;
112     }
113 
114     // If we don't have any threads, obligingly just run the thing now.
115     if (fThreads.isEmpty()) {
116         return r->run();
117     }
118 
119     // We have some threads.  Queue it up!
120     fReady.lock();
121     SkASSERT(fState != kHalting_State);  // Shouldn't be able to add work when we're halting.
122     LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable);
123     linkedRunnable->fRunnable = r;
124     fQueue.addToHead(linkedRunnable);
125     fReady.signal();
126     fReady.unlock();
127 }
128