• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define LOG_TAG "BufferPoolManager"
17 //#define LOG_NDEBUG 0
18 
19 #include <bufferpool/ClientManager.h>
20 #include <hidl/HidlTransportSupport.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "BufferPoolClient.h"
26 #include "Observer.h"
27 #include "Accessor.h"
28 
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35 
36 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
37 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
38 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
39 
40 /**
41  * The holder of the cookie of remote IClientManager.
42  * The cookie is process locally unique for each IClientManager.
43  * (The cookie is used to notify death of clients to bufferpool process.)
44  */
45 class ClientManagerCookieHolder {
46 public:
47     /**
48      * Creates a cookie holder for remote IClientManager(s).
49      */
50     ClientManagerCookieHolder();
51 
52     /**
53      * Gets a cookie for a remote IClientManager.
54      *
55      * @param manager   the specified remote IClientManager.
56      * @param added     true when the specified remote IClientManager is added
57      *                  newly, false otherwise.
58      *
59      * @return the process locally unique cookie for the specified IClientManager.
60      */
61     uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
62 
63 private:
64     uint64_t mSeqId;
65     std::mutex mLock;
66     std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
67 };
68 
ClientManagerCookieHolder()69 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
70 
getCookie(const sp<IClientManager> & manager,bool * added)71 uint64_t ClientManagerCookieHolder::getCookie(
72         const sp<IClientManager> &manager,
73         bool *added) {
74     std::lock_guard<std::mutex> lock(mLock);
75     for (auto it = mManagers.begin(); it != mManagers.end();) {
76         const sp<IClientManager> key = it->first.promote();
77         if (key) {
78             if (interfacesEqual(key, manager)) {
79                 *added = false;
80                 return it->second;
81             }
82             ++it;
83         } else {
84             it = mManagers.erase(it);
85         }
86     }
87     uint64_t id = mSeqId++;
88     *added = true;
89     mManagers.push_back(std::make_pair(manager, id));
90     return id;
91 }
92 
93 class ClientManager::Impl {
94 public:
95     Impl();
96 
97     // BnRegisterSender
98     ResultStatus registerSender(const sp<IAccessor> &accessor,
99                                 ConnectionId *pConnectionId);
100 
101     // BpRegisterSender
102     ResultStatus registerSender(const sp<IClientManager> &receiver,
103                                 ConnectionId senderId,
104                                 ConnectionId *receiverId);
105 
106     ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
107                         ConnectionId *pConnectionId);
108 
109     ResultStatus close(ConnectionId connectionId);
110 
111     ResultStatus flush(ConnectionId connectionId);
112 
113     ResultStatus allocate(ConnectionId connectionId,
114                           const std::vector<uint8_t> &params,
115                           native_handle_t **handle,
116                           std::shared_ptr<BufferPoolData> *buffer);
117 
118     ResultStatus receive(ConnectionId connectionId,
119                          TransactionId transactionId,
120                          BufferId bufferId,
121                          int64_t timestampUs,
122                          native_handle_t **handle,
123                          std::shared_ptr<BufferPoolData> *buffer);
124 
125     ResultStatus postSend(ConnectionId receiverId,
126                           const std::shared_ptr<BufferPoolData> &buffer,
127                           TransactionId *transactionId,
128                           int64_t *timestampUs);
129 
130     ResultStatus getAccessor(ConnectionId connectionId,
131                              sp<IAccessor> *accessor);
132 
133     void cleanUp(bool clearCache = false);
134 
135 private:
136     // In order to prevent deadlock between multiple locks,
137     // always lock ClientCache.lock before locking ActiveClients.lock.
138     struct ClientCache {
139         // This lock is held for brief duration.
140         // Blocking operation is not performed while holding the lock.
141         std::mutex mMutex;
142         std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
143                 mClients;
144         std::condition_variable mConnectCv;
145         bool mConnecting;
146         int64_t mLastCleanUpUs;
147 
ClientCacheandroid::hardware::media::bufferpool::V2_0::implementation::ClientManager::Impl::ClientCache148         ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
149     } mCache;
150 
151     // Active clients which can be retrieved via ConnectionId
152     struct ActiveClients {
153         // This lock is held for brief duration.
154         // Blocking operation is not performed holding the lock.
155         std::mutex mMutex;
156         std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
157                 mClients;
158     } mActive;
159 
160     sp<Observer> mObserver;
161 
162     ClientManagerCookieHolder mRemoteClientCookies;
163 };
164 
Impl()165 ClientManager::Impl::Impl()
166     : mObserver(new Observer()) {}
167 
registerSender(const sp<IAccessor> & accessor,ConnectionId * pConnectionId)168 ResultStatus ClientManager::Impl::registerSender(
169         const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
170     cleanUp();
171     int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
172     do {
173         std::unique_lock<std::mutex> lock(mCache.mMutex);
174         for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
175             sp<IAccessor> sAccessor = it->first.promote();
176             if (sAccessor && interfacesEqual(sAccessor, accessor)) {
177                 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
178                 if (client) {
179                     std::lock_guard<std::mutex> lock(mActive.mMutex);
180                     *pConnectionId = client->getConnectionId();
181                     if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
182                         ALOGV("register existing connection %lld", (long long)*pConnectionId);
183                         return ResultStatus::ALREADY_EXISTS;
184                     }
185                 }
186                 mCache.mClients.erase(it);
187                 break;
188             }
189         }
190         if (!mCache.mConnecting) {
191             mCache.mConnecting = true;
192             lock.unlock();
193             ResultStatus result = ResultStatus::OK;
194             const std::shared_ptr<BufferPoolClient> client =
195                     std::make_shared<BufferPoolClient>(accessor, mObserver);
196             lock.lock();
197             if (!client) {
198                 result = ResultStatus::NO_MEMORY;
199             } else if (!client->isValid()) {
200                 result = ResultStatus::CRITICAL_ERROR;
201             }
202             if (result == ResultStatus::OK) {
203                 // TODO: handle insert fail. (malloc fail)
204                 const std::weak_ptr<BufferPoolClient> wclient = client;
205                 mCache.mClients.push_back(std::make_pair(accessor, wclient));
206                 ConnectionId conId = client->getConnectionId();
207                 mObserver->addClient(conId, wclient);
208                 {
209                     std::lock_guard<std::mutex> lock(mActive.mMutex);
210                     mActive.mClients.insert(std::make_pair(conId, client));
211                 }
212                 *pConnectionId = conId;
213                 ALOGV("register new connection %lld", (long long)*pConnectionId);
214             }
215             mCache.mConnecting = false;
216             lock.unlock();
217             mCache.mConnectCv.notify_all();
218             return result;
219         }
220         mCache.mConnectCv.wait_for(
221                 lock, std::chrono::microseconds(kRegisterTimeoutUs));
222     } while (getTimestampNow() < timeoutUs);
223     // TODO: return timeout error
224     return ResultStatus::CRITICAL_ERROR;
225 }
226 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)227 ResultStatus ClientManager::Impl::registerSender(
228         const sp<IClientManager> &receiver,
229         ConnectionId senderId,
230         ConnectionId *receiverId) {
231     sp<IAccessor> accessor;
232     bool local = false;
233     {
234         std::lock_guard<std::mutex> lock(mActive.mMutex);
235         auto it = mActive.mClients.find(senderId);
236         if (it == mActive.mClients.end()) {
237             return ResultStatus::NOT_FOUND;
238         }
239         it->second->getAccessor(&accessor);
240         local = it->second->isLocal();
241     }
242     ResultStatus rs = ResultStatus::CRITICAL_ERROR;
243     if (accessor) {
244        Return<void> transResult = receiver->registerSender(
245                 accessor,
246                 [&rs, receiverId](
247                         ResultStatus status,
248                         int64_t connectionId) {
249                     rs = status;
250                     *receiverId = connectionId;
251                 });
252         if (!transResult.isOk()) {
253             return ResultStatus::CRITICAL_ERROR;
254         } else if (local && rs == ResultStatus::OK) {
255             sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
256             if (recipient)  {
257                 ALOGV("client death recipient registered %lld", (long long)*receiverId);
258                 bool added;
259                 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
260                 recipient->addCookieToConnection(cookie, *receiverId);
261                 if (added) {
262                     Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
263                 }
264             }
265         }
266     }
267     return rs;
268 }
269 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)270 ResultStatus ClientManager::Impl::create(
271         const std::shared_ptr<BufferPoolAllocator> &allocator,
272         ConnectionId *pConnectionId) {
273     const sp<Accessor> accessor = new Accessor(allocator);
274     if (!accessor || !accessor->isValid()) {
275         return ResultStatus::CRITICAL_ERROR;
276     }
277     // TODO: observer is local. use direct call instead of hidl call.
278     std::shared_ptr<BufferPoolClient> client =
279             std::make_shared<BufferPoolClient>(accessor, mObserver);
280     if (!client || !client->isValid()) {
281         return ResultStatus::CRITICAL_ERROR;
282     }
283     // Since a new bufferpool is created, evict memories which are used by
284     // existing bufferpools and clients.
285     cleanUp(true);
286     {
287         // TODO: handle insert fail. (malloc fail)
288         std::lock_guard<std::mutex> lock(mCache.mMutex);
289         const std::weak_ptr<BufferPoolClient> wclient = client;
290         mCache.mClients.push_back(std::make_pair(accessor, wclient));
291         ConnectionId conId = client->getConnectionId();
292         mObserver->addClient(conId, wclient);
293         {
294             std::lock_guard<std::mutex> lock(mActive.mMutex);
295             mActive.mClients.insert(std::make_pair(conId, client));
296         }
297         *pConnectionId = conId;
298         ALOGV("create new connection %lld", (long long)*pConnectionId);
299     }
300     return ResultStatus::OK;
301 }
302 
close(ConnectionId connectionId)303 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
304     std::unique_lock<std::mutex> lock1(mCache.mMutex);
305     std::unique_lock<std::mutex> lock2(mActive.mMutex);
306     auto it = mActive.mClients.find(connectionId);
307     if (it != mActive.mClients.end()) {
308         sp<IAccessor> accessor;
309         it->second->getAccessor(&accessor);
310         std::shared_ptr<BufferPoolClient> closing = it->second;
311         mActive.mClients.erase(connectionId);
312         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
313             // clean up dead client caches
314             sp<IAccessor> cAccessor = cit->first.promote();
315             if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
316                 cit = mCache.mClients.erase(cit);
317             } else {
318                 cit++;
319             }
320         }
321         lock2.unlock();
322         lock1.unlock();
323         closing->flush();
324         return ResultStatus::OK;
325     }
326     return ResultStatus::NOT_FOUND;
327 }
328 
flush(ConnectionId connectionId)329 ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
330     std::shared_ptr<BufferPoolClient> client;
331     {
332         std::lock_guard<std::mutex> lock(mActive.mMutex);
333         auto it = mActive.mClients.find(connectionId);
334         if (it == mActive.mClients.end()) {
335             return ResultStatus::NOT_FOUND;
336         }
337         client = it->second;
338     }
339     return client->flush();
340 }
341 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)342 ResultStatus ClientManager::Impl::allocate(
343         ConnectionId connectionId, const std::vector<uint8_t> &params,
344         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
345     std::shared_ptr<BufferPoolClient> client;
346     {
347         std::lock_guard<std::mutex> lock(mActive.mMutex);
348         auto it = mActive.mClients.find(connectionId);
349         if (it == mActive.mClients.end()) {
350             return ResultStatus::NOT_FOUND;
351         }
352         client = it->second;
353     }
354     return client->allocate(params, handle, buffer);
355 }
356 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)357 ResultStatus ClientManager::Impl::receive(
358         ConnectionId connectionId, TransactionId transactionId,
359         BufferId bufferId, int64_t timestampUs,
360         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
361     std::shared_ptr<BufferPoolClient> client;
362     {
363         std::lock_guard<std::mutex> lock(mActive.mMutex);
364         auto it = mActive.mClients.find(connectionId);
365         if (it == mActive.mClients.end()) {
366             return ResultStatus::NOT_FOUND;
367         }
368         client = it->second;
369     }
370     return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
371 }
372 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)373 ResultStatus ClientManager::Impl::postSend(
374         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
375         TransactionId *transactionId, int64_t *timestampUs) {
376     ConnectionId connectionId = buffer->mConnectionId;
377     std::shared_ptr<BufferPoolClient> client;
378     {
379         std::lock_guard<std::mutex> lock(mActive.mMutex);
380         auto it = mActive.mClients.find(connectionId);
381         if (it == mActive.mClients.end()) {
382             return ResultStatus::NOT_FOUND;
383         }
384         client = it->second;
385     }
386     return client->postSend(receiverId, buffer, transactionId, timestampUs);
387 }
388 
getAccessor(ConnectionId connectionId,sp<IAccessor> * accessor)389 ResultStatus ClientManager::Impl::getAccessor(
390         ConnectionId connectionId, sp<IAccessor> *accessor) {
391     std::shared_ptr<BufferPoolClient> client;
392     {
393         std::lock_guard<std::mutex> lock(mActive.mMutex);
394         auto it = mActive.mClients.find(connectionId);
395         if (it == mActive.mClients.end()) {
396             return ResultStatus::NOT_FOUND;
397         }
398         client = it->second;
399     }
400     return client->getAccessor(accessor);
401 }
402 
cleanUp(bool clearCache)403 void ClientManager::Impl::cleanUp(bool clearCache) {
404     int64_t now = getTimestampNow();
405     int64_t lastTransactionUs;
406     std::lock_guard<std::mutex> lock1(mCache.mMutex);
407     if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
408         std::lock_guard<std::mutex> lock2(mActive.mMutex);
409         int cleaned = 0;
410         for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
411             if (!it->second->isActive(&lastTransactionUs, clearCache)) {
412                 if (lastTransactionUs + kClientTimeoutUs < now) {
413                     sp<IAccessor> accessor;
414                     it->second->getAccessor(&accessor);
415                     it = mActive.mClients.erase(it);
416                     ++cleaned;
417                     continue;
418                 }
419             }
420             ++it;
421         }
422         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
423             // clean up dead client caches
424             sp<IAccessor> cAccessor = cit->first.promote();
425             if (!cAccessor) {
426                 cit = mCache.mClients.erase(cit);
427             } else {
428                 ++cit;
429             }
430         }
431         ALOGV("# of cleaned connections: %d", cleaned);
432         mCache.mLastCleanUpUs = now;
433     }
434 }
435 
436 // Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow.
registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor> & bufferPool,registerSender_cb _hidl_cb)437 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
438     if (mImpl) {
439         ConnectionId connectionId = -1;
440         ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
441         _hidl_cb(status, connectionId);
442     } else {
443         _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
444     }
445     return Void();
446 }
447 
448 // Methods for local use.
449 sp<ClientManager> ClientManager::sInstance;
450 std::mutex ClientManager::sInstanceLock;
451 
getInstance()452 sp<ClientManager> ClientManager::getInstance() {
453     std::lock_guard<std::mutex> lock(sInstanceLock);
454     if (!sInstance) {
455         sInstance = new ClientManager();
456     }
457     Accessor::createInvalidator();
458     return sInstance;
459 }
460 
ClientManager()461 ClientManager::ClientManager() : mImpl(new Impl()) {}
462 
~ClientManager()463 ClientManager::~ClientManager() {
464 }
465 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)466 ResultStatus ClientManager::create(
467         const std::shared_ptr<BufferPoolAllocator> &allocator,
468         ConnectionId *pConnectionId) {
469     if (mImpl) {
470         return mImpl->create(allocator, pConnectionId);
471     }
472     return ResultStatus::CRITICAL_ERROR;
473 }
474 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)475 ResultStatus ClientManager::registerSender(
476         const sp<IClientManager> &receiver,
477         ConnectionId senderId,
478         ConnectionId *receiverId) {
479     if (mImpl) {
480         return mImpl->registerSender(receiver, senderId, receiverId);
481     }
482     return ResultStatus::CRITICAL_ERROR;
483 }
484 
close(ConnectionId connectionId)485 ResultStatus ClientManager::close(ConnectionId connectionId) {
486     if (mImpl) {
487         return mImpl->close(connectionId);
488     }
489     return ResultStatus::CRITICAL_ERROR;
490 }
491 
flush(ConnectionId connectionId)492 ResultStatus ClientManager::flush(ConnectionId connectionId) {
493     if (mImpl) {
494         return mImpl->flush(connectionId);
495     }
496     return ResultStatus::CRITICAL_ERROR;
497 }
498 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)499 ResultStatus ClientManager::allocate(
500         ConnectionId connectionId, const std::vector<uint8_t> &params,
501         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
502     if (mImpl) {
503         return mImpl->allocate(connectionId, params, handle, buffer);
504     }
505     return ResultStatus::CRITICAL_ERROR;
506 }
507 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)508 ResultStatus ClientManager::receive(
509         ConnectionId connectionId, TransactionId transactionId,
510         BufferId bufferId, int64_t timestampUs,
511         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
512     if (mImpl) {
513         return mImpl->receive(connectionId, transactionId, bufferId,
514                               timestampUs, handle, buffer);
515     }
516     return ResultStatus::CRITICAL_ERROR;
517 }
518 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)519 ResultStatus ClientManager::postSend(
520         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
521         TransactionId *transactionId, int64_t* timestampUs) {
522     if (mImpl && buffer) {
523         return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
524     }
525     return ResultStatus::CRITICAL_ERROR;
526 }
527 
cleanUp()528 void ClientManager::cleanUp() {
529     if (mImpl) {
530         mImpl->cleanUp(true);
531     }
532 }
533 
534 }  // namespace implementation
535 }  // namespace V2_0
536 }  // namespace bufferpool
537 }  // namespace media
538 }  // namespace hardware
539 }  // namespace android
540