1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamBufferCacheManager"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20
21 #include <cutils/native_handle.h>
22 #include <log/log.h>
23 #include <sync/sync.h>
24 #include <utils/Trace.h>
25
26 #include <chrono>
27
28 #include "stream_buffer_cache_manager.h"
29 #include "utils.h"
30
31 using namespace std::chrono_literals;
32
33 namespace android {
34 namespace google_camera_hal {
35
36 // For CTS testCameraDeviceCaptureFailure, it holds image buffers and hal hits
37 // refill buffer timeout. Large timeout time also results in close session time
38 // is larger than 5 second in this test case. Typical buffer request from
39 // provider(e.g. framework) usually takes 1~2 ms. Small timeout time here may
40 // cause more framedrop in certain cases. But large timeout time can lead to
41 // extra long delay of traffic(in both ways) between the framework and the layer
42 // below HWL.
43 static constexpr auto kBufferWaitingTimeOutSec = 400ms;
44
StreamBufferCacheManager()45 StreamBufferCacheManager::StreamBufferCacheManager() {
46 workload_thread_ = std::thread([this] { this->WorkloadThreadLoop(); });
47 if (utils::SupportRealtimeThread()) {
48 status_t res = utils::SetRealtimeThread(workload_thread_.native_handle());
49 if (res != OK) {
50 ALOGE("%s: SetRealtimeThread fail", __FUNCTION__);
51 } else {
52 ALOGI("%s: SetRealtimeThread OK", __FUNCTION__);
53 }
54 }
55 }
56
~StreamBufferCacheManager()57 StreamBufferCacheManager::~StreamBufferCacheManager() {
58 ALOGI("%s: Destroying stream buffer cache manager.", __FUNCTION__);
59 {
60 std::lock_guard<std::mutex> lock(workload_mutex_);
61 workload_thread_exiting_ = true;
62 }
63 workload_cv_.notify_one();
64 workload_thread_.join();
65 }
66
Create()67 std::unique_ptr<StreamBufferCacheManager> StreamBufferCacheManager::Create() {
68 ATRACE_CALL();
69
70 auto manager =
71 std::unique_ptr<StreamBufferCacheManager>(new StreamBufferCacheManager());
72 if (manager == nullptr) {
73 ALOGE("%s: Failed to create stream buffer cache manager.", __FUNCTION__);
74 return nullptr;
75 }
76
77 manager->dummy_buffer_allocator_ = GrallocBufferAllocator::Create();
78 if (manager->dummy_buffer_allocator_ == nullptr) {
79 ALOGE("%s: Failed to create gralloc buffer allocator", __FUNCTION__);
80 return nullptr;
81 }
82
83 ALOGI("%s: Created StreamBufferCacheManager.", __FUNCTION__);
84
85 return manager;
86 }
87
RegisterStream(const StreamBufferCacheRegInfo & reg_info)88 status_t StreamBufferCacheManager::RegisterStream(
89 const StreamBufferCacheRegInfo& reg_info) {
90 ATRACE_CALL();
91 if (reg_info.request_func == nullptr || reg_info.return_func == nullptr) {
92 ALOGE("%s: Can't register stream, request or return function is nullptr.",
93 __FUNCTION__);
94 return BAD_VALUE;
95 }
96
97 if (reg_info.num_buffers_to_cache != 1) {
98 ALOGE("%s: Only support caching one buffer.", __FUNCTION__);
99 return BAD_VALUE;
100 }
101
102 std::lock_guard<std::mutex> lock(caches_map_mutex_);
103 if (stream_buffer_caches_.find(reg_info.stream_id) !=
104 stream_buffer_caches_.end()) {
105 ALOGE("%s: Stream %d has been registered.", __FUNCTION__,
106 reg_info.stream_id);
107 return INVALID_OPERATION;
108 }
109
110 status_t res = AddStreamBufferCacheLocked(reg_info);
111 if (res != OK) {
112 ALOGE("%s: Failed to add stream buffer cache.", __FUNCTION__);
113 return UNKNOWN_ERROR;
114 }
115 return OK;
116 }
117
GetStreamBuffer(int32_t stream_id,StreamBufferRequestResult * res)118 status_t StreamBufferCacheManager::GetStreamBuffer(
119 int32_t stream_id, StreamBufferRequestResult* res) {
120 ATRACE_CALL();
121
122 StreamBufferCache* stream_buffer_cache = nullptr;
123 status_t result = GetStreamBufferCache(stream_id, &stream_buffer_cache);
124 if (result != OK) {
125 ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
126 return result;
127 }
128
129 result = stream_buffer_cache->GetBuffer(res);
130 if (result != OK) {
131 ALOGE("%s: Get buffer for stream %d failed.", __FUNCTION__, stream_id);
132 return UNKNOWN_ERROR;
133 }
134
135 {
136 int fence_status = 0;
137 if (res->buffer.acquire_fence != nullptr) {
138 native_handle_t* fence_handle =
139 const_cast<native_handle_t*>(res->buffer.acquire_fence);
140 if (fence_handle->numFds == 1) {
141 fence_status = sync_wait(fence_handle->data[0], kSyncWaitTimeMs);
142 }
143 if (0 != fence_status) {
144 ALOGE("%s: Fence check failed.", __FUNCTION__);
145 }
146 native_handle_close(fence_handle);
147 native_handle_delete(fence_handle);
148 res->buffer.acquire_fence = nullptr;
149 }
150 }
151
152 NotifyThreadWorkload();
153 return OK;
154 }
155
NotifyProviderReadiness(int32_t stream_id)156 status_t StreamBufferCacheManager::NotifyProviderReadiness(int32_t stream_id) {
157 StreamBufferCache* stream_buffer_cache = nullptr;
158 status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
159 if (res != OK) {
160 ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
161 return res;
162 }
163
164 stream_buffer_cache->SetManagerState(/*active=*/true);
165
166 NotifyThreadWorkload();
167 return OK;
168 }
169
NotifyFlushingAll()170 status_t StreamBufferCacheManager::NotifyFlushingAll() {
171 // Mark all StreamBufferCache as need to be flushed
172 std::vector<StreamBufferCache*> stream_buffer_caches;
173 {
174 std::lock_guard<std::mutex> map_lock(caches_map_mutex_);
175 for (auto& [stream_id, stream_buffer_cache] : stream_buffer_caches_) {
176 stream_buffer_caches.push_back(stream_buffer_cache.get());
177 }
178 }
179
180 {
181 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
182 for (auto& stream_buffer_cache : stream_buffer_caches) {
183 stream_buffer_cache->SetManagerState(/*active=*/false);
184 }
185 }
186
187 NotifyThreadWorkload();
188 return OK;
189 }
190
IsStreamActive(int32_t stream_id,bool * is_active)191 status_t StreamBufferCacheManager::IsStreamActive(int32_t stream_id,
192 bool* is_active) {
193 StreamBufferCache* stream_buffer_cache = nullptr;
194 status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
195 if (res != OK) {
196 ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
197 return res;
198 }
199
200 *is_active = !stream_buffer_cache->IsStreamDeactivated();
201 return OK;
202 }
203
AddStreamBufferCacheLocked(const StreamBufferCacheRegInfo & reg_info)204 status_t StreamBufferCacheManager::AddStreamBufferCacheLocked(
205 const StreamBufferCacheRegInfo& reg_info) {
206 auto stream_buffer_cache = StreamBufferCacheManager::StreamBufferCache::Create(
207 reg_info, [this] { this->NotifyThreadWorkload(); },
208 dummy_buffer_allocator_.get());
209 if (stream_buffer_cache == nullptr) {
210 ALOGE("%s: Failed to create StreamBufferCache for stream %d", __FUNCTION__,
211 reg_info.stream_id);
212 return UNKNOWN_ERROR;
213 }
214
215 stream_buffer_caches_[reg_info.stream_id] = std::move(stream_buffer_cache);
216 return OK;
217 }
218
WorkloadThreadLoop()219 void StreamBufferCacheManager::WorkloadThreadLoop() {
220 while (1) {
221 bool exiting = false;
222 {
223 std::unique_lock<std::mutex> thread_lock(workload_mutex_);
224 workload_cv_.wait(thread_lock, [this] {
225 return has_new_workload_ || workload_thread_exiting_;
226 });
227 has_new_workload_ = false;
228 exiting = workload_thread_exiting_;
229 }
230
231 std::vector<StreamBufferCacheManager::StreamBufferCache*> stream_buffer_caches;
232 {
233 std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
234 for (auto& [stream_id, cache] : stream_buffer_caches_) {
235 stream_buffer_caches.push_back(cache.get());
236 }
237 }
238
239 {
240 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
241 for (auto& stream_buffer_cache : stream_buffer_caches) {
242 status_t res = stream_buffer_cache->UpdateCache(exiting);
243 if (res != OK) {
244 ALOGE("%s: Updating(flush/refill) cache failed.", __FUNCTION__);
245 }
246 }
247 }
248
249 if (exiting) {
250 ALOGI("%s: Exiting stream buffer cache manager workload thread.",
251 __FUNCTION__);
252 return;
253 }
254 }
255 }
256
NotifyThreadWorkload()257 void StreamBufferCacheManager::NotifyThreadWorkload() {
258 {
259 std::lock_guard<std::mutex> lock(workload_mutex_);
260 has_new_workload_ = true;
261 }
262 workload_cv_.notify_one();
263 }
264
265 std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>
Create(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)266 StreamBufferCacheManager::StreamBufferCache::Create(
267 const StreamBufferCacheRegInfo& reg_info,
268 NotifyManagerThreadWorkloadFunc notify,
269 IHalBufferAllocator* dummy_buffer_allocator) {
270 if (notify == nullptr || dummy_buffer_allocator == nullptr) {
271 ALOGE("%s: notify is nullptr or dummy_buffer_allocator is nullptr.",
272 __FUNCTION__);
273 return nullptr;
274 }
275
276 auto cache = std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>(
277 new StreamBufferCacheManager::StreamBufferCache(reg_info, notify,
278 dummy_buffer_allocator));
279 if (cache == nullptr) {
280 ALOGE("%s: Failed to create stream buffer cache.", __FUNCTION__);
281 return nullptr;
282 }
283
284 return cache;
285 }
286
StreamBufferCache(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)287 StreamBufferCacheManager::StreamBufferCache::StreamBufferCache(
288 const StreamBufferCacheRegInfo& reg_info,
289 NotifyManagerThreadWorkloadFunc notify,
290 IHalBufferAllocator* dummy_buffer_allocator)
291 : cache_info_(reg_info) {
292 std::lock_guard<std::mutex> lock(cache_access_mutex_);
293 notify_for_workload_ = notify;
294 dummy_buffer_allocator_ = dummy_buffer_allocator;
295 }
296
UpdateCache(bool forced_flushing)297 status_t StreamBufferCacheManager::StreamBufferCache::UpdateCache(
298 bool forced_flushing) {
299 status_t res = OK;
300 std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
301 if (forced_flushing || !is_active_) {
302 res = FlushLocked(forced_flushing);
303 if (res != OK) {
304 ALOGE("%s: Failed to flush stream buffer cache for stream %d",
305 __FUNCTION__, cache_info_.stream_id);
306 return res;
307 }
308 } else if (RefillableLocked()) {
309 cache_lock.unlock();
310 res = Refill();
311 if (res != OK) {
312 ALOGE("%s: Failed to refill stream buffer cache for stream %d",
313 __FUNCTION__, cache_info_.stream_id);
314 return res;
315 }
316 }
317 return OK;
318 }
319
GetBuffer(StreamBufferRequestResult * res)320 status_t StreamBufferCacheManager::StreamBufferCache::GetBuffer(
321 StreamBufferRequestResult* res) {
322 std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
323
324 // 0. the buffer cache must be active
325 if (!is_active_) {
326 ALOGW("%s: The buffer cache for stream %d is not active.", __FUNCTION__,
327 cache_info_.stream_id);
328 return INVALID_OPERATION;
329 }
330
331 // 1. check if the cache is deactived
332 if (stream_deactived_) {
333 res->is_dummy_buffer = true;
334 res->buffer = dummy_buffer_;
335 return OK;
336 }
337
338 // 2. check if there is any buffer available in the cache. If not, try
339 // to wait for a short period and check again. In case of timeout, use the
340 // dummy buffer instead.
341 if (cached_buffers_.empty()) {
342 // In case the GetStreamBufer is called after NotifyFlushingAll, this will
343 // be the first event that should trigger the dedicated thread to restart
344 // and refill the caches. An extra notification of thread workload is
345 // harmless and will be bypassed very quickly.
346 cache_lock.unlock();
347 notify_for_workload_();
348 cache_lock.lock();
349 // Need to check this again since the state may change after the lock is
350 // acquired for the second time.
351 if (cached_buffers_.empty()) {
352 // Wait for a certain amount of time for the cache to be refilled
353 if (cache_access_cv_.wait_for(cache_lock, kBufferWaitingTimeOutSec) ==
354 std::cv_status::timeout) {
355 ALOGW("%s: StreamBufferCache for stream %d waiting for refill timeout.",
356 __FUNCTION__, cache_info_.stream_id);
357 }
358 }
359 }
360
361 // 3. use dummy buffer if the cache is still empty
362 if (cached_buffers_.empty()) {
363 // Only allocate dummy buffer for the first time
364 if (dummy_buffer_.buffer == nullptr) {
365 status_t result = AllocateDummyBufferLocked();
366 if (result != OK) {
367 ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
368 return UNKNOWN_ERROR;
369 }
370 }
371 res->is_dummy_buffer = true;
372 res->buffer = dummy_buffer_;
373 return OK;
374 } else {
375 res->is_dummy_buffer = false;
376 res->buffer = cached_buffers_.back();
377 cached_buffers_.pop_back();
378 }
379
380 return OK;
381 }
382
IsStreamDeactivated()383 bool StreamBufferCacheManager::StreamBufferCache::IsStreamDeactivated() {
384 std::unique_lock<std::mutex> lock(cache_access_mutex_);
385 return stream_deactived_;
386 }
387
SetManagerState(bool active)388 void StreamBufferCacheManager::StreamBufferCache::SetManagerState(bool active) {
389 std::unique_lock<std::mutex> lock(cache_access_mutex_);
390 is_active_ = active;
391 }
392
FlushLocked(bool forced_flushing)393 status_t StreamBufferCacheManager::StreamBufferCache::FlushLocked(
394 bool forced_flushing) {
395 if (is_active_ && !forced_flushing) {
396 ALOGI("%s: Active stream buffer cache is not notified for forced flushing.",
397 __FUNCTION__);
398 return INVALID_OPERATION;
399 }
400
401 if (cache_info_.return_func == nullptr) {
402 ALOGE("%s: return_func is nullptr.", __FUNCTION__);
403 return UNKNOWN_ERROR;
404 }
405
406 if (cached_buffers_.empty()) {
407 ALOGV("%s: Stream buffer cache is already empty.", __FUNCTION__);
408 ReleaseDummyBufferLocked();
409 return OK;
410 }
411
412 status_t res = cache_info_.return_func(cached_buffers_);
413 if (res != OK) {
414 ALOGE("%s: Failed to return buffers.", __FUNCTION__);
415 return res;
416 }
417
418 cached_buffers_.clear();
419 ReleaseDummyBufferLocked();
420
421 return OK;
422 }
423
Refill()424 status_t StreamBufferCacheManager::StreamBufferCache::Refill() {
425 int32_t num_buffers_to_acquire = 0;
426 {
427 std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
428 if (cache_info_.request_func == nullptr) {
429 ALOGE("%s: request_func is nullptr.", __FUNCTION__);
430 return UNKNOWN_ERROR;
431 }
432
433 if (!is_active_) {
434 ALOGI("%s: Buffer cache is not active.", __FUNCTION__);
435 return UNKNOWN_ERROR;
436 }
437
438 if (stream_deactived_) {
439 ALOGI("%s: Stream already deactived.", __FUNCTION__);
440 return OK;
441 }
442
443 if (cached_buffers_.size() >= cache_info_.num_buffers_to_cache) {
444 ALOGV("%s: Stream buffer cache is already full.", __FUNCTION__);
445 return INVALID_OPERATION;
446 }
447
448 num_buffers_to_acquire =
449 cache_info_.num_buffers_to_cache - cached_buffers_.size();
450 }
451
452 // Requesting buffer from the provider can take long(e.g. even > 1sec),
453 // consumer should not be blocked by this procedure and can get dummy buffer
454 // to unblock other pipelines. Thus, cache_access_mutex_ doesn't need to be
455 // locked here.
456 std::vector<StreamBuffer> buffers;
457 StreamBufferRequestError req_status = StreamBufferRequestError::kOk;
458 status_t res =
459 cache_info_.request_func(num_buffers_to_acquire, &buffers, &req_status);
460
461 std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
462 if (res != OK) {
463 status_t result = AllocateDummyBufferLocked();
464 if (result != OK) {
465 ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
466 return UNKNOWN_ERROR;
467 }
468 }
469
470 if (buffers.empty() || res != OK) {
471 ALOGW("%s: Failed to acquire buffer for stream %d, error %d", __FUNCTION__,
472 cache_info_.stream_id, req_status);
473 switch (req_status) {
474 case StreamBufferRequestError::kNoBufferAvailable:
475 case StreamBufferRequestError::kMaxBufferExceeded:
476 ALOGI(
477 "%s: No buffer available or max buffer exceeded for stream %d. "
478 "Will retry for next request or when refilling other streams.",
479 __FUNCTION__, cache_info_.stream_id);
480 break;
481 case StreamBufferRequestError::kStreamDisconnected:
482 case StreamBufferRequestError::kUnknownError:
483 ALOGW(
484 "%s: Stream %d is disconnected or unknown error observed."
485 "This stream is marked as inactive.",
486 __FUNCTION__, cache_info_.stream_id);
487 ALOGI("%s: Stream %d begin to use dummy buffer.", __FUNCTION__,
488 cache_info_.stream_id);
489 stream_deactived_ = true;
490 break;
491 default:
492 ALOGE("%s: Unknown error code: %d", __FUNCTION__, req_status);
493 break;
494 }
495 } else {
496 for (auto& buffer : buffers) {
497 cached_buffers_.push_back(buffer);
498 }
499 }
500
501 cache_access_cv_.notify_one();
502
503 return OK;
504 }
505
RefillableLocked() const506 bool StreamBufferCacheManager::StreamBufferCache::RefillableLocked() const {
507 // No need to refill if the buffer cache is not active
508 if (!is_active_) {
509 return false;
510 }
511
512 // Need to refill if the cache is not full
513 return cached_buffers_.size() < cache_info_.num_buffers_to_cache;
514 }
515
AllocateDummyBufferLocked()516 status_t StreamBufferCacheManager::StreamBufferCache::AllocateDummyBufferLocked() {
517 if (dummy_buffer_.buffer != nullptr) {
518 ALOGW("%s: Dummy buffer has already been allocated.", __FUNCTION__);
519 return OK;
520 }
521
522 HalBufferDescriptor hal_buffer_descriptor{
523 .stream_id = cache_info_.stream_id,
524 .width = cache_info_.width,
525 .height = cache_info_.height,
526 .format = cache_info_.format,
527 .producer_flags = cache_info_.producer_flags,
528 .consumer_flags = cache_info_.consumer_flags,
529 .immediate_num_buffers = 1,
530 .max_num_buffers = 1,
531 };
532 std::vector<buffer_handle_t> buffers;
533
534 status_t res =
535 dummy_buffer_allocator_->AllocateBuffers(hal_buffer_descriptor, &buffers);
536 if (res != OK) {
537 ALOGE("%s: Dummy buffer allocator AllocateBuffers failed.", __FUNCTION__);
538 return res;
539 }
540
541 if (buffers.size() != hal_buffer_descriptor.immediate_num_buffers) {
542 ALOGE("%s: Not enough buffers allocated.", __FUNCTION__);
543 return NO_MEMORY;
544 }
545 dummy_buffer_.stream_id = cache_info_.stream_id;
546 dummy_buffer_.buffer = buffers[0];
547 ALOGI("%s: [sbc] Dummy buffer allocated: strm %d buffer %p", __FUNCTION__,
548 dummy_buffer_.stream_id, dummy_buffer_.buffer);
549
550 return OK;
551 }
552
ReleaseDummyBufferLocked()553 void StreamBufferCacheManager::StreamBufferCache::ReleaseDummyBufferLocked() {
554 // Release dummy buffer if ever acquired from the dummy_buffer_allocator_.
555 if (dummy_buffer_.buffer != nullptr) {
556 std::vector<buffer_handle_t> buffers(1, dummy_buffer_.buffer);
557 dummy_buffer_allocator_->FreeBuffers(&buffers);
558 dummy_buffer_.buffer = nullptr;
559 }
560 }
561
GetStreamBufferCache(int32_t stream_id,StreamBufferCache ** stream_buffer_cache)562 status_t StreamBufferCacheManager::GetStreamBufferCache(
563 int32_t stream_id, StreamBufferCache** stream_buffer_cache) {
564 std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
565 if (stream_buffer_caches_.find(stream_id) == stream_buffer_caches_.end()) {
566 ALOGE("%s: Sream %d can not be found.", __FUNCTION__, stream_id);
567 return BAD_VALUE;
568 }
569
570 *stream_buffer_cache = stream_buffer_caches_[stream_id].get();
571 if (*stream_buffer_cache == nullptr) {
572 ALOGE("%s: Get null cache pointer.", __FUNCTION__);
573 return UNKNOWN_ERROR;
574 }
575 return OK;
576 }
577
578 } // namespace google_camera_hal
579 } // namespace android
580