1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "WorkQueue"
19
20 #include <utils/Log.h>
21 #include "WorkQueue.h"
22
23 namespace android {
24
25 // --- WorkQueue ---
26
WorkQueue(size_t maxThreads,bool canCallJava)27 WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
28 mMaxThreads(maxThreads), mCanCallJava(canCallJava),
29 mCanceled(false), mFinished(false), mIdleThreads(0) {
30 }
31
~WorkQueue()32 WorkQueue::~WorkQueue() {
33 if (!cancel()) {
34 finish();
35 }
36 }
37
schedule(WorkUnit * workUnit,size_t backlog)38 status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
39 AutoMutex _l(mLock);
40
41 if (mFinished || mCanceled) {
42 return INVALID_OPERATION;
43 }
44
45 if (mWorkThreads.size() < mMaxThreads
46 && mIdleThreads < mWorkUnits.size() + 1) {
47 sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
48 status_t status = workThread->run("WorkQueue::WorkThread");
49 if (status) {
50 return status;
51 }
52 mWorkThreads.add(workThread);
53 mIdleThreads += 1;
54 } else if (backlog) {
55 while (mWorkUnits.size() >= mMaxThreads * backlog) {
56 mWorkDequeuedCondition.wait(mLock);
57 if (mFinished || mCanceled) {
58 return INVALID_OPERATION;
59 }
60 }
61 }
62
63 mWorkUnits.add(workUnit);
64 mWorkChangedCondition.broadcast();
65 return OK;
66 }
67
cancel()68 status_t WorkQueue::cancel() {
69 AutoMutex _l(mLock);
70
71 return cancelLocked();
72 }
73
cancelLocked()74 status_t WorkQueue::cancelLocked() {
75 if (mFinished) {
76 return INVALID_OPERATION;
77 }
78
79 if (!mCanceled) {
80 mCanceled = true;
81
82 size_t count = mWorkUnits.size();
83 for (size_t i = 0; i < count; i++) {
84 delete mWorkUnits.itemAt(i);
85 }
86 mWorkUnits.clear();
87 mWorkChangedCondition.broadcast();
88 mWorkDequeuedCondition.broadcast();
89 }
90 return OK;
91 }
92
finish()93 status_t WorkQueue::finish() {
94 { // acquire lock
95 AutoMutex _l(mLock);
96
97 if (mFinished) {
98 return INVALID_OPERATION;
99 }
100
101 mFinished = true;
102 mWorkChangedCondition.broadcast();
103 } // release lock
104
105 // It is not possible for the list of work threads to change once the mFinished
106 // flag has been set, so we can access mWorkThreads outside of the lock here.
107 size_t count = mWorkThreads.size();
108 for (size_t i = 0; i < count; i++) {
109 mWorkThreads.itemAt(i)->join();
110 }
111 mWorkThreads.clear();
112 return OK;
113 }
114
threadLoop()115 bool WorkQueue::threadLoop() {
116 WorkUnit* workUnit;
117 { // acquire lock
118 AutoMutex _l(mLock);
119
120 for (;;) {
121 if (mCanceled) {
122 return false;
123 }
124
125 if (!mWorkUnits.isEmpty()) {
126 workUnit = mWorkUnits.itemAt(0);
127 mWorkUnits.removeAt(0);
128 mIdleThreads -= 1;
129 mWorkDequeuedCondition.broadcast();
130 break;
131 }
132
133 if (mFinished) {
134 return false;
135 }
136
137 mWorkChangedCondition.wait(mLock);
138 }
139 } // release lock
140
141 bool shouldContinue = workUnit->run();
142 delete workUnit;
143
144 { // acquire lock
145 AutoMutex _l(mLock);
146
147 mIdleThreads += 1;
148
149 if (!shouldContinue) {
150 cancelLocked();
151 return false;
152 }
153 } // release lock
154
155 return true;
156 }
157
158 // --- WorkQueue::WorkThread ---
159
WorkThread(WorkQueue * workQueue,bool canCallJava)160 WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
161 Thread(canCallJava), mWorkQueue(workQueue) {
162 }
163
~WorkThread()164 WorkQueue::WorkThread::~WorkThread() {
165 }
166
threadLoop()167 bool WorkQueue::WorkThread::threadLoop() {
168 return mWorkQueue->threadLoop();
169 }
170
171 }; // namespace android
172