• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "PendingRequestPool.h"
18 
19 #include <VehicleHalTypes.h>
20 #include <VehicleUtils.h>
21 
22 #include <utils/Log.h>
23 #include <utils/SystemClock.h>
24 
25 #include <vector>
26 
27 namespace android {
28 namespace hardware {
29 namespace automotive {
30 namespace vehicle {
31 
32 namespace {
33 
34 using ::aidl::android::hardware::automotive::vehicle::StatusCode;
35 using ::android::base::Result;
36 
37 // At least check every 1s.
38 constexpr int64_t CHECK_TIME_IN_NANO = 1'000'000'000;
39 
40 }  // namespace
41 
PendingRequestPool(int64_t timeoutInNano)42 PendingRequestPool::PendingRequestPool(int64_t timeoutInNano)
43     : mTimeoutInNano(timeoutInNano), mThread([this] {
44           // [this] must be alive within this thread because destructor would wait for this thread
45           // to exit.
46           int64_t sleepTime = std::min(mTimeoutInNano, static_cast<int64_t>(CHECK_TIME_IN_NANO));
47           std::unique_lock<std::mutex> lk(mCvLock);
48           while (!mCv.wait_for(lk, std::chrono::nanoseconds(sleepTime),
49                                [this] { return mThreadStop.load(); })) {
50               checkTimeout();
51           }
52       }) {}
53 
~PendingRequestPool()54 PendingRequestPool::~PendingRequestPool() {
55     mThreadStop = true;
56     mCv.notify_all();
57     if (mThread.joinable()) {
58         mThread.join();
59     }
60 
61     // If this pool is being destructed, send out all pending requests as timeout.
62     {
63         std::scoped_lock<std::mutex> lockGuard(mLock);
64 
65         for (auto& [_, pendingRequests] : mPendingRequestsByClient) {
66             for (const auto& request : pendingRequests) {
67                 (*request.callback)(request.requestIds);
68             }
69         }
70         mPendingRequestsByClient.clear();
71     }
72 }
73 
addRequests(const void * clientId,const std::unordered_set<int64_t> & requestIds,std::shared_ptr<const TimeoutCallbackFunc> callback)74 VhalResult<void> PendingRequestPool::addRequests(
75         const void* clientId, const std::unordered_set<int64_t>& requestIds,
76         std::shared_ptr<const TimeoutCallbackFunc> callback) {
77     std::scoped_lock<std::mutex> lockGuard(mLock);
78     std::list<PendingRequest>* pendingRequests;
79     size_t pendingRequestCount = 0;
80     if (mPendingRequestsByClient.find(clientId) != mPendingRequestsByClient.end()) {
81         pendingRequests = &mPendingRequestsByClient[clientId];
82         for (const auto& pendingRequest : *pendingRequests) {
83             const auto& pendingRequestIds = pendingRequest.requestIds;
84             for (int64_t requestId : requestIds) {
85                 if (pendingRequestIds.find(requestId) != pendingRequestIds.end()) {
86                     return StatusError(StatusCode::INVALID_ARG)
87                            << "duplicate request ID: " << requestId;
88                 }
89             }
90             pendingRequestCount += pendingRequestIds.size();
91         }
92     } else {
93         // Create a new empty list for this client.
94         pendingRequests = &mPendingRequestsByClient[clientId];
95     }
96 
97     if (requestIds.size() > MAX_PENDING_REQUEST_PER_CLIENT - pendingRequestCount) {
98         return StatusError(StatusCode::TRY_AGAIN) << "too many pending requests";
99     }
100 
101     int64_t currentTime = elapsedRealtimeNano();
102     int64_t timeoutTimestamp = currentTime + mTimeoutInNano;
103 
104     pendingRequests->push_back({
105             .requestIds = std::unordered_set<int64_t>(requestIds.begin(), requestIds.end()),
106             .timeoutTimestamp = timeoutTimestamp,
107             .callback = callback,
108     });
109 
110     return {};
111 }
112 
isRequestPending(const void * clientId,int64_t requestId) const113 bool PendingRequestPool::isRequestPending(const void* clientId, int64_t requestId) const {
114     std::scoped_lock<std::mutex> lockGuard(mLock);
115 
116     return isRequestPendingLocked(clientId, requestId);
117 }
118 
countPendingRequests() const119 size_t PendingRequestPool::countPendingRequests() const {
120     std::scoped_lock<std::mutex> lockGuard(mLock);
121 
122     size_t count = 0;
123     for (const auto& [clientId, requests] : mPendingRequestsByClient) {
124         for (const auto& request : requests) {
125             count += request.requestIds.size();
126         }
127     }
128     return count;
129 }
130 
countPendingRequests(const void * clientId) const131 size_t PendingRequestPool::countPendingRequests(const void* clientId) const {
132     std::scoped_lock<std::mutex> lockGuard(mLock);
133 
134     auto it = mPendingRequestsByClient.find(clientId);
135     if (it == mPendingRequestsByClient.end()) {
136         return 0;
137     }
138 
139     size_t count = 0;
140     for (const auto& pendingRequest : it->second) {
141         count += pendingRequest.requestIds.size();
142     }
143 
144     return count;
145 }
146 
isRequestPendingLocked(const void * clientId,int64_t requestId) const147 bool PendingRequestPool::isRequestPendingLocked(const void* clientId, int64_t requestId) const {
148     auto it = mPendingRequestsByClient.find(clientId);
149     if (it == mPendingRequestsByClient.end()) {
150         return false;
151     }
152     for (const auto& pendingRequest : it->second) {
153         const auto& requestIds = pendingRequest.requestIds;
154         if (requestIds.find(requestId) != requestIds.end()) {
155             return true;
156         }
157     }
158     return false;
159 }
160 
checkTimeout()161 void PendingRequestPool::checkTimeout() {
162     std::vector<PendingRequest> timeoutRequests;
163     {
164         std::scoped_lock<std::mutex> lockGuard(mLock);
165 
166         int64_t currentTime = elapsedRealtimeNano();
167 
168         std::vector<const void*> clientsWithEmptyRequests;
169 
170         for (auto& [clientId, pendingRequests] : mPendingRequestsByClient) {
171             auto it = pendingRequests.begin();
172             while (it != pendingRequests.end()) {
173                 if (it->timeoutTimestamp >= currentTime) {
174                     break;
175                 }
176                 timeoutRequests.push_back(std::move(*it));
177                 it = pendingRequests.erase(it);
178             }
179 
180             if (pendingRequests.empty()) {
181                 clientsWithEmptyRequests.push_back(clientId);
182             }
183         }
184 
185         for (const void* clientId : clientsWithEmptyRequests) {
186             mPendingRequestsByClient.erase(clientId);
187         }
188     }
189 
190     // Call the callback outside the lock.
191     for (const auto& request : timeoutRequests) {
192         (*request.callback)(request.requestIds);
193     }
194 }
195 
tryFinishRequests(const void * clientId,const std::unordered_set<int64_t> & requestIds)196 std::unordered_set<int64_t> PendingRequestPool::tryFinishRequests(
197         const void* clientId, const std::unordered_set<int64_t>& requestIds) {
198     std::scoped_lock<std::mutex> lockGuard(mLock);
199 
200     std::unordered_set<int64_t> foundIds;
201 
202     if (mPendingRequestsByClient.find(clientId) == mPendingRequestsByClient.end()) {
203         return foundIds;
204     }
205 
206     auto& pendingRequests = mPendingRequestsByClient[clientId];
207     auto it = pendingRequests.begin();
208     while (it != pendingRequests.end()) {
209         auto& pendingRequestIds = it->requestIds;
210         for (int64_t requestId : requestIds) {
211             auto idIt = pendingRequestIds.find(requestId);
212             if (idIt == pendingRequestIds.end()) {
213                 continue;
214             }
215             pendingRequestIds.erase(idIt);
216             foundIds.insert(requestId);
217         }
218         if (pendingRequestIds.empty()) {
219             it = pendingRequests.erase(it);
220             continue;
221         }
222         it++;
223     }
224 
225     return foundIds;
226 }
227 
228 }  // namespace vehicle
229 }  // namespace automotive
230 }  // namespace hardware
231 }  // namespace android
232