1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define LOG_TAG "BufferPoolManager"
17 //#define LOG_NDEBUG 0
18
19 #include <bufferpool/ClientManager.h>
20 #include <hidl/HidlTransportSupport.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "BufferPoolClient.h"
26 #include "Observer.h"
27 #include "Accessor.h"
28
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35
36 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
37 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
38 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
39
40 /**
41 * The holder of the cookie of remote IClientManager.
42 * The cookie is process locally unique for each IClientManager.
43 * (The cookie is used to notify death of clients to bufferpool process.)
44 */
45 class ClientManagerCookieHolder {
46 public:
47 /**
48 * Creates a cookie holder for remote IClientManager(s).
49 */
50 ClientManagerCookieHolder();
51
52 /**
53 * Gets a cookie for a remote IClientManager.
54 *
55 * @param manager the specified remote IClientManager.
56 * @param added true when the specified remote IClientManager is added
57 * newly, false otherwise.
58 *
59 * @return the process locally unique cookie for the specified IClientManager.
60 */
61 uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
62
63 private:
64 uint64_t mSeqId;
65 std::mutex mLock;
66 std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
67 };
68
ClientManagerCookieHolder()69 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
70
getCookie(const sp<IClientManager> & manager,bool * added)71 uint64_t ClientManagerCookieHolder::getCookie(
72 const sp<IClientManager> &manager,
73 bool *added) {
74 std::lock_guard<std::mutex> lock(mLock);
75 for (auto it = mManagers.begin(); it != mManagers.end();) {
76 const sp<IClientManager> key = it->first.promote();
77 if (key) {
78 if (interfacesEqual(key, manager)) {
79 *added = false;
80 return it->second;
81 }
82 ++it;
83 } else {
84 it = mManagers.erase(it);
85 }
86 }
87 uint64_t id = mSeqId++;
88 *added = true;
89 mManagers.push_back(std::make_pair(manager, id));
90 return id;
91 }
92
93 class ClientManager::Impl {
94 public:
95 Impl();
96
97 // BnRegisterSender
98 ResultStatus registerSender(const sp<IAccessor> &accessor,
99 ConnectionId *pConnectionId);
100
101 // BpRegisterSender
102 ResultStatus registerSender(const sp<IClientManager> &receiver,
103 ConnectionId senderId,
104 ConnectionId *receiverId);
105
106 ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
107 ConnectionId *pConnectionId);
108
109 ResultStatus close(ConnectionId connectionId);
110
111 ResultStatus flush(ConnectionId connectionId);
112
113 ResultStatus allocate(ConnectionId connectionId,
114 const std::vector<uint8_t> ¶ms,
115 native_handle_t **handle,
116 std::shared_ptr<BufferPoolData> *buffer);
117
118 ResultStatus receive(ConnectionId connectionId,
119 TransactionId transactionId,
120 BufferId bufferId,
121 int64_t timestampUs,
122 native_handle_t **handle,
123 std::shared_ptr<BufferPoolData> *buffer);
124
125 ResultStatus postSend(ConnectionId receiverId,
126 const std::shared_ptr<BufferPoolData> &buffer,
127 TransactionId *transactionId,
128 int64_t *timestampUs);
129
130 ResultStatus getAccessor(ConnectionId connectionId,
131 sp<IAccessor> *accessor);
132
133 void cleanUp(bool clearCache = false);
134
135 private:
136 // In order to prevent deadlock between multiple locks,
137 // always lock ClientCache.lock before locking ActiveClients.lock.
138 struct ClientCache {
139 // This lock is held for brief duration.
140 // Blocking operation is not performed while holding the lock.
141 std::mutex mMutex;
142 std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
143 mClients;
144 std::condition_variable mConnectCv;
145 bool mConnecting;
146 int64_t mLastCleanUpUs;
147
ClientCacheandroid::hardware::media::bufferpool::V2_0::implementation::ClientManager::Impl::ClientCache148 ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
149 } mCache;
150
151 // Active clients which can be retrieved via ConnectionId
152 struct ActiveClients {
153 // This lock is held for brief duration.
154 // Blocking operation is not performed holding the lock.
155 std::mutex mMutex;
156 std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
157 mClients;
158 } mActive;
159
160 sp<Observer> mObserver;
161
162 ClientManagerCookieHolder mRemoteClientCookies;
163 };
164
Impl()165 ClientManager::Impl::Impl()
166 : mObserver(new Observer()) {}
167
registerSender(const sp<IAccessor> & accessor,ConnectionId * pConnectionId)168 ResultStatus ClientManager::Impl::registerSender(
169 const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
170 cleanUp();
171 int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
172 do {
173 std::unique_lock<std::mutex> lock(mCache.mMutex);
174 for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
175 sp<IAccessor> sAccessor = it->first.promote();
176 if (sAccessor && interfacesEqual(sAccessor, accessor)) {
177 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
178 if (client) {
179 std::lock_guard<std::mutex> lock(mActive.mMutex);
180 *pConnectionId = client->getConnectionId();
181 if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
182 ALOGV("register existing connection %lld", (long long)*pConnectionId);
183 return ResultStatus::ALREADY_EXISTS;
184 }
185 }
186 mCache.mClients.erase(it);
187 break;
188 }
189 }
190 if (!mCache.mConnecting) {
191 mCache.mConnecting = true;
192 lock.unlock();
193 ResultStatus result = ResultStatus::OK;
194 const std::shared_ptr<BufferPoolClient> client =
195 std::make_shared<BufferPoolClient>(accessor, mObserver);
196 lock.lock();
197 if (!client) {
198 result = ResultStatus::NO_MEMORY;
199 } else if (!client->isValid()) {
200 result = ResultStatus::CRITICAL_ERROR;
201 }
202 if (result == ResultStatus::OK) {
203 // TODO: handle insert fail. (malloc fail)
204 const std::weak_ptr<BufferPoolClient> wclient = client;
205 mCache.mClients.push_back(std::make_pair(accessor, wclient));
206 ConnectionId conId = client->getConnectionId();
207 mObserver->addClient(conId, wclient);
208 {
209 std::lock_guard<std::mutex> lock(mActive.mMutex);
210 mActive.mClients.insert(std::make_pair(conId, client));
211 }
212 *pConnectionId = conId;
213 ALOGV("register new connection %lld", (long long)*pConnectionId);
214 }
215 mCache.mConnecting = false;
216 lock.unlock();
217 mCache.mConnectCv.notify_all();
218 return result;
219 }
220 mCache.mConnectCv.wait_for(
221 lock, std::chrono::microseconds(kRegisterTimeoutUs));
222 } while (getTimestampNow() < timeoutUs);
223 // TODO: return timeout error
224 return ResultStatus::CRITICAL_ERROR;
225 }
226
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)227 ResultStatus ClientManager::Impl::registerSender(
228 const sp<IClientManager> &receiver,
229 ConnectionId senderId,
230 ConnectionId *receiverId) {
231 sp<IAccessor> accessor;
232 bool local = false;
233 {
234 std::lock_guard<std::mutex> lock(mActive.mMutex);
235 auto it = mActive.mClients.find(senderId);
236 if (it == mActive.mClients.end()) {
237 return ResultStatus::NOT_FOUND;
238 }
239 it->second->getAccessor(&accessor);
240 local = it->second->isLocal();
241 }
242 ResultStatus rs = ResultStatus::CRITICAL_ERROR;
243 if (accessor) {
244 Return<void> transResult = receiver->registerSender(
245 accessor,
246 [&rs, receiverId](
247 ResultStatus status,
248 int64_t connectionId) {
249 rs = status;
250 *receiverId = connectionId;
251 });
252 if (!transResult.isOk()) {
253 return ResultStatus::CRITICAL_ERROR;
254 } else if (local && rs == ResultStatus::OK) {
255 sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
256 if (recipient) {
257 ALOGV("client death recipient registered %lld", (long long)*receiverId);
258 bool added;
259 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
260 recipient->addCookieToConnection(cookie, *receiverId);
261 if (added) {
262 Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
263 }
264 }
265 }
266 }
267 return rs;
268 }
269
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)270 ResultStatus ClientManager::Impl::create(
271 const std::shared_ptr<BufferPoolAllocator> &allocator,
272 ConnectionId *pConnectionId) {
273 const sp<Accessor> accessor = new Accessor(allocator);
274 if (!accessor || !accessor->isValid()) {
275 return ResultStatus::CRITICAL_ERROR;
276 }
277 // TODO: observer is local. use direct call instead of hidl call.
278 std::shared_ptr<BufferPoolClient> client =
279 std::make_shared<BufferPoolClient>(accessor, mObserver);
280 if (!client || !client->isValid()) {
281 return ResultStatus::CRITICAL_ERROR;
282 }
283 // Since a new bufferpool is created, evict memories which are used by
284 // existing bufferpools and clients.
285 cleanUp(true);
286 {
287 // TODO: handle insert fail. (malloc fail)
288 std::lock_guard<std::mutex> lock(mCache.mMutex);
289 const std::weak_ptr<BufferPoolClient> wclient = client;
290 mCache.mClients.push_back(std::make_pair(accessor, wclient));
291 ConnectionId conId = client->getConnectionId();
292 mObserver->addClient(conId, wclient);
293 {
294 std::lock_guard<std::mutex> lock(mActive.mMutex);
295 mActive.mClients.insert(std::make_pair(conId, client));
296 }
297 *pConnectionId = conId;
298 ALOGV("create new connection %lld", (long long)*pConnectionId);
299 }
300 return ResultStatus::OK;
301 }
302
close(ConnectionId connectionId)303 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
304 std::unique_lock<std::mutex> lock1(mCache.mMutex);
305 std::unique_lock<std::mutex> lock2(mActive.mMutex);
306 auto it = mActive.mClients.find(connectionId);
307 if (it != mActive.mClients.end()) {
308 sp<IAccessor> accessor;
309 it->second->getAccessor(&accessor);
310 std::shared_ptr<BufferPoolClient> closing = it->second;
311 mActive.mClients.erase(connectionId);
312 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
313 // clean up dead client caches
314 sp<IAccessor> cAccessor = cit->first.promote();
315 if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
316 cit = mCache.mClients.erase(cit);
317 } else {
318 cit++;
319 }
320 }
321 lock2.unlock();
322 lock1.unlock();
323 closing->flush();
324 return ResultStatus::OK;
325 }
326 return ResultStatus::NOT_FOUND;
327 }
328
flush(ConnectionId connectionId)329 ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
330 std::shared_ptr<BufferPoolClient> client;
331 {
332 std::lock_guard<std::mutex> lock(mActive.mMutex);
333 auto it = mActive.mClients.find(connectionId);
334 if (it == mActive.mClients.end()) {
335 return ResultStatus::NOT_FOUND;
336 }
337 client = it->second;
338 }
339 return client->flush();
340 }
341
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)342 ResultStatus ClientManager::Impl::allocate(
343 ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
344 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
345 std::shared_ptr<BufferPoolClient> client;
346 {
347 std::lock_guard<std::mutex> lock(mActive.mMutex);
348 auto it = mActive.mClients.find(connectionId);
349 if (it == mActive.mClients.end()) {
350 return ResultStatus::NOT_FOUND;
351 }
352 client = it->second;
353 }
354 return client->allocate(params, handle, buffer);
355 }
356
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)357 ResultStatus ClientManager::Impl::receive(
358 ConnectionId connectionId, TransactionId transactionId,
359 BufferId bufferId, int64_t timestampUs,
360 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
361 std::shared_ptr<BufferPoolClient> client;
362 {
363 std::lock_guard<std::mutex> lock(mActive.mMutex);
364 auto it = mActive.mClients.find(connectionId);
365 if (it == mActive.mClients.end()) {
366 return ResultStatus::NOT_FOUND;
367 }
368 client = it->second;
369 }
370 return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
371 }
372
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)373 ResultStatus ClientManager::Impl::postSend(
374 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
375 TransactionId *transactionId, int64_t *timestampUs) {
376 ConnectionId connectionId = buffer->mConnectionId;
377 std::shared_ptr<BufferPoolClient> client;
378 {
379 std::lock_guard<std::mutex> lock(mActive.mMutex);
380 auto it = mActive.mClients.find(connectionId);
381 if (it == mActive.mClients.end()) {
382 return ResultStatus::NOT_FOUND;
383 }
384 client = it->second;
385 }
386 return client->postSend(receiverId, buffer, transactionId, timestampUs);
387 }
388
getAccessor(ConnectionId connectionId,sp<IAccessor> * accessor)389 ResultStatus ClientManager::Impl::getAccessor(
390 ConnectionId connectionId, sp<IAccessor> *accessor) {
391 std::shared_ptr<BufferPoolClient> client;
392 {
393 std::lock_guard<std::mutex> lock(mActive.mMutex);
394 auto it = mActive.mClients.find(connectionId);
395 if (it == mActive.mClients.end()) {
396 return ResultStatus::NOT_FOUND;
397 }
398 client = it->second;
399 }
400 return client->getAccessor(accessor);
401 }
402
cleanUp(bool clearCache)403 void ClientManager::Impl::cleanUp(bool clearCache) {
404 int64_t now = getTimestampNow();
405 int64_t lastTransactionUs;
406 std::lock_guard<std::mutex> lock1(mCache.mMutex);
407 if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
408 std::lock_guard<std::mutex> lock2(mActive.mMutex);
409 int cleaned = 0;
410 for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
411 if (!it->second->isActive(&lastTransactionUs, clearCache)) {
412 if (lastTransactionUs + kClientTimeoutUs < now) {
413 sp<IAccessor> accessor;
414 it->second->getAccessor(&accessor);
415 it = mActive.mClients.erase(it);
416 ++cleaned;
417 continue;
418 }
419 }
420 ++it;
421 }
422 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
423 // clean up dead client caches
424 sp<IAccessor> cAccessor = cit->first.promote();
425 if (!cAccessor) {
426 cit = mCache.mClients.erase(cit);
427 } else {
428 ++cit;
429 }
430 }
431 ALOGV("# of cleaned connections: %d", cleaned);
432 mCache.mLastCleanUpUs = now;
433 }
434 }
435
436 // Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow.
registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor> & bufferPool,registerSender_cb _hidl_cb)437 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
438 if (mImpl) {
439 ConnectionId connectionId = -1;
440 ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
441 _hidl_cb(status, connectionId);
442 } else {
443 _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
444 }
445 return Void();
446 }
447
448 // Methods for local use.
449 sp<ClientManager> ClientManager::sInstance;
450 std::mutex ClientManager::sInstanceLock;
451
getInstance()452 sp<ClientManager> ClientManager::getInstance() {
453 std::lock_guard<std::mutex> lock(sInstanceLock);
454 if (!sInstance) {
455 sInstance = new ClientManager();
456 }
457 Accessor::createInvalidator();
458 return sInstance;
459 }
460
ClientManager()461 ClientManager::ClientManager() : mImpl(new Impl()) {}
462
~ClientManager()463 ClientManager::~ClientManager() {
464 }
465
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)466 ResultStatus ClientManager::create(
467 const std::shared_ptr<BufferPoolAllocator> &allocator,
468 ConnectionId *pConnectionId) {
469 if (mImpl) {
470 return mImpl->create(allocator, pConnectionId);
471 }
472 return ResultStatus::CRITICAL_ERROR;
473 }
474
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)475 ResultStatus ClientManager::registerSender(
476 const sp<IClientManager> &receiver,
477 ConnectionId senderId,
478 ConnectionId *receiverId) {
479 if (mImpl) {
480 return mImpl->registerSender(receiver, senderId, receiverId);
481 }
482 return ResultStatus::CRITICAL_ERROR;
483 }
484
close(ConnectionId connectionId)485 ResultStatus ClientManager::close(ConnectionId connectionId) {
486 if (mImpl) {
487 return mImpl->close(connectionId);
488 }
489 return ResultStatus::CRITICAL_ERROR;
490 }
491
flush(ConnectionId connectionId)492 ResultStatus ClientManager::flush(ConnectionId connectionId) {
493 if (mImpl) {
494 return mImpl->flush(connectionId);
495 }
496 return ResultStatus::CRITICAL_ERROR;
497 }
498
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)499 ResultStatus ClientManager::allocate(
500 ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
501 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
502 if (mImpl) {
503 return mImpl->allocate(connectionId, params, handle, buffer);
504 }
505 return ResultStatus::CRITICAL_ERROR;
506 }
507
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)508 ResultStatus ClientManager::receive(
509 ConnectionId connectionId, TransactionId transactionId,
510 BufferId bufferId, int64_t timestampUs,
511 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
512 if (mImpl) {
513 return mImpl->receive(connectionId, transactionId, bufferId,
514 timestampUs, handle, buffer);
515 }
516 return ResultStatus::CRITICAL_ERROR;
517 }
518
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)519 ResultStatus ClientManager::postSend(
520 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
521 TransactionId *transactionId, int64_t* timestampUs) {
522 if (mImpl && buffer) {
523 return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
524 }
525 return ResultStatus::CRITICAL_ERROR;
526 }
527
cleanUp()528 void ClientManager::cleanUp() {
529 if (mImpl) {
530 mImpl->cleanUp(true);
531 }
532 }
533
534 } // namespace implementation
535 } // namespace V2_0
536 } // namespace bufferpool
537 } // namespace media
538 } // namespace hardware
539 } // namespace android
540