1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "AccessorImpl.h"
27 #include "Connection.h"
28
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35
36 namespace {
37 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39
40 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
41 static constexpr size_t kMinBufferCountForEviction = 25;
42 static constexpr size_t kMaxUnusedBufferCount = 64;
43 static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
44
45 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
46 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
47 }
48
49 // Buffer structure in bufferpool process
50 struct InternalBuffer {
51 BufferId mId;
52 size_t mOwnerCount;
53 size_t mTransactionCount;
54 const std::shared_ptr<BufferPoolAllocation> mAllocation;
55 const size_t mAllocSize;
56 const std::vector<uint8_t> mConfig;
57 bool mInvalidated;
58
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer59 InternalBuffer(
60 BufferId id,
61 const std::shared_ptr<BufferPoolAllocation> &alloc,
62 const size_t allocSize,
63 const std::vector<uint8_t> &allocConfig)
64 : mId(id), mOwnerCount(0), mTransactionCount(0),
65 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
66 mInvalidated(false) {}
67
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer68 const native_handle_t *handle() {
69 return mAllocation->handle();
70 }
71
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer72 void invalidate() {
73 mInvalidated = true;
74 }
75 };
76
77 struct TransactionStatus {
78 TransactionId mId;
79 BufferId mBufferId;
80 ConnectionId mSender;
81 ConnectionId mReceiver;
82 BufferStatus mStatus;
83 int64_t mTimestampUs;
84 bool mSenderValidated;
85
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus86 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
87 mId = message.transactionId;
88 mBufferId = message.bufferId;
89 mStatus = message.newStatus;
90 mTimestampUs = timestampUs;
91 if (mStatus == BufferStatus::TRANSFER_TO) {
92 mSender = message.connectionId;
93 mReceiver = message.targetConnectionId;
94 mSenderValidated = true;
95 } else {
96 mSender = -1LL;
97 mReceiver = message.connectionId;
98 mSenderValidated = false;
99 }
100 }
101 };
102
103 // Helper template methods for handling map of set.
104 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)105 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
106 auto iter = mapOfSet->find(key);
107 if (iter == mapOfSet->end()) {
108 std::set<U> valueSet{value};
109 mapOfSet->insert(std::make_pair(key, valueSet));
110 return true;
111 } else if (iter->second.find(value) == iter->second.end()) {
112 iter->second.insert(value);
113 return true;
114 }
115 return false;
116 }
117
118 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)119 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
120 bool ret = false;
121 auto iter = mapOfSet->find(key);
122 if (iter != mapOfSet->end()) {
123 if (iter->second.erase(value) > 0) {
124 ret = true;
125 }
126 if (iter->second.size() == 0) {
127 mapOfSet->erase(iter);
128 }
129 }
130 return ret;
131 }
132
133 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)134 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
135 auto iter = mapOfSet->find(key);
136 if (iter != mapOfSet->end()) {
137 auto setIter = iter->second.find(value);
138 return setIter != iter->second.end();
139 }
140 return false;
141 }
142
143 #ifdef __ANDROID_VNDK__
144 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
145 #else
146 static constexpr uint32_t kSeqIdVndkBit = 0;
147 #endif
148
149 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
150 uint32_t Accessor::Impl::sSeqId = time(nullptr) & kSeqIdMax;
151
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)152 Accessor::Impl::Impl(
153 const std::shared_ptr<BufferPoolAllocator> &allocator)
154 : mAllocator(allocator), mScheduleEvictTs(0) {}
155
~Impl()156 Accessor::Impl::~Impl() {
157 }
158
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)159 ResultStatus Accessor::Impl::connect(
160 const sp<Accessor> &accessor, const sp<IObserver> &observer,
161 sp<Connection> *connection,
162 ConnectionId *pConnectionId,
163 uint32_t *pMsgId,
164 const StatusDescriptor** statusDescPtr,
165 const InvalidationDescriptor** invDescPtr) {
166 sp<Connection> newConnection = new Connection();
167 ResultStatus status = ResultStatus::CRITICAL_ERROR;
168 {
169 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
170 if (newConnection) {
171 int32_t pid = getpid();
172 ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
173 status = mBufferPool.mObserver.open(id, statusDescPtr);
174 if (status == ResultStatus::OK) {
175 newConnection->initialize(accessor, id);
176 *connection = newConnection;
177 *pConnectionId = id;
178 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
179 mBufferPool.mConnectionIds.insert(id);
180 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
181 mBufferPool.mInvalidation.onConnect(id, observer);
182 if (sSeqId == kSeqIdMax) {
183 sSeqId = 0;
184 } else {
185 ++sSeqId;
186 }
187 }
188
189 }
190 mBufferPool.processStatusMessages();
191 mBufferPool.cleanUp();
192 scheduleEvictIfNeeded();
193 }
194 return status;
195 }
196
close(ConnectionId connectionId)197 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
198 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
199 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
200 mBufferPool.processStatusMessages();
201 mBufferPool.handleClose(connectionId);
202 mBufferPool.mObserver.close(connectionId);
203 mBufferPool.mInvalidation.onClose(connectionId);
204 // Since close# will be called after all works are finished, it is OK to
205 // evict unused buffers.
206 mBufferPool.cleanUp(true);
207 scheduleEvictIfNeeded();
208 return ResultStatus::OK;
209 }
210
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)211 ResultStatus Accessor::Impl::allocate(
212 ConnectionId connectionId, const std::vector<uint8_t>& params,
213 BufferId *bufferId, const native_handle_t** handle) {
214 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
215 mBufferPool.processStatusMessages();
216 ResultStatus status = ResultStatus::OK;
217 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
218 lock.unlock();
219 std::shared_ptr<BufferPoolAllocation> alloc;
220 size_t allocSize;
221 status = mAllocator->allocate(params, &alloc, &allocSize);
222 lock.lock();
223 if (status == ResultStatus::OK) {
224 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
225 }
226 ALOGV("create a buffer %d : %u %p",
227 status == ResultStatus::OK, *bufferId, *handle);
228 }
229 if (status == ResultStatus::OK) {
230 // TODO: handle ownBuffer failure
231 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
232 }
233 mBufferPool.cleanUp();
234 scheduleEvictIfNeeded();
235 return status;
236 }
237
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)238 ResultStatus Accessor::Impl::fetch(
239 ConnectionId connectionId, TransactionId transactionId,
240 BufferId bufferId, const native_handle_t** handle) {
241 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
242 mBufferPool.processStatusMessages();
243 auto found = mBufferPool.mTransactions.find(transactionId);
244 if (found != mBufferPool.mTransactions.end() &&
245 contains(&mBufferPool.mPendingTransactions,
246 connectionId, transactionId)) {
247 if (found->second->mSenderValidated &&
248 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
249 found->second->mBufferId == bufferId) {
250 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
251 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
252 if (bufferIt != mBufferPool.mBuffers.end()) {
253 mBufferPool.mStats.onBufferFetched();
254 *handle = bufferIt->second->handle();
255 return ResultStatus::OK;
256 }
257 }
258 }
259 mBufferPool.cleanUp();
260 scheduleEvictIfNeeded();
261 return ResultStatus::CRITICAL_ERROR;
262 }
263
cleanUp(bool clearCache)264 void Accessor::Impl::cleanUp(bool clearCache) {
265 // transaction timeout, buffer cacheing TTL handling
266 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
267 mBufferPool.processStatusMessages();
268 mBufferPool.cleanUp(clearCache);
269 }
270
flush()271 void Accessor::Impl::flush() {
272 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
273 mBufferPool.processStatusMessages();
274 mBufferPool.flush(shared_from_this());
275 }
276
handleInvalidateAck()277 void Accessor::Impl::handleInvalidateAck() {
278 std::map<ConnectionId, const sp<IObserver>> observers;
279 uint32_t invalidationId;
280 {
281 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
282 mBufferPool.processStatusMessages();
283 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
284 }
285 // Do not hold lock for send invalidations
286 size_t deadClients = 0;
287 for (auto it = observers.begin(); it != observers.end(); ++it) {
288 const sp<IObserver> observer = it->second;
289 if (observer) {
290 Return<void> transResult = observer->onMessage(it->first, invalidationId);
291 if (!transResult.isOk()) {
292 ++deadClients;
293 }
294 }
295 }
296 if (deadClients > 0) {
297 ALOGD("During invalidation found %zu dead clients", deadClients);
298 }
299 }
300
isValid()301 bool Accessor::Impl::isValid() {
302 return mBufferPool.isValid();
303 }
304
BufferPool()305 Accessor::Impl::Impl::BufferPool::BufferPool()
306 : mTimestampUs(getTimestampNow()),
307 mLastCleanUpUs(mTimestampUs),
308 mLastLogUs(mTimestampUs),
309 mSeq(0),
310 mStartSeq(0) {
311 mValid = mInvalidationChannel.isValid();
312 }
313
314
315 // Statistics helper
316 template<typename T, typename S>
percentage(T base,S total)317 int percentage(T base, S total) {
318 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
319 }
320
321 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
322
~BufferPool()323 Accessor::Impl::Impl::BufferPool::~BufferPool() {
324 std::lock_guard<std::mutex> lock(mMutex);
325 ALOGD("Destruction - bufferpool2 %p "
326 "cached: %zu/%zuM, %zu/%d%% in use; "
327 "allocs: %zu, %d%% recycled; "
328 "transfers: %zu, %d%% unfetched",
329 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
330 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
331 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
332 mStats.mTotalTransfers,
333 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
334 }
335
onConnect(ConnectionId conId,const sp<IObserver> & observer)336 void Accessor::Impl::BufferPool::Invalidation::onConnect(
337 ConnectionId conId, const sp<IObserver>& observer) {
338 mAcks[conId] = mInvalidationId; // starts from current invalidationId
339 mObservers.insert(std::make_pair(conId, observer));
340 }
341
onClose(ConnectionId conId)342 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
343 mAcks.erase(conId);
344 mObservers.erase(conId);
345 }
346
onAck(ConnectionId conId,uint32_t msgId)347 void Accessor::Impl::BufferPool::Invalidation::onAck(
348 ConnectionId conId,
349 uint32_t msgId) {
350 auto it = mAcks.find(conId);
351 if (it == mAcks.end()) {
352 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
353 return;
354 }
355 if (isMessageLater(msgId, it->second)) {
356 mAcks[conId] = msgId;
357 }
358 }
359
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)360 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
361 BufferId bufferId,
362 BufferInvalidationChannel &channel) {
363 for (auto it = mPendings.begin(); it != mPendings.end();) {
364 if (it->isInvalidated(bufferId)) {
365 uint32_t msgId = 0;
366 if (it->mNeedsAck) {
367 msgId = ++mInvalidationId;
368 if (msgId == 0) {
369 // wrap happens
370 msgId = ++mInvalidationId;
371 }
372 }
373 channel.postInvalidation(msgId, it->mFrom, it->mTo);
374 it = mPendings.erase(it);
375 continue;
376 }
377 ++it;
378 }
379 }
380
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)381 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
382 bool needsAck,
383 uint32_t from,
384 uint32_t to,
385 size_t left,
386 BufferInvalidationChannel &channel,
387 const std::shared_ptr<Accessor::Impl> &impl) {
388 uint32_t msgId = 0;
389 if (needsAck) {
390 msgId = ++mInvalidationId;
391 if (msgId == 0) {
392 // wrap happens
393 msgId = ++mInvalidationId;
394 }
395 }
396 ALOGV("bufferpool2 invalidation requested and queued");
397 if (left == 0) {
398 channel.postInvalidation(msgId, from, to);
399 } else {
400 // TODO: sending hint message?
401 ALOGV("bufferpoo2 invalidation requested and pending");
402 Pending pending(needsAck, from, to, left, impl);
403 mPendings.push_back(pending);
404 }
405 sInvalidator->addAccessor(mId, impl);
406 }
407
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)408 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
409 std::map<ConnectionId, const sp<IObserver>> *observers,
410 uint32_t *invalidationId) {
411 if (mInvalidationId != 0) {
412 *invalidationId = mInvalidationId;
413 std::set<int> deads;
414 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
415 if (it->second != mInvalidationId) {
416 const sp<IObserver> observer = mObservers[it->first];
417 if (observer) {
418 observers->emplace(it->first, observer);
419 ALOGV("connection %lld will call observer (%u: %u)",
420 (long long)it->first, it->second, mInvalidationId);
421 // N.B: onMessage will be called later. ignore possibility of
422 // onMessage# oneway call being lost.
423 it->second = mInvalidationId;
424 } else {
425 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
426 deads.insert(it->first);
427 }
428 }
429 }
430 if (deads.size() > 0) {
431 for (auto it = deads.begin(); it != deads.end(); ++it) {
432 onClose(*it);
433 }
434 }
435 }
436 if (mPendings.size() == 0) {
437 // All invalidation Ids are synced and no more pending invalidations.
438 sInvalidator->delAccessor(mId);
439 }
440 }
441
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)442 bool Accessor::Impl::BufferPool::handleOwnBuffer(
443 ConnectionId connectionId, BufferId bufferId) {
444
445 bool added = insert(&mUsingBuffers, connectionId, bufferId);
446 if (added) {
447 auto iter = mBuffers.find(bufferId);
448 iter->second->mOwnerCount++;
449 }
450 insert(&mUsingConnections, bufferId, connectionId);
451 return added;
452 }
453
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)454 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
455 ConnectionId connectionId, BufferId bufferId) {
456 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
457 if (deleted) {
458 auto iter = mBuffers.find(bufferId);
459 iter->second->mOwnerCount--;
460 if (iter->second->mOwnerCount == 0 &&
461 iter->second->mTransactionCount == 0) {
462 if (!iter->second->mInvalidated) {
463 mStats.onBufferUnused(iter->second->mAllocSize);
464 mFreeBuffers.insert(bufferId);
465 } else {
466 mStats.onBufferUnused(iter->second->mAllocSize);
467 mStats.onBufferEvicted(iter->second->mAllocSize);
468 mBuffers.erase(iter);
469 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
470 }
471 }
472 }
473 erase(&mUsingConnections, bufferId, connectionId);
474 ALOGV("release buffer %u : %d", bufferId, deleted);
475 return deleted;
476 }
477
handleTransferTo(const BufferStatusMessage & message)478 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
479 auto completed = mCompletedTransactions.find(
480 message.transactionId);
481 if (completed != mCompletedTransactions.end()) {
482 // already completed
483 mCompletedTransactions.erase(completed);
484 return true;
485 }
486 // the buffer should exist and be owned.
487 auto bufferIter = mBuffers.find(message.bufferId);
488 if (bufferIter == mBuffers.end() ||
489 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
490 return false;
491 }
492 auto found = mTransactions.find(message.transactionId);
493 if (found != mTransactions.end()) {
494 // transfer_from was received earlier.
495 found->second->mSender = message.connectionId;
496 found->second->mSenderValidated = true;
497 return true;
498 }
499 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
500 // N.B: it could be fake or receive connection already closed.
501 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
502 this, (long long)message.targetConnectionId);
503 return false;
504 }
505 mStats.onBufferSent();
506 mTransactions.insert(std::make_pair(
507 message.transactionId,
508 std::make_unique<TransactionStatus>(message, mTimestampUs)));
509 insert(&mPendingTransactions, message.targetConnectionId,
510 message.transactionId);
511 bufferIter->second->mTransactionCount++;
512 return true;
513 }
514
handleTransferFrom(const BufferStatusMessage & message)515 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
516 auto found = mTransactions.find(message.transactionId);
517 if (found == mTransactions.end()) {
518 // TODO: is it feasible to check ownership here?
519 mStats.onBufferSent();
520 mTransactions.insert(std::make_pair(
521 message.transactionId,
522 std::make_unique<TransactionStatus>(message, mTimestampUs)));
523 insert(&mPendingTransactions, message.connectionId,
524 message.transactionId);
525 auto bufferIter = mBuffers.find(message.bufferId);
526 bufferIter->second->mTransactionCount++;
527 } else {
528 if (message.connectionId == found->second->mReceiver) {
529 found->second->mStatus = BufferStatus::TRANSFER_FROM;
530 }
531 }
532 return true;
533 }
534
handleTransferResult(const BufferStatusMessage & message)535 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
536 auto found = mTransactions.find(message.transactionId);
537 if (found != mTransactions.end()) {
538 bool deleted = erase(&mPendingTransactions, message.connectionId,
539 message.transactionId);
540 if (deleted) {
541 if (!found->second->mSenderValidated) {
542 mCompletedTransactions.insert(message.transactionId);
543 }
544 auto bufferIter = mBuffers.find(message.bufferId);
545 if (message.newStatus == BufferStatus::TRANSFER_OK) {
546 handleOwnBuffer(message.connectionId, message.bufferId);
547 }
548 bufferIter->second->mTransactionCount--;
549 if (bufferIter->second->mOwnerCount == 0
550 && bufferIter->second->mTransactionCount == 0) {
551 if (!bufferIter->second->mInvalidated) {
552 mStats.onBufferUnused(bufferIter->second->mAllocSize);
553 mFreeBuffers.insert(message.bufferId);
554 } else {
555 mStats.onBufferUnused(bufferIter->second->mAllocSize);
556 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
557 mBuffers.erase(bufferIter);
558 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
559 }
560 }
561 mTransactions.erase(found);
562 }
563 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
564 message.bufferId, deleted);
565 return deleted;
566 }
567 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
568 message.bufferId);
569 return false;
570 }
571
processStatusMessages()572 void Accessor::Impl::BufferPool::processStatusMessages() {
573 std::vector<BufferStatusMessage> messages;
574 mObserver.getBufferStatusChanges(messages);
575 mTimestampUs = getTimestampNow();
576 for (BufferStatusMessage& message: messages) {
577 bool ret = false;
578 switch (message.newStatus) {
579 case BufferStatus::NOT_USED:
580 ret = handleReleaseBuffer(
581 message.connectionId, message.bufferId);
582 break;
583 case BufferStatus::USED:
584 // not happening
585 break;
586 case BufferStatus::TRANSFER_TO:
587 ret = handleTransferTo(message);
588 break;
589 case BufferStatus::TRANSFER_FROM:
590 ret = handleTransferFrom(message);
591 break;
592 case BufferStatus::TRANSFER_TIMEOUT:
593 // TODO
594 break;
595 case BufferStatus::TRANSFER_LOST:
596 // TODO
597 break;
598 case BufferStatus::TRANSFER_FETCH:
599 // not happening
600 break;
601 case BufferStatus::TRANSFER_OK:
602 case BufferStatus::TRANSFER_ERROR:
603 ret = handleTransferResult(message);
604 break;
605 case BufferStatus::INVALIDATION_ACK:
606 mInvalidation.onAck(message.connectionId, message.bufferId);
607 ret = true;
608 break;
609 }
610 if (ret == false) {
611 ALOGW("buffer status message processing failure - message : %d connection : %lld",
612 message.newStatus, (long long)message.connectionId);
613 }
614 }
615 messages.clear();
616 }
617
handleClose(ConnectionId connectionId)618 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
619 // Cleaning buffers
620 auto buffers = mUsingBuffers.find(connectionId);
621 if (buffers != mUsingBuffers.end()) {
622 for (const BufferId& bufferId : buffers->second) {
623 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
624 if (deleted) {
625 auto bufferIter = mBuffers.find(bufferId);
626 bufferIter->second->mOwnerCount--;
627 if (bufferIter->second->mOwnerCount == 0 &&
628 bufferIter->second->mTransactionCount == 0) {
629 // TODO: handle freebuffer insert fail
630 if (!bufferIter->second->mInvalidated) {
631 mStats.onBufferUnused(bufferIter->second->mAllocSize);
632 mFreeBuffers.insert(bufferId);
633 } else {
634 mStats.onBufferUnused(bufferIter->second->mAllocSize);
635 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
636 mBuffers.erase(bufferIter);
637 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
638 }
639 }
640 }
641 }
642 mUsingBuffers.erase(buffers);
643 }
644
645 // Cleaning transactions
646 auto pending = mPendingTransactions.find(connectionId);
647 if (pending != mPendingTransactions.end()) {
648 for (const TransactionId& transactionId : pending->second) {
649 auto iter = mTransactions.find(transactionId);
650 if (iter != mTransactions.end()) {
651 if (!iter->second->mSenderValidated) {
652 mCompletedTransactions.insert(transactionId);
653 }
654 BufferId bufferId = iter->second->mBufferId;
655 auto bufferIter = mBuffers.find(bufferId);
656 bufferIter->second->mTransactionCount--;
657 if (bufferIter->second->mOwnerCount == 0 &&
658 bufferIter->second->mTransactionCount == 0) {
659 // TODO: handle freebuffer insert fail
660 if (!bufferIter->second->mInvalidated) {
661 mStats.onBufferUnused(bufferIter->second->mAllocSize);
662 mFreeBuffers.insert(bufferId);
663 } else {
664 mStats.onBufferUnused(bufferIter->second->mAllocSize);
665 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
666 mBuffers.erase(bufferIter);
667 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
668 }
669 }
670 mTransactions.erase(iter);
671 }
672 }
673 }
674 mConnectionIds.erase(connectionId);
675 return true;
676 }
677
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)678 bool Accessor::Impl::BufferPool::getFreeBuffer(
679 const std::shared_ptr<BufferPoolAllocator> &allocator,
680 const std::vector<uint8_t> ¶ms, BufferId *pId,
681 const native_handle_t** handle) {
682 auto bufferIt = mFreeBuffers.begin();
683 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
684 BufferId bufferId = *bufferIt;
685 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
686 break;
687 }
688 }
689 if (bufferIt != mFreeBuffers.end()) {
690 BufferId id = *bufferIt;
691 mFreeBuffers.erase(bufferIt);
692 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
693 *handle = mBuffers[id]->handle();
694 *pId = id;
695 ALOGV("recycle a buffer %u %p", id, *handle);
696 return true;
697 }
698 return false;
699 }
700
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)701 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
702 const std::shared_ptr<BufferPoolAllocation> &alloc,
703 const size_t allocSize,
704 const std::vector<uint8_t> ¶ms,
705 BufferId *pId,
706 const native_handle_t** handle) {
707
708 BufferId bufferId = mSeq++;
709 if (mSeq == Connection::SYNC_BUFFERID) {
710 mSeq = 0;
711 }
712 std::unique_ptr<InternalBuffer> buffer =
713 std::make_unique<InternalBuffer>(
714 bufferId, alloc, allocSize, params);
715 if (buffer) {
716 auto res = mBuffers.insert(std::make_pair(
717 bufferId, std::move(buffer)));
718 if (res.second) {
719 mStats.onBufferAllocated(allocSize);
720 *handle = alloc->handle();
721 *pId = bufferId;
722 return ResultStatus::OK;
723 }
724 }
725 return ResultStatus::NO_MEMORY;
726 }
727
cleanUp(bool clearCache)728 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
729 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs ||
730 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
731 mLastCleanUpUs = mTimestampUs;
732 if (mTimestampUs > mLastLogUs + kLogDurationUs ||
733 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
734 mLastLogUs = mTimestampUs;
735 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
736 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
737 "%zu/%zu (fetch/transfer)",
738 this, mStats.mBuffersCached, mStats.mSizeCached,
739 mStats.mBuffersInUse, mStats.mSizeInUse,
740 mStats.mTotalRecycles, mStats.mTotalAllocations,
741 mStats.mTotalFetches, mStats.mTotalTransfers);
742 }
743 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
744 if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
745 (mStats.mSizeCached < kMinAllocBytesForEviction ||
746 mBuffers.size() < kMinBufferCountForEviction)) {
747 break;
748 }
749 auto it = mBuffers.find(*freeIt);
750 if (it != mBuffers.end() &&
751 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
752 mStats.onBufferEvicted(it->second->mAllocSize);
753 mBuffers.erase(it);
754 freeIt = mFreeBuffers.erase(freeIt);
755 } else {
756 ++freeIt;
757 ALOGW("bufferpool2 inconsistent!");
758 }
759 }
760 }
761 }
762
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)763 void Accessor::Impl::BufferPool::invalidate(
764 bool needsAck, BufferId from, BufferId to,
765 const std::shared_ptr<Accessor::Impl> &impl) {
766 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
767 if (isBufferInRange(from, to, *freeIt)) {
768 auto it = mBuffers.find(*freeIt);
769 if (it != mBuffers.end() &&
770 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
771 mStats.onBufferEvicted(it->second->mAllocSize);
772 mBuffers.erase(it);
773 freeIt = mFreeBuffers.erase(freeIt);
774 continue;
775 } else {
776 ALOGW("bufferpool2 inconsistent!");
777 }
778 }
779 ++freeIt;
780 }
781
782 size_t left = 0;
783 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
784 if (isBufferInRange(from, to, it->first)) {
785 it->second->invalidate();
786 ++left;
787 }
788 }
789 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
790 }
791
flush(const std::shared_ptr<Accessor::Impl> & impl)792 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
793 BufferId from = mStartSeq;
794 BufferId to = mSeq;
795 mStartSeq = mSeq;
796 // TODO: needsAck params
797 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
798 if (from != to) {
799 invalidate(true, from, to, impl);
800 }
801 }
802
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)803 void Accessor::Impl::invalidatorThread(
804 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
805 std::mutex &mutex,
806 std::condition_variable &cv,
807 bool &ready) {
808 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
809 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
810 constexpr useconds_t MAX_SLEEP_US = 10000;
811 uint32_t numSpin = 0;
812 useconds_t sleepUs = 1;
813
814 while(true) {
815 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
816 {
817 std::unique_lock<std::mutex> lock(mutex);
818 if (!ready) {
819 numSpin = 0;
820 sleepUs = 1;
821 cv.wait(lock);
822 }
823 copied.insert(accessors.begin(), accessors.end());
824 }
825 std::list<ConnectionId> erased;
826 for (auto it = copied.begin(); it != copied.end(); ++it) {
827 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
828 if (!impl) {
829 erased.push_back(it->first);
830 } else {
831 impl->handleInvalidateAck();
832 }
833 }
834 {
835 std::unique_lock<std::mutex> lock(mutex);
836 for (auto it = erased.begin(); it != erased.end(); ++it) {
837 accessors.erase(*it);
838 }
839 if (accessors.size() == 0) {
840 ready = false;
841 } else {
842 // TODO Use an efficient way to wait over FMQ.
843 // N.B. Since there is not a efficient way to wait over FMQ,
844 // polling over the FMQ is the current way to prevent draining
845 // CPU.
846 lock.unlock();
847 ++numSpin;
848 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
849 sleepUs < MAX_SLEEP_US) {
850 sleepUs *= 10;
851 }
852 if (numSpin % NUM_SPIN_TO_LOG == 0) {
853 ALOGW("invalidator thread spinning");
854 }
855 ::usleep(sleepUs);
856 }
857 }
858 }
859 }
860
AccessorInvalidator()861 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
862 std::thread invalidator(
863 invalidatorThread,
864 std::ref(mAccessors),
865 std::ref(mMutex),
866 std::ref(mCv),
867 std::ref(mReady));
868 invalidator.detach();
869 }
870
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)871 void Accessor::Impl::AccessorInvalidator::addAccessor(
872 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
873 bool notify = false;
874 std::unique_lock<std::mutex> lock(mMutex);
875 if (mAccessors.find(accessorId) == mAccessors.end()) {
876 if (!mReady) {
877 mReady = true;
878 notify = true;
879 }
880 mAccessors.insert(std::make_pair(accessorId, impl));
881 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
882 }
883 lock.unlock();
884 if (notify) {
885 mCv.notify_one();
886 }
887 }
888
delAccessor(uint32_t accessorId)889 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
890 std::lock_guard<std::mutex> lock(mMutex);
891 mAccessors.erase(accessorId);
892 ALOGV("buffer invalidation deleted bp:%u", accessorId);
893 if (mAccessors.size() == 0) {
894 mReady = false;
895 }
896 }
897
898 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
899
createInvalidator()900 void Accessor::Impl::createInvalidator() {
901 if (!sInvalidator) {
902 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
903 }
904 }
905
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)906 void Accessor::Impl::evictorThread(
907 std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
908 std::mutex &mutex,
909 std::condition_variable &cv) {
910 std::list<const std::weak_ptr<Accessor::Impl>> evictList;
911 while (true) {
912 int expired = 0;
913 int evicted = 0;
914 {
915 nsecs_t now = systemTime();
916 std::unique_lock<std::mutex> lock(mutex);
917 if (accessors.size() == 0) {
918 cv.wait(lock);
919 }
920 auto it = accessors.begin();
921 while (it != accessors.end()) {
922 if (now > (it->second + kEvictDurationNs)) {
923 ++expired;
924 evictList.push_back(it->first);
925 it = accessors.erase(it);
926 } else {
927 ++it;
928 }
929 }
930 }
931 // evict idle accessors;
932 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
933 const std::shared_ptr<Accessor::Impl> accessor = it->lock();
934 if (accessor) {
935 accessor->cleanUp(true);
936 ++evicted;
937 }
938 }
939 if (expired > 0) {
940 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
941 }
942 evictList.clear();
943 ::usleep(kEvictGranularityNs / 1000);
944 }
945 }
946
AccessorEvictor()947 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
948 std::thread evictor(
949 evictorThread,
950 std::ref(mAccessors),
951 std::ref(mMutex),
952 std::ref(mCv));
953 evictor.detach();
954 }
955
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)956 void Accessor::Impl::AccessorEvictor::addAccessor(
957 const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
958 std::lock_guard<std::mutex> lock(mMutex);
959 bool notify = mAccessors.empty();
960 auto it = mAccessors.find(impl);
961 if (it == mAccessors.end()) {
962 mAccessors.emplace(impl, ts);
963 } else {
964 it->second = ts;
965 }
966 if (notify) {
967 mCv.notify_one();
968 }
969 }
970
971 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
972
createEvictor()973 void Accessor::Impl::createEvictor() {
974 if (!sEvictor) {
975 sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
976 }
977 }
978
scheduleEvictIfNeeded()979 void Accessor::Impl::scheduleEvictIfNeeded() {
980 nsecs_t now = systemTime();
981
982 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
983 mScheduleEvictTs = now;
984 sEvictor->addAccessor(shared_from_this(), now);
985 }
986 }
987
988 } // namespace implementation
989 } // namespace V2_0
990 } // namespace bufferpool
991 } // namespace media
992 } // namespace hardware
993 } // namespace android
994