• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "TestWakeupClientServiceImpl.h"
18 
19 #include "ApPowerControl.h"
20 
21 #include <android-base/stringprintf.h>
22 #include <inttypes.h>
23 #include <utils/Looper.h>
24 #include <utils/SystemClock.h>
25 #include <chrono>
26 #include <thread>
27 
28 namespace android {
29 namespace hardware {
30 namespace automotive {
31 namespace remoteaccess {
32 
33 namespace {
34 
35 using ::android::uptimeMillis;
36 using ::android::base::ScopedLockAssertion;
37 using ::android::base::StringPrintf;
38 using ::grpc::ServerContext;
39 using ::grpc::ServerWriter;
40 using ::grpc::Status;
41 
42 constexpr int kTaskIntervalInMs = 5'000;
43 constexpr int64_t KTaskTimeoutInMs = 20'000;
44 
45 }  // namespace
46 
generateTask()47 GetRemoteTasksResponse FakeTaskGenerator::generateTask() {
48     int clientId = mCurrentClientId++;
49     GetRemoteTasksResponse response;
50     response.set_data(std::string(reinterpret_cast<const char*>(DATA), sizeof(DATA)));
51     std::string clientIdStr = StringPrintf("%d", clientId);
52     response.set_clientid(clientIdStr);
53     return response;
54 }
55 
TaskTimeoutMessageHandler(TaskQueue * taskQueue)56 TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
57     : mTaskQueue(taskQueue) {}
58 
handleMessage(const android::Message & message)59 void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
60     mTaskQueue->handleTaskTimeout();
61 }
62 
TaskQueue()63 TaskQueue::TaskQueue() {
64     mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
65     mLooper = Looper::prepare(/*opts=*/0);
66     mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
67 }
68 
~TaskQueue()69 TaskQueue::~TaskQueue() {
70     {
71         std::lock_guard<std::mutex> lockGuard(mLock);
72         mStopped = true;
73     }
74     while (true) {
75         // Remove all pending timeout handlers from queue.
76         if (!maybePopOne().has_value()) {
77             break;
78         }
79     }
80     if (mCheckTaskTimeoutThread.joinable()) {
81         mCheckTaskTimeoutThread.join();
82     }
83 }
84 
maybePopOne()85 std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
86     std::lock_guard<std::mutex> lockGuard(mLock);
87     if (mTasks.size() == 0) {
88         return std::nullopt;
89     }
90     TaskInfo response = std::move(mTasks.top());
91     mTasks.pop();
92     mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
93     return std::move(response.taskData);
94 }
95 
add(const GetRemoteTasksResponse & task)96 void TaskQueue::add(const GetRemoteTasksResponse& task) {
97     std::lock_guard<std::mutex> lockGuard(mLock);
98     if (mStopped) {
99         return;
100     }
101     int taskId = mTaskIdCounter++;
102     mTasks.push(TaskInfo{
103             .taskId = taskId,
104             .timestampInMs = uptimeMillis(),
105             .taskData = task,
106     });
107     android::Message message(taskId);
108     mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
109     mTasksNotEmptyCv.notify_all();
110 }
111 
waitForTask()112 void TaskQueue::waitForTask() {
113     std::unique_lock<std::mutex> lock(mLock);
114     waitForTaskWithLock(lock);
115 }
116 
waitForTaskWithLock(std::unique_lock<std::mutex> & lock)117 void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
118     mTasksNotEmptyCv.wait(lock, [this] {
119         ScopedLockAssertion lockAssertion(mLock);
120         return mTasks.size() > 0 || mStopped;
121     });
122 }
123 
stopWait()124 void TaskQueue::stopWait() {
125     std::lock_guard<std::mutex> lockGuard(mLock);
126     mStopped = true;
127     mTasksNotEmptyCv.notify_all();
128 }
129 
isEmpty()130 bool TaskQueue::isEmpty() {
131     std::lock_guard<std::mutex> lockGuard(mLock);
132     return mTasks.size() == 0 || mStopped;
133 }
134 
checkForTestTimeoutLoop()135 void TaskQueue::checkForTestTimeoutLoop() {
136     Looper::setForThread(mLooper);
137 
138     while (true) {
139         {
140             std::unique_lock<std::mutex> lock(mLock);
141             if (mStopped) {
142                 return;
143             }
144         }
145 
146         mLooper->pollAll(/*timeoutMillis=*/-1);
147     }
148 }
149 
handleTaskTimeout()150 void TaskQueue::handleTaskTimeout() {
151     // We know which task timed-out from the taskId in the message. However, there is no easy way
152     // to remove a specific task with the task ID from the priority_queue, so we just check from
153     // the top of the queue (which have the oldest tasks).
154     std::lock_guard<std::mutex> lockGuard(mLock);
155     int64_t now = uptimeMillis();
156     while (mTasks.size() > 0) {
157         const TaskInfo& taskInfo = mTasks.top();
158         if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
159             break;
160         }
161         // In real implementation, this should report task failure to remote wakeup server.
162         printf("Task for client ID: %s timed-out, added at %" PRId64 " ms, now %" PRId64 " ms",
163                taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
164         mTasks.pop();
165     }
166 }
167 
TestWakeupClientServiceImpl()168 TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
169     mThread = std::thread([this] { fakeTaskGenerateLoop(); });
170 }
171 
~TestWakeupClientServiceImpl()172 TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() {
173     {
174         std::lock_guard<std::mutex> lockGuard(mLock);
175         mServerStopped = true;
176         mServerStoppedCv.notify_all();
177     }
178     mTaskQueue.stopWait();
179     if (mThread.joinable()) {
180         mThread.join();
181     }
182 }
183 
fakeTaskGenerateLoop()184 void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
185     // In actual implementation, this should communicate with the remote server and receives tasks
186     // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
187     while (true) {
188         mTaskQueue.add(mFakeTaskGenerator.generateTask());
189         printf("Received a new task\n");
190         if (mWakeupRequired) {
191             wakeupApplicationProcessor();
192         }
193 
194         printf("Sleeping for %d seconds until next task\n", kTaskIntervalInMs);
195 
196         std::unique_lock lk(mLock);
197         if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
198                 ScopedLockAssertion lockAssertion(mLock);
199                 return mServerStopped;
200             })) {
201             // If the stopped flag is set, we are quitting, exit the loop.
202             return;
203         }
204     }
205 }
206 
GetRemoteTasks(ServerContext * context,const GetRemoteTasksRequest * request,ServerWriter<GetRemoteTasksResponse> * writer)207 Status TestWakeupClientServiceImpl::GetRemoteTasks(ServerContext* context,
208                                                    const GetRemoteTasksRequest* request,
209                                                    ServerWriter<GetRemoteTasksResponse>* writer) {
210     printf("GetRemoteTasks called\n");
211     while (true) {
212         mTaskQueue.waitForTask();
213 
214         while (true) {
215             auto maybeTask = mTaskQueue.maybePopOne();
216             if (!maybeTask.has_value()) {
217                 // No task left, loop again and wait for another task(s).
218                 break;
219             }
220             // Loop through all the task in the queue but obtain lock for each element so we don't
221             // hold lock while writing the response.
222             const GetRemoteTasksResponse& response = maybeTask.value();
223             if (!writer->Write(response)) {
224                 // Broken stream, maybe the client is shutting down.
225                 printf("Failed to deliver remote task to remote access HAL\n");
226                 // The task failed to be sent, add it back to the queue. The order might change, but
227                 // it is okay.
228                 mTaskQueue.add(response);
229                 return Status::CANCELLED;
230             }
231         }
232     }
233     return Status::OK;
234 }
235 
NotifyWakeupRequired(ServerContext * context,const NotifyWakeupRequiredRequest * request,NotifyWakeupRequiredResponse * response)236 Status TestWakeupClientServiceImpl::NotifyWakeupRequired(ServerContext* context,
237                                                          const NotifyWakeupRequiredRequest* request,
238                                                          NotifyWakeupRequiredResponse* response) {
239     if (request->iswakeuprequired() && !mWakeupRequired && !mTaskQueue.isEmpty()) {
240         // If wakeup is now required and previously not required, this means we have finished
241         // shutting down the device. If there are still pending tasks, try waking up AP again
242         // to finish executing those tasks.
243         wakeupApplicationProcessor();
244     }
245     mWakeupRequired = request->iswakeuprequired();
246     return Status::OK;
247 }
248 
wakeupApplicationProcessor()249 void TestWakeupClientServiceImpl::wakeupApplicationProcessor() {
250     wakeupAp();
251 }
252 
253 }  // namespace remoteaccess
254 }  // namespace automotive
255 }  // namespace hardware
256 }  // namespace android
257