1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "GCH_ResultDispatcher"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20
21 #include "result_dispatcher.h"
22
23 #include <inttypes.h>
24 #include <log/log.h>
25 #include <sys/resource.h>
26 #include <utils/Trace.h>
27
28 #include <string>
29 #include <string_view>
30
31 #include "hal_types.h"
32 #include "utils.h"
33
34 namespace android {
35 namespace google_camera_hal {
36
Create(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,NotifyBatchFunc notify_batch,const StreamConfiguration & stream_config,std::string_view name)37 std::unique_ptr<ResultDispatcher> ResultDispatcher::Create(
38 uint32_t partial_result_count,
39 ProcessCaptureResultFunc process_capture_result,
40 ProcessBatchCaptureResultFunc process_batch_capture_result,
41 NotifyFunc notify, NotifyBatchFunc notify_batch,
42 const StreamConfiguration& stream_config, std::string_view name) {
43 ATRACE_CALL();
44 auto dispatcher = std::make_unique<ResultDispatcher>(
45 partial_result_count, std::move(process_capture_result),
46 std::move(process_batch_capture_result), std::move(notify),
47 std::move(notify_batch), stream_config, name);
48 if (dispatcher == nullptr) {
49 ALOGE("[%s] %s: Creating ResultDispatcher failed.",
50 std::string(name).c_str(), __FUNCTION__);
51 return nullptr;
52 }
53
54 return dispatcher;
55 }
56
ResultDispatcher(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,NotifyBatchFunc notify_batch,const StreamConfiguration & stream_config,std::string_view name)57 ResultDispatcher::ResultDispatcher(
58 uint32_t partial_result_count,
59 ProcessCaptureResultFunc process_capture_result,
60 ProcessBatchCaptureResultFunc process_batch_capture_result,
61 NotifyFunc notify, NotifyBatchFunc notify_batch,
62 const StreamConfiguration& stream_config, std::string_view name)
63 : kPartialResultCount(partial_result_count),
64 name_(name),
65 process_capture_result_(std::move(process_capture_result)),
66 process_batch_capture_result_(std::move(process_batch_capture_result)),
67 notify_(std::move(notify)),
68 notify_batch_(std::move(notify_batch)) {
69 ATRACE_CALL();
70 pending_shutters_ = DispatchQueue<PendingShutter>(name_, "shutter");
71 pending_early_metadata_ =
72 DispatchQueue<PendingResultMetadata>(name_, "early result metadata");
73 pending_final_metadata_ =
74 DispatchQueue<PendingResultMetadata>(name_, "final result metadata");
75
76 notify_callback_thread_ =
77 std::thread([this] { this->NotifyCallbackThreadLoop(); });
78
79 // Assign higher priority to reduce the preemption when CPU usage is high
80 //
81 // As from b/295977499, we need to make it realtime for priority inheritance
82 // to avoid CameraServer thread being the bottleneck
83 status_t res =
84 utils::SetRealtimeThread(notify_callback_thread_.native_handle());
85 if (res != OK) {
86 ALOGE("[%s] %s: SetRealtimeThread fail", name_.c_str(), __FUNCTION__);
87 } else {
88 ALOGI("[%s] %s: SetRealtimeThread OK", name_.c_str(), __FUNCTION__);
89 }
90 InitializeGroupStreamIdsMap(stream_config);
91 }
92
~ResultDispatcher()93 ResultDispatcher::~ResultDispatcher() {
94 ATRACE_CALL();
95 {
96 std::unique_lock<std::mutex> lock(notify_callback_lock_);
97 notify_callback_thread_exiting_ = true;
98 }
99
100 notify_callback_condition_.notify_one();
101 notify_callback_thread_.join();
102 }
103
RemovePendingRequest(uint32_t frame_number)104 void ResultDispatcher::RemovePendingRequest(uint32_t frame_number) {
105 ATRACE_CALL();
106 std::lock_guard<std::mutex> lock(result_lock_);
107 RemovePendingRequestLocked(frame_number);
108 }
109
AddPendingRequest(const CaptureRequest & pending_request)110 status_t ResultDispatcher::AddPendingRequest(
111 const CaptureRequest& pending_request) {
112 ATRACE_CALL();
113 std::lock_guard<std::mutex> lock(result_lock_);
114
115 status_t res = AddPendingRequestLocked(pending_request);
116 if (res != OK) {
117 ALOGE("[%s] %s: Adding a pending request failed: %s(%d).", name_.c_str(),
118 __FUNCTION__, strerror(-res), res);
119 RemovePendingRequestLocked(pending_request.frame_number);
120 return res;
121 }
122
123 return OK;
124 }
125
AddPendingRequestLocked(const CaptureRequest & pending_request)126 status_t ResultDispatcher::AddPendingRequestLocked(
127 const CaptureRequest& pending_request) {
128 ATRACE_CALL();
129 uint32_t frame_number = pending_request.frame_number;
130 const RequestType request_type = pending_request.input_buffers.empty()
131 ? RequestType::kNormal
132 : RequestType::kReprocess;
133
134 status_t res = pending_shutters_.AddRequest(frame_number, request_type);
135 if (res != OK) {
136 ALOGE("[%s] %s: Adding pending shutter for frame %u failed: %s(%d)",
137 name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
138 return res;
139 }
140
141 res = pending_early_metadata_.AddRequest(frame_number, request_type);
142 if (res != OK) {
143 ALOGE("[%s] %s: Adding pending early metadata for frame %u failed: %s(%d)",
144 name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
145 return res;
146 }
147
148 res = pending_final_metadata_.AddRequest(frame_number, request_type);
149 if (res != OK) {
150 ALOGE("[%s] %s: Adding pending final metadata for frame %u failed: %s(%d)",
151 name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
152 return res;
153 }
154
155 for (auto& buffer : pending_request.input_buffers) {
156 res = AddPendingBufferLocked(frame_number, buffer, request_type);
157 if (res != OK) {
158 ALOGE("[%s] %s: Adding pending input buffer for frame %u failed: %s(%d)",
159 name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
160 return res;
161 }
162 }
163
164 for (auto& buffer : pending_request.output_buffers) {
165 res = AddPendingBufferLocked(frame_number, buffer, request_type);
166 if (res != OK) {
167 ALOGE("[%s] %s: Adding pending output buffer for frame %u failed: %s(%d)",
168 name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
169 return res;
170 }
171 }
172
173 return OK;
174 }
175
AddPendingBufferLocked(uint32_t frame_number,const StreamBuffer & buffer,RequestType request_type)176 status_t ResultDispatcher::AddPendingBufferLocked(uint32_t frame_number,
177 const StreamBuffer& buffer,
178 RequestType request_type) {
179 ATRACE_CALL();
180 StreamKey stream_key = CreateStreamKey(buffer.stream_id);
181 if (!stream_pending_buffers_map_.contains(stream_key)) {
182 stream_pending_buffers_map_[stream_key] = DispatchQueue<PendingBuffer>(
183 name_, "buffer of stream " + DumpStreamKey(stream_key));
184 }
185
186 return stream_pending_buffers_map_[stream_key].AddRequest(frame_number,
187 request_type);
188 }
189
RemovePendingRequestLocked(uint32_t frame_number)190 void ResultDispatcher::RemovePendingRequestLocked(uint32_t frame_number) {
191 ATRACE_CALL();
192 pending_shutters_.RemoveRequest(frame_number);
193 pending_early_metadata_.RemoveRequest(frame_number);
194 pending_final_metadata_.RemoveRequest(frame_number);
195
196 for (auto& pending_buffers : stream_pending_buffers_map_) {
197 pending_buffers.second.RemoveRequest(frame_number);
198 }
199 }
200
AddResultImpl(std::unique_ptr<CaptureResult> result)201 status_t ResultDispatcher::AddResultImpl(std::unique_ptr<CaptureResult> result) {
202 status_t res;
203 bool failed = false;
204 uint32_t frame_number = result->frame_number;
205
206 if (result->result_metadata != nullptr) {
207 res = AddResultMetadata(frame_number, std::move(result->result_metadata),
208 std::move(result->physical_metadata),
209 result->partial_result);
210 if (res != OK) {
211 ALOGE("[%s] %s: Adding result metadata failed: %s (%d)", name_.c_str(),
212 __FUNCTION__, strerror(-res), res);
213 failed = true;
214 }
215 }
216
217 for (auto& buffer : result->output_buffers) {
218 res = AddBuffer(frame_number, buffer, /*is_input=*/false);
219 if (res != OK) {
220 ALOGE("[%s] %s: Adding an output buffer failed: %s (%d)", name_.c_str(),
221 __FUNCTION__, strerror(-res), res);
222 failed = true;
223 }
224 }
225
226 for (auto& buffer : result->input_buffers) {
227 res = AddBuffer(frame_number, buffer, /*is_input=*/true);
228 if (res != OK) {
229 ALOGE("[%s] %s: Adding an input buffer failed: %s (%d)", name_.c_str(),
230 __FUNCTION__, strerror(-res), res);
231 failed = true;
232 }
233 }
234
235 return failed ? UNKNOWN_ERROR : OK;
236 }
237
AddResult(std::unique_ptr<CaptureResult> result)238 status_t ResultDispatcher::AddResult(std::unique_ptr<CaptureResult> result) {
239 ATRACE_CALL();
240 const status_t res = AddResultImpl(std::move(result));
241 {
242 std::unique_lock<std::mutex> lock(notify_callback_lock_);
243 is_result_shutter_updated_ = true;
244 notify_callback_condition_.notify_one();
245 }
246 return res;
247 }
248
AddBatchResult(std::vector<std::unique_ptr<CaptureResult>> results)249 status_t ResultDispatcher::AddBatchResult(
250 std::vector<std::unique_ptr<CaptureResult>> results) {
251 std::optional<status_t> last_error;
252 for (auto& result : results) {
253 const status_t res = AddResultImpl(std::move(result));
254 if (res != OK) {
255 last_error = res;
256 }
257 }
258 {
259 std::unique_lock<std::mutex> lock(notify_callback_lock_);
260 is_result_shutter_updated_ = true;
261 notify_callback_condition_.notify_one();
262 }
263 return last_error.value_or(OK);
264 }
265
AddShutterLocked(uint32_t frame_number,int64_t timestamp_ns,int64_t readout_timestamp_ns)266 status_t ResultDispatcher::AddShutterLocked(uint32_t frame_number,
267 int64_t timestamp_ns,
268 int64_t readout_timestamp_ns) {
269 status_t res = pending_shutters_.AddResult(
270 frame_number, PendingShutter{
271 .timestamp_ns = timestamp_ns,
272 .readout_timestamp_ns = readout_timestamp_ns,
273 .ready = true,
274 });
275 if (res != OK) {
276 ALOGE(
277 "[%s] %s: Failed to add shutter for frame %u , New timestamp "
278 "%" PRId64,
279 name_.c_str(), __FUNCTION__, frame_number, timestamp_ns);
280 }
281 return res;
282 }
283
AddShutter(const ShutterMessage & shutter)284 status_t ResultDispatcher::AddShutter(const ShutterMessage& shutter) {
285 ATRACE_CALL();
286
287 {
288 std::lock_guard lock(result_lock_);
289 if (status_t ret =
290 AddShutterLocked(shutter.frame_number, shutter.timestamp_ns,
291 shutter.readout_timestamp_ns);
292 ret != OK) {
293 return ret;
294 }
295 }
296 {
297 std::unique_lock<std::mutex> lock(notify_callback_lock_);
298 is_result_shutter_updated_ = true;
299 notify_callback_condition_.notify_one();
300 }
301 return OK;
302 }
303
AddBatchShutter(const std::vector<ShutterMessage> & shutters)304 status_t ResultDispatcher::AddBatchShutter(
305 const std::vector<ShutterMessage>& shutters) {
306 {
307 std::lock_guard lock(result_lock_);
308 for (const ShutterMessage& shutter : shutters) {
309 if (status_t ret =
310 AddShutterLocked(shutter.frame_number, shutter.timestamp_ns,
311 shutter.readout_timestamp_ns);
312 ret != OK) {
313 return ret;
314 }
315 }
316 }
317 {
318 std::unique_lock<std::mutex> lock(notify_callback_lock_);
319 is_result_shutter_updated_ = true;
320 notify_callback_condition_.notify_one();
321 }
322 return OK;
323 }
324
AddError(const ErrorMessage & error)325 status_t ResultDispatcher::AddError(const ErrorMessage& error) {
326 ATRACE_CALL();
327 std::lock_guard<std::mutex> lock(result_lock_);
328 uint32_t frame_number = error.frame_number;
329 // No need to deliver the shutter message on an error
330 if (error.error_code == ErrorCode::kErrorDevice ||
331 error.error_code == ErrorCode::kErrorResult ||
332 error.error_code == ErrorCode::kErrorRequest) {
333 pending_shutters_.RemoveRequest(frame_number);
334 }
335 // No need to deliver the result metadata on a result metadata error
336 if (error.error_code == ErrorCode::kErrorResult ||
337 error.error_code == ErrorCode::kErrorRequest) {
338 pending_early_metadata_.RemoveRequest(frame_number);
339 pending_final_metadata_.RemoveRequest(frame_number);
340 }
341
342 NotifyMessage message = {.type = MessageType::kError, .message.error = error};
343 ALOGV("[%s] %s: Notify error %u for frame %u stream %d", name_.c_str(),
344 __FUNCTION__, error.error_code, frame_number, error.error_stream_id);
345 notify_(message);
346
347 return OK;
348 }
349
MakeResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)350 std::unique_ptr<CaptureResult> ResultDispatcher::MakeResultMetadata(
351 uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
352 std::vector<PhysicalCameraMetadata> physical_metadata,
353 uint32_t partial_result) {
354 ATRACE_CALL();
355 auto result = std::make_unique<CaptureResult>(CaptureResult({}));
356 result->frame_number = frame_number;
357 result->result_metadata = std::move(metadata);
358 result->physical_metadata = std::move(physical_metadata);
359 result->partial_result = partial_result;
360 return result;
361 }
362
AddResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)363 status_t ResultDispatcher::AddResultMetadata(
364 uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
365 std::vector<PhysicalCameraMetadata> physical_metadata,
366 uint32_t partial_result) {
367 ATRACE_CALL();
368 if (metadata == nullptr) {
369 ALOGE("[%s] %s: metadata is nullptr.", name_.c_str(), __FUNCTION__);
370 return BAD_VALUE;
371 }
372
373 if (partial_result > kPartialResultCount) {
374 ALOGE(
375 "[%s] %s: partial_result %u cannot be larger than partial result count "
376 "%u",
377 name_.c_str(), __FUNCTION__, partial_result, kPartialResultCount);
378 return BAD_VALUE;
379 }
380
381 std::lock_guard<std::mutex> lock(result_lock_);
382 DispatchQueue<PendingResultMetadata>& queue =
383 partial_result < kPartialResultCount ? pending_early_metadata_
384 : pending_final_metadata_;
385 return queue.AddResult(frame_number,
386 PendingResultMetadata{
387 .metadata = std::move(metadata),
388 .physical_metadata = std::move(physical_metadata),
389 .partial_result_count = partial_result,
390 .ready = true,
391 });
392 }
393
AddBuffer(uint32_t frame_number,StreamBuffer buffer,bool is_input)394 status_t ResultDispatcher::AddBuffer(uint32_t frame_number, StreamBuffer buffer,
395 bool is_input) {
396 ATRACE_CALL();
397 std::lock_guard<std::mutex> lock(result_lock_);
398
399 StreamKey stream_key = CreateStreamKey(buffer.stream_id);
400 auto pending_buffers_it = stream_pending_buffers_map_.find(stream_key);
401 if (pending_buffers_it == stream_pending_buffers_map_.end()) {
402 ALOGE("[%s] %s: Cannot find the pending buffer for stream %s",
403 name_.c_str(), __FUNCTION__, DumpStreamKey(stream_key).c_str());
404 return NAME_NOT_FOUND;
405 }
406
407 return pending_buffers_it->second.AddResult(frame_number,
408 PendingBuffer{
409 .buffer = buffer,
410 .is_input = is_input,
411 .ready = true,
412 });
413 }
414
NotifyCallbackThreadLoop()415 void ResultDispatcher::NotifyCallbackThreadLoop() {
416 // '\0' counts toward the 16-character restriction.
417 constexpr int kPthreadNameLenMinusOne = 16 - 1;
418 pthread_setname_np(
419 pthread_self(),
420 name_.substr(/*pos=*/0, /*count=*/kPthreadNameLenMinusOne).c_str());
421
422 while (1) {
423 if (notify_batch_ == nullptr) {
424 NotifyShutters();
425 } else {
426 NotifyBatchShutters();
427 }
428 NotifyResultMetadata();
429 NotifyBuffers();
430
431 std::unique_lock<std::mutex> lock(notify_callback_lock_);
432 if (notify_callback_thread_exiting_) {
433 ALOGV("[%s] %s: NotifyCallbackThreadLoop exits.", name_.c_str(),
434 __FUNCTION__);
435 return;
436 }
437 if (!is_result_shutter_updated_) {
438 if (notify_callback_condition_.wait_for(
439 lock, std::chrono::milliseconds(kCallbackThreadTimeoutMs)) ==
440 std::cv_status::timeout) {
441 PrintTimeoutMessages();
442 }
443 }
444 is_result_shutter_updated_ = false;
445 }
446 }
447
PrintTimeoutMessages()448 void ResultDispatcher::PrintTimeoutMessages() {
449 std::lock_guard<std::mutex> lock(result_lock_);
450 pending_shutters_.PrintTimeoutMessages();
451 pending_early_metadata_.PrintTimeoutMessages();
452 pending_final_metadata_.PrintTimeoutMessages();
453
454 for (auto& [stream_key, pending_buffers] : stream_pending_buffers_map_) {
455 pending_buffers.PrintTimeoutMessages();
456 }
457 }
458
InitializeGroupStreamIdsMap(const StreamConfiguration & stream_config)459 void ResultDispatcher::InitializeGroupStreamIdsMap(
460 const StreamConfiguration& stream_config) {
461 std::lock_guard<std::mutex> lock(result_lock_);
462 for (const auto& stream : stream_config.streams) {
463 if (stream.group_id != -1) {
464 group_stream_map_[stream.id] = stream.group_id;
465 }
466 }
467 }
468
CreateStreamKey(int32_t stream_id) const469 ResultDispatcher::StreamKey ResultDispatcher::CreateStreamKey(
470 int32_t stream_id) const {
471 if (group_stream_map_.count(stream_id) == 0) {
472 return StreamKey(stream_id, StreamKeyType::kSingleStream);
473 } else {
474 return StreamKey(group_stream_map_.at(stream_id),
475 StreamKeyType::kGroupStream);
476 }
477 }
478
DumpStreamKey(const StreamKey & stream_key) const479 std::string ResultDispatcher::DumpStreamKey(const StreamKey& stream_key) const {
480 switch (stream_key.second) {
481 case StreamKeyType::kSingleStream:
482 return std::to_string(stream_key.first);
483 case StreamKeyType::kGroupStream:
484 return "group " + std::to_string(stream_key.first);
485 default:
486 return "Invalid stream key type";
487 }
488 }
489
GetPendingShutterNotificationLocked(NotifyMessage & message)490 status_t ResultDispatcher::GetPendingShutterNotificationLocked(
491 NotifyMessage& message) {
492 uint32_t frame_number = 0;
493 PendingShutter pending_shutter;
494 status_t ret = pending_shutters_.GetReadyData(frame_number, pending_shutter);
495 if (ret == OK) {
496 message.type = MessageType::kShutter;
497 message.message.shutter.frame_number = frame_number;
498 message.message.shutter.timestamp_ns = pending_shutter.timestamp_ns;
499 message.message.shutter.readout_timestamp_ns =
500 pending_shutter.readout_timestamp_ns;
501 ALOGV("[%s] %s: Notify shutter for frame %u timestamp %" PRIu64
502 " readout_timestamp %" PRIu64,
503 name_.c_str(), __FUNCTION__, message.message.shutter.frame_number,
504 message.message.shutter.timestamp_ns,
505 message.message.shutter.readout_timestamp_ns);
506 }
507 return ret;
508 }
509
NotifyShutters()510 void ResultDispatcher::NotifyShutters() {
511 ATRACE_CALL();
512 NotifyMessage message = {};
513 // TODO: b/347771898 - Update to not depend on running faster than data is
514 // ready
515 while (true) {
516 std::lock_guard<std::mutex> lock(result_lock_);
517 if (GetPendingShutterNotificationLocked(message) != OK) {
518 break;
519 }
520 notify_(message);
521 }
522 }
523
NotifyBatchShutters()524 void ResultDispatcher::NotifyBatchShutters() {
525 ATRACE_CALL();
526 std::vector<NotifyMessage> messages;
527 NotifyMessage message = {};
528 // TODO: b/347771898 - Update to not depend on running faster than data is
529 // ready
530 std::lock_guard<std::mutex> lock(result_lock_);
531 while (true) {
532 if (GetPendingShutterNotificationLocked(message) != OK) {
533 break;
534 }
535 messages.push_back(message);
536 }
537
538 if (!messages.empty()) {
539 notify_batch_(messages);
540 }
541 }
542
NotifyCaptureResults(std::vector<std::unique_ptr<CaptureResult>> results)543 void ResultDispatcher::NotifyCaptureResults(
544 std::vector<std::unique_ptr<CaptureResult>> results) {
545 ATRACE_CALL();
546 std::lock_guard<std::mutex> lock(process_capture_result_lock_);
547 if (process_batch_capture_result_ != nullptr) {
548 process_batch_capture_result_(std::move(results));
549 } else {
550 for (auto& result : results) {
551 process_capture_result_(std::move(result));
552 }
553 }
554 }
555
NotifyResultMetadata()556 void ResultDispatcher::NotifyResultMetadata() {
557 ATRACE_CALL();
558 uint32_t frame_number = 0;
559 std::vector<std::unique_ptr<CaptureResult>> early_results;
560 std::vector<std::unique_ptr<CaptureResult>> final_results;
561 PendingResultMetadata early_result_metadata;
562 PendingResultMetadata final_result_metadata;
563 // TODO: b/347771898 - Assess if notify can hold the lock for less time
564 {
565 std::lock_guard<std::mutex> lock(result_lock_);
566 while (pending_early_metadata_.GetReadyData(frame_number,
567 early_result_metadata) == OK) {
568 ALOGV("[%s] %s: Notify early metadata for frame %u", name_.c_str(),
569 __FUNCTION__, frame_number);
570 early_results.push_back(MakeResultMetadata(
571 frame_number, std::move(early_result_metadata.metadata),
572 std::move(early_result_metadata.physical_metadata),
573 early_result_metadata.partial_result_count));
574 }
575
576 while (pending_final_metadata_.GetReadyData(frame_number,
577 final_result_metadata) == OK) {
578 ALOGV("[%s] %s: Notify final metadata for frame %u", name_.c_str(),
579 __FUNCTION__, frame_number);
580 // Removes the pending early metadata if it exists, in case the HAL only
581 // sent the final metadata
582 pending_early_metadata_.RemoveRequest(frame_number);
583
584 final_results.push_back(MakeResultMetadata(
585 frame_number, std::move(final_result_metadata.metadata),
586 std::move(final_result_metadata.physical_metadata),
587 final_result_metadata.partial_result_count));
588 }
589 }
590 if (!early_results.empty()) {
591 NotifyCaptureResults(std::move(early_results));
592 }
593 if (!final_results.empty()) {
594 NotifyCaptureResults(std::move(final_results));
595 }
596 }
597
GetReadyBufferResult(std::unique_ptr<CaptureResult> * result)598 status_t ResultDispatcher::GetReadyBufferResult(
599 std::unique_ptr<CaptureResult>* result) {
600 ATRACE_CALL();
601 std::lock_guard<std::mutex> lock(result_lock_);
602 if (result == nullptr) {
603 ALOGE("[%s] %s: result is nullptr.", name_.c_str(), __FUNCTION__);
604 return BAD_VALUE;
605 }
606
607 *result = nullptr;
608
609 for (auto& pending_buffers : stream_pending_buffers_map_) {
610 uint32_t frame_number = 0;
611 PendingBuffer buffer_data;
612 if (pending_buffers.second.GetReadyData(frame_number, buffer_data) == OK) {
613 std::unique_ptr<CaptureResult> buffer_result =
614 std::make_unique<CaptureResult>(CaptureResult({}));
615
616 buffer_result->frame_number = frame_number;
617 if (buffer_data.is_input) {
618 buffer_result->input_buffers.push_back(buffer_data.buffer);
619 } else {
620 buffer_result->output_buffers.push_back(buffer_data.buffer);
621 }
622 *result = std::move(buffer_result);
623 return OK;
624 }
625 }
626
627 return NAME_NOT_FOUND;
628 }
629
NotifyBuffers()630 void ResultDispatcher::NotifyBuffers() {
631 ATRACE_CALL();
632 std::vector<std::unique_ptr<CaptureResult>> results;
633 std::unique_ptr<CaptureResult> result;
634
635 // TODO: b/347771898 - Update to not depend on running faster than data is
636 // ready
637 while (GetReadyBufferResult(&result) == OK) {
638 if (result == nullptr) {
639 ALOGE("[%s] %s: result is nullptr", name_.c_str(), __FUNCTION__);
640 return;
641 }
642 ALOGV("[%s] %s: Notify Buffer for frame %u", name_.c_str(), __FUNCTION__,
643 result->frame_number);
644 results.push_back(std::move(result));
645 }
646 if (!results.empty()) {
647 NotifyCaptureResults(std::move(results));
648 }
649 }
650
651 template <typename FrameData>
DispatchQueue(std::string_view dispatcher_name,std::string_view data_name)652 ResultDispatcher::DispatchQueue<FrameData>::DispatchQueue(
653 std::string_view dispatcher_name, std::string_view data_name)
654 : dispatcher_name_(dispatcher_name), data_name_(data_name) {
655 }
656
657 template <typename FrameData>
AddRequest(uint32_t frame_number,RequestType request_type)658 status_t ResultDispatcher::DispatchQueue<FrameData>::AddRequest(
659 uint32_t frame_number, RequestType request_type) {
660 if (normal_request_map_.contains(frame_number) ||
661 reprocess_request_map_.contains(frame_number)) {
662 ALOGE("[%s] %s: Pending %s for frame %u already exists.",
663 std::string(dispatcher_name_).c_str(), __FUNCTION__,
664 data_name_.c_str(), frame_number);
665 return ALREADY_EXISTS;
666 }
667 if (request_type == RequestType::kNormal) {
668 normal_request_map_[frame_number] = FrameData();
669 } else {
670 reprocess_request_map_[frame_number] = FrameData();
671 }
672 return OK;
673 }
674
675 template <typename FrameData>
RemoveRequest(uint32_t frame_number)676 void ResultDispatcher::DispatchQueue<FrameData>::RemoveRequest(
677 uint32_t frame_number) {
678 normal_request_map_.erase(frame_number);
679 reprocess_request_map_.erase(frame_number);
680 }
681
682 template <typename FrameData>
AddResult(uint32_t frame_number,FrameData result)683 status_t ResultDispatcher::DispatchQueue<FrameData>::AddResult(
684 uint32_t frame_number, FrameData result) {
685 auto it = normal_request_map_.find(frame_number);
686 if (it == normal_request_map_.end()) {
687 it = reprocess_request_map_.find(frame_number);
688 if (it == reprocess_request_map_.end()) {
689 ALOGE("[%s] %s: Cannot find the pending %s for frame %u",
690 std::string(dispatcher_name_).c_str(), __FUNCTION__,
691 data_name_.c_str(), frame_number);
692 return NAME_NOT_FOUND;
693 }
694 }
695
696 if (it->second.ready) {
697 ALOGE("[%s] %s: Already received %s for frame %u",
698 std::string(dispatcher_name_).c_str(), __FUNCTION__,
699 data_name_.c_str(), frame_number);
700 return ALREADY_EXISTS;
701 }
702
703 it->second = std::move(result);
704 return OK;
705 }
706
707 template <typename FrameData>
GetReadyData(uint32_t & frame_number,FrameData & ready_data)708 status_t ResultDispatcher::DispatchQueue<FrameData>::GetReadyData(
709 uint32_t& frame_number, FrameData& ready_data) {
710 auto it = normal_request_map_.begin();
711 if (it != normal_request_map_.end() && it->second.ready) {
712 frame_number = it->first;
713 ready_data = std::move(it->second);
714 normal_request_map_.erase(it);
715 return OK;
716 }
717
718 it = reprocess_request_map_.begin();
719 if (it != reprocess_request_map_.end() && it->second.ready) {
720 frame_number = it->first;
721 ready_data = std::move(it->second);
722 reprocess_request_map_.erase(it);
723 return OK;
724 }
725 // The first pending data is not ready
726 return NAME_NOT_FOUND;
727 }
728
729 template <typename FrameData>
PrintTimeoutMessages()730 void ResultDispatcher::DispatchQueue<FrameData>::PrintTimeoutMessages() {
731 for (auto& [frame_number, pending_data] : normal_request_map_) {
732 ALOGW("[%s] %s: pending %s for frame %u ready %d",
733 std::string(dispatcher_name_).c_str(), __FUNCTION__,
734 data_name_.c_str(), frame_number, pending_data.ready);
735 }
736 for (auto& [frame_number, pending_data] : reprocess_request_map_) {
737 ALOGW("[%s] %s: pending %s for frame %u ready %d",
738 std::string(dispatcher_name_).c_str(), __FUNCTION__,
739 data_name_.c_str(), frame_number, pending_data.ready);
740 }
741 }
742 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingShutter>;
743 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingBuffer>;
744 template class ResultDispatcher::DispatchQueue<
745 ResultDispatcher::PendingResultMetadata>;
746
747 } // namespace google_camera_hal
748 } // namespace android
749