• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define LOG_TAG "AidlBufferPoolAcc"
17 //#define LOG_NDEBUG 0
18 
19 #include <sys/types.h>
20 #include <stdint.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25 
26 #include "Accessor.h"
27 #include "Connection.h"
28 #include "DataHelper.h"
29 
30 namespace aidl::android::hardware::media::bufferpool2::implementation {
31 
32 namespace {
33     static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
34     static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
35 }
36 
37 #ifdef __ANDROID_VNDK__
38 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
39 #else
40 static constexpr uint32_t kSeqIdVndkBit = 0;
41 #endif
42 
43 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
44 uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;
45 
46 namespace {
47 // anonymous namespace
48 static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
49     std::make_shared<ConnectionDeathRecipient>();
50 
serviceDied(void * cookie)51 void serviceDied(void *cookie) {
52     if (sConnectionDeathRecipient) {
53         sConnectionDeathRecipient->onDead(cookie);
54     }
55 }
56 }
57 
getConnectionDeathRecipient()58 std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
59     return sConnectionDeathRecipient;
60 }
61 
ConnectionDeathRecipient()62 ConnectionDeathRecipient::ConnectionDeathRecipient() {
63     mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
64             AIBinder_DeathRecipient_new(serviceDied));
65 }
66 
add(int64_t connectionId,const std::shared_ptr<Accessor> & accessor)67 void ConnectionDeathRecipient::add(
68         int64_t connectionId,
69         const std::shared_ptr<Accessor> &accessor) {
70     std::lock_guard<std::mutex> lock(mLock);
71     if (mAccessors.find(connectionId) == mAccessors.end()) {
72         mAccessors.insert(std::make_pair(connectionId, accessor));
73     }
74 }
75 
remove(int64_t connectionId)76 void ConnectionDeathRecipient::remove(int64_t connectionId) {
77     std::lock_guard<std::mutex> lock(mLock);
78     mAccessors.erase(connectionId);
79     auto it = mConnectionToCookie.find(connectionId);
80     if (it != mConnectionToCookie.end()) {
81         void * cookie = it->second;
82         mConnectionToCookie.erase(it);
83         auto cit = mCookieToConnections.find(cookie);
84         if (cit != mCookieToConnections.end()) {
85             cit->second.erase(connectionId);
86             if (cit->second.size() == 0) {
87                 mCookieToConnections.erase(cit);
88             }
89         }
90     }
91 }
92 
addCookieToConnection(void * cookie,int64_t connectionId)93 void ConnectionDeathRecipient::addCookieToConnection(
94         void *cookie,
95         int64_t connectionId) {
96     std::lock_guard<std::mutex> lock(mLock);
97     if (mAccessors.find(connectionId) == mAccessors.end()) {
98         return;
99     }
100     mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
101     auto it = mCookieToConnections.find(cookie);
102     if (it != mCookieToConnections.end()) {
103         it->second.insert(connectionId);
104     } else {
105         mCookieToConnections.insert(std::make_pair(
106                 cookie, std::set<int64_t>{connectionId}));
107     }
108 }
109 
onDead(void * cookie)110 void ConnectionDeathRecipient::onDead(void *cookie) {
111     std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
112     {
113         std::lock_guard<std::mutex> lock(mLock);
114 
115         auto it = mCookieToConnections.find(cookie);
116         if (it != mCookieToConnections.end()) {
117             for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
118                 auto accessorIt = mAccessors.find(*conIt);
119                 if (accessorIt != mAccessors.end()) {
120                     connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
121                     mAccessors.erase(accessorIt);
122                 }
123                 mConnectionToCookie.erase(*conIt);
124             }
125             mCookieToConnections.erase(it);
126         }
127     }
128 
129     if (connectionsToClose.size() > 0) {
130         std::shared_ptr<Accessor> accessor;
131         for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
132             accessor = it->second.lock();
133 
134             if (accessor) {
135                 accessor->close(it->first);
136                 ALOGD("connection %lld closed on death", (long long)it->first);
137             }
138         }
139     }
140 }
141 
getRecipient()142 AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
143     return mDeathRecipient.get();
144 }
145 
connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver> & in_observer,::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo * _aidl_return)146 ::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
147     std::shared_ptr<Connection> connection;
148     ConnectionId connectionId;
149     uint32_t msgId;
150     StatusDescriptor statusDesc;
151     InvalidationDescriptor invDesc;
152     BufferPoolStatus status = connect(
153             in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
154     if (status == ResultStatus::OK) {
155         _aidl_return->connection = connection;
156         _aidl_return->connectionId = connectionId;
157         _aidl_return->msgId = msgId;
158         _aidl_return->toFmqDesc = std::move(statusDesc);
159         _aidl_return->fromFmqDesc = std::move(invDesc);
160         return ::ndk::ScopedAStatus::ok();
161     }
162     return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
163 }
164 
Accessor(const std::shared_ptr<BufferPoolAllocator> & allocator)165 Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
166     : mAllocator(allocator), mScheduleEvictTs(0) {}
167 
~Accessor()168 Accessor::~Accessor() {
169 }
170 
isValid()171 bool Accessor::isValid() {
172     return mBufferPool.isValid();
173 }
174 
flush()175 BufferPoolStatus Accessor::flush() {
176     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
177     mBufferPool.processStatusMessages();
178     mBufferPool.flush(ref<Accessor>());
179     return ResultStatus::OK;
180 }
181 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)182 BufferPoolStatus Accessor::allocate(
183         ConnectionId connectionId,
184         const std::vector<uint8_t> &params,
185         BufferId *bufferId, const native_handle_t** handle) {
186     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
187     mBufferPool.processStatusMessages();
188     BufferPoolStatus status = ResultStatus::OK;
189     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
190         lock.unlock();
191         std::shared_ptr<BufferPoolAllocation> alloc;
192         size_t allocSize;
193         status = mAllocator->allocate(params, &alloc, &allocSize);
194         lock.lock();
195         if (status == ResultStatus::OK) {
196             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
197         }
198         ALOGV("create a buffer %d : %u %p",
199               status == ResultStatus::OK, *bufferId, *handle);
200     }
201     if (status == ResultStatus::OK) {
202         // TODO: handle ownBuffer failure
203         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
204     }
205     mBufferPool.cleanUp();
206     scheduleEvictIfNeeded();
207     return status;
208 }
209 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)210 BufferPoolStatus Accessor::fetch(
211         ConnectionId connectionId, TransactionId transactionId,
212         BufferId bufferId, const native_handle_t** handle) {
213     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
214     mBufferPool.processStatusMessages();
215     auto found = mBufferPool.mTransactions.find(transactionId);
216     if (found != mBufferPool.mTransactions.end() &&
217             contains(&mBufferPool.mPendingTransactions,
218                      connectionId, transactionId)) {
219         if (found->second->mSenderValidated &&
220                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
221                 found->second->mBufferId == bufferId) {
222             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
223             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
224             if (bufferIt != mBufferPool.mBuffers.end()) {
225                 mBufferPool.mStats.onBufferFetched();
226                 *handle = bufferIt->second->handle();
227                 return ResultStatus::OK;
228             }
229         }
230     }
231     mBufferPool.cleanUp();
232     scheduleEvictIfNeeded();
233     return ResultStatus::CRITICAL_ERROR;
234 }
235 
connect(const std::shared_ptr<IObserver> & observer,bool local,std::shared_ptr<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,StatusDescriptor * statusDescPtr,InvalidationDescriptor * invDescPtr)236 BufferPoolStatus Accessor::connect(
237         const std::shared_ptr<IObserver> &observer, bool local,
238         std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
239         uint32_t *pMsgId,
240         StatusDescriptor* statusDescPtr,
241         InvalidationDescriptor* invDescPtr) {
242     std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
243     BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
244     {
245         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246         if (newConnection) {
247             int32_t pid = getpid();
248             ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
249             status = mBufferPool.mObserver.open(id, statusDescPtr);
250             if (status == ResultStatus::OK) {
251                 newConnection->initialize(ref<Accessor>(), id);
252                 *connection = newConnection;
253                 *pConnectionId = id;
254                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
255                 mBufferPool.mConnectionIds.insert(id);
256                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
257                 mBufferPool.mInvalidation.onConnect(id, observer);
258                 if (sSeqId == kSeqIdMax) {
259                    sSeqId = 0;
260                 } else {
261                     ++sSeqId;
262                 }
263             }
264 
265         }
266         mBufferPool.processStatusMessages();
267         mBufferPool.cleanUp();
268         scheduleEvictIfNeeded();
269     }
270     if (!local && status == ResultStatus::OK) {
271         std::shared_ptr<Accessor> accessor(ref<Accessor>());
272         sConnectionDeathRecipient->add(*pConnectionId, accessor);
273     }
274     return status;
275 }
276 
close(ConnectionId connectionId)277 BufferPoolStatus Accessor::close(ConnectionId connectionId) {
278     {
279         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
280         ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
281         mBufferPool.processStatusMessages();
282         mBufferPool.handleClose(connectionId);
283         mBufferPool.mObserver.close(connectionId);
284         mBufferPool.mInvalidation.onClose(connectionId);
285         // Since close# will be called after all works are finished, it is OK to
286         // evict unused buffers.
287         mBufferPool.cleanUp(true);
288         scheduleEvictIfNeeded();
289     }
290     sConnectionDeathRecipient->remove(connectionId);
291     return ResultStatus::OK;
292 }
293 
cleanUp(bool clearCache)294 void Accessor::cleanUp(bool clearCache) {
295     // transaction timeout, buffer caching TTL handling
296     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
297     mBufferPool.processStatusMessages();
298     mBufferPool.cleanUp(clearCache);
299 }
300 
handleInvalidateAck()301 void Accessor::handleInvalidateAck() {
302     std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
303     uint32_t invalidationId;
304     {
305         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
306         mBufferPool.processStatusMessages();
307         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
308     }
309     // Do not hold lock for send invalidations
310     size_t deadClients = 0;
311     for (auto it = observers.begin(); it != observers.end(); ++it) {
312         const std::shared_ptr<IObserver> observer = it->second;
313         if (observer) {
314             ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
315             if (!status.isOk()) {
316                 ++deadClients;
317             }
318         }
319     }
320     if (deadClients > 0) {
321         ALOGD("During invalidation found %zu dead clients", deadClients);
322     }
323 }
324 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)325 void Accessor::invalidatorThread(
326             std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
327             std::mutex &mutex,
328             std::condition_variable &cv,
329             bool &ready) {
330     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
331     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
332     constexpr useconds_t MAX_SLEEP_US = 10000;
333     uint32_t numSpin = 0;
334     useconds_t sleepUs = 1;
335 
336     while(true) {
337         std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
338         {
339             std::unique_lock<std::mutex> lock(mutex);
340             while (!ready) {
341                 numSpin = 0;
342                 sleepUs = 1;
343                 cv.wait(lock);
344             }
345             copied.insert(accessors.begin(), accessors.end());
346         }
347         std::list<ConnectionId> erased;
348         for (auto it = copied.begin(); it != copied.end(); ++it) {
349             const std::shared_ptr<Accessor> acc = it->second.lock();
350             if (!acc) {
351                 erased.push_back(it->first);
352             } else {
353                 acc->handleInvalidateAck();
354             }
355         }
356         {
357             std::unique_lock<std::mutex> lock(mutex);
358             for (auto it = erased.begin(); it != erased.end(); ++it) {
359                 accessors.erase(*it);
360             }
361             if (accessors.size() == 0) {
362                 ready = false;
363             } else {
364                 // N.B. Since there is not a efficient way to wait over FMQ,
365                 // polling over the FMQ is the current way to prevent draining
366                 // CPU.
367                 lock.unlock();
368                 ++numSpin;
369                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
370                     sleepUs < MAX_SLEEP_US) {
371                     sleepUs *= 10;
372                 }
373                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
374                     ALOGW("invalidator thread spinning");
375                 }
376                 ::usleep(sleepUs);
377             }
378         }
379     }
380 }
381 
AccessorInvalidator()382 Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
383     std::thread invalidator(
384             invalidatorThread,
385             std::ref(mAccessors),
386             std::ref(mMutex),
387             std::ref(mCv),
388             std::ref(mReady));
389     invalidator.detach();
390 }
391 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor> & accessor)392 void Accessor::AccessorInvalidator::addAccessor(
393         uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
394     bool notify = false;
395     std::unique_lock<std::mutex> lock(mMutex);
396     if (mAccessors.find(accessorId) == mAccessors.end()) {
397         if (!mReady) {
398             mReady = true;
399             notify = true;
400         }
401         mAccessors.emplace(accessorId, accessor);
402         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
403     }
404     lock.unlock();
405     if (notify) {
406         mCv.notify_one();
407     }
408 }
409 
delAccessor(uint32_t accessorId)410 void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
411     std::lock_guard<std::mutex> lock(mMutex);
412     mAccessors.erase(accessorId);
413     ALOGV("buffer invalidation deleted bp:%u", accessorId);
414     if (mAccessors.size() == 0) {
415         mReady = false;
416     }
417 }
418 
419 std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
420 
createInvalidator()421 void Accessor::createInvalidator() {
422     if (!sInvalidator) {
423         sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
424     }
425 }
426 
evictorThread(std::map<const std::weak_ptr<Accessor>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)427 void Accessor::evictorThread(
428         std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
429         std::mutex &mutex,
430         std::condition_variable &cv) {
431     std::list<const std::weak_ptr<Accessor>> evictList;
432     while (true) {
433         int expired = 0;
434         int evicted = 0;
435         {
436             nsecs_t now = systemTime();
437             std::unique_lock<std::mutex> lock(mutex);
438             while (accessors.size() == 0) {
439                 cv.wait(lock);
440             }
441             auto it = accessors.begin();
442             while (it != accessors.end()) {
443                 if (now > (it->second + kEvictDurationNs)) {
444                     ++expired;
445                     evictList.push_back(it->first);
446                     it = accessors.erase(it);
447                 } else {
448                     ++it;
449                 }
450             }
451         }
452         // evict idle accessors;
453         for (auto it = evictList.begin(); it != evictList.end(); ++it) {
454             const std::shared_ptr<Accessor> accessor = it->lock();
455             if (accessor) {
456                 accessor->cleanUp(true);
457                 ++evicted;
458             }
459         }
460         if (expired > 0) {
461             ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
462         }
463         evictList.clear();
464         ::usleep(kEvictGranularityNs / 1000);
465     }
466 }
467 
AccessorEvictor()468 Accessor::AccessorEvictor::AccessorEvictor() {
469     std::thread evictor(
470             evictorThread,
471             std::ref(mAccessors),
472             std::ref(mMutex),
473             std::ref(mCv));
474     evictor.detach();
475 }
476 
addAccessor(const std::weak_ptr<Accessor> & accessor,nsecs_t ts)477 void Accessor::AccessorEvictor::addAccessor(
478         const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
479     std::lock_guard<std::mutex> lock(mMutex);
480     bool notify = mAccessors.empty();
481     auto it = mAccessors.find(accessor);
482     if (it == mAccessors.end()) {
483         mAccessors.emplace(accessor, ts);
484     } else {
485         it->second = ts;
486     }
487     if (notify) {
488         mCv.notify_one();
489     }
490 }
491 
492 std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
493 
createEvictor()494 void Accessor::createEvictor() {
495     if (!sEvictor) {
496         sEvictor = std::make_unique<Accessor::AccessorEvictor>();
497     }
498 }
499 
scheduleEvictIfNeeded()500 void Accessor::scheduleEvictIfNeeded() {
501     nsecs_t now = systemTime();
502 
503     if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
504         mScheduleEvictTs = now;
505         sEvictor->addAccessor(ref<Accessor>(), now);
506     }
507 }
508 
509 }  // namespace aidl::android::hardware::media::bufferpool2::implemntation {
510