1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define LOG_TAG "AidlBufferPoolAcc"
17 //#define LOG_NDEBUG 0
18
19 #include <sys/types.h>
20 #include <stdint.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25
26 #include "Accessor.h"
27 #include "Connection.h"
28 #include "DataHelper.h"
29
30 namespace aidl::android::hardware::media::bufferpool2::implementation {
31
32 namespace {
33 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
34 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
35 }
36
37 #ifdef __ANDROID_VNDK__
38 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
39 #else
40 static constexpr uint32_t kSeqIdVndkBit = 0;
41 #endif
42
43 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
44 uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
45
46 namespace {
47 // anonymous namespace
48 static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
49 std::make_shared<ConnectionDeathRecipient>();
50
serviceDied(void * cookie)51 void serviceDied(void *cookie) {
52 if (sConnectionDeathRecipient) {
53 sConnectionDeathRecipient->onDead(cookie);
54 }
55 }
56 }
57
getConnectionDeathRecipient()58 std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
59 return sConnectionDeathRecipient;
60 }
61
ConnectionDeathRecipient()62 ConnectionDeathRecipient::ConnectionDeathRecipient() {
63 mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
64 AIBinder_DeathRecipient_new(serviceDied));
65 }
66
add(int64_t connectionId,const std::shared_ptr<Accessor> & accessor)67 void ConnectionDeathRecipient::add(
68 int64_t connectionId,
69 const std::shared_ptr<Accessor> &accessor) {
70 std::lock_guard<std::mutex> lock(mLock);
71 if (mAccessors.find(connectionId) == mAccessors.end()) {
72 mAccessors.insert(std::make_pair(connectionId, accessor));
73 }
74 }
75
remove(int64_t connectionId)76 void ConnectionDeathRecipient::remove(int64_t connectionId) {
77 std::lock_guard<std::mutex> lock(mLock);
78 mAccessors.erase(connectionId);
79 auto it = mConnectionToCookie.find(connectionId);
80 if (it != mConnectionToCookie.end()) {
81 void * cookie = it->second;
82 mConnectionToCookie.erase(it);
83 auto cit = mCookieToConnections.find(cookie);
84 if (cit != mCookieToConnections.end()) {
85 cit->second.erase(connectionId);
86 if (cit->second.size() == 0) {
87 mCookieToConnections.erase(cit);
88 }
89 }
90 }
91 }
92
addCookieToConnection(void * cookie,int64_t connectionId)93 void ConnectionDeathRecipient::addCookieToConnection(
94 void *cookie,
95 int64_t connectionId) {
96 std::lock_guard<std::mutex> lock(mLock);
97 if (mAccessors.find(connectionId) == mAccessors.end()) {
98 return;
99 }
100 mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
101 auto it = mCookieToConnections.find(cookie);
102 if (it != mCookieToConnections.end()) {
103 it->second.insert(connectionId);
104 } else {
105 mCookieToConnections.insert(std::make_pair(
106 cookie, std::set<int64_t>{connectionId}));
107 }
108 }
109
onDead(void * cookie)110 void ConnectionDeathRecipient::onDead(void *cookie) {
111 std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
112 {
113 std::lock_guard<std::mutex> lock(mLock);
114
115 auto it = mCookieToConnections.find(cookie);
116 if (it != mCookieToConnections.end()) {
117 for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
118 auto accessorIt = mAccessors.find(*conIt);
119 if (accessorIt != mAccessors.end()) {
120 connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
121 mAccessors.erase(accessorIt);
122 }
123 mConnectionToCookie.erase(*conIt);
124 }
125 mCookieToConnections.erase(it);
126 }
127 }
128
129 if (connectionsToClose.size() > 0) {
130 std::shared_ptr<Accessor> accessor;
131 for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
132 accessor = it->second.lock();
133
134 if (accessor) {
135 accessor->close(it->first);
136 ALOGD("connection %lld closed on death", (long long)it->first);
137 }
138 }
139 }
140 }
141
getRecipient()142 AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
143 return mDeathRecipient.get();
144 }
145
connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver> & in_observer,::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo * _aidl_return)146 ::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
147 std::shared_ptr<Connection> connection;
148 ConnectionId connectionId;
149 uint32_t msgId;
150 StatusDescriptor statusDesc;
151 InvalidationDescriptor invDesc;
152 BufferPoolStatus status = connect(
153 in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
154 if (status == ResultStatus::OK) {
155 _aidl_return->connection = connection;
156 _aidl_return->connectionId = connectionId;
157 _aidl_return->msgId = msgId;
158 _aidl_return->toFmqDesc = std::move(statusDesc);
159 _aidl_return->fromFmqDesc = std::move(invDesc);
160 return ::ndk::ScopedAStatus::ok();
161 }
162 return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
163 }
164
Accessor(const std::shared_ptr<BufferPoolAllocator> & allocator)165 Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
166 : mAllocator(allocator), mScheduleEvictTs(0) {}
167
~Accessor()168 Accessor::~Accessor() {
169 }
170
isValid()171 bool Accessor::isValid() {
172 return mBufferPool.isValid();
173 }
174
flush()175 BufferPoolStatus Accessor::flush() {
176 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
177 mBufferPool.processStatusMessages();
178 mBufferPool.flush(ref<Accessor>());
179 return ResultStatus::OK;
180 }
181
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)182 BufferPoolStatus Accessor::allocate(
183 ConnectionId connectionId,
184 const std::vector<uint8_t> ¶ms,
185 BufferId *bufferId, const native_handle_t** handle) {
186 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
187 mBufferPool.processStatusMessages();
188 BufferPoolStatus status = ResultStatus::OK;
189 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
190 lock.unlock();
191 std::shared_ptr<BufferPoolAllocation> alloc;
192 size_t allocSize;
193 status = mAllocator->allocate(params, &alloc, &allocSize);
194 lock.lock();
195 if (status == ResultStatus::OK) {
196 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
197 }
198 ALOGV("create a buffer %d : %u %p",
199 status == ResultStatus::OK, *bufferId, *handle);
200 }
201 if (status == ResultStatus::OK) {
202 // TODO: handle ownBuffer failure
203 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
204 }
205 mBufferPool.cleanUp();
206 scheduleEvictIfNeeded();
207 return status;
208 }
209
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)210 BufferPoolStatus Accessor::fetch(
211 ConnectionId connectionId, TransactionId transactionId,
212 BufferId bufferId, const native_handle_t** handle) {
213 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
214 mBufferPool.processStatusMessages();
215 auto found = mBufferPool.mTransactions.find(transactionId);
216 if (found != mBufferPool.mTransactions.end() &&
217 contains(&mBufferPool.mPendingTransactions,
218 connectionId, transactionId)) {
219 if (found->second->mSenderValidated &&
220 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
221 found->second->mBufferId == bufferId) {
222 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
223 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
224 if (bufferIt != mBufferPool.mBuffers.end()) {
225 mBufferPool.mStats.onBufferFetched();
226 *handle = bufferIt->second->handle();
227 return ResultStatus::OK;
228 }
229 }
230 }
231 mBufferPool.cleanUp();
232 scheduleEvictIfNeeded();
233 return ResultStatus::CRITICAL_ERROR;
234 }
235
connect(const std::shared_ptr<IObserver> & observer,bool local,std::shared_ptr<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,StatusDescriptor * statusDescPtr,InvalidationDescriptor * invDescPtr)236 BufferPoolStatus Accessor::connect(
237 const std::shared_ptr<IObserver> &observer, bool local,
238 std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
239 uint32_t *pMsgId,
240 StatusDescriptor* statusDescPtr,
241 InvalidationDescriptor* invDescPtr) {
242 std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
243 BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
244 {
245 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246 if (newConnection) {
247 int32_t pid = getpid();
248 ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
249 status = mBufferPool.mObserver.open(id, statusDescPtr);
250 if (status == ResultStatus::OK) {
251 newConnection->initialize(ref<Accessor>(), id);
252 *connection = newConnection;
253 *pConnectionId = id;
254 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
255 mBufferPool.mConnectionIds.insert(id);
256 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
257 mBufferPool.mInvalidation.onConnect(id, observer);
258 if (sSeqId == kSeqIdMax) {
259 sSeqId = 0;
260 } else {
261 ++sSeqId;
262 }
263 }
264
265 }
266 mBufferPool.processStatusMessages();
267 mBufferPool.cleanUp();
268 scheduleEvictIfNeeded();
269 }
270 if (!local && status == ResultStatus::OK) {
271 std::shared_ptr<Accessor> accessor(ref<Accessor>());
272 sConnectionDeathRecipient->add(*pConnectionId, accessor);
273 }
274 return status;
275 }
276
close(ConnectionId connectionId)277 BufferPoolStatus Accessor::close(ConnectionId connectionId) {
278 {
279 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
280 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
281 mBufferPool.processStatusMessages();
282 mBufferPool.handleClose(connectionId);
283 mBufferPool.mObserver.close(connectionId);
284 mBufferPool.mInvalidation.onClose(connectionId);
285 // Since close# will be called after all works are finished, it is OK to
286 // evict unused buffers.
287 mBufferPool.cleanUp(true);
288 scheduleEvictIfNeeded();
289 }
290 sConnectionDeathRecipient->remove(connectionId);
291 return ResultStatus::OK;
292 }
293
cleanUp(bool clearCache)294 void Accessor::cleanUp(bool clearCache) {
295 // transaction timeout, buffer caching TTL handling
296 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
297 mBufferPool.processStatusMessages();
298 mBufferPool.cleanUp(clearCache);
299 }
300
handleInvalidateAck()301 void Accessor::handleInvalidateAck() {
302 std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
303 uint32_t invalidationId;
304 {
305 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
306 mBufferPool.processStatusMessages();
307 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
308 }
309 // Do not hold lock for send invalidations
310 size_t deadClients = 0;
311 for (auto it = observers.begin(); it != observers.end(); ++it) {
312 const std::shared_ptr<IObserver> observer = it->second;
313 if (observer) {
314 ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
315 if (!status.isOk()) {
316 ++deadClients;
317 }
318 }
319 }
320 if (deadClients > 0) {
321 ALOGD("During invalidation found %zu dead clients", deadClients);
322 }
323 }
324
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)325 void Accessor::invalidatorThread(
326 std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
327 std::mutex &mutex,
328 std::condition_variable &cv,
329 bool &ready) {
330 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
331 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
332 constexpr useconds_t MAX_SLEEP_US = 10000;
333 uint32_t numSpin = 0;
334 useconds_t sleepUs = 1;
335
336 while(true) {
337 std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
338 {
339 std::unique_lock<std::mutex> lock(mutex);
340 while (!ready) {
341 numSpin = 0;
342 sleepUs = 1;
343 cv.wait(lock);
344 }
345 copied.insert(accessors.begin(), accessors.end());
346 }
347 std::list<ConnectionId> erased;
348 for (auto it = copied.begin(); it != copied.end(); ++it) {
349 const std::shared_ptr<Accessor> acc = it->second.lock();
350 if (!acc) {
351 erased.push_back(it->first);
352 } else {
353 acc->handleInvalidateAck();
354 }
355 }
356 {
357 std::unique_lock<std::mutex> lock(mutex);
358 for (auto it = erased.begin(); it != erased.end(); ++it) {
359 accessors.erase(*it);
360 }
361 if (accessors.size() == 0) {
362 ready = false;
363 } else {
364 // N.B. Since there is not a efficient way to wait over FMQ,
365 // polling over the FMQ is the current way to prevent draining
366 // CPU.
367 lock.unlock();
368 ++numSpin;
369 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
370 sleepUs < MAX_SLEEP_US) {
371 sleepUs *= 10;
372 }
373 if (numSpin % NUM_SPIN_TO_LOG == 0) {
374 ALOGW("invalidator thread spinning");
375 }
376 ::usleep(sleepUs);
377 }
378 }
379 }
380 }
381
AccessorInvalidator()382 Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
383 std::thread invalidator(
384 invalidatorThread,
385 std::ref(mAccessors),
386 std::ref(mMutex),
387 std::ref(mCv),
388 std::ref(mReady));
389 invalidator.detach();
390 }
391
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor> & accessor)392 void Accessor::AccessorInvalidator::addAccessor(
393 uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
394 bool notify = false;
395 std::unique_lock<std::mutex> lock(mMutex);
396 if (mAccessors.find(accessorId) == mAccessors.end()) {
397 if (!mReady) {
398 mReady = true;
399 notify = true;
400 }
401 mAccessors.emplace(accessorId, accessor);
402 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
403 }
404 lock.unlock();
405 if (notify) {
406 mCv.notify_one();
407 }
408 }
409
delAccessor(uint32_t accessorId)410 void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
411 std::lock_guard<std::mutex> lock(mMutex);
412 mAccessors.erase(accessorId);
413 ALOGV("buffer invalidation deleted bp:%u", accessorId);
414 if (mAccessors.size() == 0) {
415 mReady = false;
416 }
417 }
418
419 std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
420
createInvalidator()421 void Accessor::createInvalidator() {
422 if (!sInvalidator) {
423 sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
424 }
425 }
426
evictorThread(std::map<const std::weak_ptr<Accessor>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)427 void Accessor::evictorThread(
428 std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
429 std::mutex &mutex,
430 std::condition_variable &cv) {
431 std::list<const std::weak_ptr<Accessor>> evictList;
432 while (true) {
433 int expired = 0;
434 int evicted = 0;
435 {
436 nsecs_t now = systemTime();
437 std::unique_lock<std::mutex> lock(mutex);
438 while (accessors.size() == 0) {
439 cv.wait(lock);
440 }
441 auto it = accessors.begin();
442 while (it != accessors.end()) {
443 if (now > (it->second + kEvictDurationNs)) {
444 ++expired;
445 evictList.push_back(it->first);
446 it = accessors.erase(it);
447 } else {
448 ++it;
449 }
450 }
451 }
452 // evict idle accessors;
453 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
454 const std::shared_ptr<Accessor> accessor = it->lock();
455 if (accessor) {
456 accessor->cleanUp(true);
457 ++evicted;
458 }
459 }
460 if (expired > 0) {
461 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
462 }
463 evictList.clear();
464 ::usleep(kEvictGranularityNs / 1000);
465 }
466 }
467
AccessorEvictor()468 Accessor::AccessorEvictor::AccessorEvictor() {
469 std::thread evictor(
470 evictorThread,
471 std::ref(mAccessors),
472 std::ref(mMutex),
473 std::ref(mCv));
474 evictor.detach();
475 }
476
addAccessor(const std::weak_ptr<Accessor> & accessor,nsecs_t ts)477 void Accessor::AccessorEvictor::addAccessor(
478 const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
479 std::lock_guard<std::mutex> lock(mMutex);
480 bool notify = mAccessors.empty();
481 auto it = mAccessors.find(accessor);
482 if (it == mAccessors.end()) {
483 mAccessors.emplace(accessor, ts);
484 } else {
485 it->second = ts;
486 }
487 if (notify) {
488 mCv.notify_one();
489 }
490 }
491
492 std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
493
createEvictor()494 void Accessor::createEvictor() {
495 if (!sEvictor) {
496 sEvictor = std::make_unique<Accessor::AccessorEvictor>();
497 }
498 }
499
scheduleEvictIfNeeded()500 void Accessor::scheduleEvictIfNeeded() {
501 nsecs_t now = systemTime();
502
503 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
504 mScheduleEvictTs = now;
505 sEvictor->addAccessor(ref<Accessor>(), now);
506 }
507 }
508
509 } // namespace aidl::android::hardware::media::bufferpool2::implemntation {
510