1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "AidlBufferPool"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "Accessor.h"
27 #include "BufferPool.h"
28 #include "Connection.h"
29 #include "DataHelper.h"
30
31 namespace aidl::android::hardware::media::bufferpool2::implementation {
32
33 namespace {
34 static constexpr int64_t kCleanUpDurationMs = 500; // 0.5 sec
35 static constexpr int64_t kLogDurationMs = 5000; // 5 secs
36
37 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
38 static constexpr size_t kMinBufferCountForEviction = 25;
39 static constexpr size_t kMaxUnusedBufferCount = 64;
40 static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
41 }
42
BufferPool()43 BufferPool::BufferPool()
44 : mTimestampMs(::android::elapsedRealtime()),
45 mLastCleanUpMs(mTimestampMs),
46 mLastLogMs(mTimestampMs),
47 mSeq(0),
48 mStartSeq(0) {
49 mValid = mInvalidationChannel.isValid();
50 }
51
52
53 // Statistics helper
54 template<typename T, typename S>
percentage(T base,S total)55 int percentage(T base, S total) {
56 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
57 }
58
59 std::atomic<std::uint32_t> BufferPool::Invalidation::sInvSeqId(0);
60
~BufferPool()61 BufferPool::~BufferPool() {
62 std::lock_guard<std::mutex> lock(mMutex);
63 ALOGD("Destruction - bufferpool2 %p "
64 "cached: %zu/%zuM, %zu/%d%% in use; "
65 "allocs: %zu, %d%% recycled; "
66 "transfers: %zu, %d%% unfetched",
67 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
68 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
69 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
70 mStats.mTotalTransfers,
71 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
72 }
73
onConnect(ConnectionId conId,const std::shared_ptr<IObserver> & observer)74 void BufferPool::Invalidation::onConnect(
75 ConnectionId conId, const std::shared_ptr<IObserver>& observer) {
76 mAcks[conId] = mInvalidationId; // starts from current invalidationId
77 mObservers.insert(std::make_pair(conId, observer));
78 }
79
onClose(ConnectionId conId)80 void BufferPool::Invalidation::onClose(ConnectionId conId) {
81 mAcks.erase(conId);
82 mObservers.erase(conId);
83 }
84
onAck(ConnectionId conId,uint32_t msgId)85 void BufferPool::Invalidation::onAck(
86 ConnectionId conId,
87 uint32_t msgId) {
88 auto it = mAcks.find(conId);
89 if (it == mAcks.end()) {
90 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
91 return;
92 }
93 if (isMessageLater(msgId, it->second)) {
94 mAcks[conId] = msgId;
95 }
96 }
97
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)98 void BufferPool::Invalidation::onBufferInvalidated(
99 BufferId bufferId,
100 BufferInvalidationChannel &channel) {
101 for (auto it = mPendings.begin(); it != mPendings.end();) {
102 if (it->isInvalidated(bufferId)) {
103 uint32_t msgId = 0;
104 if (it->mNeedsAck) {
105 if (mInvalidationId == UINT_MAX) {
106 // wrap happens;
107 mInvalidationId = 0;
108 }
109 msgId = ++mInvalidationId;
110 }
111 channel.postInvalidation(msgId, it->mFrom, it->mTo);
112 it = mPendings.erase(it);
113 continue;
114 }
115 ++it;
116 }
117 }
118
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor> & impl)119 void BufferPool::Invalidation::onInvalidationRequest(
120 bool needsAck,
121 uint32_t from,
122 uint32_t to,
123 size_t left,
124 BufferInvalidationChannel &channel,
125 const std::shared_ptr<Accessor> &impl) {
126 uint32_t msgId = 0;
127 if (needsAck) {
128 if (mInvalidationId == UINT_MAX) {
129 //wrap happens
130 mInvalidationId = 0;
131 }
132 msgId = ++mInvalidationId;
133 }
134 ALOGV("bufferpool2 invalidation requested and queued");
135 if (left == 0) {
136 channel.postInvalidation(msgId, from, to);
137 } else {
138 ALOGV("bufferpoo2 invalidation requested and pending");
139 Pending pending(needsAck, from, to, left, impl);
140 mPendings.push_back(pending);
141 }
142 Accessor::sInvalidator->addAccessor(mId, impl);
143 }
144
onHandleAck(std::map<ConnectionId,const std::shared_ptr<IObserver>> * observers,uint32_t * invalidationId)145 void BufferPool::Invalidation::onHandleAck(
146 std::map<ConnectionId, const std::shared_ptr<IObserver>> *observers,
147 uint32_t *invalidationId) {
148 if (mInvalidationId != 0) {
149 *invalidationId = mInvalidationId;
150 std::set<int> deads;
151 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
152 if (it->second != mInvalidationId) {
153 const std::shared_ptr<IObserver> observer = mObservers[it->first];
154 if (observer) {
155 observers->emplace(it->first, observer);
156 ALOGV("connection %lld will call observer (%u: %u)",
157 (long long)it->first, it->second, mInvalidationId);
158 // N.B: onMessage will be called later. ignore possibility of
159 // onMessage# oneway call being lost.
160 it->second = mInvalidationId;
161 } else {
162 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
163 deads.insert(it->first);
164 }
165 }
166 }
167 if (deads.size() > 0) {
168 for (auto it = deads.begin(); it != deads.end(); ++it) {
169 onClose(*it);
170 }
171 }
172 }
173 if (mPendings.size() == 0) {
174 // All invalidation Ids are synced and no more pending invalidations.
175 Accessor::sInvalidator->delAccessor(mId);
176 }
177 }
178
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)179 bool BufferPool::handleOwnBuffer(
180 ConnectionId connectionId, BufferId bufferId) {
181
182 bool added = insert(&mUsingBuffers, connectionId, bufferId);
183 if (added) {
184 auto iter = mBuffers.find(bufferId);
185 iter->second->mOwnerCount++;
186 }
187 insert(&mUsingConnections, bufferId, connectionId);
188 return added;
189 }
190
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)191 bool BufferPool::handleReleaseBuffer(
192 ConnectionId connectionId, BufferId bufferId) {
193 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
194 if (deleted) {
195 auto iter = mBuffers.find(bufferId);
196 iter->second->mOwnerCount--;
197 if (iter->second->mOwnerCount == 0 &&
198 iter->second->mTransactionCount == 0) {
199 if (!iter->second->mInvalidated) {
200 mStats.onBufferUnused(iter->second->mAllocSize);
201 mFreeBuffers.insert(bufferId);
202 } else {
203 mStats.onBufferUnused(iter->second->mAllocSize);
204 mStats.onBufferEvicted(iter->second->mAllocSize);
205 mBuffers.erase(iter);
206 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
207 }
208 }
209 }
210 erase(&mUsingConnections, bufferId, connectionId);
211 ALOGV("release buffer %u : %d", bufferId, deleted);
212 return deleted;
213 }
214
handleTransferTo(const BufferStatusMessage & message)215 bool BufferPool::handleTransferTo(const BufferStatusMessage &message) {
216 auto completed = mCompletedTransactions.find(
217 message.transactionId);
218 if (completed != mCompletedTransactions.end()) {
219 // already completed
220 mCompletedTransactions.erase(completed);
221 return true;
222 }
223 // the buffer should exist and be owned.
224 auto bufferIter = mBuffers.find(message.bufferId);
225 if (bufferIter == mBuffers.end() ||
226 !contains(&mUsingBuffers, message.connectionId, FromAidl(message.bufferId))) {
227 return false;
228 }
229 auto found = mTransactions.find(message.transactionId);
230 if (found != mTransactions.end()) {
231 // transfer_from was received earlier.
232 found->second->mSender = message.connectionId;
233 found->second->mSenderValidated = true;
234 return true;
235 }
236 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
237 // N.B: it could be fake or receive connection already closed.
238 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
239 this, (long long)message.targetConnectionId);
240 return false;
241 }
242 mStats.onBufferSent();
243 mTransactions.insert(std::make_pair(
244 message.transactionId,
245 std::make_unique<TransactionStatus>(message, mTimestampMs)));
246 insert(&mPendingTransactions, message.targetConnectionId,
247 FromAidl(message.transactionId));
248 bufferIter->second->mTransactionCount++;
249 return true;
250 }
251
handleTransferFrom(const BufferStatusMessage & message)252 bool BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
253 auto found = mTransactions.find(message.transactionId);
254 if (found == mTransactions.end()) {
255 // TODO: is it feasible to check ownership here?
256 mStats.onBufferSent();
257 mTransactions.insert(std::make_pair(
258 message.transactionId,
259 std::make_unique<TransactionStatus>(message, mTimestampMs)));
260 insert(&mPendingTransactions, message.connectionId,
261 FromAidl(message.transactionId));
262 auto bufferIter = mBuffers.find(message.bufferId);
263 bufferIter->second->mTransactionCount++;
264 } else {
265 if (message.connectionId == found->second->mReceiver) {
266 found->second->mStatus = BufferStatus::TRANSFER_FROM;
267 }
268 }
269 return true;
270 }
271
handleTransferResult(const BufferStatusMessage & message)272 bool BufferPool::handleTransferResult(const BufferStatusMessage &message) {
273 auto found = mTransactions.find(message.transactionId);
274 if (found != mTransactions.end()) {
275 bool deleted = erase(&mPendingTransactions, message.connectionId,
276 FromAidl(message.transactionId));
277 if (deleted) {
278 if (!found->second->mSenderValidated) {
279 mCompletedTransactions.insert(message.transactionId);
280 }
281 auto bufferIter = mBuffers.find(message.bufferId);
282 if (message.status == BufferStatus::TRANSFER_OK) {
283 handleOwnBuffer(message.connectionId, message.bufferId);
284 }
285 bufferIter->second->mTransactionCount--;
286 if (bufferIter->second->mOwnerCount == 0
287 && bufferIter->second->mTransactionCount == 0) {
288 if (!bufferIter->second->mInvalidated) {
289 mStats.onBufferUnused(bufferIter->second->mAllocSize);
290 mFreeBuffers.insert(message.bufferId);
291 } else {
292 mStats.onBufferUnused(bufferIter->second->mAllocSize);
293 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
294 mBuffers.erase(bufferIter);
295 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
296 }
297 }
298 mTransactions.erase(found);
299 }
300 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
301 message.bufferId, deleted);
302 return deleted;
303 }
304 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
305 message.bufferId);
306 return false;
307 }
308
processStatusMessages()309 void BufferPool::processStatusMessages() {
310 std::vector<BufferStatusMessage> messages;
311 mObserver.getBufferStatusChanges(messages);
312 mTimestampMs = ::android::elapsedRealtime();
313 for (BufferStatusMessage& message: messages) {
314 bool ret = false;
315 switch (message.status) {
316 case BufferStatus::NOT_USED:
317 ret = handleReleaseBuffer(
318 message.connectionId, message.bufferId);
319 break;
320 case BufferStatus::USED:
321 // not happening
322 break;
323 case BufferStatus::TRANSFER_TO:
324 ret = handleTransferTo(message);
325 break;
326 case BufferStatus::TRANSFER_FROM:
327 ret = handleTransferFrom(message);
328 break;
329 case BufferStatus::TRANSFER_TIMEOUT:
330 // TODO
331 break;
332 case BufferStatus::TRANSFER_LOST:
333 // TODO
334 break;
335 case BufferStatus::TRANSFER_FETCH:
336 // not happening
337 break;
338 case BufferStatus::TRANSFER_OK:
339 case BufferStatus::TRANSFER_ERROR:
340 ret = handleTransferResult(message);
341 break;
342 case BufferStatus::INVALIDATION_ACK:
343 mInvalidation.onAck(message.connectionId, message.bufferId);
344 ret = true;
345 break;
346 }
347 if (ret == false) {
348 ALOGW("buffer status message processing failure - message : %d connection : %lld",
349 message.status, (long long)message.connectionId);
350 }
351 }
352 messages.clear();
353 }
354
handleClose(ConnectionId connectionId)355 bool BufferPool::handleClose(ConnectionId connectionId) {
356 // Cleaning buffers
357 auto buffers = mUsingBuffers.find(connectionId);
358 if (buffers != mUsingBuffers.end()) {
359 for (const BufferId& bufferId : buffers->second) {
360 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
361 if (deleted) {
362 auto bufferIter = mBuffers.find(bufferId);
363 bufferIter->second->mOwnerCount--;
364 if (bufferIter->second->mOwnerCount == 0 &&
365 bufferIter->second->mTransactionCount == 0) {
366 // TODO: handle freebuffer insert fail
367 if (!bufferIter->second->mInvalidated) {
368 mStats.onBufferUnused(bufferIter->second->mAllocSize);
369 mFreeBuffers.insert(bufferId);
370 } else {
371 mStats.onBufferUnused(bufferIter->second->mAllocSize);
372 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
373 mBuffers.erase(bufferIter);
374 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
375 }
376 }
377 }
378 }
379 mUsingBuffers.erase(buffers);
380 }
381
382 // Cleaning transactions
383 auto pending = mPendingTransactions.find(connectionId);
384 if (pending != mPendingTransactions.end()) {
385 for (const TransactionId& transactionId : pending->second) {
386 auto iter = mTransactions.find(transactionId);
387 if (iter != mTransactions.end()) {
388 if (!iter->second->mSenderValidated) {
389 mCompletedTransactions.insert(transactionId);
390 }
391 BufferId bufferId = iter->second->mBufferId;
392 auto bufferIter = mBuffers.find(bufferId);
393 bufferIter->second->mTransactionCount--;
394 if (bufferIter->second->mOwnerCount == 0 &&
395 bufferIter->second->mTransactionCount == 0) {
396 // TODO: handle freebuffer insert fail
397 if (!bufferIter->second->mInvalidated) {
398 mStats.onBufferUnused(bufferIter->second->mAllocSize);
399 mFreeBuffers.insert(bufferId);
400 } else {
401 mStats.onBufferUnused(bufferIter->second->mAllocSize);
402 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
403 mBuffers.erase(bufferIter);
404 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
405 }
406 }
407 mTransactions.erase(iter);
408 }
409 }
410 }
411 mConnectionIds.erase(connectionId);
412 return true;
413 }
414
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)415 bool BufferPool::getFreeBuffer(
416 const std::shared_ptr<BufferPoolAllocator> &allocator,
417 const std::vector<uint8_t> ¶ms, BufferId *pId,
418 const native_handle_t** handle) {
419 auto bufferIt = mFreeBuffers.begin();
420 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
421 BufferId bufferId = *bufferIt;
422 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
423 break;
424 }
425 }
426 if (bufferIt != mFreeBuffers.end()) {
427 BufferId id = *bufferIt;
428 mFreeBuffers.erase(bufferIt);
429 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
430 *handle = mBuffers[id]->handle();
431 *pId = id;
432 ALOGV("recycle a buffer %u %p", id, *handle);
433 return true;
434 }
435 return false;
436 }
437
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)438 BufferPoolStatus BufferPool::addNewBuffer(
439 const std::shared_ptr<BufferPoolAllocation> &alloc,
440 const size_t allocSize,
441 const std::vector<uint8_t> ¶ms,
442 BufferId *pId,
443 const native_handle_t** handle) {
444
445 BufferId bufferId = mSeq++;
446 if (mSeq == Connection::SYNC_BUFFERID) {
447 mSeq = 0;
448 }
449 std::unique_ptr<InternalBuffer> buffer =
450 std::make_unique<InternalBuffer>(
451 bufferId, alloc, allocSize, params);
452 if (buffer) {
453 auto res = mBuffers.insert(std::make_pair(
454 bufferId, std::move(buffer)));
455 if (res.second) {
456 mStats.onBufferAllocated(allocSize);
457 *handle = alloc->handle();
458 *pId = bufferId;
459 return ResultStatus::OK;
460 }
461 }
462 return ResultStatus::NO_MEMORY;
463 }
464
cleanUp(bool clearCache)465 void BufferPool::cleanUp(bool clearCache) {
466 if (clearCache || mTimestampMs > mLastCleanUpMs + kCleanUpDurationMs ||
467 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
468 mLastCleanUpMs = mTimestampMs;
469 if (mTimestampMs > mLastLogMs + kLogDurationMs ||
470 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
471 mLastLogMs = mTimestampMs;
472 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
473 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
474 "%zu/%zu (fetch/transfer)",
475 this, mStats.mBuffersCached, mStats.mSizeCached,
476 mStats.mBuffersInUse, mStats.mSizeInUse,
477 mStats.mTotalRecycles, mStats.mTotalAllocations,
478 mStats.mTotalFetches, mStats.mTotalTransfers);
479 }
480 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
481 if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
482 (mStats.mSizeCached < kMinAllocBytesForEviction ||
483 mBuffers.size() < kMinBufferCountForEviction)) {
484 break;
485 }
486 auto it = mBuffers.find(*freeIt);
487 if (it != mBuffers.end() &&
488 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
489 mStats.onBufferEvicted(it->second->mAllocSize);
490 mBuffers.erase(it);
491 freeIt = mFreeBuffers.erase(freeIt);
492 } else {
493 ++freeIt;
494 ALOGW("bufferpool2 inconsistent!");
495 }
496 }
497 }
498 }
499
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor> & impl)500 void BufferPool::invalidate(
501 bool needsAck, BufferId from, BufferId to,
502 const std::shared_ptr<Accessor> &impl) {
503 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
504 if (isBufferInRange(from, to, *freeIt)) {
505 auto it = mBuffers.find(*freeIt);
506 if (it != mBuffers.end() &&
507 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
508 mStats.onBufferEvicted(it->second->mAllocSize);
509 mBuffers.erase(it);
510 freeIt = mFreeBuffers.erase(freeIt);
511 continue;
512 } else {
513 ALOGW("bufferpool2 inconsistent!");
514 }
515 }
516 ++freeIt;
517 }
518
519 size_t left = 0;
520 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
521 if (isBufferInRange(from, to, it->first)) {
522 it->second->invalidate();
523 ++left;
524 }
525 }
526 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
527 }
528
flush(const std::shared_ptr<Accessor> & impl)529 void BufferPool::flush(const std::shared_ptr<Accessor> &impl) {
530 BufferId from = mStartSeq;
531 BufferId to = mSeq;
532 mStartSeq = mSeq;
533 // TODO: needsAck params
534 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
535 if (from != to) {
536 invalidate(true, from, to, impl);
537 }
538 }
539
540 } // namespace aidl::android::hardware::media::bufferpool2::implementation
541