• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "AccessorImpl.h"
27 #include "Connection.h"
28 
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35 
36 namespace {
37     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39 
40     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
41     static constexpr size_t kMinBufferCountForEviction = 25;
42     static constexpr size_t kMaxUnusedBufferCount = 64;
43     static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
44 
45     static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
46     static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
47 }
48 
49 // Buffer structure in bufferpool process
50 struct InternalBuffer {
51     BufferId mId;
52     size_t mOwnerCount;
53     size_t mTransactionCount;
54     const std::shared_ptr<BufferPoolAllocation> mAllocation;
55     const size_t mAllocSize;
56     const std::vector<uint8_t> mConfig;
57     bool mInvalidated;
58 
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer59     InternalBuffer(
60             BufferId id,
61             const std::shared_ptr<BufferPoolAllocation> &alloc,
62             const size_t allocSize,
63             const std::vector<uint8_t> &allocConfig)
64             : mId(id), mOwnerCount(0), mTransactionCount(0),
65             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
66             mInvalidated(false) {}
67 
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer68     const native_handle_t *handle() {
69         return mAllocation->handle();
70     }
71 
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer72     void invalidate() {
73         mInvalidated = true;
74     }
75 };
76 
77 struct TransactionStatus {
78     TransactionId mId;
79     BufferId mBufferId;
80     ConnectionId mSender;
81     ConnectionId mReceiver;
82     BufferStatus mStatus;
83     int64_t mTimestampUs;
84     bool mSenderValidated;
85 
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus86     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
87         mId = message.transactionId;
88         mBufferId = message.bufferId;
89         mStatus = message.newStatus;
90         mTimestampUs = timestampUs;
91         if (mStatus == BufferStatus::TRANSFER_TO) {
92             mSender = message.connectionId;
93             mReceiver = message.targetConnectionId;
94             mSenderValidated = true;
95         } else {
96             mSender = -1LL;
97             mReceiver = message.connectionId;
98             mSenderValidated = false;
99         }
100     }
101 };
102 
103 // Helper template methods for handling map of set.
104 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)105 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
106     auto iter = mapOfSet->find(key);
107     if (iter == mapOfSet->end()) {
108         std::set<U> valueSet{value};
109         mapOfSet->insert(std::make_pair(key, valueSet));
110         return true;
111     } else if (iter->second.find(value)  == iter->second.end()) {
112         iter->second.insert(value);
113         return true;
114     }
115     return false;
116 }
117 
118 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)119 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
120     bool ret = false;
121     auto iter = mapOfSet->find(key);
122     if (iter != mapOfSet->end()) {
123         if (iter->second.erase(value) > 0) {
124             ret = true;
125         }
126         if (iter->second.size() == 0) {
127             mapOfSet->erase(iter);
128         }
129     }
130     return ret;
131 }
132 
133 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)134 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
135     auto iter = mapOfSet->find(key);
136     if (iter != mapOfSet->end()) {
137         auto setIter = iter->second.find(value);
138         return setIter != iter->second.end();
139     }
140     return false;
141 }
142 
143 #ifdef __ANDROID_VNDK__
144 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
145 #else
146 static constexpr uint32_t kSeqIdVndkBit = 0;
147 #endif
148 
149 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
150 uint32_t Accessor::Impl::sSeqId = time(nullptr) & kSeqIdMax;
151 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)152 Accessor::Impl::Impl(
153         const std::shared_ptr<BufferPoolAllocator> &allocator)
154         : mAllocator(allocator), mScheduleEvictTs(0) {}
155 
~Impl()156 Accessor::Impl::~Impl() {
157 }
158 
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)159 ResultStatus Accessor::Impl::connect(
160         const sp<Accessor> &accessor, const sp<IObserver> &observer,
161         sp<Connection> *connection,
162         ConnectionId *pConnectionId,
163         uint32_t *pMsgId,
164         const StatusDescriptor** statusDescPtr,
165         const InvalidationDescriptor** invDescPtr) {
166     sp<Connection> newConnection = new Connection();
167     ResultStatus status = ResultStatus::CRITICAL_ERROR;
168     {
169         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
170         if (newConnection) {
171             int32_t pid = getpid();
172             ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
173             status = mBufferPool.mObserver.open(id, statusDescPtr);
174             if (status == ResultStatus::OK) {
175                 newConnection->initialize(accessor, id);
176                 *connection = newConnection;
177                 *pConnectionId = id;
178                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
179                 mBufferPool.mConnectionIds.insert(id);
180                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
181                 mBufferPool.mInvalidation.onConnect(id, observer);
182                 if (sSeqId == kSeqIdMax) {
183                    sSeqId = 0;
184                 } else {
185                     ++sSeqId;
186                 }
187             }
188 
189         }
190         mBufferPool.processStatusMessages();
191         mBufferPool.cleanUp();
192         scheduleEvictIfNeeded();
193     }
194     return status;
195 }
196 
close(ConnectionId connectionId)197 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
198     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
199     ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
200     mBufferPool.processStatusMessages();
201     mBufferPool.handleClose(connectionId);
202     mBufferPool.mObserver.close(connectionId);
203     mBufferPool.mInvalidation.onClose(connectionId);
204     // Since close# will be called after all works are finished, it is OK to
205     // evict unused buffers.
206     mBufferPool.cleanUp(true);
207     scheduleEvictIfNeeded();
208     return ResultStatus::OK;
209 }
210 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)211 ResultStatus Accessor::Impl::allocate(
212         ConnectionId connectionId, const std::vector<uint8_t>& params,
213         BufferId *bufferId, const native_handle_t** handle) {
214     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
215     mBufferPool.processStatusMessages();
216     ResultStatus status = ResultStatus::OK;
217     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
218         lock.unlock();
219         std::shared_ptr<BufferPoolAllocation> alloc;
220         size_t allocSize;
221         status = mAllocator->allocate(params, &alloc, &allocSize);
222         lock.lock();
223         if (status == ResultStatus::OK) {
224             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
225         }
226         ALOGV("create a buffer %d : %u %p",
227               status == ResultStatus::OK, *bufferId, *handle);
228     }
229     if (status == ResultStatus::OK) {
230         // TODO: handle ownBuffer failure
231         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
232     }
233     mBufferPool.cleanUp();
234     scheduleEvictIfNeeded();
235     return status;
236 }
237 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)238 ResultStatus Accessor::Impl::fetch(
239         ConnectionId connectionId, TransactionId transactionId,
240         BufferId bufferId, const native_handle_t** handle) {
241     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
242     mBufferPool.processStatusMessages();
243     auto found = mBufferPool.mTransactions.find(transactionId);
244     if (found != mBufferPool.mTransactions.end() &&
245             contains(&mBufferPool.mPendingTransactions,
246                      connectionId, transactionId)) {
247         if (found->second->mSenderValidated &&
248                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
249                 found->second->mBufferId == bufferId) {
250             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
251             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
252             if (bufferIt != mBufferPool.mBuffers.end()) {
253                 mBufferPool.mStats.onBufferFetched();
254                 *handle = bufferIt->second->handle();
255                 return ResultStatus::OK;
256             }
257         }
258     }
259     mBufferPool.cleanUp();
260     scheduleEvictIfNeeded();
261     return ResultStatus::CRITICAL_ERROR;
262 }
263 
cleanUp(bool clearCache)264 void Accessor::Impl::cleanUp(bool clearCache) {
265     // transaction timeout, buffer cacheing TTL handling
266     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
267     mBufferPool.processStatusMessages();
268     mBufferPool.cleanUp(clearCache);
269 }
270 
flush()271 void Accessor::Impl::flush() {
272     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
273     mBufferPool.processStatusMessages();
274     mBufferPool.flush(shared_from_this());
275 }
276 
handleInvalidateAck()277 void Accessor::Impl::handleInvalidateAck() {
278     std::map<ConnectionId, const sp<IObserver>> observers;
279     uint32_t invalidationId;
280     {
281         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
282         mBufferPool.processStatusMessages();
283         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
284     }
285     // Do not hold lock for send invalidations
286     size_t deadClients = 0;
287     for (auto it = observers.begin(); it != observers.end(); ++it) {
288         const sp<IObserver> observer = it->second;
289         if (observer) {
290             Return<void> transResult = observer->onMessage(it->first, invalidationId);
291             if (!transResult.isOk()) {
292                 ++deadClients;
293             }
294         }
295     }
296     if (deadClients > 0) {
297         ALOGD("During invalidation found %zu dead clients", deadClients);
298     }
299 }
300 
isValid()301 bool Accessor::Impl::isValid() {
302     return mBufferPool.isValid();
303 }
304 
BufferPool()305 Accessor::Impl::Impl::BufferPool::BufferPool()
306     : mTimestampUs(getTimestampNow()),
307       mLastCleanUpUs(mTimestampUs),
308       mLastLogUs(mTimestampUs),
309       mSeq(0),
310       mStartSeq(0) {
311     mValid = mInvalidationChannel.isValid();
312 }
313 
314 
315 // Statistics helper
316 template<typename T, typename S>
percentage(T base,S total)317 int percentage(T base, S total) {
318     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
319 }
320 
321 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
322 
~BufferPool()323 Accessor::Impl::Impl::BufferPool::~BufferPool() {
324     std::lock_guard<std::mutex> lock(mMutex);
325     ALOGD("Destruction - bufferpool2 %p "
326           "cached: %zu/%zuM, %zu/%d%% in use; "
327           "allocs: %zu, %d%% recycled; "
328           "transfers: %zu, %d%% unfetched",
329           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
330           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
331           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
332           mStats.mTotalTransfers,
333           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
334 }
335 
onConnect(ConnectionId conId,const sp<IObserver> & observer)336 void Accessor::Impl::BufferPool::Invalidation::onConnect(
337         ConnectionId conId, const sp<IObserver>& observer) {
338     mAcks[conId] = mInvalidationId; // starts from current invalidationId
339     mObservers.insert(std::make_pair(conId, observer));
340 }
341 
onClose(ConnectionId conId)342 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
343     mAcks.erase(conId);
344     mObservers.erase(conId);
345 }
346 
onAck(ConnectionId conId,uint32_t msgId)347 void Accessor::Impl::BufferPool::Invalidation::onAck(
348         ConnectionId conId,
349         uint32_t msgId) {
350     auto it = mAcks.find(conId);
351     if (it == mAcks.end()) {
352         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
353         return;
354     }
355     if (isMessageLater(msgId, it->second)) {
356         mAcks[conId] = msgId;
357     }
358 }
359 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)360 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
361         BufferId bufferId,
362         BufferInvalidationChannel &channel) {
363     for (auto it = mPendings.begin(); it != mPendings.end();) {
364         if (it->isInvalidated(bufferId)) {
365             uint32_t msgId = 0;
366             if (it->mNeedsAck) {
367                 msgId = ++mInvalidationId;
368                 if (msgId == 0) {
369                     // wrap happens
370                     msgId = ++mInvalidationId;
371                 }
372             }
373             channel.postInvalidation(msgId, it->mFrom, it->mTo);
374             it = mPendings.erase(it);
375             continue;
376         }
377         ++it;
378     }
379 }
380 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)381 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
382         bool needsAck,
383         uint32_t from,
384         uint32_t to,
385         size_t left,
386         BufferInvalidationChannel &channel,
387         const std::shared_ptr<Accessor::Impl> &impl) {
388         uint32_t msgId = 0;
389     if (needsAck) {
390         msgId = ++mInvalidationId;
391         if (msgId == 0) {
392             // wrap happens
393             msgId = ++mInvalidationId;
394         }
395     }
396     ALOGV("bufferpool2 invalidation requested and queued");
397     if (left == 0) {
398         channel.postInvalidation(msgId, from, to);
399     } else {
400         // TODO: sending hint message?
401         ALOGV("bufferpoo2 invalidation requested and pending");
402         Pending pending(needsAck, from, to, left, impl);
403         mPendings.push_back(pending);
404     }
405     sInvalidator->addAccessor(mId, impl);
406 }
407 
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)408 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
409         std::map<ConnectionId, const sp<IObserver>> *observers,
410         uint32_t *invalidationId) {
411     if (mInvalidationId != 0) {
412         *invalidationId = mInvalidationId;
413         std::set<int> deads;
414         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
415             if (it->second != mInvalidationId) {
416                 const sp<IObserver> observer = mObservers[it->first];
417                 if (observer) {
418                     observers->emplace(it->first, observer);
419                     ALOGV("connection %lld will call observer (%u: %u)",
420                           (long long)it->first, it->second, mInvalidationId);
421                     // N.B: onMessage will be called later. ignore possibility of
422                     // onMessage# oneway call being lost.
423                     it->second = mInvalidationId;
424                 } else {
425                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
426                     deads.insert(it->first);
427                 }
428             }
429         }
430         if (deads.size() > 0) {
431             for (auto it = deads.begin(); it != deads.end(); ++it) {
432                 onClose(*it);
433             }
434         }
435     }
436     if (mPendings.size() == 0) {
437         // All invalidation Ids are synced and no more pending invalidations.
438         sInvalidator->delAccessor(mId);
439     }
440 }
441 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)442 bool Accessor::Impl::BufferPool::handleOwnBuffer(
443         ConnectionId connectionId, BufferId bufferId) {
444 
445     bool added = insert(&mUsingBuffers, connectionId, bufferId);
446     if (added) {
447         auto iter = mBuffers.find(bufferId);
448         iter->second->mOwnerCount++;
449     }
450     insert(&mUsingConnections, bufferId, connectionId);
451     return added;
452 }
453 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)454 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
455         ConnectionId connectionId, BufferId bufferId) {
456     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
457     if (deleted) {
458         auto iter = mBuffers.find(bufferId);
459         iter->second->mOwnerCount--;
460         if (iter->second->mOwnerCount == 0 &&
461                 iter->second->mTransactionCount == 0) {
462             if (!iter->second->mInvalidated) {
463                 mStats.onBufferUnused(iter->second->mAllocSize);
464                 mFreeBuffers.insert(bufferId);
465             } else {
466                 mStats.onBufferUnused(iter->second->mAllocSize);
467                 mStats.onBufferEvicted(iter->second->mAllocSize);
468                 mBuffers.erase(iter);
469                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
470             }
471         }
472     }
473     erase(&mUsingConnections, bufferId, connectionId);
474     ALOGV("release buffer %u : %d", bufferId, deleted);
475     return deleted;
476 }
477 
handleTransferTo(const BufferStatusMessage & message)478 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
479     auto completed = mCompletedTransactions.find(
480             message.transactionId);
481     if (completed != mCompletedTransactions.end()) {
482         // already completed
483         mCompletedTransactions.erase(completed);
484         return true;
485     }
486     // the buffer should exist and be owned.
487     auto bufferIter = mBuffers.find(message.bufferId);
488     if (bufferIter == mBuffers.end() ||
489             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
490         return false;
491     }
492     auto found = mTransactions.find(message.transactionId);
493     if (found != mTransactions.end()) {
494         // transfer_from was received earlier.
495         found->second->mSender = message.connectionId;
496         found->second->mSenderValidated = true;
497         return true;
498     }
499     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
500         // N.B: it could be fake or receive connection already closed.
501         ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
502               this, (long long)message.targetConnectionId);
503         return false;
504     }
505     mStats.onBufferSent();
506     mTransactions.insert(std::make_pair(
507             message.transactionId,
508             std::make_unique<TransactionStatus>(message, mTimestampUs)));
509     insert(&mPendingTransactions, message.targetConnectionId,
510            message.transactionId);
511     bufferIter->second->mTransactionCount++;
512     return true;
513 }
514 
handleTransferFrom(const BufferStatusMessage & message)515 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
516     auto found = mTransactions.find(message.transactionId);
517     if (found == mTransactions.end()) {
518         // TODO: is it feasible to check ownership here?
519         mStats.onBufferSent();
520         mTransactions.insert(std::make_pair(
521                 message.transactionId,
522                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
523         insert(&mPendingTransactions, message.connectionId,
524                message.transactionId);
525         auto bufferIter = mBuffers.find(message.bufferId);
526         bufferIter->second->mTransactionCount++;
527     } else {
528         if (message.connectionId == found->second->mReceiver) {
529             found->second->mStatus = BufferStatus::TRANSFER_FROM;
530         }
531     }
532     return true;
533 }
534 
handleTransferResult(const BufferStatusMessage & message)535 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
536     auto found = mTransactions.find(message.transactionId);
537     if (found != mTransactions.end()) {
538         bool deleted = erase(&mPendingTransactions, message.connectionId,
539                              message.transactionId);
540         if (deleted) {
541             if (!found->second->mSenderValidated) {
542                 mCompletedTransactions.insert(message.transactionId);
543             }
544             auto bufferIter = mBuffers.find(message.bufferId);
545             if (message.newStatus == BufferStatus::TRANSFER_OK) {
546                 handleOwnBuffer(message.connectionId, message.bufferId);
547             }
548             bufferIter->second->mTransactionCount--;
549             if (bufferIter->second->mOwnerCount == 0
550                 && bufferIter->second->mTransactionCount == 0) {
551                 if (!bufferIter->second->mInvalidated) {
552                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
553                     mFreeBuffers.insert(message.bufferId);
554                 } else {
555                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
556                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
557                     mBuffers.erase(bufferIter);
558                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
559                 }
560             }
561             mTransactions.erase(found);
562         }
563         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
564               message.bufferId, deleted);
565         return deleted;
566     }
567     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
568           message.bufferId);
569     return false;
570 }
571 
processStatusMessages()572 void Accessor::Impl::BufferPool::processStatusMessages() {
573     std::vector<BufferStatusMessage> messages;
574     mObserver.getBufferStatusChanges(messages);
575     mTimestampUs = getTimestampNow();
576     for (BufferStatusMessage& message: messages) {
577         bool ret = false;
578         switch (message.newStatus) {
579             case BufferStatus::NOT_USED:
580                 ret = handleReleaseBuffer(
581                         message.connectionId, message.bufferId);
582                 break;
583             case BufferStatus::USED:
584                 // not happening
585                 break;
586             case BufferStatus::TRANSFER_TO:
587                 ret = handleTransferTo(message);
588                 break;
589             case BufferStatus::TRANSFER_FROM:
590                 ret = handleTransferFrom(message);
591                 break;
592             case BufferStatus::TRANSFER_TIMEOUT:
593                 // TODO
594                 break;
595             case BufferStatus::TRANSFER_LOST:
596                 // TODO
597                 break;
598             case BufferStatus::TRANSFER_FETCH:
599                 // not happening
600                 break;
601             case BufferStatus::TRANSFER_OK:
602             case BufferStatus::TRANSFER_ERROR:
603                 ret = handleTransferResult(message);
604                 break;
605             case BufferStatus::INVALIDATION_ACK:
606                 mInvalidation.onAck(message.connectionId, message.bufferId);
607                 ret = true;
608                 break;
609         }
610         if (ret == false) {
611             ALOGW("buffer status message processing failure - message : %d connection : %lld",
612                   message.newStatus, (long long)message.connectionId);
613         }
614     }
615     messages.clear();
616 }
617 
handleClose(ConnectionId connectionId)618 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
619     // Cleaning buffers
620     auto buffers = mUsingBuffers.find(connectionId);
621     if (buffers != mUsingBuffers.end()) {
622         for (const BufferId& bufferId : buffers->second) {
623             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
624             if (deleted) {
625                 auto bufferIter = mBuffers.find(bufferId);
626                 bufferIter->second->mOwnerCount--;
627                 if (bufferIter->second->mOwnerCount == 0 &&
628                         bufferIter->second->mTransactionCount == 0) {
629                     // TODO: handle freebuffer insert fail
630                     if (!bufferIter->second->mInvalidated) {
631                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
632                         mFreeBuffers.insert(bufferId);
633                     } else {
634                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
635                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
636                         mBuffers.erase(bufferIter);
637                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
638                     }
639                 }
640             }
641         }
642         mUsingBuffers.erase(buffers);
643     }
644 
645     // Cleaning transactions
646     auto pending = mPendingTransactions.find(connectionId);
647     if (pending != mPendingTransactions.end()) {
648         for (const TransactionId& transactionId : pending->second) {
649             auto iter = mTransactions.find(transactionId);
650             if (iter != mTransactions.end()) {
651                 if (!iter->second->mSenderValidated) {
652                     mCompletedTransactions.insert(transactionId);
653                 }
654                 BufferId bufferId = iter->second->mBufferId;
655                 auto bufferIter = mBuffers.find(bufferId);
656                 bufferIter->second->mTransactionCount--;
657                 if (bufferIter->second->mOwnerCount == 0 &&
658                     bufferIter->second->mTransactionCount == 0) {
659                     // TODO: handle freebuffer insert fail
660                     if (!bufferIter->second->mInvalidated) {
661                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
662                         mFreeBuffers.insert(bufferId);
663                     } else {
664                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
665                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
666                         mBuffers.erase(bufferIter);
667                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
668                     }
669                 }
670                 mTransactions.erase(iter);
671             }
672         }
673     }
674     mConnectionIds.erase(connectionId);
675     return true;
676 }
677 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)678 bool Accessor::Impl::BufferPool::getFreeBuffer(
679         const std::shared_ptr<BufferPoolAllocator> &allocator,
680         const std::vector<uint8_t> &params, BufferId *pId,
681         const native_handle_t** handle) {
682     auto bufferIt = mFreeBuffers.begin();
683     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
684         BufferId bufferId = *bufferIt;
685         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
686             break;
687         }
688     }
689     if (bufferIt != mFreeBuffers.end()) {
690         BufferId id = *bufferIt;
691         mFreeBuffers.erase(bufferIt);
692         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
693         *handle = mBuffers[id]->handle();
694         *pId = id;
695         ALOGV("recycle a buffer %u %p", id, *handle);
696         return true;
697     }
698     return false;
699 }
700 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)701 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
702         const std::shared_ptr<BufferPoolAllocation> &alloc,
703         const size_t allocSize,
704         const std::vector<uint8_t> &params,
705         BufferId *pId,
706         const native_handle_t** handle) {
707 
708     BufferId bufferId = mSeq++;
709     if (mSeq == Connection::SYNC_BUFFERID) {
710         mSeq = 0;
711     }
712     std::unique_ptr<InternalBuffer> buffer =
713             std::make_unique<InternalBuffer>(
714                     bufferId, alloc, allocSize, params);
715     if (buffer) {
716         auto res = mBuffers.insert(std::make_pair(
717                 bufferId, std::move(buffer)));
718         if (res.second) {
719             mStats.onBufferAllocated(allocSize);
720             *handle = alloc->handle();
721             *pId = bufferId;
722             return ResultStatus::OK;
723         }
724     }
725     return ResultStatus::NO_MEMORY;
726 }
727 
cleanUp(bool clearCache)728 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
729     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs ||
730             mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
731         mLastCleanUpUs = mTimestampUs;
732         if (mTimestampUs > mLastLogUs + kLogDurationUs ||
733                 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
734             mLastLogUs = mTimestampUs;
735             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
736                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
737                   "%zu/%zu (fetch/transfer)",
738                   this, mStats.mBuffersCached, mStats.mSizeCached,
739                   mStats.mBuffersInUse, mStats.mSizeInUse,
740                   mStats.mTotalRecycles, mStats.mTotalAllocations,
741                   mStats.mTotalFetches, mStats.mTotalTransfers);
742         }
743         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
744             if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
745                     (mStats.mSizeCached < kMinAllocBytesForEviction ||
746                      mBuffers.size() < kMinBufferCountForEviction)) {
747                 break;
748             }
749             auto it = mBuffers.find(*freeIt);
750             if (it != mBuffers.end() &&
751                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
752                 mStats.onBufferEvicted(it->second->mAllocSize);
753                 mBuffers.erase(it);
754                 freeIt = mFreeBuffers.erase(freeIt);
755             } else {
756                 ++freeIt;
757                 ALOGW("bufferpool2 inconsistent!");
758             }
759         }
760     }
761 }
762 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)763 void Accessor::Impl::BufferPool::invalidate(
764         bool needsAck, BufferId from, BufferId to,
765         const std::shared_ptr<Accessor::Impl> &impl) {
766     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
767         if (isBufferInRange(from, to, *freeIt)) {
768             auto it = mBuffers.find(*freeIt);
769             if (it != mBuffers.end() &&
770                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
771                 mStats.onBufferEvicted(it->second->mAllocSize);
772                 mBuffers.erase(it);
773                 freeIt = mFreeBuffers.erase(freeIt);
774                 continue;
775             } else {
776                 ALOGW("bufferpool2 inconsistent!");
777             }
778         }
779         ++freeIt;
780     }
781 
782     size_t left = 0;
783     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
784         if (isBufferInRange(from, to, it->first)) {
785             it->second->invalidate();
786             ++left;
787         }
788     }
789     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
790 }
791 
flush(const std::shared_ptr<Accessor::Impl> & impl)792 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
793     BufferId from = mStartSeq;
794     BufferId to = mSeq;
795     mStartSeq = mSeq;
796     // TODO: needsAck params
797     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
798     if (from != to) {
799         invalidate(true, from, to, impl);
800     }
801 }
802 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)803 void Accessor::Impl::invalidatorThread(
804             std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
805             std::mutex &mutex,
806             std::condition_variable &cv,
807             bool &ready) {
808     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
809     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
810     constexpr useconds_t MAX_SLEEP_US = 10000;
811     uint32_t numSpin = 0;
812     useconds_t sleepUs = 1;
813 
814     while(true) {
815         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
816         {
817             std::unique_lock<std::mutex> lock(mutex);
818             if (!ready) {
819                 numSpin = 0;
820                 sleepUs = 1;
821                 cv.wait(lock);
822             }
823             copied.insert(accessors.begin(), accessors.end());
824         }
825         std::list<ConnectionId> erased;
826         for (auto it = copied.begin(); it != copied.end(); ++it) {
827             const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
828             if (!impl) {
829                 erased.push_back(it->first);
830             } else {
831                 impl->handleInvalidateAck();
832             }
833         }
834         {
835             std::unique_lock<std::mutex> lock(mutex);
836             for (auto it = erased.begin(); it != erased.end(); ++it) {
837                 accessors.erase(*it);
838             }
839             if (accessors.size() == 0) {
840                 ready = false;
841             } else {
842                 // TODO Use an efficient way to wait over FMQ.
843                 // N.B. Since there is not a efficient way to wait over FMQ,
844                 // polling over the FMQ is the current way to prevent draining
845                 // CPU.
846                 lock.unlock();
847                 ++numSpin;
848                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
849                     sleepUs < MAX_SLEEP_US) {
850                     sleepUs *= 10;
851                 }
852                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
853                     ALOGW("invalidator thread spinning");
854                 }
855                 ::usleep(sleepUs);
856             }
857         }
858     }
859 }
860 
AccessorInvalidator()861 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
862     std::thread invalidator(
863             invalidatorThread,
864             std::ref(mAccessors),
865             std::ref(mMutex),
866             std::ref(mCv),
867             std::ref(mReady));
868     invalidator.detach();
869 }
870 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)871 void Accessor::Impl::AccessorInvalidator::addAccessor(
872         uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
873     bool notify = false;
874     std::unique_lock<std::mutex> lock(mMutex);
875     if (mAccessors.find(accessorId) == mAccessors.end()) {
876         if (!mReady) {
877             mReady = true;
878             notify = true;
879         }
880         mAccessors.insert(std::make_pair(accessorId, impl));
881         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
882     }
883     lock.unlock();
884     if (notify) {
885         mCv.notify_one();
886     }
887 }
888 
delAccessor(uint32_t accessorId)889 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
890     std::lock_guard<std::mutex> lock(mMutex);
891     mAccessors.erase(accessorId);
892     ALOGV("buffer invalidation deleted bp:%u", accessorId);
893     if (mAccessors.size() == 0) {
894         mReady = false;
895     }
896 }
897 
898 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
899 
createInvalidator()900 void Accessor::Impl::createInvalidator() {
901     if (!sInvalidator) {
902         sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
903     }
904 }
905 
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)906 void Accessor::Impl::evictorThread(
907         std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
908         std::mutex &mutex,
909         std::condition_variable &cv) {
910     std::list<const std::weak_ptr<Accessor::Impl>> evictList;
911     while (true) {
912         int expired = 0;
913         int evicted = 0;
914         {
915             nsecs_t now = systemTime();
916             std::unique_lock<std::mutex> lock(mutex);
917             if (accessors.size() == 0) {
918                 cv.wait(lock);
919             }
920             auto it = accessors.begin();
921             while (it != accessors.end()) {
922                 if (now > (it->second + kEvictDurationNs)) {
923                     ++expired;
924                     evictList.push_back(it->first);
925                     it = accessors.erase(it);
926                 } else {
927                     ++it;
928                 }
929             }
930         }
931         // evict idle accessors;
932         for (auto it = evictList.begin(); it != evictList.end(); ++it) {
933             const std::shared_ptr<Accessor::Impl> accessor = it->lock();
934             if (accessor) {
935                 accessor->cleanUp(true);
936                 ++evicted;
937             }
938         }
939         if (expired > 0) {
940             ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
941         }
942         evictList.clear();
943         ::usleep(kEvictGranularityNs / 1000);
944     }
945 }
946 
AccessorEvictor()947 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
948     std::thread evictor(
949             evictorThread,
950             std::ref(mAccessors),
951             std::ref(mMutex),
952             std::ref(mCv));
953     evictor.detach();
954 }
955 
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)956 void Accessor::Impl::AccessorEvictor::addAccessor(
957         const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
958     std::lock_guard<std::mutex> lock(mMutex);
959     bool notify = mAccessors.empty();
960     auto it = mAccessors.find(impl);
961     if (it == mAccessors.end()) {
962         mAccessors.emplace(impl, ts);
963     } else {
964         it->second = ts;
965     }
966     if (notify) {
967         mCv.notify_one();
968     }
969 }
970 
971 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
972 
createEvictor()973 void Accessor::Impl::createEvictor() {
974     if (!sEvictor) {
975         sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
976     }
977 }
978 
scheduleEvictIfNeeded()979 void Accessor::Impl::scheduleEvictIfNeeded() {
980     nsecs_t now = systemTime();
981 
982     if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
983         mScheduleEvictTs = now;
984         sEvictor->addAccessor(shared_from_this(), now);
985     }
986 }
987 
988 }  // namespace implementation
989 }  // namespace V2_0
990 }  // namespace bufferpool
991 }  // namespace media
992 }  // namespace hardware
993 }  // namespace android
994