• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ANDROID_QUEUE_WORKER_H_
18 #define ANDROID_QUEUE_WORKER_H_
19 
20 #include "worker.h"
21 
22 #include <queue>
23 
24 namespace android {
25 
26 template <typename T>
27 class QueueWorker : public Worker {
28  public:
29   static const size_t kDefaultMaxQueueSize = 2;
30   static const int64_t kTimeoutDisabled = -1;
31 
QueueWorker(const char * name,int priority)32   QueueWorker(const char *name, int priority)
33       : Worker(name, priority),
34         max_queue_size_(kDefaultMaxQueueSize),
35         queue_timeout_ms_(kTimeoutDisabled),
36         idle_timeout_ms_(kTimeoutDisabled),
37         idled_out_(false) {
38   }
39 
40   int QueueWork(std::unique_ptr<T> workitem);
41 
IsWorkPending()42   bool IsWorkPending() const {
43     return !queue_.empty();
44   }
idle()45   bool idle() const {
46     return idled_out_;
47   }
48 
idle_timeout()49   int64_t idle_timeout() {
50     return idle_timeout_ms_;
51   }
set_idle_timeout(int64_t timeout_ms)52   void set_idle_timeout(int64_t timeout_ms) {
53     idle_timeout_ms_ = timeout_ms;
54   }
55 
queue_timeout()56   int64_t queue_timeout() {
57     return queue_timeout_ms_;
58   }
set_queue_timeout(int64_t timeout_ms)59   void set_queue_timeout(int64_t timeout_ms) {
60     queue_timeout_ms_ = timeout_ms;
61   }
62 
max_queue_size()63   size_t max_queue_size() const {
64     return max_queue_size_;
65   }
set_max_queue_size(size_t size)66   void set_max_queue_size(size_t size) {
67     max_queue_size_ = size;
68   }
69 
70  protected:
71   virtual void ProcessWork(std::unique_ptr<T> workitem) = 0;
ProcessIdle()72   virtual void ProcessIdle(){}
73   virtual void Routine();
74 
75   template <typename Predicate>
76   int WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred,
77                int64_t max_msecs);
78 
79  private:
80   std::queue<std::unique_ptr<T>> queue_;
81   size_t max_queue_size_;
82   int64_t queue_timeout_ms_;
83   int64_t idle_timeout_ms_;
84   bool idled_out_;
85 };
86 
87 template <typename T>
88 template <typename Predicate>
WaitCond(std::unique_lock<std::mutex> & lock,Predicate pred,int64_t max_msecs)89 int QueueWorker<T>::WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred,
90                              int64_t max_msecs) {
91   bool ret = true;
92   auto wait_func = [&] { return pred() || should_exit(); };
93 
94   if (max_msecs < 0) {
95     cond_.wait(lock, wait_func);
96   } else {
97     auto timeout = std::chrono::milliseconds(max_msecs);
98     ret = cond_.wait_for(lock, timeout, wait_func);
99   }
100 
101   if (!ret)
102     return -ETIMEDOUT;
103   else if (should_exit())
104     return -EINTR;
105 
106   return 0;
107 }
108 
109 template <typename T>
Routine()110 void QueueWorker<T>::Routine() {
111   std::unique_lock<std::mutex> lk(mutex_);
112   std::unique_ptr<T> workitem;
113 
114   auto wait_func = [&] { return !queue_.empty(); };
115   int ret =
116       WaitCond(lk, wait_func, idled_out_ ? kTimeoutDisabled : idle_timeout_ms_);
117   switch (ret) {
118     case 0:
119       break;
120     case -ETIMEDOUT:
121       ProcessIdle();
122       idled_out_ = true;
123       return;
124     case -EINTR:
125     default:
126       return;
127   }
128 
129   if (!queue_.empty()) {
130     workitem = std::move(queue_.front());
131     queue_.pop();
132   }
133   lk.unlock();
134   cond_.notify_all();
135 
136   idled_out_ = false;
137   ProcessWork(std::move(workitem));
138 }
139 
140 template <typename T>
QueueWork(std::unique_ptr<T> workitem)141 int QueueWorker<T>::QueueWork(std::unique_ptr<T> workitem) {
142   std::unique_lock<std::mutex> lk(mutex_);
143 
144   auto wait_func = [&] { return queue_.size() < max_queue_size_; };
145   int ret = WaitCond(lk, wait_func, queue_timeout_ms_);
146   if (ret)
147     return ret;
148 
149   queue_.push(std::move(workitem));
150   lk.unlock();
151 
152   cond_.notify_one();
153 
154   return 0;
155 }
156 };
157 #endif
158