• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamBufferCacheManager"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20 
21 #include <cutils/native_handle.h>
22 #include <log/log.h>
23 #include <sync/sync.h>
24 #include <utils/Trace.h>
25 
26 #include <chrono>
27 
28 #include "stream_buffer_cache_manager.h"
29 #include "utils.h"
30 
31 using namespace std::chrono_literals;
32 
33 namespace android {
34 namespace google_camera_hal {
35 
36 // For CTS testCameraDeviceCaptureFailure, it holds image buffers and hal hits
37 // refill buffer timeout. Large timeout time also results in close session time
38 // is larger than 5 second in this test case. Typical buffer request from
39 // provider(e.g. framework) usually takes 1~2 ms. Small timeout time here may
40 // cause more framedrop in certain cases. But large timeout time can lead to
41 // extra long delay of traffic(in both ways) between the framework and the layer
42 // below HWL.
43 static constexpr auto kBufferWaitingTimeOutSec = 400ms;
44 
StreamBufferCacheManager()45 StreamBufferCacheManager::StreamBufferCacheManager() {
46   workload_thread_ = std::thread([this] { this->WorkloadThreadLoop(); });
47   if (utils::SupportRealtimeThread()) {
48     status_t res = utils::SetRealtimeThread(workload_thread_.native_handle());
49     if (res != OK) {
50       ALOGE("%s: SetRealtimeThread fail", __FUNCTION__);
51     } else {
52       ALOGI("%s: SetRealtimeThread OK", __FUNCTION__);
53     }
54   }
55 }
56 
~StreamBufferCacheManager()57 StreamBufferCacheManager::~StreamBufferCacheManager() {
58   ALOGI("%s: Destroying stream buffer cache manager.", __FUNCTION__);
59   {
60     std::lock_guard<std::mutex> lock(workload_mutex_);
61     workload_thread_exiting_ = true;
62   }
63   workload_cv_.notify_one();
64   workload_thread_.join();
65 }
66 
Create()67 std::unique_ptr<StreamBufferCacheManager> StreamBufferCacheManager::Create() {
68   ATRACE_CALL();
69 
70   auto manager =
71       std::unique_ptr<StreamBufferCacheManager>(new StreamBufferCacheManager());
72   if (manager == nullptr) {
73     ALOGE("%s: Failed to create stream buffer cache manager.", __FUNCTION__);
74     return nullptr;
75   }
76 
77   manager->dummy_buffer_allocator_ = GrallocBufferAllocator::Create();
78   if (manager->dummy_buffer_allocator_ == nullptr) {
79     ALOGE("%s: Failed to create gralloc buffer allocator", __FUNCTION__);
80     return nullptr;
81   }
82 
83   ALOGI("%s: Created StreamBufferCacheManager.", __FUNCTION__);
84 
85   return manager;
86 }
87 
RegisterStream(const StreamBufferCacheRegInfo & reg_info)88 status_t StreamBufferCacheManager::RegisterStream(
89     const StreamBufferCacheRegInfo& reg_info) {
90   ATRACE_CALL();
91   if (reg_info.request_func == nullptr || reg_info.return_func == nullptr) {
92     ALOGE("%s: Can't register stream, request or return function is nullptr.",
93           __FUNCTION__);
94     return BAD_VALUE;
95   }
96 
97   if (reg_info.num_buffers_to_cache != 1) {
98     ALOGE("%s: Only support caching one buffer.", __FUNCTION__);
99     return BAD_VALUE;
100   }
101 
102   std::lock_guard<std::mutex> lock(caches_map_mutex_);
103   if (stream_buffer_caches_.find(reg_info.stream_id) !=
104       stream_buffer_caches_.end()) {
105     ALOGE("%s: Stream %d has been registered.", __FUNCTION__,
106           reg_info.stream_id);
107     return INVALID_OPERATION;
108   }
109 
110   status_t res = AddStreamBufferCacheLocked(reg_info);
111   if (res != OK) {
112     ALOGE("%s: Failed to add stream buffer cache.", __FUNCTION__);
113     return UNKNOWN_ERROR;
114   }
115   return OK;
116 }
117 
GetStreamBuffer(int32_t stream_id,StreamBufferRequestResult * res)118 status_t StreamBufferCacheManager::GetStreamBuffer(
119     int32_t stream_id, StreamBufferRequestResult* res) {
120   ATRACE_CALL();
121 
122   StreamBufferCache* stream_buffer_cache = nullptr;
123   status_t result = GetStreamBufferCache(stream_id, &stream_buffer_cache);
124   if (result != OK) {
125     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
126     return result;
127   }
128 
129   result = stream_buffer_cache->GetBuffer(res);
130   if (result != OK) {
131     ALOGE("%s: Get buffer for stream %d failed.", __FUNCTION__, stream_id);
132     return UNKNOWN_ERROR;
133   }
134 
135   {
136     int fence_status = 0;
137     if (res->buffer.acquire_fence != nullptr) {
138       native_handle_t* fence_handle =
139           const_cast<native_handle_t*>(res->buffer.acquire_fence);
140       if (fence_handle->numFds == 1) {
141         fence_status = sync_wait(fence_handle->data[0], kSyncWaitTimeMs);
142       }
143       if (0 != fence_status) {
144         ALOGE("%s: Fence check failed.", __FUNCTION__);
145       }
146       native_handle_close(fence_handle);
147       native_handle_delete(fence_handle);
148       res->buffer.acquire_fence = nullptr;
149     }
150   }
151 
152   NotifyThreadWorkload();
153   return OK;
154 }
155 
NotifyProviderReadiness(int32_t stream_id)156 status_t StreamBufferCacheManager::NotifyProviderReadiness(int32_t stream_id) {
157   StreamBufferCache* stream_buffer_cache = nullptr;
158   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
159   if (res != OK) {
160     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
161     return res;
162   }
163 
164   stream_buffer_cache->SetManagerState(/*active=*/true);
165 
166   NotifyThreadWorkload();
167   return OK;
168 }
169 
NotifyFlushingAll()170 status_t StreamBufferCacheManager::NotifyFlushingAll() {
171   // Mark all StreamBufferCache as need to be flushed
172   std::vector<StreamBufferCache*> stream_buffer_caches;
173   {
174     std::lock_guard<std::mutex> map_lock(caches_map_mutex_);
175     for (auto& [stream_id, stream_buffer_cache] : stream_buffer_caches_) {
176       stream_buffer_caches.push_back(stream_buffer_cache.get());
177     }
178   }
179 
180   {
181     std::unique_lock<std::mutex> flush_lock(flush_mutex_);
182     for (auto& stream_buffer_cache : stream_buffer_caches) {
183       stream_buffer_cache->SetManagerState(/*active=*/false);
184     }
185   }
186 
187   NotifyThreadWorkload();
188   return OK;
189 }
190 
IsStreamActive(int32_t stream_id,bool * is_active)191 status_t StreamBufferCacheManager::IsStreamActive(int32_t stream_id,
192                                                   bool* is_active) {
193   StreamBufferCache* stream_buffer_cache = nullptr;
194   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
195   if (res != OK) {
196     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
197     return res;
198   }
199 
200   *is_active = !stream_buffer_cache->IsStreamDeactivated();
201   return OK;
202 }
203 
AddStreamBufferCacheLocked(const StreamBufferCacheRegInfo & reg_info)204 status_t StreamBufferCacheManager::AddStreamBufferCacheLocked(
205     const StreamBufferCacheRegInfo& reg_info) {
206   auto stream_buffer_cache = StreamBufferCacheManager::StreamBufferCache::Create(
207       reg_info, [this] { this->NotifyThreadWorkload(); },
208       dummy_buffer_allocator_.get());
209   if (stream_buffer_cache == nullptr) {
210     ALOGE("%s: Failed to create StreamBufferCache for stream %d", __FUNCTION__,
211           reg_info.stream_id);
212     return UNKNOWN_ERROR;
213   }
214 
215   stream_buffer_caches_[reg_info.stream_id] = std::move(stream_buffer_cache);
216   return OK;
217 }
218 
WorkloadThreadLoop()219 void StreamBufferCacheManager::WorkloadThreadLoop() {
220   while (1) {
221     bool exiting = false;
222     {
223       std::unique_lock<std::mutex> thread_lock(workload_mutex_);
224       workload_cv_.wait(thread_lock, [this] {
225         return has_new_workload_ || workload_thread_exiting_;
226       });
227       has_new_workload_ = false;
228       exiting = workload_thread_exiting_;
229     }
230 
231     std::vector<StreamBufferCacheManager::StreamBufferCache*> stream_buffer_caches;
232     {
233       std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
234       for (auto& [stream_id, cache] : stream_buffer_caches_) {
235         stream_buffer_caches.push_back(cache.get());
236       }
237     }
238 
239     {
240       std::unique_lock<std::mutex> flush_lock(flush_mutex_);
241       for (auto& stream_buffer_cache : stream_buffer_caches) {
242         status_t res = stream_buffer_cache->UpdateCache(exiting);
243         if (res != OK) {
244           ALOGE("%s: Updating(flush/refill) cache failed.", __FUNCTION__);
245         }
246       }
247     }
248 
249     if (exiting) {
250       ALOGI("%s: Exiting stream buffer cache manager workload thread.",
251             __FUNCTION__);
252       return;
253     }
254   }
255 }
256 
NotifyThreadWorkload()257 void StreamBufferCacheManager::NotifyThreadWorkload() {
258   {
259     std::lock_guard<std::mutex> lock(workload_mutex_);
260     has_new_workload_ = true;
261   }
262   workload_cv_.notify_one();
263 }
264 
265 std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>
Create(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)266 StreamBufferCacheManager::StreamBufferCache::Create(
267     const StreamBufferCacheRegInfo& reg_info,
268     NotifyManagerThreadWorkloadFunc notify,
269     IHalBufferAllocator* dummy_buffer_allocator) {
270   if (notify == nullptr || dummy_buffer_allocator == nullptr) {
271     ALOGE("%s: notify is nullptr or dummy_buffer_allocator is nullptr.",
272           __FUNCTION__);
273     return nullptr;
274   }
275 
276   auto cache = std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>(
277       new StreamBufferCacheManager::StreamBufferCache(reg_info, notify,
278                                                       dummy_buffer_allocator));
279   if (cache == nullptr) {
280     ALOGE("%s: Failed to create stream buffer cache.", __FUNCTION__);
281     return nullptr;
282   }
283 
284   return cache;
285 }
286 
StreamBufferCache(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)287 StreamBufferCacheManager::StreamBufferCache::StreamBufferCache(
288     const StreamBufferCacheRegInfo& reg_info,
289     NotifyManagerThreadWorkloadFunc notify,
290     IHalBufferAllocator* dummy_buffer_allocator)
291     : cache_info_(reg_info) {
292   std::lock_guard<std::mutex> lock(cache_access_mutex_);
293   notify_for_workload_ = notify;
294   dummy_buffer_allocator_ = dummy_buffer_allocator;
295 }
296 
UpdateCache(bool forced_flushing)297 status_t StreamBufferCacheManager::StreamBufferCache::UpdateCache(
298     bool forced_flushing) {
299   status_t res = OK;
300   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
301   if (forced_flushing || !is_active_) {
302     res = FlushLocked(forced_flushing);
303     if (res != OK) {
304       ALOGE("%s: Failed to flush stream buffer cache for stream %d",
305             __FUNCTION__, cache_info_.stream_id);
306       return res;
307     }
308   } else if (RefillableLocked()) {
309     cache_lock.unlock();
310     res = Refill();
311     if (res != OK) {
312       ALOGE("%s: Failed to refill stream buffer cache for stream %d",
313             __FUNCTION__, cache_info_.stream_id);
314       return res;
315     }
316   }
317   return OK;
318 }
319 
GetBuffer(StreamBufferRequestResult * res)320 status_t StreamBufferCacheManager::StreamBufferCache::GetBuffer(
321     StreamBufferRequestResult* res) {
322   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
323 
324   // 0. the buffer cache must be active
325   if (!is_active_) {
326     ALOGW("%s: The buffer cache for stream %d is not active.", __FUNCTION__,
327           cache_info_.stream_id);
328     return INVALID_OPERATION;
329   }
330 
331   // 1. check if the cache is deactived
332   if (stream_deactived_) {
333     res->is_dummy_buffer = true;
334     res->buffer = dummy_buffer_;
335     return OK;
336   }
337 
338   // 2. check if there is any buffer available in the cache. If not, try
339   // to wait for a short period and check again. In case of timeout, use the
340   // dummy buffer instead.
341   if (cached_buffers_.empty()) {
342     // In case the GetStreamBufer is called after NotifyFlushingAll, this will
343     // be the first event that should trigger the dedicated thread to restart
344     // and refill the caches. An extra notification of thread workload is
345     // harmless and will be bypassed very quickly.
346     cache_lock.unlock();
347     notify_for_workload_();
348     cache_lock.lock();
349     // Need to check this again since the state may change after the lock is
350     // acquired for the second time.
351     if (cached_buffers_.empty()) {
352       // Wait for a certain amount of time for the cache to be refilled
353       if (cache_access_cv_.wait_for(cache_lock, kBufferWaitingTimeOutSec) ==
354           std::cv_status::timeout) {
355         ALOGW("%s: StreamBufferCache for stream %d waiting for refill timeout.",
356               __FUNCTION__, cache_info_.stream_id);
357       }
358     }
359   }
360 
361   // 3. use dummy buffer if the cache is still empty
362   if (cached_buffers_.empty()) {
363     // Only allocate dummy buffer for the first time
364     if (dummy_buffer_.buffer == nullptr) {
365       status_t result = AllocateDummyBufferLocked();
366       if (result != OK) {
367         ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
368         return UNKNOWN_ERROR;
369       }
370     }
371     res->is_dummy_buffer = true;
372     res->buffer = dummy_buffer_;
373     return OK;
374   } else {
375     res->is_dummy_buffer = false;
376     res->buffer = cached_buffers_.back();
377     cached_buffers_.pop_back();
378   }
379 
380   return OK;
381 }
382 
IsStreamDeactivated()383 bool StreamBufferCacheManager::StreamBufferCache::IsStreamDeactivated() {
384   std::unique_lock<std::mutex> lock(cache_access_mutex_);
385   return stream_deactived_;
386 }
387 
SetManagerState(bool active)388 void StreamBufferCacheManager::StreamBufferCache::SetManagerState(bool active) {
389   std::unique_lock<std::mutex> lock(cache_access_mutex_);
390   is_active_ = active;
391 }
392 
FlushLocked(bool forced_flushing)393 status_t StreamBufferCacheManager::StreamBufferCache::FlushLocked(
394     bool forced_flushing) {
395   if (is_active_ && !forced_flushing) {
396     ALOGI("%s: Active stream buffer cache is not notified for forced flushing.",
397           __FUNCTION__);
398     return INVALID_OPERATION;
399   }
400 
401   if (cache_info_.return_func == nullptr) {
402     ALOGE("%s: return_func is nullptr.", __FUNCTION__);
403     return UNKNOWN_ERROR;
404   }
405 
406   if (cached_buffers_.empty()) {
407     ALOGV("%s: Stream buffer cache is already empty.", __FUNCTION__);
408     ReleaseDummyBufferLocked();
409     return OK;
410   }
411 
412   status_t res = cache_info_.return_func(cached_buffers_);
413   if (res != OK) {
414     ALOGE("%s: Failed to return buffers.", __FUNCTION__);
415     return res;
416   }
417 
418   cached_buffers_.clear();
419   ReleaseDummyBufferLocked();
420 
421   return OK;
422 }
423 
Refill()424 status_t StreamBufferCacheManager::StreamBufferCache::Refill() {
425   int32_t num_buffers_to_acquire = 0;
426   {
427     std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
428     if (cache_info_.request_func == nullptr) {
429       ALOGE("%s: request_func is nullptr.", __FUNCTION__);
430       return UNKNOWN_ERROR;
431     }
432 
433     if (!is_active_) {
434       ALOGI("%s: Buffer cache is not active.", __FUNCTION__);
435       return UNKNOWN_ERROR;
436     }
437 
438     if (stream_deactived_) {
439       ALOGI("%s: Stream already deactived.", __FUNCTION__);
440       return OK;
441     }
442 
443     if (cached_buffers_.size() >= cache_info_.num_buffers_to_cache) {
444       ALOGV("%s: Stream buffer cache is already full.", __FUNCTION__);
445       return INVALID_OPERATION;
446     }
447 
448     num_buffers_to_acquire =
449         cache_info_.num_buffers_to_cache - cached_buffers_.size();
450   }
451 
452   // Requesting buffer from the provider can take long(e.g. even > 1sec),
453   // consumer should not be blocked by this procedure and can get dummy buffer
454   // to unblock other pipelines. Thus, cache_access_mutex_ doesn't need to be
455   // locked here.
456   std::vector<StreamBuffer> buffers;
457   StreamBufferRequestError req_status = StreamBufferRequestError::kOk;
458   status_t res =
459       cache_info_.request_func(num_buffers_to_acquire, &buffers, &req_status);
460 
461   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
462   if (res != OK) {
463     status_t result = AllocateDummyBufferLocked();
464     if (result != OK) {
465       ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
466       return UNKNOWN_ERROR;
467     }
468   }
469 
470   if (buffers.empty() || res != OK) {
471     ALOGW("%s: Failed to acquire buffer for stream %d, error %d", __FUNCTION__,
472           cache_info_.stream_id, req_status);
473     switch (req_status) {
474       case StreamBufferRequestError::kNoBufferAvailable:
475       case StreamBufferRequestError::kMaxBufferExceeded:
476         ALOGI(
477             "%s: No buffer available or max buffer exceeded for stream %d. "
478             "Will retry for next request or when refilling other streams.",
479             __FUNCTION__, cache_info_.stream_id);
480         break;
481       case StreamBufferRequestError::kStreamDisconnected:
482       case StreamBufferRequestError::kUnknownError:
483         ALOGW(
484             "%s: Stream %d is disconnected or unknown error observed."
485             "This stream is marked as inactive.",
486             __FUNCTION__, cache_info_.stream_id);
487         ALOGI("%s: Stream %d begin to use dummy buffer.", __FUNCTION__,
488               cache_info_.stream_id);
489         stream_deactived_ = true;
490         break;
491       default:
492         ALOGE("%s: Unknown error code: %d", __FUNCTION__, req_status);
493         break;
494     }
495   } else {
496     for (auto& buffer : buffers) {
497       cached_buffers_.push_back(buffer);
498     }
499   }
500 
501   cache_access_cv_.notify_one();
502 
503   return OK;
504 }
505 
RefillableLocked() const506 bool StreamBufferCacheManager::StreamBufferCache::RefillableLocked() const {
507   // No need to refill if the buffer cache is not active
508   if (!is_active_) {
509     return false;
510   }
511 
512   // Need to refill if the cache is not full
513   return cached_buffers_.size() < cache_info_.num_buffers_to_cache;
514 }
515 
AllocateDummyBufferLocked()516 status_t StreamBufferCacheManager::StreamBufferCache::AllocateDummyBufferLocked() {
517   if (dummy_buffer_.buffer != nullptr) {
518     ALOGW("%s: Dummy buffer has already been allocated.", __FUNCTION__);
519     return OK;
520   }
521 
522   HalBufferDescriptor hal_buffer_descriptor{
523       .stream_id = cache_info_.stream_id,
524       .width = cache_info_.width,
525       .height = cache_info_.height,
526       .format = cache_info_.format,
527       .producer_flags = cache_info_.producer_flags,
528       .consumer_flags = cache_info_.consumer_flags,
529       .immediate_num_buffers = 1,
530       .max_num_buffers = 1,
531   };
532   std::vector<buffer_handle_t> buffers;
533 
534   status_t res =
535       dummy_buffer_allocator_->AllocateBuffers(hal_buffer_descriptor, &buffers);
536   if (res != OK) {
537     ALOGE("%s: Dummy buffer allocator AllocateBuffers failed.", __FUNCTION__);
538     return res;
539   }
540 
541   if (buffers.size() != hal_buffer_descriptor.immediate_num_buffers) {
542     ALOGE("%s: Not enough buffers allocated.", __FUNCTION__);
543     return NO_MEMORY;
544   }
545   dummy_buffer_.stream_id = cache_info_.stream_id;
546   dummy_buffer_.buffer = buffers[0];
547   ALOGI("%s: [sbc] Dummy buffer allocated: strm %d buffer %p", __FUNCTION__,
548         dummy_buffer_.stream_id, dummy_buffer_.buffer);
549 
550   return OK;
551 }
552 
ReleaseDummyBufferLocked()553 void StreamBufferCacheManager::StreamBufferCache::ReleaseDummyBufferLocked() {
554   // Release dummy buffer if ever acquired from the dummy_buffer_allocator_.
555   if (dummy_buffer_.buffer != nullptr) {
556     std::vector<buffer_handle_t> buffers(1, dummy_buffer_.buffer);
557     dummy_buffer_allocator_->FreeBuffers(&buffers);
558     dummy_buffer_.buffer = nullptr;
559   }
560 }
561 
GetStreamBufferCache(int32_t stream_id,StreamBufferCache ** stream_buffer_cache)562 status_t StreamBufferCacheManager::GetStreamBufferCache(
563     int32_t stream_id, StreamBufferCache** stream_buffer_cache) {
564   std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
565   if (stream_buffer_caches_.find(stream_id) == stream_buffer_caches_.end()) {
566     ALOGE("%s: Sream %d can not be found.", __FUNCTION__, stream_id);
567     return BAD_VALUE;
568   }
569 
570   *stream_buffer_cache = stream_buffer_caches_[stream_id].get();
571   if (*stream_buffer_cache == nullptr) {
572     ALOGE("%s: Get null cache pointer.", __FUNCTION__);
573     return UNKNOWN_ERROR;
574   }
575   return OK;
576 }
577 
578 }  // namespace google_camera_hal
579 }  // namespace android
580