1 #include "include/private/dvr/buffer_hub_queue_client.h"
2
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7
8 #include <array>
9
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <private/dvr/bufferhub_rpc.h>
14
15 #define RETRY_EINTR(fnc_call) \
16 ([&]() -> decltype(fnc_call) { \
17 decltype(fnc_call) result; \
18 do { \
19 result = (fnc_call); \
20 } while (result == -1 && errno == EINTR); \
21 return result; \
22 })()
23
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::Status;
27
28 namespace android {
29 namespace dvr {
30
BufferHubQueue(LocalChannelHandle channel_handle)31 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
32 : Client{pdx::default_transport::ClientChannel::Create(
33 std::move(channel_handle))},
34 meta_size_(0),
35 buffers_(BufferHubQueue::kMaxQueueCapacity),
36 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
37 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
38 fences_(BufferHubQueue::kMaxQueueCapacity),
39 capacity_(0),
40 id_(-1) {
41 Initialize();
42 }
43
BufferHubQueue(const std::string & endpoint_path)44 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
45 : Client{pdx::default_transport::ClientChannelFactory::Create(
46 endpoint_path)},
47 meta_size_(0),
48 buffers_(BufferHubQueue::kMaxQueueCapacity),
49 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
50 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
51 fences_(BufferHubQueue::kMaxQueueCapacity),
52 capacity_(0),
53 id_(-1) {
54 Initialize();
55 }
56
Initialize()57 void BufferHubQueue::Initialize() {
58 int ret = epoll_fd_.Create();
59 if (ret < 0) {
60 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
61 strerror(-ret));
62 return;
63 }
64
65 epoll_event event = {.events = EPOLLIN | EPOLLET,
66 .data = {.u64 = static_cast<uint64_t>(
67 BufferHubQueue::kEpollQueueEventIndex)}};
68 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
69 if (ret < 0) {
70 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
71 strerror(-ret));
72 }
73 }
74
ImportQueue()75 Status<void> BufferHubQueue::ImportQueue() {
76 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
77 if (!status) {
78 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
79 status.GetErrorMessage().c_str());
80 return ErrorStatus(status.error());
81 } else {
82 SetupQueue(status.get().meta_size_bytes, status.get().id);
83 return {};
84 }
85 }
86
SetupQueue(size_t meta_size_bytes,int id)87 void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
88 meta_size_ = meta_size_bytes;
89 id_ = id;
90 meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
91 }
92
CreateConsumerQueue()93 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
94 if (auto status = CreateConsumerQueueHandle())
95 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
96 else
97 return nullptr;
98 }
99
CreateSilentConsumerQueue()100 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
101 if (auto status = CreateConsumerQueueHandle())
102 return std::unique_ptr<ConsumerQueue>(
103 new ConsumerQueue(status.take(), true));
104 else
105 return nullptr;
106 }
107
CreateConsumerQueueHandle()108 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
109 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
110 if (!status) {
111 ALOGE(
112 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
113 "%s",
114 status.GetErrorMessage().c_str());
115 return ErrorStatus(status.error());
116 }
117
118 return status;
119 }
120
WaitForBuffers(int timeout)121 bool BufferHubQueue::WaitForBuffers(int timeout) {
122 std::array<epoll_event, kMaxEvents> events;
123
124 // Loop at least once to check for hangups.
125 do {
126 ALOGD_IF(
127 TRACE,
128 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
129 id(), count(), capacity());
130
131 // If there is already a buffer then just check for hangup without waiting.
132 const int ret = epoll_fd_.Wait(events.data(), events.size(),
133 count() == 0 ? timeout : 0);
134
135 if (ret == 0) {
136 ALOGI_IF(TRACE,
137 "BufferHubQueue::WaitForBuffers: No events before timeout: "
138 "queue_id=%d",
139 id());
140 return count() != 0;
141 }
142
143 if (ret < 0 && ret != -EINTR) {
144 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
145 strerror(-ret));
146 return false;
147 }
148
149 const int num_events = ret;
150
151 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
152 // one for each buffer, in the queue and one extra event for the queue
153 // client itself.
154 for (int i = 0; i < num_events; i++) {
155 int64_t index = static_cast<int64_t>(events[i].data.u64);
156
157 ALOGD_IF(TRACE,
158 "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
159 index);
160
161 if (is_buffer_event_index(index)) {
162 HandleBufferEvent(static_cast<size_t>(index), events[i].events);
163 } else if (is_queue_event_index(index)) {
164 HandleQueueEvent(events[i].events);
165 } else {
166 ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
167 index);
168 }
169 }
170 } while (count() == 0 && capacity() > 0 && !hung_up());
171
172 return count() != 0;
173 }
174
HandleBufferEvent(size_t slot,int poll_events)175 void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
176 auto buffer = buffers_[slot];
177 if (!buffer) {
178 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
179 return;
180 }
181
182 auto status = buffer->GetEventMask(poll_events);
183 if (!status) {
184 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
185 status.GetErrorMessage().c_str());
186 return;
187 }
188
189 const int events = status.get();
190 if (events & EPOLLIN) {
191 const int ret = OnBufferReady(buffer, &fences_[slot]);
192 if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
193 // Only enqueue the buffer if it moves to or is already in the state
194 // requested in OnBufferReady(). If the buffer is busy this means that the
195 // buffer moved from released to posted when a new consumer was created
196 // before the ProducerQueue had a chance to regain it. This is a valid
197 // transition that we have to handle because edge triggered poll events
198 // latch the ready state even if it is later de-asserted -- don't enqueue
199 // or print an error log in this case.
200 if (ret != -EBUSY)
201 Enqueue(buffer, slot);
202 } else {
203 ALOGE(
204 "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
205 "queue_id=%d buffer_id=%d: %s",
206 id(), buffer->id(), strerror(-ret));
207 }
208 } else if (events & EPOLLHUP) {
209 // This might be caused by producer replacing an existing buffer slot, or
210 // when BufferHubQueue is shutting down. For the first case, currently the
211 // epoll FD is cleaned up when the replacement consumer client is imported,
212 // we shouldn't detach again if |epollhub_pending_[slot]| is set.
213 ALOGW(
214 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
215 "buffer event fd: %d, EPOLLHUP pending: %d",
216 slot, buffer->event_fd(), int{epollhup_pending_[slot]});
217 if (epollhup_pending_[slot]) {
218 epollhup_pending_[slot] = false;
219 } else {
220 DetachBuffer(slot);
221 }
222 } else {
223 ALOGW(
224 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
225 "events=%d",
226 slot, events);
227 }
228 }
229
HandleQueueEvent(int poll_event)230 void BufferHubQueue::HandleQueueEvent(int poll_event) {
231 auto status = GetEventMask(poll_event);
232 if (!status) {
233 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
234 status.GetErrorMessage().c_str());
235 return;
236 }
237
238 const int events = status.get();
239 if (events & EPOLLIN) {
240 // Note that after buffer imports, if |count()| still returns 0, epoll
241 // wait will be tried again to acquire the newly imported buffer.
242 auto buffer_status = OnBufferAllocated();
243 if (!buffer_status) {
244 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
245 buffer_status.GetErrorMessage().c_str());
246 }
247 } else if (events & EPOLLHUP) {
248 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
249 hung_up_ = true;
250 } else {
251 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
252 }
253 }
254
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buf,size_t slot)255 int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
256 size_t slot) {
257 if (is_full()) {
258 // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
259 // import buffer.
260 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
261 capacity_);
262 return -E2BIG;
263 }
264
265 if (buffers_[slot] != nullptr) {
266 // Replace the buffer if the slot is preoccupied. This could happen when the
267 // producer side replaced the slot with a newly allocated buffer. Detach the
268 // buffer before setting up with the new one.
269 DetachBuffer(slot);
270 epollhup_pending_[slot] = true;
271 }
272
273 epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
274 const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
275 if (ret < 0) {
276 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
277 strerror(-ret));
278 return ret;
279 }
280
281 buffers_[slot] = buf;
282 capacity_++;
283 return 0;
284 }
285
DetachBuffer(size_t slot)286 int BufferHubQueue::DetachBuffer(size_t slot) {
287 auto& buf = buffers_[slot];
288 if (buf == nullptr) {
289 ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
290 return -EINVAL;
291 }
292
293 const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
294 if (ret < 0) {
295 ALOGE(
296 "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
297 "%s",
298 strerror(-ret));
299 return ret;
300 }
301
302 buffers_[slot] = nullptr;
303 capacity_--;
304 return 0;
305 }
306
Enqueue(const std::shared_ptr<BufferHubBuffer> & buf,size_t slot)307 void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
308 size_t slot) {
309 if (count() == capacity_) {
310 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
311 return;
312 }
313
314 // Set slot buffer back to vector.
315 // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
316 // the limitation of the RingBuffer we are using. Would be better to refactor
317 // that.
318 BufferInfo buffer_info(slot, meta_size_);
319 buffer_info.buffer = buf;
320 // Swap metadata loaded during onBufferReady into vector.
321 std::swap(buffer_info.metadata, meta_buffer_tmp_);
322
323 available_buffers_.Append(std::move(buffer_info));
324 }
325
Dequeue(int timeout,size_t * slot,void * meta,LocalHandle * fence)326 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
327 int timeout, size_t* slot, void* meta, LocalHandle* fence) {
328 ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
329
330 if (!WaitForBuffers(timeout))
331 return ErrorStatus(ETIMEDOUT);
332
333 std::shared_ptr<BufferHubBuffer> buf;
334 BufferInfo& buffer_info = available_buffers_.Front();
335
336 *fence = std::move(fences_[buffer_info.slot]);
337
338 // Report current pos as the output slot.
339 std::swap(buffer_info.slot, *slot);
340 // Swap buffer from vector to be returned later.
341 std::swap(buffer_info.buffer, buf);
342 // Swap metadata from vector into tmp so that we can write out to |meta|.
343 std::swap(buffer_info.metadata, meta_buffer_tmp_);
344
345 available_buffers_.PopFront();
346
347 if (!buf) {
348 ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
349 return ErrorStatus(ENOBUFS);
350 }
351
352 if (meta) {
353 std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
354 reinterpret_cast<uint8_t*>(meta));
355 }
356
357 return {std::move(buf)};
358 }
359
ProducerQueue(size_t meta_size)360 ProducerQueue::ProducerQueue(size_t meta_size)
361 : ProducerQueue(meta_size, 0, 0, 0, 0) {}
362
ProducerQueue(LocalChannelHandle handle)363 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
364 : BASE(std::move(handle)) {
365 auto status = ImportQueue();
366 if (!status) {
367 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
368 status.GetErrorMessage().c_str());
369 Close(-status.error());
370 }
371 }
372
ProducerQueue(size_t meta_size,uint64_t usage_set_mask,uint64_t usage_clear_mask,uint64_t usage_deny_set_mask,uint64_t usage_deny_clear_mask)373 ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
374 uint64_t usage_clear_mask,
375 uint64_t usage_deny_set_mask,
376 uint64_t usage_deny_clear_mask)
377 : BASE(BufferHubRPC::kClientPath) {
378 auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
379 meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
380 usage_deny_set_mask, usage_deny_clear_mask});
381 if (!status) {
382 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
383 status.GetErrorMessage().c_str());
384 Close(-status.error());
385 return;
386 }
387
388 SetupQueue(status.get().meta_size_bytes, status.get().id);
389 }
390
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t * out_slot)391 int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
392 uint32_t layer_count, uint32_t format,
393 uint64_t usage, size_t* out_slot) {
394 if (out_slot == nullptr) {
395 ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
396 return -EINVAL;
397 }
398
399 if (is_full()) {
400 ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
401 capacity());
402 return -E2BIG;
403 }
404
405 const size_t kBufferCount = 1U;
406 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
407 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
408 width, height, layer_count, format, usage, kBufferCount);
409 if (!status) {
410 ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
411 status.GetErrorMessage().c_str());
412 return -status.error();
413 }
414
415 auto buffer_handle_slots = status.take();
416 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
417 "BufferHubRPC::ProducerQueueAllocateBuffers should "
418 "return one and only one buffer handle.");
419
420 // We only allocate one buffer at a time.
421 auto& buffer_handle = buffer_handle_slots[0].first;
422 size_t buffer_slot = buffer_handle_slots[0].second;
423 ALOGD_IF(TRACE,
424 "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
425 buffer_handle.value());
426
427 *out_slot = buffer_slot;
428 return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
429 buffer_slot);
430 }
431
AddBuffer(const std::shared_ptr<BufferProducer> & buf,size_t slot)432 int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
433 size_t slot) {
434 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
435 id(), buf->id(), slot);
436 // For producer buffer, we need to enqueue the newly added buffer
437 // immediately. Producer queue starts with all buffers in available state.
438 const int ret = BufferHubQueue::AddBuffer(buf, slot);
439 if (ret < 0)
440 return ret;
441
442 Enqueue(buf, slot);
443 return 0;
444 }
445
DetachBuffer(size_t slot)446 int ProducerQueue::DetachBuffer(size_t slot) {
447 auto status =
448 InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
449 if (!status) {
450 ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
451 status.GetErrorMessage().c_str());
452 return -status.error();
453 }
454
455 return BufferHubQueue::DetachBuffer(slot);
456 }
457
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)458 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
459 int timeout, size_t* slot, LocalHandle* release_fence) {
460 if (slot == nullptr || release_fence == nullptr) {
461 ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
462 slot, release_fence);
463 return ErrorStatus(EINVAL);
464 }
465
466 auto buffer_status =
467 BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
468 if (!buffer_status)
469 return buffer_status.error_status();
470
471 return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
472 }
473
OnBufferReady(const std::shared_ptr<BufferHubBuffer> & buf,LocalHandle * release_fence)474 int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
475 LocalHandle* release_fence) {
476 ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
477 id(), buf->id());
478 auto buffer = std::static_pointer_cast<BufferProducer>(buf);
479 return buffer->Gain(release_fence);
480 }
481
ConsumerQueue(LocalChannelHandle handle,bool ignore_on_import)482 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
483 : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
484 auto status = ImportQueue();
485 if (!status) {
486 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
487 status.GetErrorMessage().c_str());
488 Close(-status.error());
489 }
490
491 auto import_status = ImportBuffers();
492 if (import_status) {
493 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
494 import_status.get());
495 } else {
496 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
497 import_status.GetErrorMessage().c_str());
498 }
499 }
500
ImportBuffers()501 Status<size_t> ConsumerQueue::ImportBuffers() {
502 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
503 if (!status) {
504 ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
505 status.GetErrorMessage().c_str());
506 return ErrorStatus(status.error());
507 }
508
509 int ret;
510 int last_error = 0;
511 int imported_buffers = 0;
512
513 auto buffer_handle_slots = status.take();
514 for (auto& buffer_handle_slot : buffer_handle_slots) {
515 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
516 buffer_handle_slot.first.value());
517
518 std::unique_ptr<BufferConsumer> buffer_consumer =
519 BufferConsumer::Import(std::move(buffer_handle_slot.first));
520
521 // Setup ignore state before adding buffer to the queue.
522 if (ignore_on_import_) {
523 ALOGD_IF(TRACE,
524 "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
525 "buffer_id=%d",
526 buffer_consumer->id());
527 ret = buffer_consumer->SetIgnore(true);
528 if (ret < 0) {
529 ALOGE(
530 "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
531 "imported buffer buffer_id=%d: %s",
532 buffer_consumer->id(), strerror(-ret));
533 last_error = ret;
534 }
535 }
536
537 ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
538 if (ret < 0) {
539 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
540 strerror(-ret));
541 last_error = ret;
542 continue;
543 } else {
544 imported_buffers++;
545 }
546 }
547
548 if (imported_buffers > 0)
549 return {imported_buffers};
550 else
551 return ErrorStatus(-last_error);
552 }
553
AddBuffer(const std::shared_ptr<BufferConsumer> & buf,size_t slot)554 int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
555 size_t slot) {
556 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
557 id(), buf->id(), slot);
558 const int ret = BufferHubQueue::AddBuffer(buf, slot);
559 if (ret < 0)
560 return ret;
561
562 // Check to see if the buffer is already signaled. This is necessary to catch
563 // cases where buffers are already available; epoll edge triggered mode does
564 // not fire until and edge transition when adding new buffers to the epoll
565 // set.
566 const int kTimeoutMs = 0;
567 pollfd pfd{buf->event_fd(), POLLIN, 0};
568 const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
569 if (count < 0) {
570 const int error = errno;
571 ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
572 strerror(errno));
573 return -error;
574 }
575
576 if (count == 1)
577 HandleBufferEvent(slot, pfd.revents);
578
579 return 0;
580 }
581
Dequeue(int timeout,size_t * slot,void * meta,size_t meta_size,LocalHandle * acquire_fence)582 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
583 int timeout, size_t* slot, void* meta, size_t meta_size,
584 LocalHandle* acquire_fence) {
585 if (meta_size != meta_size_) {
586 ALOGE(
587 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
588 "does not match metadata size (%zu) for the queue.",
589 meta_size, meta_size_);
590 return ErrorStatus(EINVAL);
591 }
592
593 if (slot == nullptr || acquire_fence == nullptr) {
594 ALOGE(
595 "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
596 "acquire_fence=%p",
597 slot, meta, acquire_fence);
598 return ErrorStatus(EINVAL);
599 }
600
601 auto buffer_status =
602 BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
603 if (!buffer_status)
604 return buffer_status.error_status();
605
606 return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
607 }
608
OnBufferReady(const std::shared_ptr<BufferHubBuffer> & buf,LocalHandle * acquire_fence)609 int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
610 LocalHandle* acquire_fence) {
611 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
612 id(), buf->id());
613 auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
614 return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
615 }
616
OnBufferAllocated()617 Status<void> ConsumerQueue::OnBufferAllocated() {
618 auto status = ImportBuffers();
619 if (!status) {
620 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
621 status.GetErrorMessage().c_str());
622 return ErrorStatus(status.error());
623 } else if (status.get() == 0) {
624 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
625 return ErrorStatus(ENOBUFS);
626 } else {
627 ALOGD_IF(TRACE,
628 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
629 status.get());
630 return {};
631 }
632 }
633
634 } // namespace dvr
635 } // namespace android
636