1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define LOG_TAG "AidlBufferPoolAcc"
17 //#define LOG_NDEBUG 0
18
19 #include <android-base/no_destructor.h>
20
21 #include <sys/types.h>
22 #include <stdint.h>
23 #include <time.h>
24 #include <unistd.h>
25 #include <utils/Log.h>
26 #include <thread>
27
28 #include "Accessor.h"
29 #include "Connection.h"
30 #include "DataHelper.h"
31
32 namespace aidl::android::hardware::media::bufferpool2::implementation {
33
34 namespace {
35 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
36 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
37 }
38
39 #ifdef __ANDROID_VNDK__
40 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
41 #else
42 static constexpr uint32_t kSeqIdVndkBit = 0;
43 #endif
44
45 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
46
ConnectionIdGenerator()47 Accessor::ConnectionIdGenerator::ConnectionIdGenerator() {
48 mSeqId = static_cast<uint32_t>(time(nullptr) & kSeqIdMax);
49 mPid = static_cast<int32_t>(getpid());
50 }
51
getConnectionId()52 ConnectionId Accessor::ConnectionIdGenerator::getConnectionId() {
53 uint32_t seq;
54 {
55 std::lock_guard<std::mutex> l(mLock);
56 seq = mSeqId;
57 if (mSeqId == kSeqIdMax) {
58 mSeqId = 0;
59 } else {
60 ++mSeqId;
61 }
62 }
63 return (int64_t)mPid << 32 | seq | kSeqIdVndkBit;
64 }
65
66 namespace {
67 // anonymous namespace
68 static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
69 std::make_shared<ConnectionDeathRecipient>();
70
serviceDied(void * cookie)71 void serviceDied(void *cookie) {
72 if (sConnectionDeathRecipient) {
73 sConnectionDeathRecipient->onDead(cookie);
74 }
75 }
76 }
77
getConnectionDeathRecipient()78 std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
79 return sConnectionDeathRecipient;
80 }
81
ConnectionDeathRecipient()82 ConnectionDeathRecipient::ConnectionDeathRecipient() {
83 mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
84 AIBinder_DeathRecipient_new(serviceDied));
85 }
86
add(int64_t connectionId,const std::shared_ptr<Accessor> & accessor)87 void ConnectionDeathRecipient::add(
88 int64_t connectionId,
89 const std::shared_ptr<Accessor> &accessor) {
90 std::lock_guard<std::mutex> lock(mLock);
91 if (mAccessors.find(connectionId) == mAccessors.end()) {
92 mAccessors.insert(std::make_pair(connectionId, accessor));
93 }
94 }
95
remove(int64_t connectionId)96 void ConnectionDeathRecipient::remove(int64_t connectionId) {
97 std::lock_guard<std::mutex> lock(mLock);
98 mAccessors.erase(connectionId);
99 auto it = mConnectionToCookie.find(connectionId);
100 if (it != mConnectionToCookie.end()) {
101 void * cookie = it->second;
102 mConnectionToCookie.erase(it);
103 auto cit = mCookieToConnections.find(cookie);
104 if (cit != mCookieToConnections.end()) {
105 cit->second.erase(connectionId);
106 if (cit->second.size() == 0) {
107 mCookieToConnections.erase(cit);
108 }
109 }
110 }
111 }
112
addCookieToConnection(void * cookie,int64_t connectionId)113 void ConnectionDeathRecipient::addCookieToConnection(
114 void *cookie,
115 int64_t connectionId) {
116 std::lock_guard<std::mutex> lock(mLock);
117 if (mAccessors.find(connectionId) == mAccessors.end()) {
118 return;
119 }
120 mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
121 auto it = mCookieToConnections.find(cookie);
122 if (it != mCookieToConnections.end()) {
123 it->second.insert(connectionId);
124 } else {
125 mCookieToConnections.insert(std::make_pair(
126 cookie, std::set<int64_t>{connectionId}));
127 }
128 }
129
onDead(void * cookie)130 void ConnectionDeathRecipient::onDead(void *cookie) {
131 std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
132 {
133 std::lock_guard<std::mutex> lock(mLock);
134
135 auto it = mCookieToConnections.find(cookie);
136 if (it != mCookieToConnections.end()) {
137 for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
138 auto accessorIt = mAccessors.find(*conIt);
139 if (accessorIt != mAccessors.end()) {
140 connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
141 mAccessors.erase(accessorIt);
142 }
143 mConnectionToCookie.erase(*conIt);
144 }
145 mCookieToConnections.erase(it);
146 }
147 }
148
149 if (connectionsToClose.size() > 0) {
150 std::shared_ptr<Accessor> accessor;
151 for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
152 accessor = it->second.lock();
153
154 if (accessor) {
155 accessor->close(it->first);
156 ALOGD("connection %lld closed on death", (long long)it->first);
157 }
158 }
159 }
160 }
161
getRecipient()162 AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
163 return mDeathRecipient.get();
164 }
165
connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver> & in_observer,::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo * _aidl_return)166 ::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
167 std::shared_ptr<Connection> connection;
168 ConnectionId connectionId;
169 uint32_t msgId;
170 StatusDescriptor statusDesc;
171 InvalidationDescriptor invDesc;
172 BufferPoolStatus status = connect(
173 in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
174 if (status == ResultStatus::OK) {
175 _aidl_return->connection = connection;
176 _aidl_return->connectionId = connectionId;
177 _aidl_return->msgId = msgId;
178 _aidl_return->toFmqDesc = std::move(statusDesc);
179 _aidl_return->fromFmqDesc = std::move(invDesc);
180 return ::ndk::ScopedAStatus::ok();
181 }
182 return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
183 }
184
Accessor(const std::shared_ptr<BufferPoolAllocator> & allocator)185 Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
186 : mAllocator(allocator), mScheduleEvictTs(0) {}
187
~Accessor()188 Accessor::~Accessor() {
189 }
190
isValid()191 bool Accessor::isValid() {
192 return mBufferPool.isValid();
193 }
194
flush()195 BufferPoolStatus Accessor::flush() {
196 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
197 mBufferPool.processStatusMessages();
198 mBufferPool.flush(ref<Accessor>());
199 return ResultStatus::OK;
200 }
201
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)202 BufferPoolStatus Accessor::allocate(
203 ConnectionId connectionId,
204 const std::vector<uint8_t> ¶ms,
205 BufferId *bufferId, const native_handle_t** handle) {
206 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
207 mBufferPool.processStatusMessages();
208 BufferPoolStatus status = ResultStatus::OK;
209 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
210 lock.unlock();
211 std::shared_ptr<BufferPoolAllocation> alloc;
212 size_t allocSize;
213 status = mAllocator->allocate(params, &alloc, &allocSize);
214 lock.lock();
215 if (status == ResultStatus::OK) {
216 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
217 }
218 ALOGV("create a buffer %d : %u %p",
219 status == ResultStatus::OK, *bufferId, *handle);
220 }
221 if (status == ResultStatus::OK) {
222 // TODO: handle ownBuffer failure
223 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
224 }
225 mBufferPool.cleanUp();
226 scheduleEvictIfNeeded();
227 return status;
228 }
229
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)230 BufferPoolStatus Accessor::fetch(
231 ConnectionId connectionId, TransactionId transactionId,
232 BufferId bufferId, const native_handle_t** handle) {
233 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
234 mBufferPool.processStatusMessages();
235 auto found = mBufferPool.mTransactions.find(transactionId);
236 if (found != mBufferPool.mTransactions.end() &&
237 contains(&mBufferPool.mPendingTransactions,
238 connectionId, transactionId)) {
239 if (found->second->mSenderValidated &&
240 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
241 found->second->mBufferId == bufferId) {
242 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
243 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
244 if (bufferIt != mBufferPool.mBuffers.end()) {
245 mBufferPool.mStats.onBufferFetched();
246 *handle = bufferIt->second->handle();
247 return ResultStatus::OK;
248 }
249 }
250 }
251 mBufferPool.cleanUp();
252 scheduleEvictIfNeeded();
253 return ResultStatus::CRITICAL_ERROR;
254 }
255
connect(const std::shared_ptr<IObserver> & observer,bool local,std::shared_ptr<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,StatusDescriptor * statusDescPtr,InvalidationDescriptor * invDescPtr)256 BufferPoolStatus Accessor::connect(
257 const std::shared_ptr<IObserver> &observer, bool local,
258 std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
259 uint32_t *pMsgId,
260 StatusDescriptor* statusDescPtr,
261 InvalidationDescriptor* invDescPtr) {
262 static ::android::base::NoDestructor<ConnectionIdGenerator> sConIdGenerator;
263 std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
264 BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
265 {
266 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
267 if (newConnection) {
268 int32_t pid = getpid();
269 ConnectionId id = sConIdGenerator->getConnectionId();
270 status = mBufferPool.mObserver.open(id, statusDescPtr);
271 if (status == ResultStatus::OK) {
272 newConnection->initialize(ref<Accessor>(), id);
273 *connection = newConnection;
274 *pConnectionId = id;
275 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
276 mBufferPool.mConnectionIds.insert(id);
277 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
278 mBufferPool.mInvalidation.onConnect(id, observer);
279 }
280
281 }
282 mBufferPool.processStatusMessages();
283 mBufferPool.cleanUp();
284 scheduleEvictIfNeeded();
285 }
286 if (!local && status == ResultStatus::OK) {
287 std::shared_ptr<Accessor> accessor(ref<Accessor>());
288 sConnectionDeathRecipient->add(*pConnectionId, accessor);
289 }
290 return status;
291 }
292
close(ConnectionId connectionId)293 BufferPoolStatus Accessor::close(ConnectionId connectionId) {
294 {
295 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
296 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
297 mBufferPool.processStatusMessages();
298 mBufferPool.handleClose(connectionId);
299 mBufferPool.mObserver.close(connectionId);
300 mBufferPool.mInvalidation.onClose(connectionId);
301 // Since close# will be called after all works are finished, it is OK to
302 // evict unused buffers.
303 mBufferPool.cleanUp(true);
304 scheduleEvictIfNeeded();
305 }
306 sConnectionDeathRecipient->remove(connectionId);
307 return ResultStatus::OK;
308 }
309
cleanUp(bool clearCache)310 void Accessor::cleanUp(bool clearCache) {
311 // transaction timeout, buffer caching TTL handling
312 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
313 mBufferPool.processStatusMessages();
314 mBufferPool.cleanUp(clearCache);
315 }
316
handleInvalidateAck()317 void Accessor::handleInvalidateAck() {
318 std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
319 uint32_t invalidationId;
320 {
321 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
322 mBufferPool.processStatusMessages();
323 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
324 }
325 // Do not hold lock for send invalidations
326 size_t deadClients = 0;
327 for (auto it = observers.begin(); it != observers.end(); ++it) {
328 const std::shared_ptr<IObserver> observer = it->second;
329 if (observer) {
330 ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
331 if (!status.isOk()) {
332 ++deadClients;
333 }
334 }
335 }
336 if (deadClients > 0) {
337 ALOGD("During invalidation found %zu dead clients", deadClients);
338 }
339 }
340
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)341 void Accessor::invalidatorThread(
342 std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
343 std::mutex &mutex,
344 std::condition_variable &cv,
345 bool &ready) {
346 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
347 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
348 constexpr useconds_t MAX_SLEEP_US = 10000;
349 uint32_t numSpin = 0;
350 useconds_t sleepUs = 1;
351
352 while(true) {
353 std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
354 {
355 std::unique_lock<std::mutex> lock(mutex);
356 while (!ready) {
357 numSpin = 0;
358 sleepUs = 1;
359 cv.wait(lock);
360 }
361 copied.insert(accessors.begin(), accessors.end());
362 }
363 std::list<ConnectionId> erased;
364 for (auto it = copied.begin(); it != copied.end(); ++it) {
365 const std::shared_ptr<Accessor> acc = it->second.lock();
366 if (!acc) {
367 erased.push_back(it->first);
368 } else {
369 acc->handleInvalidateAck();
370 }
371 }
372 {
373 std::unique_lock<std::mutex> lock(mutex);
374 for (auto it = erased.begin(); it != erased.end(); ++it) {
375 accessors.erase(*it);
376 }
377 if (accessors.size() == 0) {
378 ready = false;
379 } else {
380 // N.B. Since there is not a efficient way to wait over FMQ,
381 // polling over the FMQ is the current way to prevent draining
382 // CPU.
383 lock.unlock();
384 ++numSpin;
385 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
386 sleepUs < MAX_SLEEP_US) {
387 sleepUs *= 10;
388 }
389 if (numSpin % NUM_SPIN_TO_LOG == 0) {
390 ALOGW("invalidator thread spinning");
391 }
392 ::usleep(sleepUs);
393 }
394 }
395 }
396 }
397
AccessorInvalidator()398 Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
399 std::thread invalidator(
400 invalidatorThread,
401 std::ref(mAccessors),
402 std::ref(mMutex),
403 std::ref(mCv),
404 std::ref(mReady));
405 invalidator.detach();
406 }
407
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor> & accessor)408 void Accessor::AccessorInvalidator::addAccessor(
409 uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
410 bool notify = false;
411 std::unique_lock<std::mutex> lock(mMutex);
412 if (mAccessors.find(accessorId) == mAccessors.end()) {
413 if (!mReady) {
414 mReady = true;
415 notify = true;
416 }
417 mAccessors.emplace(accessorId, accessor);
418 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
419 }
420 lock.unlock();
421 if (notify) {
422 mCv.notify_one();
423 }
424 }
425
delAccessor(uint32_t accessorId)426 void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
427 std::lock_guard<std::mutex> lock(mMutex);
428 mAccessors.erase(accessorId);
429 ALOGV("buffer invalidation deleted bp:%u", accessorId);
430 if (mAccessors.size() == 0) {
431 mReady = false;
432 }
433 }
434
435 std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
436
createInvalidator()437 void Accessor::createInvalidator() {
438 if (!sInvalidator) {
439 sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
440 }
441 }
442
evictorThread(std::map<const std::weak_ptr<Accessor>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)443 void Accessor::evictorThread(
444 std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
445 std::mutex &mutex,
446 std::condition_variable &cv) {
447 std::list<const std::weak_ptr<Accessor>> evictList;
448 while (true) {
449 int expired = 0;
450 int evicted = 0;
451 {
452 nsecs_t now = systemTime();
453 std::unique_lock<std::mutex> lock(mutex);
454 while (accessors.size() == 0) {
455 cv.wait(lock);
456 }
457 auto it = accessors.begin();
458 while (it != accessors.end()) {
459 if (now > (it->second + kEvictDurationNs)) {
460 ++expired;
461 evictList.push_back(it->first);
462 it = accessors.erase(it);
463 } else {
464 ++it;
465 }
466 }
467 }
468 // evict idle accessors;
469 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
470 const std::shared_ptr<Accessor> accessor = it->lock();
471 if (accessor) {
472 accessor->cleanUp(true);
473 ++evicted;
474 }
475 }
476 if (expired > 0) {
477 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
478 }
479 evictList.clear();
480 ::usleep(kEvictGranularityNs / 1000);
481 }
482 }
483
AccessorEvictor()484 Accessor::AccessorEvictor::AccessorEvictor() {
485 std::thread evictor(
486 evictorThread,
487 std::ref(mAccessors),
488 std::ref(mMutex),
489 std::ref(mCv));
490 evictor.detach();
491 }
492
addAccessor(const std::weak_ptr<Accessor> & accessor,nsecs_t ts)493 void Accessor::AccessorEvictor::addAccessor(
494 const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
495 std::lock_guard<std::mutex> lock(mMutex);
496 bool notify = mAccessors.empty();
497 auto it = mAccessors.find(accessor);
498 if (it == mAccessors.end()) {
499 mAccessors.emplace(accessor, ts);
500 } else {
501 it->second = ts;
502 }
503 if (notify) {
504 mCv.notify_one();
505 }
506 }
507
508 std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
509
createEvictor()510 void Accessor::createEvictor() {
511 if (!sEvictor) {
512 sEvictor = std::make_unique<Accessor::AccessorEvictor>();
513 }
514 }
515
scheduleEvictIfNeeded()516 void Accessor::scheduleEvictIfNeeded() {
517 nsecs_t now = systemTime();
518
519 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
520 mScheduleEvictTs = now;
521 sEvictor->addAccessor(ref<Accessor>(), now);
522 }
523 }
524
525 } // namespace aidl::android::hardware::media::bufferpool2::implemntation {
526