• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "GCH_ResultDispatcher"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20 
21 #include "result_dispatcher.h"
22 
23 #include <inttypes.h>
24 #include <log/log.h>
25 #include <sys/resource.h>
26 #include <utils/Trace.h>
27 
28 #include <string>
29 #include <string_view>
30 
31 #include "hal_types.h"
32 #include "utils.h"
33 
34 namespace android {
35 namespace google_camera_hal {
36 
Create(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,NotifyBatchFunc notify_batch,const StreamConfiguration & stream_config,std::string_view name)37 std::unique_ptr<ResultDispatcher> ResultDispatcher::Create(
38     uint32_t partial_result_count,
39     ProcessCaptureResultFunc process_capture_result,
40     ProcessBatchCaptureResultFunc process_batch_capture_result,
41     NotifyFunc notify, NotifyBatchFunc notify_batch,
42     const StreamConfiguration& stream_config, std::string_view name) {
43   ATRACE_CALL();
44   auto dispatcher = std::make_unique<ResultDispatcher>(
45       partial_result_count, std::move(process_capture_result),
46       std::move(process_batch_capture_result), std::move(notify),
47       std::move(notify_batch), stream_config, name);
48   if (dispatcher == nullptr) {
49     ALOGE("[%s] %s: Creating ResultDispatcher failed.",
50           std::string(name).c_str(), __FUNCTION__);
51     return nullptr;
52   }
53 
54   return dispatcher;
55 }
56 
ResultDispatcher(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,NotifyBatchFunc notify_batch,const StreamConfiguration & stream_config,std::string_view name)57 ResultDispatcher::ResultDispatcher(
58     uint32_t partial_result_count,
59     ProcessCaptureResultFunc process_capture_result,
60     ProcessBatchCaptureResultFunc process_batch_capture_result,
61     NotifyFunc notify, NotifyBatchFunc notify_batch,
62     const StreamConfiguration& stream_config, std::string_view name)
63     : kPartialResultCount(partial_result_count),
64       name_(name),
65       process_capture_result_(std::move(process_capture_result)),
66       process_batch_capture_result_(std::move(process_batch_capture_result)),
67       notify_(std::move(notify)),
68       notify_batch_(std::move(notify_batch)) {
69   ATRACE_CALL();
70   pending_shutters_ = DispatchQueue<PendingShutter>(name_, "shutter");
71   pending_early_metadata_ =
72       DispatchQueue<PendingResultMetadata>(name_, "early result metadata");
73   pending_final_metadata_ =
74       DispatchQueue<PendingResultMetadata>(name_, "final result metadata");
75 
76   notify_callback_thread_ =
77       std::thread([this] { this->NotifyCallbackThreadLoop(); });
78 
79   // Assign higher priority to reduce the preemption when CPU usage is high
80   //
81   // As from b/295977499, we need to make it realtime for priority inheritance
82   // to avoid CameraServer thread being the bottleneck
83   status_t res =
84       utils::SetRealtimeThread(notify_callback_thread_.native_handle());
85   if (res != OK) {
86     ALOGE("[%s] %s: SetRealtimeThread fail", name_.c_str(), __FUNCTION__);
87   } else {
88     ALOGI("[%s] %s: SetRealtimeThread OK", name_.c_str(), __FUNCTION__);
89   }
90   InitializeGroupStreamIdsMap(stream_config);
91 }
92 
~ResultDispatcher()93 ResultDispatcher::~ResultDispatcher() {
94   ATRACE_CALL();
95   {
96     std::unique_lock<std::mutex> lock(notify_callback_lock_);
97     notify_callback_thread_exiting_ = true;
98   }
99 
100   notify_callback_condition_.notify_one();
101   notify_callback_thread_.join();
102 }
103 
RemovePendingRequest(uint32_t frame_number)104 void ResultDispatcher::RemovePendingRequest(uint32_t frame_number) {
105   ATRACE_CALL();
106   std::lock_guard<std::mutex> lock(result_lock_);
107   RemovePendingRequestLocked(frame_number);
108 }
109 
AddPendingRequest(const CaptureRequest & pending_request)110 status_t ResultDispatcher::AddPendingRequest(
111     const CaptureRequest& pending_request) {
112   ATRACE_CALL();
113   std::lock_guard<std::mutex> lock(result_lock_);
114 
115   status_t res = AddPendingRequestLocked(pending_request);
116   if (res != OK) {
117     ALOGE("[%s] %s: Adding a pending request failed: %s(%d).", name_.c_str(),
118           __FUNCTION__, strerror(-res), res);
119     RemovePendingRequestLocked(pending_request.frame_number);
120     return res;
121   }
122 
123   return OK;
124 }
125 
AddPendingRequestLocked(const CaptureRequest & pending_request)126 status_t ResultDispatcher::AddPendingRequestLocked(
127     const CaptureRequest& pending_request) {
128   ATRACE_CALL();
129   uint32_t frame_number = pending_request.frame_number;
130   const RequestType request_type = pending_request.input_buffers.empty()
131                                        ? RequestType::kNormal
132                                        : RequestType::kReprocess;
133 
134   status_t res = pending_shutters_.AddRequest(frame_number, request_type);
135   if (res != OK) {
136     ALOGE("[%s] %s: Adding pending shutter for frame %u failed: %s(%d)",
137           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
138     return res;
139   }
140 
141   res = pending_early_metadata_.AddRequest(frame_number, request_type);
142   if (res != OK) {
143     ALOGE("[%s] %s: Adding pending early metadata for frame %u failed: %s(%d)",
144           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
145     return res;
146   }
147 
148   res = pending_final_metadata_.AddRequest(frame_number, request_type);
149   if (res != OK) {
150     ALOGE("[%s] %s: Adding pending final metadata for frame %u failed: %s(%d)",
151           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
152     return res;
153   }
154 
155   for (auto& buffer : pending_request.input_buffers) {
156     res = AddPendingBufferLocked(frame_number, buffer, request_type);
157     if (res != OK) {
158       ALOGE("[%s] %s: Adding pending input buffer for frame %u failed: %s(%d)",
159             name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
160       return res;
161     }
162   }
163 
164   for (auto& buffer : pending_request.output_buffers) {
165     res = AddPendingBufferLocked(frame_number, buffer, request_type);
166     if (res != OK) {
167       ALOGE("[%s] %s: Adding pending output buffer for frame %u failed: %s(%d)",
168             name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
169       return res;
170     }
171   }
172 
173   return OK;
174 }
175 
AddPendingBufferLocked(uint32_t frame_number,const StreamBuffer & buffer,RequestType request_type)176 status_t ResultDispatcher::AddPendingBufferLocked(uint32_t frame_number,
177                                                   const StreamBuffer& buffer,
178                                                   RequestType request_type) {
179   ATRACE_CALL();
180   StreamKey stream_key = CreateStreamKey(buffer.stream_id);
181   if (!stream_pending_buffers_map_.contains(stream_key)) {
182     stream_pending_buffers_map_[stream_key] = DispatchQueue<PendingBuffer>(
183         name_, "buffer of stream " + DumpStreamKey(stream_key));
184   }
185 
186   return stream_pending_buffers_map_[stream_key].AddRequest(frame_number,
187                                                             request_type);
188 }
189 
RemovePendingRequestLocked(uint32_t frame_number)190 void ResultDispatcher::RemovePendingRequestLocked(uint32_t frame_number) {
191   ATRACE_CALL();
192   pending_shutters_.RemoveRequest(frame_number);
193   pending_early_metadata_.RemoveRequest(frame_number);
194   pending_final_metadata_.RemoveRequest(frame_number);
195 
196   for (auto& pending_buffers : stream_pending_buffers_map_) {
197     pending_buffers.second.RemoveRequest(frame_number);
198   }
199 }
200 
AddResultImpl(std::unique_ptr<CaptureResult> result)201 status_t ResultDispatcher::AddResultImpl(std::unique_ptr<CaptureResult> result) {
202   status_t res;
203   bool failed = false;
204   uint32_t frame_number = result->frame_number;
205 
206   if (result->result_metadata != nullptr) {
207     res = AddResultMetadata(frame_number, std::move(result->result_metadata),
208                             std::move(result->physical_metadata),
209                             result->partial_result);
210     if (res != OK) {
211       ALOGE("[%s] %s: Adding result metadata failed: %s (%d)", name_.c_str(),
212             __FUNCTION__, strerror(-res), res);
213       failed = true;
214     }
215   }
216 
217   for (auto& buffer : result->output_buffers) {
218     res = AddBuffer(frame_number, buffer, /*is_input=*/false);
219     if (res != OK) {
220       ALOGE("[%s] %s: Adding an output buffer failed: %s (%d)", name_.c_str(),
221             __FUNCTION__, strerror(-res), res);
222       failed = true;
223     }
224   }
225 
226   for (auto& buffer : result->input_buffers) {
227     res = AddBuffer(frame_number, buffer, /*is_input=*/true);
228     if (res != OK) {
229       ALOGE("[%s] %s: Adding an input buffer failed: %s (%d)", name_.c_str(),
230             __FUNCTION__, strerror(-res), res);
231       failed = true;
232     }
233   }
234 
235   return failed ? UNKNOWN_ERROR : OK;
236 }
237 
AddResult(std::unique_ptr<CaptureResult> result)238 status_t ResultDispatcher::AddResult(std::unique_ptr<CaptureResult> result) {
239   ATRACE_CALL();
240   const status_t res = AddResultImpl(std::move(result));
241   {
242     std::unique_lock<std::mutex> lock(notify_callback_lock_);
243     is_result_shutter_updated_ = true;
244     notify_callback_condition_.notify_one();
245   }
246   return res;
247 }
248 
AddBatchResult(std::vector<std::unique_ptr<CaptureResult>> results)249 status_t ResultDispatcher::AddBatchResult(
250     std::vector<std::unique_ptr<CaptureResult>> results) {
251   std::optional<status_t> last_error;
252   for (auto& result : results) {
253     const status_t res = AddResultImpl(std::move(result));
254     if (res != OK) {
255       last_error = res;
256     }
257   }
258   {
259     std::unique_lock<std::mutex> lock(notify_callback_lock_);
260     is_result_shutter_updated_ = true;
261     notify_callback_condition_.notify_one();
262   }
263   return last_error.value_or(OK);
264 }
265 
AddShutterLocked(uint32_t frame_number,int64_t timestamp_ns,int64_t readout_timestamp_ns)266 status_t ResultDispatcher::AddShutterLocked(uint32_t frame_number,
267                                             int64_t timestamp_ns,
268                                             int64_t readout_timestamp_ns) {
269   status_t res = pending_shutters_.AddResult(
270       frame_number, PendingShutter{
271                         .timestamp_ns = timestamp_ns,
272                         .readout_timestamp_ns = readout_timestamp_ns,
273                         .ready = true,
274                     });
275   if (res != OK) {
276     ALOGE(
277         "[%s] %s: Failed to add shutter for frame %u , New timestamp "
278         "%" PRId64,
279         name_.c_str(), __FUNCTION__, frame_number, timestamp_ns);
280   }
281   return res;
282 }
283 
AddShutter(const ShutterMessage & shutter)284 status_t ResultDispatcher::AddShutter(const ShutterMessage& shutter) {
285   ATRACE_CALL();
286 
287   {
288     std::lock_guard lock(result_lock_);
289     if (status_t ret =
290             AddShutterLocked(shutter.frame_number, shutter.timestamp_ns,
291                              shutter.readout_timestamp_ns);
292         ret != OK) {
293       return ret;
294     }
295   }
296   {
297     std::unique_lock<std::mutex> lock(notify_callback_lock_);
298     is_result_shutter_updated_ = true;
299     notify_callback_condition_.notify_one();
300   }
301   return OK;
302 }
303 
AddBatchShutter(const std::vector<ShutterMessage> & shutters)304 status_t ResultDispatcher::AddBatchShutter(
305     const std::vector<ShutterMessage>& shutters) {
306   {
307     std::lock_guard lock(result_lock_);
308     for (const ShutterMessage& shutter : shutters) {
309       if (status_t ret =
310               AddShutterLocked(shutter.frame_number, shutter.timestamp_ns,
311                                shutter.readout_timestamp_ns);
312           ret != OK) {
313         return ret;
314       }
315     }
316   }
317   {
318     std::unique_lock<std::mutex> lock(notify_callback_lock_);
319     is_result_shutter_updated_ = true;
320     notify_callback_condition_.notify_one();
321   }
322   return OK;
323 }
324 
AddError(const ErrorMessage & error)325 status_t ResultDispatcher::AddError(const ErrorMessage& error) {
326   ATRACE_CALL();
327   std::lock_guard<std::mutex> lock(result_lock_);
328   uint32_t frame_number = error.frame_number;
329   // No need to deliver the shutter message on an error
330   if (error.error_code == ErrorCode::kErrorDevice ||
331       error.error_code == ErrorCode::kErrorResult ||
332       error.error_code == ErrorCode::kErrorRequest) {
333     pending_shutters_.RemoveRequest(frame_number);
334   }
335   // No need to deliver the result metadata on a result metadata error
336   if (error.error_code == ErrorCode::kErrorResult ||
337       error.error_code == ErrorCode::kErrorRequest) {
338     pending_early_metadata_.RemoveRequest(frame_number);
339     pending_final_metadata_.RemoveRequest(frame_number);
340   }
341 
342   NotifyMessage message = {.type = MessageType::kError, .message.error = error};
343   ALOGV("[%s] %s: Notify error %u for frame %u stream %d", name_.c_str(),
344         __FUNCTION__, error.error_code, frame_number, error.error_stream_id);
345   notify_(message);
346 
347   return OK;
348 }
349 
MakeResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)350 std::unique_ptr<CaptureResult> ResultDispatcher::MakeResultMetadata(
351     uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
352     std::vector<PhysicalCameraMetadata> physical_metadata,
353     uint32_t partial_result) {
354   ATRACE_CALL();
355   auto result = std::make_unique<CaptureResult>(CaptureResult({}));
356   result->frame_number = frame_number;
357   result->result_metadata = std::move(metadata);
358   result->physical_metadata = std::move(physical_metadata);
359   result->partial_result = partial_result;
360   return result;
361 }
362 
AddResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)363 status_t ResultDispatcher::AddResultMetadata(
364     uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
365     std::vector<PhysicalCameraMetadata> physical_metadata,
366     uint32_t partial_result) {
367   ATRACE_CALL();
368   if (metadata == nullptr) {
369     ALOGE("[%s] %s: metadata is nullptr.", name_.c_str(), __FUNCTION__);
370     return BAD_VALUE;
371   }
372 
373   if (partial_result > kPartialResultCount) {
374     ALOGE(
375         "[%s] %s: partial_result %u cannot be larger than partial result count "
376         "%u",
377         name_.c_str(), __FUNCTION__, partial_result, kPartialResultCount);
378     return BAD_VALUE;
379   }
380 
381   std::lock_guard<std::mutex> lock(result_lock_);
382   DispatchQueue<PendingResultMetadata>& queue =
383       partial_result < kPartialResultCount ? pending_early_metadata_
384                                            : pending_final_metadata_;
385   return queue.AddResult(frame_number,
386                          PendingResultMetadata{
387                              .metadata = std::move(metadata),
388                              .physical_metadata = std::move(physical_metadata),
389                              .partial_result_count = partial_result,
390                              .ready = true,
391                          });
392 }
393 
AddBuffer(uint32_t frame_number,StreamBuffer buffer,bool is_input)394 status_t ResultDispatcher::AddBuffer(uint32_t frame_number, StreamBuffer buffer,
395                                      bool is_input) {
396   ATRACE_CALL();
397   std::lock_guard<std::mutex> lock(result_lock_);
398 
399   StreamKey stream_key = CreateStreamKey(buffer.stream_id);
400   auto pending_buffers_it = stream_pending_buffers_map_.find(stream_key);
401   if (pending_buffers_it == stream_pending_buffers_map_.end()) {
402     ALOGE("[%s] %s: Cannot find the pending buffer for stream %s",
403           name_.c_str(), __FUNCTION__, DumpStreamKey(stream_key).c_str());
404     return NAME_NOT_FOUND;
405   }
406 
407   return pending_buffers_it->second.AddResult(frame_number,
408                                               PendingBuffer{
409                                                   .buffer = buffer,
410                                                   .is_input = is_input,
411                                                   .ready = true,
412                                               });
413 }
414 
NotifyCallbackThreadLoop()415 void ResultDispatcher::NotifyCallbackThreadLoop() {
416   // '\0' counts toward the 16-character restriction.
417   constexpr int kPthreadNameLenMinusOne = 16 - 1;
418   pthread_setname_np(
419       pthread_self(),
420       name_.substr(/*pos=*/0, /*count=*/kPthreadNameLenMinusOne).c_str());
421 
422   while (1) {
423     if (notify_batch_ == nullptr) {
424       NotifyShutters();
425     } else {
426       NotifyBatchShutters();
427     }
428     NotifyResultMetadata();
429     NotifyBuffers();
430 
431     std::unique_lock<std::mutex> lock(notify_callback_lock_);
432     if (notify_callback_thread_exiting_) {
433       ALOGV("[%s] %s: NotifyCallbackThreadLoop exits.", name_.c_str(),
434             __FUNCTION__);
435       return;
436     }
437     if (!is_result_shutter_updated_) {
438       if (notify_callback_condition_.wait_for(
439               lock, std::chrono::milliseconds(kCallbackThreadTimeoutMs)) ==
440           std::cv_status::timeout) {
441         PrintTimeoutMessages();
442       }
443     }
444     is_result_shutter_updated_ = false;
445   }
446 }
447 
PrintTimeoutMessages()448 void ResultDispatcher::PrintTimeoutMessages() {
449   std::lock_guard<std::mutex> lock(result_lock_);
450   pending_shutters_.PrintTimeoutMessages();
451   pending_early_metadata_.PrintTimeoutMessages();
452   pending_final_metadata_.PrintTimeoutMessages();
453 
454   for (auto& [stream_key, pending_buffers] : stream_pending_buffers_map_) {
455     pending_buffers.PrintTimeoutMessages();
456   }
457 }
458 
InitializeGroupStreamIdsMap(const StreamConfiguration & stream_config)459 void ResultDispatcher::InitializeGroupStreamIdsMap(
460     const StreamConfiguration& stream_config) {
461   std::lock_guard<std::mutex> lock(result_lock_);
462   for (const auto& stream : stream_config.streams) {
463     if (stream.group_id != -1) {
464       group_stream_map_[stream.id] = stream.group_id;
465     }
466   }
467 }
468 
CreateStreamKey(int32_t stream_id) const469 ResultDispatcher::StreamKey ResultDispatcher::CreateStreamKey(
470     int32_t stream_id) const {
471   if (group_stream_map_.count(stream_id) == 0) {
472     return StreamKey(stream_id, StreamKeyType::kSingleStream);
473   } else {
474     return StreamKey(group_stream_map_.at(stream_id),
475                      StreamKeyType::kGroupStream);
476   }
477 }
478 
DumpStreamKey(const StreamKey & stream_key) const479 std::string ResultDispatcher::DumpStreamKey(const StreamKey& stream_key) const {
480   switch (stream_key.second) {
481     case StreamKeyType::kSingleStream:
482       return std::to_string(stream_key.first);
483     case StreamKeyType::kGroupStream:
484       return "group " + std::to_string(stream_key.first);
485     default:
486       return "Invalid stream key type";
487   }
488 }
489 
GetPendingShutterNotificationLocked(NotifyMessage & message)490 status_t ResultDispatcher::GetPendingShutterNotificationLocked(
491     NotifyMessage& message) {
492   uint32_t frame_number = 0;
493   PendingShutter pending_shutter;
494   status_t ret = pending_shutters_.GetReadyData(frame_number, pending_shutter);
495   if (ret == OK) {
496     message.type = MessageType::kShutter;
497     message.message.shutter.frame_number = frame_number;
498     message.message.shutter.timestamp_ns = pending_shutter.timestamp_ns;
499     message.message.shutter.readout_timestamp_ns =
500         pending_shutter.readout_timestamp_ns;
501     ALOGV("[%s] %s: Notify shutter for frame %u timestamp %" PRIu64
502           " readout_timestamp %" PRIu64,
503           name_.c_str(), __FUNCTION__, message.message.shutter.frame_number,
504           message.message.shutter.timestamp_ns,
505           message.message.shutter.readout_timestamp_ns);
506   }
507   return ret;
508 }
509 
NotifyShutters()510 void ResultDispatcher::NotifyShutters() {
511   ATRACE_CALL();
512   NotifyMessage message = {};
513   // TODO: b/347771898 - Update to not depend on running faster than data is
514   // ready
515   while (true) {
516     std::lock_guard<std::mutex> lock(result_lock_);
517     if (GetPendingShutterNotificationLocked(message) != OK) {
518       break;
519     }
520     notify_(message);
521   }
522 }
523 
NotifyBatchShutters()524 void ResultDispatcher::NotifyBatchShutters() {
525   ATRACE_CALL();
526   std::vector<NotifyMessage> messages;
527   NotifyMessage message = {};
528   // TODO: b/347771898 - Update to not depend on running faster than data is
529   // ready
530   std::lock_guard<std::mutex> lock(result_lock_);
531   while (true) {
532     if (GetPendingShutterNotificationLocked(message) != OK) {
533       break;
534     }
535     messages.push_back(message);
536   }
537 
538   if (!messages.empty()) {
539     notify_batch_(messages);
540   }
541 }
542 
NotifyCaptureResults(std::vector<std::unique_ptr<CaptureResult>> results)543 void ResultDispatcher::NotifyCaptureResults(
544     std::vector<std::unique_ptr<CaptureResult>> results) {
545   ATRACE_CALL();
546   std::lock_guard<std::mutex> lock(process_capture_result_lock_);
547   if (process_batch_capture_result_ != nullptr) {
548     process_batch_capture_result_(std::move(results));
549   } else {
550     for (auto& result : results) {
551       process_capture_result_(std::move(result));
552     }
553   }
554 }
555 
NotifyResultMetadata()556 void ResultDispatcher::NotifyResultMetadata() {
557   ATRACE_CALL();
558   uint32_t frame_number = 0;
559   std::vector<std::unique_ptr<CaptureResult>> early_results;
560   std::vector<std::unique_ptr<CaptureResult>> final_results;
561   PendingResultMetadata early_result_metadata;
562   PendingResultMetadata final_result_metadata;
563   // TODO: b/347771898 - Assess if notify can hold the lock for less time
564   {
565     std::lock_guard<std::mutex> lock(result_lock_);
566     while (pending_early_metadata_.GetReadyData(frame_number,
567                                                 early_result_metadata) == OK) {
568       ALOGV("[%s] %s: Notify early metadata for frame %u", name_.c_str(),
569             __FUNCTION__, frame_number);
570       early_results.push_back(MakeResultMetadata(
571           frame_number, std::move(early_result_metadata.metadata),
572           std::move(early_result_metadata.physical_metadata),
573           early_result_metadata.partial_result_count));
574     }
575 
576     while (pending_final_metadata_.GetReadyData(frame_number,
577                                                 final_result_metadata) == OK) {
578       ALOGV("[%s] %s: Notify final metadata for frame %u", name_.c_str(),
579             __FUNCTION__, frame_number);
580       // Removes the pending early metadata if it exists, in case the HAL only
581       // sent the final metadata
582       pending_early_metadata_.RemoveRequest(frame_number);
583 
584       final_results.push_back(MakeResultMetadata(
585           frame_number, std::move(final_result_metadata.metadata),
586           std::move(final_result_metadata.physical_metadata),
587           final_result_metadata.partial_result_count));
588     }
589   }
590   if (!early_results.empty()) {
591     NotifyCaptureResults(std::move(early_results));
592   }
593   if (!final_results.empty()) {
594     NotifyCaptureResults(std::move(final_results));
595   }
596 }
597 
GetReadyBufferResult(std::unique_ptr<CaptureResult> * result)598 status_t ResultDispatcher::GetReadyBufferResult(
599     std::unique_ptr<CaptureResult>* result) {
600   ATRACE_CALL();
601   std::lock_guard<std::mutex> lock(result_lock_);
602   if (result == nullptr) {
603     ALOGE("[%s] %s: result is nullptr.", name_.c_str(), __FUNCTION__);
604     return BAD_VALUE;
605   }
606 
607   *result = nullptr;
608 
609   for (auto& pending_buffers : stream_pending_buffers_map_) {
610     uint32_t frame_number = 0;
611     PendingBuffer buffer_data;
612     if (pending_buffers.second.GetReadyData(frame_number, buffer_data) == OK) {
613       std::unique_ptr<CaptureResult> buffer_result =
614           std::make_unique<CaptureResult>(CaptureResult({}));
615 
616       buffer_result->frame_number = frame_number;
617       if (buffer_data.is_input) {
618         buffer_result->input_buffers.push_back(buffer_data.buffer);
619       } else {
620         buffer_result->output_buffers.push_back(buffer_data.buffer);
621       }
622       *result = std::move(buffer_result);
623       return OK;
624     }
625   }
626 
627   return NAME_NOT_FOUND;
628 }
629 
NotifyBuffers()630 void ResultDispatcher::NotifyBuffers() {
631   ATRACE_CALL();
632   std::vector<std::unique_ptr<CaptureResult>> results;
633   std::unique_ptr<CaptureResult> result;
634 
635   // TODO: b/347771898 - Update to not depend on running faster than data is
636   // ready
637   while (GetReadyBufferResult(&result) == OK) {
638     if (result == nullptr) {
639       ALOGE("[%s] %s: result is nullptr", name_.c_str(), __FUNCTION__);
640       return;
641     }
642     ALOGV("[%s] %s: Notify Buffer for frame %u", name_.c_str(), __FUNCTION__,
643           result->frame_number);
644     results.push_back(std::move(result));
645   }
646   if (!results.empty()) {
647     NotifyCaptureResults(std::move(results));
648   }
649 }
650 
651 template <typename FrameData>
DispatchQueue(std::string_view dispatcher_name,std::string_view data_name)652 ResultDispatcher::DispatchQueue<FrameData>::DispatchQueue(
653     std::string_view dispatcher_name, std::string_view data_name)
654     : dispatcher_name_(dispatcher_name), data_name_(data_name) {
655 }
656 
657 template <typename FrameData>
AddRequest(uint32_t frame_number,RequestType request_type)658 status_t ResultDispatcher::DispatchQueue<FrameData>::AddRequest(
659     uint32_t frame_number, RequestType request_type) {
660   if (normal_request_map_.contains(frame_number) ||
661       reprocess_request_map_.contains(frame_number)) {
662     ALOGE("[%s] %s: Pending %s for frame %u already exists.",
663           std::string(dispatcher_name_).c_str(), __FUNCTION__,
664           data_name_.c_str(), frame_number);
665     return ALREADY_EXISTS;
666   }
667   if (request_type == RequestType::kNormal) {
668     normal_request_map_[frame_number] = FrameData();
669   } else {
670     reprocess_request_map_[frame_number] = FrameData();
671   }
672   return OK;
673 }
674 
675 template <typename FrameData>
RemoveRequest(uint32_t frame_number)676 void ResultDispatcher::DispatchQueue<FrameData>::RemoveRequest(
677     uint32_t frame_number) {
678   normal_request_map_.erase(frame_number);
679   reprocess_request_map_.erase(frame_number);
680 }
681 
682 template <typename FrameData>
AddResult(uint32_t frame_number,FrameData result)683 status_t ResultDispatcher::DispatchQueue<FrameData>::AddResult(
684     uint32_t frame_number, FrameData result) {
685   auto it = normal_request_map_.find(frame_number);
686   if (it == normal_request_map_.end()) {
687     it = reprocess_request_map_.find(frame_number);
688     if (it == reprocess_request_map_.end()) {
689       ALOGE("[%s] %s: Cannot find the pending %s for frame %u",
690             std::string(dispatcher_name_).c_str(), __FUNCTION__,
691             data_name_.c_str(), frame_number);
692       return NAME_NOT_FOUND;
693     }
694   }
695 
696   if (it->second.ready) {
697     ALOGE("[%s] %s: Already received %s for frame %u",
698           std::string(dispatcher_name_).c_str(), __FUNCTION__,
699           data_name_.c_str(), frame_number);
700     return ALREADY_EXISTS;
701   }
702 
703   it->second = std::move(result);
704   return OK;
705 }
706 
707 template <typename FrameData>
GetReadyData(uint32_t & frame_number,FrameData & ready_data)708 status_t ResultDispatcher::DispatchQueue<FrameData>::GetReadyData(
709     uint32_t& frame_number, FrameData& ready_data) {
710   auto it = normal_request_map_.begin();
711   if (it != normal_request_map_.end() && it->second.ready) {
712     frame_number = it->first;
713     ready_data = std::move(it->second);
714     normal_request_map_.erase(it);
715     return OK;
716   }
717 
718   it = reprocess_request_map_.begin();
719   if (it != reprocess_request_map_.end() && it->second.ready) {
720     frame_number = it->first;
721     ready_data = std::move(it->second);
722     reprocess_request_map_.erase(it);
723     return OK;
724   }
725   // The first pending data is not ready
726   return NAME_NOT_FOUND;
727 }
728 
729 template <typename FrameData>
PrintTimeoutMessages()730 void ResultDispatcher::DispatchQueue<FrameData>::PrintTimeoutMessages() {
731   for (auto& [frame_number, pending_data] : normal_request_map_) {
732     ALOGW("[%s] %s: pending %s for frame %u ready %d",
733           std::string(dispatcher_name_).c_str(), __FUNCTION__,
734           data_name_.c_str(), frame_number, pending_data.ready);
735   }
736   for (auto& [frame_number, pending_data] : reprocess_request_map_) {
737     ALOGW("[%s] %s: pending %s for frame %u ready %d",
738           std::string(dispatcher_name_).c_str(), __FUNCTION__,
739           data_name_.c_str(), frame_number, pending_data.ready);
740   }
741 }
742 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingShutter>;
743 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingBuffer>;
744 template class ResultDispatcher::DispatchQueue<
745     ResultDispatcher::PendingResultMetadata>;
746 
747 }  // namespace google_camera_hal
748 }  // namespace android
749