• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamBufferCacheManager"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20 
21 #include <cutils/native_handle.h>
22 #include <log/log.h>
23 #include <sync/sync.h>
24 #include <utils/Trace.h>
25 
26 #include <chrono>
27 
28 #include "stream_buffer_cache_manager.h"
29 #include "utils.h"
30 
31 using namespace std::chrono_literals;
32 
33 namespace android {
34 namespace google_camera_hal {
35 
36 // For CTS testCameraDeviceCaptureFailure, it holds image buffers and hal hits
37 // refill buffer timeout. Large timeout time also results in close session time
38 // is larger than 5 second in this test case. Typical buffer request from
39 // provider(e.g. framework) usually takes 1~2 ms. Small timeout time here may
40 // cause more framedrop in certain cases. But large timeout time can lead to
41 // extra long delay of traffic(in both ways) between the framework and the layer
42 // below HWL.
43 static constexpr auto kBufferWaitingTimeOutSec = 400ms;
44 
StreamBufferCacheManager()45 StreamBufferCacheManager::StreamBufferCacheManager() {
46   workload_thread_ = std::thread([this] { this->WorkloadThreadLoop(); });
47   if (utils::SupportRealtimeThread()) {
48     status_t res = utils::SetRealtimeThread(workload_thread_.native_handle());
49     if (res != OK) {
50       ALOGE("%s: SetRealtimeThread fail", __FUNCTION__);
51     } else {
52       ALOGI("%s: SetRealtimeThread OK", __FUNCTION__);
53     }
54   }
55 }
56 
~StreamBufferCacheManager()57 StreamBufferCacheManager::~StreamBufferCacheManager() {
58   ALOGI("%s: Destroying stream buffer cache manager.", __FUNCTION__);
59   {
60     std::lock_guard<std::mutex> lock(workload_mutex_);
61     workload_thread_exiting_ = true;
62   }
63   workload_cv_.notify_one();
64   workload_thread_.join();
65 }
66 
Create()67 std::unique_ptr<StreamBufferCacheManager> StreamBufferCacheManager::Create() {
68   ATRACE_CALL();
69 
70   auto manager =
71       std::unique_ptr<StreamBufferCacheManager>(new StreamBufferCacheManager());
72   if (manager == nullptr) {
73     ALOGE("%s: Failed to create stream buffer cache manager.", __FUNCTION__);
74     return nullptr;
75   }
76 
77   manager->dummy_buffer_allocator_ = GrallocBufferAllocator::Create();
78   if (manager->dummy_buffer_allocator_ == nullptr) {
79     ALOGE("%s: Failed to create gralloc buffer allocator", __FUNCTION__);
80     return nullptr;
81   }
82 
83   ALOGI("%s: Created StreamBufferCacheManager.", __FUNCTION__);
84 
85   return manager;
86 }
87 
RegisterStream(const StreamBufferCacheRegInfo & reg_info)88 status_t StreamBufferCacheManager::RegisterStream(
89     const StreamBufferCacheRegInfo& reg_info) {
90   ATRACE_CALL();
91   if (reg_info.request_func == nullptr || reg_info.return_func == nullptr) {
92     ALOGE("%s: Can't register stream, request or return function is nullptr.",
93           __FUNCTION__);
94     return BAD_VALUE;
95   }
96 
97   if (reg_info.num_buffers_to_cache != 1) {
98     ALOGE("%s: Only support caching one buffer.", __FUNCTION__);
99     return BAD_VALUE;
100   }
101 
102   std::lock_guard<std::mutex> lock(caches_map_mutex_);
103   if (stream_buffer_caches_.find(reg_info.stream_id) !=
104       stream_buffer_caches_.end()) {
105     ALOGE("%s: Stream %d has been registered.", __FUNCTION__,
106           reg_info.stream_id);
107     return INVALID_OPERATION;
108   }
109 
110   status_t res = AddStreamBufferCacheLocked(reg_info);
111   if (res != OK) {
112     ALOGE("%s: Failed to add stream buffer cache.", __FUNCTION__);
113     return UNKNOWN_ERROR;
114   }
115   return OK;
116 }
117 
GetStreamBuffer(int32_t stream_id,StreamBufferRequestResult * res)118 status_t StreamBufferCacheManager::GetStreamBuffer(
119     int32_t stream_id, StreamBufferRequestResult* res) {
120   ATRACE_CALL();
121 
122   StreamBufferCache* stream_buffer_cache = nullptr;
123   status_t result = GetStreamBufferCache(stream_id, &stream_buffer_cache);
124   if (result != OK) {
125     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
126     return result;
127   }
128 
129   result = stream_buffer_cache->GetBuffer(res);
130   if (result != OK) {
131     ALOGE("%s: Get buffer for stream %d failed.", __FUNCTION__, stream_id);
132     return UNKNOWN_ERROR;
133   }
134 
135   {
136     int fence_status = 0;
137     if (res->buffer.acquire_fence != nullptr) {
138       native_handle_t* fence_handle =
139           const_cast<native_handle_t*>(res->buffer.acquire_fence);
140       if (fence_handle->numFds == 1) {
141         fence_status = sync_wait(fence_handle->data[0], kSyncWaitTimeMs);
142       }
143       if (0 != fence_status) {
144         ALOGE("%s: Fence check failed.", __FUNCTION__);
145       }
146       native_handle_close(fence_handle);
147       native_handle_delete(fence_handle);
148       res->buffer.acquire_fence = nullptr;
149     }
150   }
151 
152   NotifyThreadWorkload();
153   return OK;
154 }
155 
NotifyProviderReadiness(int32_t stream_id)156 status_t StreamBufferCacheManager::NotifyProviderReadiness(int32_t stream_id) {
157   StreamBufferCache* stream_buffer_cache = nullptr;
158   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
159   if (res != OK) {
160     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
161     return res;
162   }
163 
164   stream_buffer_cache->NotifyProviderReadiness();
165 
166   NotifyThreadWorkload();
167   return OK;
168 }
169 
NotifyFlushingAll()170 status_t StreamBufferCacheManager::NotifyFlushingAll() {
171   // Mark all StreamBufferCache as need to be flushed
172   std::vector<StreamBufferCache*> stream_buffer_caches;
173   {
174     std::lock_guard<std::mutex> map_lock(caches_map_mutex_);
175     for (auto& [stream_id, stream_buffer_cache] : stream_buffer_caches_) {
176       stream_buffer_caches.push_back(stream_buffer_cache.get());
177     }
178   }
179 
180   {
181     std::unique_lock<std::mutex> flush_lock(flush_mutex_);
182     for (auto& stream_buffer_cache : stream_buffer_caches) {
183       stream_buffer_cache->NotifyFlushing();
184     }
185   }
186 
187   NotifyThreadWorkload();
188   return OK;
189 }
190 
IsStreamActive(int32_t stream_id,bool * is_active)191 status_t StreamBufferCacheManager::IsStreamActive(int32_t stream_id,
192                                                   bool* is_active) {
193   StreamBufferCache* stream_buffer_cache = nullptr;
194   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
195   if (res != OK) {
196     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
197     return res;
198   }
199 
200   *is_active = !stream_buffer_cache->IsStreamDeactivated();
201   return OK;
202 }
203 
AddStreamBufferCacheLocked(const StreamBufferCacheRegInfo & reg_info)204 status_t StreamBufferCacheManager::AddStreamBufferCacheLocked(
205     const StreamBufferCacheRegInfo& reg_info) {
206   auto stream_buffer_cache = StreamBufferCacheManager::StreamBufferCache::Create(
207       reg_info, [this] { this->NotifyThreadWorkload(); },
208       dummy_buffer_allocator_.get());
209   if (stream_buffer_cache == nullptr) {
210     ALOGE("%s: Failed to create StreamBufferCache for stream %d", __FUNCTION__,
211           reg_info.stream_id);
212     return UNKNOWN_ERROR;
213   }
214 
215   stream_buffer_caches_[reg_info.stream_id] = std::move(stream_buffer_cache);
216   return OK;
217 }
218 
WorkloadThreadLoop()219 void StreamBufferCacheManager::WorkloadThreadLoop() {
220   while (1) {
221     bool exiting = false;
222     {
223       std::unique_lock<std::mutex> thread_lock(workload_mutex_);
224       workload_cv_.wait(thread_lock, [this] {
225         return has_new_workload_ || workload_thread_exiting_;
226       });
227       has_new_workload_ = false;
228       exiting = workload_thread_exiting_;
229     }
230 
231     std::vector<StreamBufferCacheManager::StreamBufferCache*> stream_buffer_caches;
232     {
233       std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
234       for (auto& [stream_id, cache] : stream_buffer_caches_) {
235         stream_buffer_caches.push_back(cache.get());
236       }
237     }
238 
239     {
240       std::unique_lock<std::mutex> flush_lock(flush_mutex_);
241       for (auto& stream_buffer_cache : stream_buffer_caches) {
242         status_t res = stream_buffer_cache->UpdateCache(exiting);
243         if (res != OK) {
244           ALOGE("%s: Updating(flush/refill) cache failed.", __FUNCTION__);
245         }
246       }
247     }
248 
249     if (exiting) {
250       ALOGI("%s: Exiting stream buffer cache manager workload thread.",
251             __FUNCTION__);
252       return;
253     }
254   }
255 }
256 
NotifyThreadWorkload()257 void StreamBufferCacheManager::NotifyThreadWorkload() {
258   {
259     std::lock_guard<std::mutex> lock(workload_mutex_);
260     has_new_workload_ = true;
261   }
262   workload_cv_.notify_one();
263 }
264 
265 std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>
Create(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)266 StreamBufferCacheManager::StreamBufferCache::Create(
267     const StreamBufferCacheRegInfo& reg_info,
268     NotifyManagerThreadWorkloadFunc notify,
269     IHalBufferAllocator* dummy_buffer_allocator) {
270   if (notify == nullptr || dummy_buffer_allocator == nullptr) {
271     ALOGE("%s: notify is nullptr or dummy_buffer_allocator is nullptr.",
272           __FUNCTION__);
273     return nullptr;
274   }
275 
276   auto cache = std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>(
277       new StreamBufferCacheManager::StreamBufferCache(reg_info, notify,
278                                                       dummy_buffer_allocator));
279   if (cache == nullptr) {
280     ALOGE("%s: Failed to create stream buffer cache.", __FUNCTION__);
281     return nullptr;
282   }
283 
284   return cache;
285 }
286 
StreamBufferCache(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)287 StreamBufferCacheManager::StreamBufferCache::StreamBufferCache(
288     const StreamBufferCacheRegInfo& reg_info,
289     NotifyManagerThreadWorkloadFunc notify,
290     IHalBufferAllocator* dummy_buffer_allocator)
291     : cache_info_(reg_info) {
292   std::lock_guard<std::mutex> lock(cache_access_mutex_);
293   notify_for_workload_ = notify;
294   dummy_buffer_allocator_ = dummy_buffer_allocator;
295 }
296 
UpdateCache(bool forced_flushing)297 status_t StreamBufferCacheManager::StreamBufferCache::UpdateCache(
298     bool forced_flushing) {
299   status_t res = OK;
300   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
301   if (forced_flushing || notified_flushing_) {
302     res = FlushLocked(forced_flushing);
303     if (res != OK) {
304       ALOGE("%s: Failed to flush stream buffer cache for stream %d",
305             __FUNCTION__, cache_info_.stream_id);
306       return res;
307     }
308   } else if (RefillableLocked()) {
309     cache_lock.unlock();
310     res = Refill();
311     if (res != OK) {
312       ALOGE("%s: Failed to refill stream buffer cache for stream %d",
313             __FUNCTION__, cache_info_.stream_id);
314       return res;
315     }
316   }
317   return OK;
318 }
319 
GetBuffer(StreamBufferRequestResult * res)320 status_t StreamBufferCacheManager::StreamBufferCache::GetBuffer(
321     StreamBufferRequestResult* res) {
322   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
323 
324   // 0. the provider of the stream for this cache must be ready
325   if (!notified_provider_readiness_) {
326     ALOGW("%s: The provider of stream %d is not ready.", __FUNCTION__,
327           cache_info_.stream_id);
328     return INVALID_OPERATION;
329   }
330 
331   // 1. check if the cache is deactived or the stream has been notified for
332   // flushing.
333   if (stream_deactived_ || notified_flushing_) {
334     res->is_dummy_buffer = true;
335     res->buffer = dummy_buffer_;
336     return OK;
337   }
338 
339   // 2. check if there is any buffer available in the cache. If not, try
340   // to wait for a short period and check again. In case of timeout, use the
341   // dummy buffer instead.
342   if (cached_buffers_.empty()) {
343     // In case the GetStreamBufer is called after NotifyFlushingAll, this will
344     // be the first event that should trigger the dedicated thread to restart
345     // and refill the caches. An extra notification of thread workload is
346     // harmless and will be bypassed very quickly.
347     cache_lock.unlock();
348     notify_for_workload_();
349     cache_lock.lock();
350     // Need to check this again since the state may change after the lock is
351     // acquired for the second time.
352     if (cached_buffers_.empty()) {
353       // Wait for a certain amount of time for the cache to be refilled
354       if (cache_access_cv_.wait_for(cache_lock, kBufferWaitingTimeOutSec) ==
355           std::cv_status::timeout) {
356         ALOGW("%s: StreamBufferCache for stream %d waiting for refill timeout.",
357               __FUNCTION__, cache_info_.stream_id);
358       }
359     }
360   }
361 
362   // 3. use dummy buffer if the cache is still empty
363   if (cached_buffers_.empty()) {
364     // Only allocate dummy buffer for the first time
365     if (dummy_buffer_.buffer == nullptr) {
366       status_t result = AllocateDummyBufferLocked();
367       if (result != OK) {
368         ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
369         return UNKNOWN_ERROR;
370       }
371     }
372     res->is_dummy_buffer = true;
373     res->buffer = dummy_buffer_;
374     return OK;
375   } else {
376     res->is_dummy_buffer = false;
377     res->buffer = cached_buffers_.back();
378     cached_buffers_.pop_back();
379   }
380 
381   return OK;
382 }
383 
IsStreamDeactivated()384 bool StreamBufferCacheManager::StreamBufferCache::IsStreamDeactivated() {
385   std::unique_lock<std::mutex> lock(cache_access_mutex_);
386   return stream_deactived_;
387 }
388 
NotifyProviderReadiness()389 void StreamBufferCacheManager::StreamBufferCache::NotifyProviderReadiness() {
390   std::unique_lock<std::mutex> lock(cache_access_mutex_);
391   notified_provider_readiness_ = true;
392 }
393 
NotifyFlushing()394 void StreamBufferCacheManager::StreamBufferCache::NotifyFlushing() {
395   std::unique_lock<std::mutex> lock(cache_access_mutex_);
396   notified_flushing_ = true;
397 }
398 
FlushLocked(bool forced_flushing)399 status_t StreamBufferCacheManager::StreamBufferCache::FlushLocked(
400     bool forced_flushing) {
401   if (notified_flushing_ != true && !forced_flushing) {
402     ALOGI("%s: Stream buffer cache is not notified for flushing.", __FUNCTION__);
403     return INVALID_OPERATION;
404   }
405 
406   notified_flushing_ = false;
407   if (cache_info_.return_func == nullptr) {
408     ALOGE("%s: return_func is nullptr.", __FUNCTION__);
409     return UNKNOWN_ERROR;
410   }
411 
412   if (cached_buffers_.empty()) {
413     ALOGV("%s: Stream buffer cache is already empty.", __FUNCTION__);
414     ReleaseDummyBufferLocked();
415     return OK;
416   }
417 
418   status_t res = cache_info_.return_func(cached_buffers_);
419   if (res != OK) {
420     ALOGE("%s: Failed to return buffers.", __FUNCTION__);
421     return res;
422   }
423 
424   cached_buffers_.clear();
425   ReleaseDummyBufferLocked();
426 
427   return OK;
428 }
429 
Refill()430 status_t StreamBufferCacheManager::StreamBufferCache::Refill() {
431   int32_t num_buffers_to_acquire = 0;
432   {
433     std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
434     if (cache_info_.request_func == nullptr) {
435       ALOGE("%s: request_func is nullptr.", __FUNCTION__);
436       return UNKNOWN_ERROR;
437     }
438 
439     if (!notified_provider_readiness_) {
440       ALOGI("%s: Provider is not ready.", __FUNCTION__);
441       return UNKNOWN_ERROR;
442     }
443 
444     if (stream_deactived_ || notified_flushing_) {
445       ALOGI("%s: Already notified for flushing or stream already deactived.",
446             __FUNCTION__);
447       return OK;
448     }
449 
450     if (cached_buffers_.size() >= cache_info_.num_buffers_to_cache) {
451       ALOGV("%s: Stream buffer cache is already full.", __FUNCTION__);
452       return INVALID_OPERATION;
453     }
454 
455     num_buffers_to_acquire =
456         cache_info_.num_buffers_to_cache - cached_buffers_.size();
457   }
458 
459   // Requesting buffer from the provider can take long(e.g. even > 1sec),
460   // consumer should not be blocked by this procedure and can get dummy buffer
461   // to unblock other pipelines. Thus, cache_access_mutex_ doesn't need to be
462   // locked here.
463   std::vector<StreamBuffer> buffers;
464   StreamBufferRequestError req_status = StreamBufferRequestError::kOk;
465   status_t res =
466       cache_info_.request_func(num_buffers_to_acquire, &buffers, &req_status);
467 
468   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
469   if (res != OK) {
470     status_t result = AllocateDummyBufferLocked();
471     if (result != OK) {
472       ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
473       return UNKNOWN_ERROR;
474     }
475   }
476 
477   if (buffers.empty() || res != OK) {
478     ALOGW("%s: Failed to acquire buffer for stream %d, error %d", __FUNCTION__,
479           cache_info_.stream_id, req_status);
480     switch (req_status) {
481       case StreamBufferRequestError::kNoBufferAvailable:
482       case StreamBufferRequestError::kMaxBufferExceeded:
483         ALOGI(
484             "%s: No buffer available or max buffer exceeded for stream %d. "
485             "Will retry for next request or when refilling other streams.",
486             __FUNCTION__, cache_info_.stream_id);
487         break;
488       case StreamBufferRequestError::kStreamDisconnected:
489       case StreamBufferRequestError::kUnknownError:
490         ALOGW(
491             "%s: Stream %d is disconnected or unknown error observed."
492             "This stream is marked as inactive.",
493             __FUNCTION__, cache_info_.stream_id);
494         ALOGI("%s: Stream %d begin to use dummy buffer.", __FUNCTION__,
495               cache_info_.stream_id);
496         stream_deactived_ = true;
497         break;
498       default:
499         ALOGE("%s: Unknown error code: %d", __FUNCTION__, req_status);
500         break;
501     }
502   } else {
503     for (auto& buffer : buffers) {
504       cached_buffers_.push_back(buffer);
505     }
506   }
507 
508   cache_access_cv_.notify_one();
509 
510   return OK;
511 }
512 
RefillableLocked() const513 bool StreamBufferCacheManager::StreamBufferCache::RefillableLocked() const {
514   // No need to refill if the provider is not ready
515   if (!notified_provider_readiness_) {
516     return false;
517   }
518 
519   // No need to refill if the stream buffer cache is notified for flushing
520   if (notified_flushing_) {
521     return false;
522   }
523 
524   // Need to refill if the cache is not full
525   return cached_buffers_.size() < cache_info_.num_buffers_to_cache;
526 }
527 
AllocateDummyBufferLocked()528 status_t StreamBufferCacheManager::StreamBufferCache::AllocateDummyBufferLocked() {
529   if (dummy_buffer_.buffer != nullptr) {
530     ALOGW("%s: Dummy buffer has already been allocated.", __FUNCTION__);
531     return OK;
532   }
533 
534   HalBufferDescriptor hal_buffer_descriptor{
535       .stream_id = cache_info_.stream_id,
536       .width = cache_info_.width,
537       .height = cache_info_.height,
538       .format = cache_info_.format,
539       .producer_flags = cache_info_.producer_flags,
540       .consumer_flags = cache_info_.consumer_flags,
541       .immediate_num_buffers = 1,
542       .max_num_buffers = 1,
543   };
544   std::vector<buffer_handle_t> buffers;
545 
546   status_t res =
547       dummy_buffer_allocator_->AllocateBuffers(hal_buffer_descriptor, &buffers);
548   if (res != OK) {
549     ALOGE("%s: Dummy buffer allocator AllocateBuffers failed.", __FUNCTION__);
550     return res;
551   }
552 
553   if (buffers.size() != hal_buffer_descriptor.immediate_num_buffers) {
554     ALOGE("%s: Not enough buffers allocated.", __FUNCTION__);
555     return NO_MEMORY;
556   }
557   dummy_buffer_.stream_id = cache_info_.stream_id;
558   dummy_buffer_.buffer = buffers[0];
559   ALOGI("%s: [sbc] Dummy buffer allocated: strm %d buffer %p", __FUNCTION__,
560         dummy_buffer_.stream_id, dummy_buffer_.buffer);
561 
562   return OK;
563 }
564 
ReleaseDummyBufferLocked()565 void StreamBufferCacheManager::StreamBufferCache::ReleaseDummyBufferLocked() {
566   // Release dummy buffer if ever acquired from the dummy_buffer_allocator_.
567   if (dummy_buffer_.buffer != nullptr) {
568     std::vector<buffer_handle_t> buffers(1, dummy_buffer_.buffer);
569     dummy_buffer_allocator_->FreeBuffers(&buffers);
570     dummy_buffer_.buffer = nullptr;
571   }
572 }
573 
GetStreamBufferCache(int32_t stream_id,StreamBufferCache ** stream_buffer_cache)574 status_t StreamBufferCacheManager::GetStreamBufferCache(
575     int32_t stream_id, StreamBufferCache** stream_buffer_cache) {
576   std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
577   if (stream_buffer_caches_.find(stream_id) == stream_buffer_caches_.end()) {
578     ALOGE("%s: Sream %d can not be found.", __FUNCTION__, stream_id);
579     return BAD_VALUE;
580   }
581 
582   *stream_buffer_cache = stream_buffer_caches_[stream_id].get();
583   if (*stream_buffer_cache == nullptr) {
584     ALOGE("%s: Get null cache pointer.", __FUNCTION__);
585     return UNKNOWN_ERROR;
586   }
587   return OK;
588 }
589 
590 }  // namespace google_camera_hal
591 }  // namespace android
592