• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "include/private/dvr/buffer_hub_queue_client.h"
2 
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7 
8 #include <array>
9 
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <private/dvr/bufferhub_rpc.h>
14 
15 #define RETRY_EINTR(fnc_call)                 \
16   ([&]() -> decltype(fnc_call) {              \
17     decltype(fnc_call) result;                \
18     do {                                      \
19       result = (fnc_call);                    \
20     } while (result == -1 && errno == EINTR); \
21     return result;                            \
22   })()
23 
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::Status;
27 
28 namespace android {
29 namespace dvr {
30 
BufferHubQueue(LocalChannelHandle channel_handle)31 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
32     : Client{pdx::default_transport::ClientChannel::Create(
33           std::move(channel_handle))},
34       meta_size_(0),
35       buffers_(BufferHubQueue::kMaxQueueCapacity),
36       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
37       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
38       fences_(BufferHubQueue::kMaxQueueCapacity),
39       capacity_(0),
40       id_(-1) {
41   Initialize();
42 }
43 
BufferHubQueue(const std::string & endpoint_path)44 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
45     : Client{pdx::default_transport::ClientChannelFactory::Create(
46           endpoint_path)},
47       meta_size_(0),
48       buffers_(BufferHubQueue::kMaxQueueCapacity),
49       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
50       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
51       fences_(BufferHubQueue::kMaxQueueCapacity),
52       capacity_(0),
53       id_(-1) {
54   Initialize();
55 }
56 
Initialize()57 void BufferHubQueue::Initialize() {
58   int ret = epoll_fd_.Create();
59   if (ret < 0) {
60     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
61           strerror(-ret));
62     return;
63   }
64 
65   epoll_event event = {.events = EPOLLIN | EPOLLET,
66                        .data = {.u64 = static_cast<uint64_t>(
67                                     BufferHubQueue::kEpollQueueEventIndex)}};
68   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
69   if (ret < 0) {
70     ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
71           strerror(-ret));
72   }
73 }
74 
ImportQueue()75 Status<void> BufferHubQueue::ImportQueue() {
76   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
77   if (!status) {
78     ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
79           status.GetErrorMessage().c_str());
80     return ErrorStatus(status.error());
81   } else {
82     SetupQueue(status.get().meta_size_bytes, status.get().id);
83     return {};
84   }
85 }
86 
SetupQueue(size_t meta_size_bytes,int id)87 void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
88   meta_size_ = meta_size_bytes;
89   id_ = id;
90   meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
91 }
92 
CreateConsumerQueue()93 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
94   if (auto status = CreateConsumerQueueHandle())
95     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
96   else
97     return nullptr;
98 }
99 
CreateSilentConsumerQueue()100 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
101   if (auto status = CreateConsumerQueueHandle())
102     return std::unique_ptr<ConsumerQueue>(
103         new ConsumerQueue(status.take(), true));
104   else
105     return nullptr;
106 }
107 
CreateConsumerQueueHandle()108 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
109   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
110   if (!status) {
111     ALOGE(
112         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
113         "%s",
114         status.GetErrorMessage().c_str());
115     return ErrorStatus(status.error());
116   }
117 
118   return status;
119 }
120 
WaitForBuffers(int timeout)121 bool BufferHubQueue::WaitForBuffers(int timeout) {
122   std::array<epoll_event, kMaxEvents> events;
123 
124   // Loop at least once to check for hangups.
125   do {
126     ALOGD_IF(
127         TRACE,
128         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
129         id(), count(), capacity());
130 
131     // If there is already a buffer then just check for hangup without waiting.
132     const int ret = epoll_fd_.Wait(events.data(), events.size(),
133                                    count() == 0 ? timeout : 0);
134 
135     if (ret == 0) {
136       ALOGI_IF(TRACE,
137                "BufferHubQueue::WaitForBuffers: No events before timeout: "
138                "queue_id=%d",
139                id());
140       return count() != 0;
141     }
142 
143     if (ret < 0 && ret != -EINTR) {
144       ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
145             strerror(-ret));
146       return false;
147     }
148 
149     const int num_events = ret;
150 
151     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
152     // one for each buffer, in the queue and one extra event for the queue
153     // client itself.
154     for (int i = 0; i < num_events; i++) {
155       int64_t index = static_cast<int64_t>(events[i].data.u64);
156 
157       ALOGD_IF(TRACE,
158                "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
159                index);
160 
161       if (is_buffer_event_index(index)) {
162         HandleBufferEvent(static_cast<size_t>(index), events[i].events);
163       } else if (is_queue_event_index(index)) {
164         HandleQueueEvent(events[i].events);
165       } else {
166         ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
167               index);
168       }
169     }
170   } while (count() == 0 && capacity() > 0 && !hung_up());
171 
172   return count() != 0;
173 }
174 
HandleBufferEvent(size_t slot,int poll_events)175 void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
176   auto buffer = buffers_[slot];
177   if (!buffer) {
178     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
179     return;
180   }
181 
182   auto status = buffer->GetEventMask(poll_events);
183   if (!status) {
184     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
185           status.GetErrorMessage().c_str());
186     return;
187   }
188 
189   const int events = status.get();
190   if (events & EPOLLIN) {
191     const int ret = OnBufferReady(buffer, &fences_[slot]);
192     if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
193       // Only enqueue the buffer if it moves to or is already in the state
194       // requested in OnBufferReady(). If the buffer is busy this means that the
195       // buffer moved from released to posted when a new consumer was created
196       // before the ProducerQueue had a chance to regain it. This is a valid
197       // transition that we have to handle because edge triggered poll events
198       // latch the ready state even if it is later de-asserted -- don't enqueue
199       // or print an error log in this case.
200       if (ret != -EBUSY)
201         Enqueue(buffer, slot);
202     } else {
203       ALOGE(
204           "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
205           "queue_id=%d buffer_id=%d: %s",
206           id(), buffer->id(), strerror(-ret));
207     }
208   } else if (events & EPOLLHUP) {
209     // This might be caused by producer replacing an existing buffer slot, or
210     // when BufferHubQueue is shutting down. For the first case, currently the
211     // epoll FD is cleaned up when the replacement consumer client is imported,
212     // we shouldn't detach again if |epollhub_pending_[slot]| is set.
213     ALOGW(
214         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
215         "buffer event fd: %d, EPOLLHUP pending: %d",
216         slot, buffer->event_fd(), int{epollhup_pending_[slot]});
217     if (epollhup_pending_[slot]) {
218       epollhup_pending_[slot] = false;
219     } else {
220       DetachBuffer(slot);
221     }
222   } else {
223     ALOGW(
224         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
225         "events=%d",
226         slot, events);
227   }
228 }
229 
HandleQueueEvent(int poll_event)230 void BufferHubQueue::HandleQueueEvent(int poll_event) {
231   auto status = GetEventMask(poll_event);
232   if (!status) {
233     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
234           status.GetErrorMessage().c_str());
235     return;
236   }
237 
238   const int events = status.get();
239   if (events & EPOLLIN) {
240     // Note that after buffer imports, if |count()| still returns 0, epoll
241     // wait will be tried again to acquire the newly imported buffer.
242     auto buffer_status = OnBufferAllocated();
243     if (!buffer_status) {
244       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
245             buffer_status.GetErrorMessage().c_str());
246     }
247   } else if (events & EPOLLHUP) {
248     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
249     hung_up_ = true;
250   } else {
251     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
252   }
253 }
254 
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buf,size_t slot)255 int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
256                               size_t slot) {
257   if (is_full()) {
258     // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
259     // import buffer.
260     ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
261           capacity_);
262     return -E2BIG;
263   }
264 
265   if (buffers_[slot] != nullptr) {
266     // Replace the buffer if the slot is preoccupied. This could happen when the
267     // producer side replaced the slot with a newly allocated buffer. Detach the
268     // buffer before setting up with the new one.
269     DetachBuffer(slot);
270     epollhup_pending_[slot] = true;
271   }
272 
273   epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
274   const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
275   if (ret < 0) {
276     ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
277           strerror(-ret));
278     return ret;
279   }
280 
281   buffers_[slot] = buf;
282   capacity_++;
283   return 0;
284 }
285 
DetachBuffer(size_t slot)286 int BufferHubQueue::DetachBuffer(size_t slot) {
287   auto& buf = buffers_[slot];
288   if (buf == nullptr) {
289     ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
290     return -EINVAL;
291   }
292 
293   const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
294   if (ret < 0) {
295     ALOGE(
296         "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
297         "%s",
298         strerror(-ret));
299     return ret;
300   }
301 
302   buffers_[slot] = nullptr;
303   capacity_--;
304   return 0;
305 }
306 
Enqueue(const std::shared_ptr<BufferHubBuffer> & buf,size_t slot)307 void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
308                              size_t slot) {
309   if (count() == capacity_) {
310     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
311     return;
312   }
313 
314   // Set slot buffer back to vector.
315   // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
316   // the limitation of the RingBuffer we are using. Would be better to refactor
317   // that.
318   BufferInfo buffer_info(slot, meta_size_);
319   buffer_info.buffer = buf;
320   // Swap metadata loaded during onBufferReady into vector.
321   std::swap(buffer_info.metadata, meta_buffer_tmp_);
322 
323   available_buffers_.Append(std::move(buffer_info));
324 }
325 
Dequeue(int timeout,size_t * slot,void * meta,LocalHandle * fence)326 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
327     int timeout, size_t* slot, void* meta, LocalHandle* fence) {
328   ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
329 
330   if (!WaitForBuffers(timeout))
331     return ErrorStatus(ETIMEDOUT);
332 
333   std::shared_ptr<BufferHubBuffer> buf;
334   BufferInfo& buffer_info = available_buffers_.Front();
335 
336   *fence = std::move(fences_[buffer_info.slot]);
337 
338   // Report current pos as the output slot.
339   std::swap(buffer_info.slot, *slot);
340   // Swap buffer from vector to be returned later.
341   std::swap(buffer_info.buffer, buf);
342   // Swap metadata from vector into tmp so that we can write out to |meta|.
343   std::swap(buffer_info.metadata, meta_buffer_tmp_);
344 
345   available_buffers_.PopFront();
346 
347   if (!buf) {
348     ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
349     return ErrorStatus(ENOBUFS);
350   }
351 
352   if (meta) {
353     std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
354               reinterpret_cast<uint8_t*>(meta));
355   }
356 
357   return {std::move(buf)};
358 }
359 
ProducerQueue(size_t meta_size)360 ProducerQueue::ProducerQueue(size_t meta_size)
361     : ProducerQueue(meta_size, 0, 0, 0, 0) {}
362 
ProducerQueue(LocalChannelHandle handle)363 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
364     : BASE(std::move(handle)) {
365   auto status = ImportQueue();
366   if (!status) {
367     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
368           status.GetErrorMessage().c_str());
369     Close(-status.error());
370   }
371 }
372 
ProducerQueue(size_t meta_size,uint64_t usage_set_mask,uint64_t usage_clear_mask,uint64_t usage_deny_set_mask,uint64_t usage_deny_clear_mask)373 ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
374                              uint64_t usage_clear_mask,
375                              uint64_t usage_deny_set_mask,
376                              uint64_t usage_deny_clear_mask)
377     : BASE(BufferHubRPC::kClientPath) {
378   auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
379       meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
380                              usage_deny_set_mask, usage_deny_clear_mask});
381   if (!status) {
382     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
383           status.GetErrorMessage().c_str());
384     Close(-status.error());
385     return;
386   }
387 
388   SetupQueue(status.get().meta_size_bytes, status.get().id);
389 }
390 
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t * out_slot)391 int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
392                                   uint32_t layer_count, uint32_t format,
393                                   uint64_t usage, size_t* out_slot) {
394   if (out_slot == nullptr) {
395     ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
396     return -EINVAL;
397   }
398 
399   if (is_full()) {
400     ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
401           capacity());
402     return -E2BIG;
403   }
404 
405   const size_t kBufferCount = 1U;
406   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
407       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
408           width, height, layer_count, format, usage, kBufferCount);
409   if (!status) {
410     ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
411           status.GetErrorMessage().c_str());
412     return -status.error();
413   }
414 
415   auto buffer_handle_slots = status.take();
416   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
417                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
418                       "return one and only one buffer handle.");
419 
420   // We only allocate one buffer at a time.
421   auto& buffer_handle = buffer_handle_slots[0].first;
422   size_t buffer_slot = buffer_handle_slots[0].second;
423   ALOGD_IF(TRACE,
424            "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
425            buffer_handle.value());
426 
427   *out_slot = buffer_slot;
428   return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
429                    buffer_slot);
430 }
431 
AddBuffer(const std::shared_ptr<BufferProducer> & buf,size_t slot)432 int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
433                              size_t slot) {
434   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
435            id(), buf->id(), slot);
436   // For producer buffer, we need to enqueue the newly added buffer
437   // immediately. Producer queue starts with all buffers in available state.
438   const int ret = BufferHubQueue::AddBuffer(buf, slot);
439   if (ret < 0)
440     return ret;
441 
442   Enqueue(buf, slot);
443   return 0;
444 }
445 
DetachBuffer(size_t slot)446 int ProducerQueue::DetachBuffer(size_t slot) {
447   auto status =
448       InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
449   if (!status) {
450     ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
451           status.GetErrorMessage().c_str());
452     return -status.error();
453   }
454 
455   return BufferHubQueue::DetachBuffer(slot);
456 }
457 
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)458 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
459     int timeout, size_t* slot, LocalHandle* release_fence) {
460   if (slot == nullptr || release_fence == nullptr) {
461     ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
462           slot, release_fence);
463     return ErrorStatus(EINVAL);
464   }
465 
466   auto buffer_status =
467       BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
468   if (!buffer_status)
469     return buffer_status.error_status();
470 
471   return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
472 }
473 
OnBufferReady(const std::shared_ptr<BufferHubBuffer> & buf,LocalHandle * release_fence)474 int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
475                                  LocalHandle* release_fence) {
476   ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
477            id(), buf->id());
478   auto buffer = std::static_pointer_cast<BufferProducer>(buf);
479   return buffer->Gain(release_fence);
480 }
481 
ConsumerQueue(LocalChannelHandle handle,bool ignore_on_import)482 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
483     : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
484   auto status = ImportQueue();
485   if (!status) {
486     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
487           status.GetErrorMessage().c_str());
488     Close(-status.error());
489   }
490 
491   auto import_status = ImportBuffers();
492   if (import_status) {
493     ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
494           import_status.get());
495   } else {
496     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
497           import_status.GetErrorMessage().c_str());
498   }
499 }
500 
ImportBuffers()501 Status<size_t> ConsumerQueue::ImportBuffers() {
502   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
503   if (!status) {
504     ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
505           status.GetErrorMessage().c_str());
506     return ErrorStatus(status.error());
507   }
508 
509   int ret;
510   int last_error = 0;
511   int imported_buffers = 0;
512 
513   auto buffer_handle_slots = status.take();
514   for (auto& buffer_handle_slot : buffer_handle_slots) {
515     ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
516              buffer_handle_slot.first.value());
517 
518     std::unique_ptr<BufferConsumer> buffer_consumer =
519         BufferConsumer::Import(std::move(buffer_handle_slot.first));
520 
521     // Setup ignore state before adding buffer to the queue.
522     if (ignore_on_import_) {
523       ALOGD_IF(TRACE,
524                "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
525                "buffer_id=%d",
526                buffer_consumer->id());
527       ret = buffer_consumer->SetIgnore(true);
528       if (ret < 0) {
529         ALOGE(
530             "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
531             "imported buffer buffer_id=%d: %s",
532             buffer_consumer->id(), strerror(-ret));
533         last_error = ret;
534       }
535     }
536 
537     ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
538     if (ret < 0) {
539       ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
540             strerror(-ret));
541       last_error = ret;
542       continue;
543     } else {
544       imported_buffers++;
545     }
546   }
547 
548   if (imported_buffers > 0)
549     return {imported_buffers};
550   else
551     return ErrorStatus(-last_error);
552 }
553 
AddBuffer(const std::shared_ptr<BufferConsumer> & buf,size_t slot)554 int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
555                              size_t slot) {
556   ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
557            id(), buf->id(), slot);
558   const int ret = BufferHubQueue::AddBuffer(buf, slot);
559   if (ret < 0)
560     return ret;
561 
562   // Check to see if the buffer is already signaled. This is necessary to catch
563   // cases where buffers are already available; epoll edge triggered mode does
564   // not fire until and edge transition when adding new buffers to the epoll
565   // set.
566   const int kTimeoutMs = 0;
567   pollfd pfd{buf->event_fd(), POLLIN, 0};
568   const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
569   if (count < 0) {
570     const int error = errno;
571     ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
572           strerror(errno));
573     return -error;
574   }
575 
576   if (count == 1)
577     HandleBufferEvent(slot, pfd.revents);
578 
579   return 0;
580 }
581 
Dequeue(int timeout,size_t * slot,void * meta,size_t meta_size,LocalHandle * acquire_fence)582 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
583     int timeout, size_t* slot, void* meta, size_t meta_size,
584     LocalHandle* acquire_fence) {
585   if (meta_size != meta_size_) {
586     ALOGE(
587         "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
588         "does not match metadata size (%zu) for the queue.",
589         meta_size, meta_size_);
590     return ErrorStatus(EINVAL);
591   }
592 
593   if (slot == nullptr || acquire_fence == nullptr) {
594     ALOGE(
595         "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
596         "acquire_fence=%p",
597         slot, meta, acquire_fence);
598     return ErrorStatus(EINVAL);
599   }
600 
601   auto buffer_status =
602       BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
603   if (!buffer_status)
604     return buffer_status.error_status();
605 
606   return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
607 }
608 
OnBufferReady(const std::shared_ptr<BufferHubBuffer> & buf,LocalHandle * acquire_fence)609 int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
610                                  LocalHandle* acquire_fence) {
611   ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
612            id(), buf->id());
613   auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
614   return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
615 }
616 
OnBufferAllocated()617 Status<void> ConsumerQueue::OnBufferAllocated() {
618   auto status = ImportBuffers();
619   if (!status) {
620     ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
621           status.GetErrorMessage().c_str());
622     return ErrorStatus(status.error());
623   } else if (status.get() == 0) {
624     ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
625     return ErrorStatus(ENOBUFS);
626   } else {
627     ALOGD_IF(TRACE,
628              "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
629              status.get());
630     return {};
631   }
632 }
633 
634 }  // namespace dvr
635 }  // namespace android
636