• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "include/private/dvr/buffer_hub_queue_client.h"
2 
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7 
8 #include <array>
9 
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14 
15 #define RETRY_EINTR(fnc_call)                 \
16   ([&]() -> decltype(fnc_call) {              \
17     decltype(fnc_call) result;                \
18     do {                                      \
19       result = (fnc_call);                    \
20     } while (result == -1 && errno == EINTR); \
21     return result;                            \
22   })()
23 
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28 
29 namespace android {
30 namespace dvr {
31 
32 namespace {
33 
34 // Polls an fd for the given events.
PollEvents(int fd,short events)35 Status<int> PollEvents(int fd, short events) {
36   const int kTimeoutMs = 0;
37   pollfd pfd{fd, events, 0};
38   const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
39   if (count < 0) {
40     return ErrorStatus(errno);
41   } else if (count == 0) {
42     return ErrorStatus(ETIMEDOUT);
43   } else {
44     return {pfd.revents};
45   }
46 }
47 
Unstuff(uint64_t value)48 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
49   return {static_cast<int32_t>(value >> 32),
50           static_cast<int32_t>(value & ((1ull << 32) - 1))};
51 }
52 
Stuff(int32_t a,int32_t b)53 uint64_t Stuff(int32_t a, int32_t b) {
54   const uint32_t ua = static_cast<uint32_t>(a);
55   const uint32_t ub = static_cast<uint32_t>(b);
56   return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
57 }
58 
59 }  // anonymous namespace
60 
BufferHubQueue(LocalChannelHandle channel_handle)61 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
62     : Client{pdx::default_transport::ClientChannel::Create(
63           std::move(channel_handle))} {
64   Initialize();
65 }
66 
BufferHubQueue(const std::string & endpoint_path)67 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
68     : Client{
69           pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
70   Initialize();
71 }
72 
Initialize()73 void BufferHubQueue::Initialize() {
74   int ret = epoll_fd_.Create();
75   if (ret < 0) {
76     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
77           strerror(-ret));
78     return;
79   }
80 
81   epoll_event event = {
82       .events = EPOLLIN | EPOLLET,
83       .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
84   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
85   if (ret < 0) {
86     ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
87           strerror(-ret));
88   }
89 }
90 
ImportQueue()91 Status<void> BufferHubQueue::ImportQueue() {
92   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
93   if (!status) {
94     ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
95           status.GetErrorMessage().c_str());
96     return ErrorStatus(status.error());
97   } else {
98     SetupQueue(status.get());
99     return {};
100   }
101 }
102 
SetupQueue(const QueueInfo & queue_info)103 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
104   is_async_ = queue_info.producer_config.is_async;
105   default_width_ = queue_info.producer_config.default_width;
106   default_height_ = queue_info.producer_config.default_height;
107   default_format_ = queue_info.producer_config.default_format;
108   user_metadata_size_ = queue_info.producer_config.user_metadata_size;
109   id_ = queue_info.id;
110 }
111 
CreateConsumerQueue()112 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
113   if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
114     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
115   else
116     return nullptr;
117 }
118 
CreateSilentConsumerQueue()119 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
120   if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
121     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
122   else
123     return nullptr;
124 }
125 
CreateConsumerQueueHandle(bool silent)126 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
127     bool silent) {
128   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
129   if (!status) {
130     ALOGE(
131         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
132         "%s",
133         status.GetErrorMessage().c_str());
134     return ErrorStatus(status.error());
135   }
136 
137   return status;
138 }
139 
WaitForBuffers(int timeout)140 bool BufferHubQueue::WaitForBuffers(int timeout) {
141   ATRACE_NAME("BufferHubQueue::WaitForBuffers");
142   std::array<epoll_event, kMaxEvents> events;
143 
144   // Loop at least once to check for hangups.
145   do {
146     ALOGD_IF(
147         TRACE,
148         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
149         id(), count(), capacity());
150 
151     // If there is already a buffer then just check for hangup without waiting.
152     const int ret = epoll_fd_.Wait(events.data(), events.size(),
153                                    count() == 0 ? timeout : 0);
154 
155     if (ret == 0) {
156       ALOGI_IF(TRACE,
157                "BufferHubQueue::WaitForBuffers: No events before timeout: "
158                "queue_id=%d",
159                id());
160       return count() != 0;
161     }
162 
163     if (ret < 0 && ret != -EINTR) {
164       ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
165             strerror(-ret));
166       return false;
167     }
168 
169     const int num_events = ret;
170 
171     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
172     // one for each buffer in the queue, and one extra event for the queue
173     // client itself.
174     for (int i = 0; i < num_events; i++) {
175       int32_t event_fd;
176       int32_t index;
177       std::tie(event_fd, index) = Unstuff(events[i].data.u64);
178 
179       PDX_TRACE_FORMAT(
180           "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
181           "slot=%d|",
182           id(), num_events, i, event_fd, index);
183 
184       ALOGD_IF(TRACE,
185                "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
186                i, event_fd, index);
187 
188       if (is_buffer_event_index(index)) {
189         HandleBufferEvent(static_cast<size_t>(index), event_fd,
190                           events[i].events);
191       } else if (is_queue_event_index(index)) {
192         HandleQueueEvent(events[i].events);
193       } else {
194         ALOGW(
195             "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
196             "index=%d",
197             event_fd, index);
198       }
199     }
200   } while (count() == 0 && capacity() > 0 && !hung_up());
201 
202   return count() != 0;
203 }
204 
HandleBufferEvent(size_t slot,int event_fd,int poll_events)205 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
206                                                int poll_events) {
207   ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
208   if (!buffers_[slot]) {
209     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
210     return ErrorStatus(ENOENT);
211   }
212 
213   auto status = buffers_[slot]->GetEventMask(poll_events);
214   if (!status) {
215     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
216           status.GetErrorMessage().c_str());
217     return status.error_status();
218   }
219 
220   const int events = status.get();
221   PDX_TRACE_FORMAT(
222       "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
223       "events=%d|",
224       id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
225 
226   if (events & EPOLLIN) {
227     return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
228   } else if (events & EPOLLHUP) {
229     ALOGW(
230         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
231         "event_fd=%d buffer_id=%d",
232         slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
233     return RemoveBuffer(slot);
234   } else {
235     ALOGW(
236         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
237         "events=%d",
238         slot, events);
239   }
240 
241   return {};
242 }
243 
HandleQueueEvent(int poll_event)244 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
245   ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
246   auto status = GetEventMask(poll_event);
247   if (!status) {
248     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
249           status.GetErrorMessage().c_str());
250     return status.error_status();
251   }
252 
253   const int events = status.get();
254   if (events & EPOLLIN) {
255     // Note that after buffer imports, if |count()| still returns 0, epoll
256     // wait will be tried again to acquire the newly imported buffer.
257     auto buffer_status = OnBufferAllocated();
258     if (!buffer_status) {
259       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
260             buffer_status.GetErrorMessage().c_str());
261     }
262   } else if (events & EPOLLHUP) {
263     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
264     hung_up_ = true;
265   } else {
266     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
267   }
268 
269   return {};
270 }
271 
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buffer,size_t slot)272 Status<void> BufferHubQueue::AddBuffer(
273     const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
274   ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
275            buffer->id(), slot);
276 
277   if (is_full()) {
278     ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
279           capacity_);
280     return ErrorStatus(E2BIG);
281   }
282 
283   if (buffers_[slot]) {
284     // Replace the buffer if the slot is occupied. This could happen when the
285     // producer side replaced the slot with a newly allocated buffer. Remove the
286     // buffer before setting up with the new one.
287     auto remove_status = RemoveBuffer(slot);
288     if (!remove_status)
289       return remove_status.error_status();
290   }
291 
292   for (const auto& event_source : buffer->GetEventSources()) {
293     epoll_event event = {.events = event_source.event_mask | EPOLLET,
294                          .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
295     const int ret =
296         epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
297     if (ret < 0) {
298       ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
299             strerror(-ret));
300       return ErrorStatus(-ret);
301     }
302   }
303 
304   buffers_[slot] = buffer;
305   capacity_++;
306   return {};
307 }
308 
RemoveBuffer(size_t slot)309 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
310   ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
311 
312   if (buffers_[slot]) {
313     for (const auto& event_source : buffers_[slot]->GetEventSources()) {
314       const int ret =
315           epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
316       if (ret < 0) {
317         ALOGE(
318             "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
319             "set: %s",
320             strerror(-ret));
321         return ErrorStatus(-ret);
322       }
323     }
324 
325     // Trigger OnBufferRemoved callback if registered.
326     if (on_buffer_removed_)
327       on_buffer_removed_(buffers_[slot]);
328 
329     buffers_[slot] = nullptr;
330     capacity_--;
331   }
332 
333   return {};
334 }
335 
Enqueue(Entry entry)336 Status<void> BufferHubQueue::Enqueue(Entry entry) {
337   if (!is_full()) {
338     available_buffers_.push(std::move(entry));
339 
340     // Trigger OnBufferAvailable callback if registered.
341     if (on_buffer_available_)
342       on_buffer_available_();
343 
344     return {};
345   } else {
346     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
347     return ErrorStatus(E2BIG);
348   }
349 }
350 
Dequeue(int timeout,size_t * slot)351 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
352                                                                  size_t* slot) {
353   ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
354            timeout);
355 
356   PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
357 
358   if (count() == 0) {
359     if (!WaitForBuffers(timeout))
360       return ErrorStatus(ETIMEDOUT);
361   }
362 
363   auto& entry = available_buffers_.top();
364   PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
365                    entry.slot);
366 
367   std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
368   *slot = entry.slot;
369 
370   available_buffers_.pop();
371 
372   return {std::move(buffer)};
373 }
374 
SetBufferAvailableCallback(BufferAvailableCallback callback)375 void BufferHubQueue::SetBufferAvailableCallback(
376     BufferAvailableCallback callback) {
377   on_buffer_available_ = callback;
378 }
379 
SetBufferRemovedCallback(BufferRemovedCallback callback)380 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
381   on_buffer_removed_ = callback;
382 }
383 
FreeAllBuffers()384 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
385   // Clear all available buffers.
386   while (!available_buffers_.empty())
387     available_buffers_.pop();
388 
389   pdx::Status<void> last_error;  // No error.
390   // Clear all buffers this producer queue is tracking.
391   for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
392     if (buffers_[slot] != nullptr) {
393       auto status = RemoveBuffer(slot);
394       if (!status) {
395         ALOGE(
396             "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
397             "slot=%zu.",
398             slot);
399         last_error = status.error_status();
400       }
401     }
402   }
403 
404   return last_error;
405 }
406 
ProducerQueue(LocalChannelHandle handle)407 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
408     : BASE(std::move(handle)) {
409   auto status = ImportQueue();
410   if (!status) {
411     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
412           status.GetErrorMessage().c_str());
413     Close(-status.error());
414   }
415 }
416 
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)417 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
418                              const UsagePolicy& usage)
419     : BASE(BufferHubRPC::kClientPath) {
420   auto status =
421       InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
422   if (!status) {
423     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
424           status.GetErrorMessage().c_str());
425     Close(-status.error());
426     return;
427   }
428 
429   SetupQueue(status.get());
430 }
431 
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)432 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
433     uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
434     uint64_t usage, size_t buffer_count) {
435   if (capacity() + buffer_count > kMaxQueueCapacity) {
436     ALOGE(
437         "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
438         "allocate %zu more buffer(s).",
439         capacity(), buffer_count);
440     return ErrorStatus(E2BIG);
441   }
442 
443   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
444       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
445           width, height, layer_count, format, usage, buffer_count);
446   if (!status) {
447     ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
448           status.GetErrorMessage().c_str());
449     return status.error_status();
450   }
451 
452   auto buffer_handle_slots = status.take();
453   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
454                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
455                       "return %zu buffer handle(s), but returned %zu instead.",
456                       buffer_count, buffer_handle_slots.size());
457 
458   std::vector<size_t> buffer_slots;
459   buffer_slots.reserve(buffer_count);
460 
461   // Bookkeeping for each buffer.
462   for (auto& hs : buffer_handle_slots) {
463     auto& buffer_handle = hs.first;
464     size_t buffer_slot = hs.second;
465 
466     // Note that import might (though very unlikely) fail. If so, buffer_handle
467     // will be closed and included in returned buffer_slots.
468     if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
469                   buffer_slot)) {
470       ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
471                buffer_slot);
472       buffer_slots.push_back(buffer_slot);
473     }
474   }
475 
476   if (buffer_slots.size() == 0) {
477     // Error out if no buffer is allocated and improted.
478     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
479     ErrorStatus(ENOMEM);
480   }
481 
482   return {std::move(buffer_slots)};
483 }
484 
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)485 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
486                                              uint32_t layer_count,
487                                              uint32_t format, uint64_t usage) {
488   // We only allocate one buffer at a time.
489   constexpr size_t buffer_count = 1;
490   auto status =
491       AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
492   if (!status) {
493     ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
494           status.GetErrorMessage().c_str());
495     return status.error_status();
496   }
497 
498   if (status.get().size() == 0) {
499     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
500     ErrorStatus(ENOMEM);
501   }
502 
503   return {status.get()[0]};
504 }
505 
AddBuffer(const std::shared_ptr<BufferProducer> & buffer,size_t slot)506 Status<void> ProducerQueue::AddBuffer(
507     const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
508   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
509            id(), buffer->id(), slot);
510   // For producer buffer, we need to enqueue the newly added buffer
511   // immediately. Producer queue starts with all buffers in available state.
512   auto status = BufferHubQueue::AddBuffer(buffer, slot);
513   if (!status)
514     return status;
515 
516   return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
517 }
518 
RemoveBuffer(size_t slot)519 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
520   auto status =
521       InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
522   if (!status) {
523     ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
524           status.GetErrorMessage().c_str());
525     return status.error_status();
526   }
527 
528   return BufferHubQueue::RemoveBuffer(slot);
529 }
530 
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)531 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
532     int timeout, size_t* slot, LocalHandle* release_fence) {
533   DvrNativeBufferMetadata canonical_meta;
534   return Dequeue(timeout, slot, &canonical_meta, release_fence);
535 }
536 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence)537 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
538     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
539     pdx::LocalHandle* release_fence) {
540   ATRACE_NAME("ProducerQueue::Dequeue");
541   if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
542     ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
543     return ErrorStatus(EINVAL);
544   }
545 
546   auto status = BufferHubQueue::Dequeue(timeout, slot);
547   if (!status)
548     return status.error_status();
549 
550   auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
551   const int ret = buffer->GainAsync(out_meta, release_fence);
552   if (ret < 0 && ret != -EALREADY)
553     return ErrorStatus(-ret);
554 
555   return {std::move(buffer)};
556 }
557 
ConsumerQueue(LocalChannelHandle handle)558 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
559     : BufferHubQueue(std::move(handle)) {
560   auto status = ImportQueue();
561   if (!status) {
562     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
563           status.GetErrorMessage().c_str());
564     Close(-status.error());
565   }
566 
567   auto import_status = ImportBuffers();
568   if (import_status) {
569     ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
570           import_status.get());
571   } else {
572     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
573           import_status.GetErrorMessage().c_str());
574   }
575 }
576 
ImportBuffers()577 Status<size_t> ConsumerQueue::ImportBuffers() {
578   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
579   if (!status) {
580     if (status.error() == EBADR) {
581       ALOGI(
582           "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
583           "imported.");
584       return {0};
585     } else {
586       ALOGE(
587           "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
588           status.GetErrorMessage().c_str());
589       return status.error_status();
590     }
591   }
592 
593   int ret;
594   Status<void> last_error;
595   size_t imported_buffers_count = 0;
596 
597   auto buffer_handle_slots = status.take();
598   for (auto& buffer_handle_slot : buffer_handle_slots) {
599     ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
600              buffer_handle_slot.first.value());
601 
602     std::unique_ptr<BufferConsumer> buffer_consumer =
603         BufferConsumer::Import(std::move(buffer_handle_slot.first));
604     if (!buffer_consumer) {
605       ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
606             buffer_handle_slot.second);
607       last_error = ErrorStatus(EPIPE);
608       continue;
609     }
610 
611     auto add_status =
612         AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
613     if (!add_status) {
614       ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
615             add_status.GetErrorMessage().c_str());
616       last_error = add_status;
617     } else {
618       imported_buffers_count++;
619     }
620   }
621 
622   if (imported_buffers_count > 0)
623     return {imported_buffers_count};
624   else
625     return last_error.error_status();
626 }
627 
AddBuffer(const std::shared_ptr<BufferConsumer> & buffer,size_t slot)628 Status<void> ConsumerQueue::AddBuffer(
629     const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
630   ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
631            id(), buffer->id(), slot);
632   auto status = BufferHubQueue::AddBuffer(buffer, slot);
633   if (!status)
634     return status;
635 
636   // Check to see if the buffer is already signaled. This is necessary to catch
637   // cases where buffers are already available; epoll edge triggered mode does
638   // not fire until an edge transition when adding new buffers to the epoll
639   // set. Note that we only poll the fd events because HandleBufferEvent() takes
640   // care of checking the translated buffer events.
641   auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
642   if (!poll_status && poll_status.error() != ETIMEDOUT) {
643     ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
644           poll_status.GetErrorMessage().c_str());
645     return poll_status.error_status();
646   }
647 
648   // Update accounting if the buffer is available.
649   if (poll_status)
650     return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
651   else
652     return {};
653 }
654 
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)655 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
656     int timeout, size_t* slot, void* meta, size_t user_metadata_size,
657     LocalHandle* acquire_fence) {
658   if (user_metadata_size != user_metadata_size_) {
659     ALOGE(
660         "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
661         "does not match metadata size (%zu) for the queue.",
662         user_metadata_size, user_metadata_size_);
663     return ErrorStatus(EINVAL);
664   }
665 
666   DvrNativeBufferMetadata canonical_meta;
667   auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
668   if (!status)
669     return status.error_status();
670 
671   if (meta && user_metadata_size) {
672     void* metadata_src =
673         reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
674     if (metadata_src) {
675       memcpy(meta, metadata_src, user_metadata_size);
676     } else {
677       ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
678     }
679   }
680 
681   return status;
682 }
683 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)684 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
685     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
686     pdx::LocalHandle* acquire_fence) {
687   ATRACE_NAME("ConsumerQueue::Dequeue");
688   if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
689     ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
690     return ErrorStatus(EINVAL);
691   }
692 
693   auto status = BufferHubQueue::Dequeue(timeout, slot);
694   if (!status)
695     return status.error_status();
696 
697   auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
698   const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
699   if (ret < 0)
700     return ErrorStatus(-ret);
701 
702   return {std::move(buffer)};
703 }
704 
OnBufferAllocated()705 Status<void> ConsumerQueue::OnBufferAllocated() {
706   ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
707 
708   auto status = ImportBuffers();
709   if (!status) {
710     ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
711           status.GetErrorMessage().c_str());
712     return ErrorStatus(status.error());
713   } else if (status.get() == 0) {
714     ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
715     return ErrorStatus(ENOBUFS);
716   } else {
717     ALOGD_IF(TRACE,
718              "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
719              status.get());
720     return {};
721   }
722 }
723 
724 }  // namespace dvr
725 }  // namespace android
726