• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "logging.h"
18 #include "status.h"
19 #include "worker.h"
20 
21 #include <time.h>
22 
23 //#define WORKER_DEBUG
24 #ifdef  WORKER_DEBUG
25 
26 #define DBG(...) LOGD(__VA_ARGS__)
27 
28 #else
29 
30 #define DBG(...)
31 
32 #endif
33 
Work(void * param)34 void * WorkerThread::Work(void *param) {
35     WorkerThread *t = (WorkerThread *)param;
36     android_atomic_acquire_store(STATE_RUNNING, &t->state_);
37     void * v = t->Worker(t->workerParam_);
38     android_atomic_acquire_store(STATE_STOPPED, &t->state_);
39     return v;
40 }
41 
isRunning()42 bool WorkerThread::isRunning() {
43     DBG("WorkerThread::isRunning E");
44     bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
45     DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
46     return ret_value;
47 }
48 
WorkerThread()49 WorkerThread::WorkerThread() {
50     DBG("WorkerThread::WorkerThread E");
51     state_ = STATE_INITIALIZED;
52     pthread_mutex_init(&mutex_, NULL);
53     pthread_cond_init(&cond_, NULL);
54     DBG("WorkerThread::WorkerThread X");
55 }
56 
~WorkerThread()57 WorkerThread::~WorkerThread() {
58     DBG("WorkerThread::~WorkerThread E");
59     Stop();
60     pthread_mutex_destroy(&mutex_);
61     DBG("WorkerThread::~WorkerThread X");
62 }
63 
64 // Return true if changed from STATE_RUNNING to STATE_STOPPING
BeginStopping()65 bool WorkerThread::BeginStopping() {
66     DBG("WorkerThread::BeginStopping E");
67     bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
68     DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
69     return ret_value;
70 }
71 
72 // Wait until state is not STATE_STOPPING
WaitUntilStopped()73 void WorkerThread::WaitUntilStopped() {
74     DBG("WorkerThread::WaitUntilStopped E");
75     pthread_cond_signal(&cond_);
76     while(android_atomic_release_load(&state_) == STATE_STOPPING) {
77         usleep(200000);
78     }
79     DBG("WorkerThread::WaitUntilStopped X");
80 }
81 
Stop()82 void WorkerThread::Stop() {
83     DBG("WorkerThread::Stop E");
84     if (BeginStopping()) {
85         WaitUntilStopped();
86     }
87     DBG("WorkerThread::Stop X");
88 }
89 
Run(void * workerParam)90 int WorkerThread::Run(void *workerParam) {
91     DBG("WorkerThread::Run E workerParam=%p", workerParam);
92     int status;
93     int ret;
94 
95     workerParam_ = workerParam;
96 
97     ret = pthread_attr_init(&attr_);
98     if (ret != 0) {
99         LOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
100         return STATUS_ERR;
101     }
102     ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
103     if (ret != 0) {
104         LOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
105                 strerror(ret));
106         return STATUS_ERR;
107     }
108     ret = pthread_create(&tid_, &attr_,
109                 (void * (*)(void *))&WorkerThread::Work, this);
110     if (ret != 0) {
111         LOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
112         return STATUS_ERR;
113     }
114 
115     // Wait until worker is running
116     while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
117         usleep(200000);
118     }
119 
120     DBG("WorkerThread::Run X workerParam=%p", workerParam);
121     return STATUS_OK;
122 }
123 
124 
125 class WorkerQueueThread : public WorkerThread {
126   private:
127     friend class WorkerQueue;
128 
129   public:
WorkerQueueThread()130     WorkerQueueThread() {
131     }
132 
~WorkerQueueThread()133     virtual ~WorkerQueueThread() {
134         Stop();
135     }
136 
Worker(void * param)137     void * Worker(void *param) {
138         DBG("WorkerQueueThread::Worker E");
139         WorkerQueue *wq = (WorkerQueue *)param;
140 
141         // Do the work until we're told to stop
142         while (isRunning()) {
143             pthread_mutex_lock(&mutex_);
144             while (isRunning() && wq->q_.size() == 0) {
145                 if (wq->delayed_q_.size() == 0) {
146                     // Both queue's are empty so wait
147                     pthread_cond_wait(&cond_, &mutex_);
148                 } else {
149                     // delayed_q_ is not empty, move any
150                     // timed out records to q_.
151                     int64_t now = android::elapsedRealtime();
152                     while((wq->delayed_q_.size() != 0) &&
153                             ((wq->delayed_q_.top()->time - now) <= 0)) {
154                         struct WorkerQueue::Record *r = wq->delayed_q_.top();
155                         DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
156                                 r->p, r->time);
157                         wq->delayed_q_.pop();
158                         wq->q_.push_back(r);
159                     }
160 
161                     if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
162                         // We need to do a timed wait
163                         struct timeval tv;
164                         struct timespec ts;
165                         struct WorkerQueue::Record *r = wq->delayed_q_.top();
166                         int64_t delay_ms = r->time - now;
167                         DBG("WorkerQueueThread::Worker wait"
168                             " p=%p time=%lldms delay_ms=%lldms",
169                                 r->p, r->time, delay_ms);
170                         gettimeofday(&tv, NULL);
171                         ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
172                         ts.tv_nsec = (tv.tv_usec +
173                                         ((delay_ms % 1000) * 1000)) * 1000;
174                         pthread_cond_timedwait(&cond_, &mutex_, &ts);
175                     }
176                 }
177             }
178             if (isRunning()) {
179                 struct WorkerQueue::Record *r = wq->q_.front();
180                 wq->q_.pop_front();
181                 void *p = r->p;
182                 wq->release_record(r);
183                 pthread_mutex_unlock(&mutex_);
184                 wq->Process(r->p);
185             } else {
186                 pthread_mutex_unlock(&mutex_);
187             }
188         }
189         DBG("WorkerQueueThread::Worker X");
190         return NULL;
191     }
192 };
193 
WorkerQueue()194 WorkerQueue::WorkerQueue() {
195     DBG("WorkerQueue::WorkerQueue E");
196     wqt_ = new WorkerQueueThread();
197     DBG("WorkerQueue::WorkerQueue X");
198 }
199 
~WorkerQueue()200 WorkerQueue::~WorkerQueue() {
201     DBG("WorkerQueue::~WorkerQueue E");
202     Stop();
203 
204     Record *r;
205     pthread_mutex_lock(&wqt_->mutex_);
206     while(free_list_.size() != 0) {
207         r = free_list_.front();
208         free_list_.pop_front();
209         DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
210         delete r;
211     }
212     while(delayed_q_.size() != 0) {
213         r = delayed_q_.top();
214         delayed_q_.pop();
215         DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
216         delete r;
217     }
218     pthread_mutex_unlock(&wqt_->mutex_);
219 
220     delete wqt_;
221     DBG("WorkerQueue::~WorkerQueue X");
222 }
223 
Run()224 int WorkerQueue::Run() {
225     return wqt_->Run(this);
226 }
227 
Stop()228 void WorkerQueue::Stop() {
229     wqt_->Stop();
230 }
231 
232 /**
233  * Obtain a record from free_list if it is not empty, fill in the record with provided
234  * information: *p and delay_in_ms
235  */
obtain_record(void * p,int delay_in_ms)236 struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
237     struct Record *r;
238     if (free_list_.size() == 0) {
239         r = new Record();
240         DBG("WorkerQueue::obtain_record new r=%p", r);
241     } else {
242         r = free_list_.front();
243         DBG("WorkerQueue::obtain_record reuse r=%p", r);
244         free_list_.pop_front();
245     }
246     r->p = p;
247     if (delay_in_ms != 0) {
248         r->time = android::elapsedRealtime() + delay_in_ms;
249     } else {
250         r->time = 0;
251     }
252     return r;
253 }
254 
255 /**
256  * release a record and insert into the front of the free_list
257  */
release_record(struct Record * r)258 void WorkerQueue::release_record(struct Record *r) {
259     DBG("WorkerQueue::release_record r=%p", r);
260     free_list_.push_front(r);
261 }
262 
263 /**
264  * Add a record to processing queue q_
265  */
Add(void * p)266 void WorkerQueue::Add(void *p) {
267     DBG("WorkerQueue::Add E:");
268     pthread_mutex_lock(&wqt_->mutex_);
269     struct Record *r = obtain_record(p, 0);
270     q_.push_back(r);
271     if (q_.size() == 1) {
272         pthread_cond_signal(&wqt_->cond_);
273     }
274     pthread_mutex_unlock(&wqt_->mutex_);
275     DBG("WorkerQueue::Add X:");
276 }
277 
AddDelayed(void * p,int delay_in_ms)278 void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
279     DBG("WorkerQueue::AddDelayed E:");
280     if (delay_in_ms <= 0) {
281         Add(p);
282     } else {
283         pthread_mutex_lock(&wqt_->mutex_);
284         struct Record *r = obtain_record(p, delay_in_ms);
285         delayed_q_.push(r);
286 #ifdef WORKER_DEBUG
287         int64_t now = android::elapsedRealtime();
288         DBG("WorkerQueue::AddDelayed"
289             " p=%p delay_in_ms=%d now=%lldms top->p=%p"
290             " top->time=%lldms diff=%lldms",
291                 p, delay_in_ms, now, delayed_q_.top()->p,
292                 delayed_q_.top()->time, delayed_q_.top()->time - now);
293 #endif
294         if ((q_.size() == 0) && (delayed_q_.top() == r)) {
295             // q_ is empty and the new record is at delayed_q_.top
296             // so we signal the waiting thread so it can readjust
297             // the wait time.
298             DBG("WorkerQueue::AddDelayed signal");
299             pthread_cond_signal(&wqt_->cond_);
300         }
301         pthread_mutex_unlock(&wqt_->mutex_);
302     }
303     DBG("WorkerQueue::AddDelayed X:");
304 }
305 
306 
307 class TestWorkerQueue : public WorkerQueue {
Process(void * p)308     virtual void Process(void *p) {
309         LOGD("TestWorkerQueue::Process: EX p=%p", p);
310     }
311 };
312 
313 class TesterThread : public WorkerThread {
314   public:
Worker(void * param)315     void * Worker(void *param)
316     {
317         LOGD("TesterThread::Worker E param=%p", param);
318         WorkerQueue *wq = (WorkerQueue *)param;
319 
320         // Test AddDelayed
321         wq->AddDelayed((void *)1000, 1000);
322         wq->Add((void *)0);
323         wq->Add((void *)0);
324         wq->Add((void *)0);
325         wq->Add((void *)0);
326         wq->AddDelayed((void *)100, 100);
327         wq->AddDelayed((void *)2000, 2000);
328 
329         for (int i = 1; isRunning(); i++) {
330             LOGD("TesterThread: looping %d", i);
331             wq->Add((void *)i);
332             wq->Add((void *)i);
333             wq->Add((void *)i);
334             wq->Add((void *)i);
335             sleep(1);
336         }
337 
338         LOGD("TesterThread::Worker X param=%p", param);
339 
340         return NULL;
341     }
342 };
343 
testWorker()344 void testWorker() {
345     LOGD("testWorker E: ********");
346 
347     // Test we can create a thread and delete it
348     TesterThread *tester = new TesterThread();
349     delete tester;
350 
351     TestWorkerQueue *wq = new TestWorkerQueue();
352     if (wq->Run() == STATUS_OK) {
353         LOGD("testWorker WorkerQueue %p running", wq);
354 
355         // Test we can run a thread, stop it then delete it
356         tester = new TesterThread();
357         tester->Run(wq);
358         LOGD("testWorker tester %p running", tester);
359         sleep(10);
360         LOGD("testWorker tester %p stopping", tester);
361         tester->Stop();
362         LOGD("testWorker tester %p stopped", tester);
363         wq->Stop();
364         LOGD("testWorker wq %p stopped", wq);
365     }
366     LOGD("testWorker X: ********\n");
367 }
368