1 /**
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "logging.h"
18 #include "status.h"
19 #include "worker.h"
20
21 #include <time.h>
22
23 //#define WORKER_DEBUG
24 #ifdef WORKER_DEBUG
25
26 #define DBG(...) LOGD(__VA_ARGS__)
27
28 #else
29
30 #define DBG(...)
31
32 #endif
33
Work(void * param)34 void * WorkerThread::Work(void *param) {
35 WorkerThread *t = (WorkerThread *)param;
36 android_atomic_acquire_store(STATE_RUNNING, &t->state_);
37 void * v = t->Worker(t->workerParam_);
38 android_atomic_acquire_store(STATE_STOPPED, &t->state_);
39 return v;
40 }
41
isRunning()42 bool WorkerThread::isRunning() {
43 DBG("WorkerThread::isRunning E");
44 bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
45 DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
46 return ret_value;
47 }
48
WorkerThread()49 WorkerThread::WorkerThread() {
50 DBG("WorkerThread::WorkerThread E");
51 state_ = STATE_INITIALIZED;
52 pthread_mutex_init(&mutex_, NULL);
53 pthread_cond_init(&cond_, NULL);
54 DBG("WorkerThread::WorkerThread X");
55 }
56
~WorkerThread()57 WorkerThread::~WorkerThread() {
58 DBG("WorkerThread::~WorkerThread E");
59 Stop();
60 pthread_mutex_destroy(&mutex_);
61 DBG("WorkerThread::~WorkerThread X");
62 }
63
64 // Return true if changed from STATE_RUNNING to STATE_STOPPING
BeginStopping()65 bool WorkerThread::BeginStopping() {
66 DBG("WorkerThread::BeginStopping E");
67 bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
68 DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
69 return ret_value;
70 }
71
72 // Wait until state is not STATE_STOPPING
WaitUntilStopped()73 void WorkerThread::WaitUntilStopped() {
74 DBG("WorkerThread::WaitUntilStopped E");
75 pthread_cond_signal(&cond_);
76 while(android_atomic_release_load(&state_) == STATE_STOPPING) {
77 usleep(200000);
78 }
79 DBG("WorkerThread::WaitUntilStopped X");
80 }
81
Stop()82 void WorkerThread::Stop() {
83 DBG("WorkerThread::Stop E");
84 if (BeginStopping()) {
85 WaitUntilStopped();
86 }
87 DBG("WorkerThread::Stop X");
88 }
89
Run(void * workerParam)90 int WorkerThread::Run(void *workerParam) {
91 DBG("WorkerThread::Run E workerParam=%p", workerParam);
92 int status;
93 int ret;
94
95 workerParam_ = workerParam;
96
97 ret = pthread_attr_init(&attr_);
98 if (ret != 0) {
99 LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
100 return STATUS_ERR;
101 }
102 ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
103 if (ret != 0) {
104 LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
105 strerror(ret));
106 return STATUS_ERR;
107 }
108 ret = pthread_create(&tid_, &attr_,
109 (void * (*)(void *))&WorkerThread::Work, this);
110 if (ret != 0) {
111 LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
112 return STATUS_ERR;
113 }
114
115 // Wait until worker is running
116 while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
117 usleep(200000);
118 }
119
120 DBG("WorkerThread::Run X workerParam=%p", workerParam);
121 return STATUS_OK;
122 }
123
124
125 class WorkerQueueThread : public WorkerThread {
126 private:
127 friend class WorkerQueue;
128
129 public:
WorkerQueueThread()130 WorkerQueueThread() {
131 }
132
~WorkerQueueThread()133 virtual ~WorkerQueueThread() {
134 Stop();
135 }
136
Worker(void * param)137 void * Worker(void *param) {
138 DBG("WorkerQueueThread::Worker E");
139 WorkerQueue *wq = (WorkerQueue *)param;
140
141 // Do the work until we're told to stop
142 while (isRunning()) {
143 pthread_mutex_lock(&mutex_);
144 while (isRunning() && wq->q_.size() == 0) {
145 if (wq->delayed_q_.size() == 0) {
146 // Both queue's are empty so wait
147 pthread_cond_wait(&cond_, &mutex_);
148 } else {
149 // delayed_q_ is not empty, move any
150 // timed out records to q_.
151 int64_t now = android::elapsedRealtime();
152 while((wq->delayed_q_.size() != 0) &&
153 ((wq->delayed_q_.top()->time - now) <= 0)) {
154 struct WorkerQueue::Record *r = wq->delayed_q_.top();
155 DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
156 r->p, r->time);
157 wq->delayed_q_.pop();
158 wq->q_.push_back(r);
159 }
160
161 if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
162 // We need to do a timed wait
163 struct timeval tv;
164 struct timespec ts;
165 struct WorkerQueue::Record *r = wq->delayed_q_.top();
166 int64_t delay_ms = r->time - now;
167 DBG("WorkerQueueThread::Worker wait"
168 " p=%p time=%lldms delay_ms=%lldms",
169 r->p, r->time, delay_ms);
170 gettimeofday(&tv, NULL);
171 ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
172 ts.tv_nsec = (tv.tv_usec +
173 ((delay_ms % 1000) * 1000)) * 1000;
174 pthread_cond_timedwait(&cond_, &mutex_, &ts);
175 }
176 }
177 }
178 if (isRunning()) {
179 struct WorkerQueue::Record *r = wq->q_.front();
180 wq->q_.pop_front();
181 void *p = r->p;
182 wq->release_record(r);
183 pthread_mutex_unlock(&mutex_);
184 wq->Process(r->p);
185 } else {
186 pthread_mutex_unlock(&mutex_);
187 }
188 }
189 DBG("WorkerQueueThread::Worker X");
190 return NULL;
191 }
192 };
193
WorkerQueue()194 WorkerQueue::WorkerQueue() {
195 DBG("WorkerQueue::WorkerQueue E");
196 wqt_ = new WorkerQueueThread();
197 DBG("WorkerQueue::WorkerQueue X");
198 }
199
~WorkerQueue()200 WorkerQueue::~WorkerQueue() {
201 DBG("WorkerQueue::~WorkerQueue E");
202 Stop();
203
204 Record *r;
205 pthread_mutex_lock(&wqt_->mutex_);
206 while(free_list_.size() != 0) {
207 r = free_list_.front();
208 free_list_.pop_front();
209 DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
210 delete r;
211 }
212 while(delayed_q_.size() != 0) {
213 r = delayed_q_.top();
214 delayed_q_.pop();
215 DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
216 delete r;
217 }
218 pthread_mutex_unlock(&wqt_->mutex_);
219
220 delete wqt_;
221 DBG("WorkerQueue::~WorkerQueue X");
222 }
223
Run()224 int WorkerQueue::Run() {
225 return wqt_->Run(this);
226 }
227
Stop()228 void WorkerQueue::Stop() {
229 wqt_->Stop();
230 }
231
232 /**
233 * Obtain a record from free_list if it is not empty, fill in the record with provided
234 * information: *p and delay_in_ms
235 */
obtain_record(void * p,int delay_in_ms)236 struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
237 struct Record *r;
238 if (free_list_.size() == 0) {
239 r = new Record();
240 DBG("WorkerQueue::obtain_record new r=%p", r);
241 } else {
242 r = free_list_.front();
243 DBG("WorkerQueue::obtain_record reuse r=%p", r);
244 free_list_.pop_front();
245 }
246 r->p = p;
247 if (delay_in_ms != 0) {
248 r->time = android::elapsedRealtime() + delay_in_ms;
249 } else {
250 r->time = 0;
251 }
252 return r;
253 }
254
255 /**
256 * release a record and insert into the front of the free_list
257 */
release_record(struct Record * r)258 void WorkerQueue::release_record(struct Record *r) {
259 DBG("WorkerQueue::release_record r=%p", r);
260 free_list_.push_front(r);
261 }
262
263 /**
264 * Add a record to processing queue q_
265 */
Add(void * p)266 void WorkerQueue::Add(void *p) {
267 DBG("WorkerQueue::Add E:");
268 pthread_mutex_lock(&wqt_->mutex_);
269 struct Record *r = obtain_record(p, 0);
270 q_.push_back(r);
271 if (q_.size() == 1) {
272 pthread_cond_signal(&wqt_->cond_);
273 }
274 pthread_mutex_unlock(&wqt_->mutex_);
275 DBG("WorkerQueue::Add X:");
276 }
277
AddDelayed(void * p,int delay_in_ms)278 void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
279 DBG("WorkerQueue::AddDelayed E:");
280 if (delay_in_ms <= 0) {
281 Add(p);
282 } else {
283 pthread_mutex_lock(&wqt_->mutex_);
284 struct Record *r = obtain_record(p, delay_in_ms);
285 delayed_q_.push(r);
286 #ifdef WORKER_DEBUG
287 int64_t now = android::elapsedRealtime();
288 DBG("WorkerQueue::AddDelayed"
289 " p=%p delay_in_ms=%d now=%lldms top->p=%p"
290 " top->time=%lldms diff=%lldms",
291 p, delay_in_ms, now, delayed_q_.top()->p,
292 delayed_q_.top()->time, delayed_q_.top()->time - now);
293 #endif
294 if ((q_.size() == 0) && (delayed_q_.top() == r)) {
295 // q_ is empty and the new record is at delayed_q_.top
296 // so we signal the waiting thread so it can readjust
297 // the wait time.
298 DBG("WorkerQueue::AddDelayed signal");
299 pthread_cond_signal(&wqt_->cond_);
300 }
301 pthread_mutex_unlock(&wqt_->mutex_);
302 }
303 DBG("WorkerQueue::AddDelayed X:");
304 }
305
306
307 class TestWorkerQueue : public WorkerQueue {
Process(void * p)308 virtual void Process(void *p) {
309 LOGD("TestWorkerQueue::Process: EX p=%p", p);
310 }
311 };
312
313 class TesterThread : public WorkerThread {
314 public:
Worker(void * param)315 void * Worker(void *param)
316 {
317 LOGD("TesterThread::Worker E param=%p", param);
318 WorkerQueue *wq = (WorkerQueue *)param;
319
320 // Test AddDelayed
321 wq->AddDelayed((void *)1000, 1000);
322 wq->Add((void *)0);
323 wq->Add((void *)0);
324 wq->Add((void *)0);
325 wq->Add((void *)0);
326 wq->AddDelayed((void *)100, 100);
327 wq->AddDelayed((void *)2000, 2000);
328
329 for (int i = 1; isRunning(); i++) {
330 LOGD("TesterThread: looping %d", i);
331 wq->Add((void *)i);
332 wq->Add((void *)i);
333 wq->Add((void *)i);
334 wq->Add((void *)i);
335 sleep(1);
336 }
337
338 LOGD("TesterThread::Worker X param=%p", param);
339
340 return NULL;
341 }
342 };
343
testWorker()344 void testWorker() {
345 LOGD("testWorker E: ********");
346
347 // Test we can create a thread and delete it
348 TesterThread *tester = new TesterThread();
349 delete tester;
350
351 TestWorkerQueue *wq = new TestWorkerQueue();
352 if (wq->Run() == STATUS_OK) {
353 LOGD("testWorker WorkerQueue %p running", wq);
354
355 // Test we can run a thread, stop it then delete it
356 tester = new TesterThread();
357 tester->Run(wq);
358 LOGD("testWorker tester %p running", tester);
359 sleep(10);
360 LOGD("testWorker tester %p stopping", tester);
361 tester->Stop();
362 LOGD("testWorker tester %p stopped", tester);
363 wq->Stop();
364 LOGD("testWorker wq %p stopped", wq);
365 }
366 LOGD("testWorker X: ********\n");
367 }
368