1 #include "include/private/dvr/buffer_hub_queue_client.h"
2
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7
8 #include <array>
9
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14
15 #define RETRY_EINTR(fnc_call) \
16 ([&]() -> decltype(fnc_call) { \
17 decltype(fnc_call) result; \
18 do { \
19 result = (fnc_call); \
20 } while (result == -1 && errno == EINTR); \
21 return result; \
22 })()
23
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28
29 namespace android {
30 namespace dvr {
31
32 namespace {
33
34 // Polls an fd for the given events.
PollEvents(int fd,short events)35 Status<int> PollEvents(int fd, short events) {
36 const int kTimeoutMs = 0;
37 pollfd pfd{fd, events, 0};
38 const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
39 if (count < 0) {
40 return ErrorStatus(errno);
41 } else if (count == 0) {
42 return ErrorStatus(ETIMEDOUT);
43 } else {
44 return {pfd.revents};
45 }
46 }
47
Unstuff(uint64_t value)48 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
49 return {static_cast<int32_t>(value >> 32),
50 static_cast<int32_t>(value & ((1ull << 32) - 1))};
51 }
52
Stuff(int32_t a,int32_t b)53 uint64_t Stuff(int32_t a, int32_t b) {
54 const uint32_t ua = static_cast<uint32_t>(a);
55 const uint32_t ub = static_cast<uint32_t>(b);
56 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
57 }
58
59 } // anonymous namespace
60
BufferHubQueue(LocalChannelHandle channel_handle)61 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
62 : Client{pdx::default_transport::ClientChannel::Create(
63 std::move(channel_handle))} {
64 Initialize();
65 }
66
BufferHubQueue(const std::string & endpoint_path)67 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
68 : Client{
69 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
70 Initialize();
71 }
72
Initialize()73 void BufferHubQueue::Initialize() {
74 int ret = epoll_fd_.Create();
75 if (ret < 0) {
76 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
77 strerror(-ret));
78 return;
79 }
80
81 epoll_event event = {
82 .events = EPOLLIN | EPOLLET,
83 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
84 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
85 if (ret < 0) {
86 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
87 strerror(-ret));
88 }
89 }
90
ImportQueue()91 Status<void> BufferHubQueue::ImportQueue() {
92 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
93 if (!status) {
94 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
95 status.GetErrorMessage().c_str());
96 return ErrorStatus(status.error());
97 } else {
98 SetupQueue(status.get());
99 return {};
100 }
101 }
102
SetupQueue(const QueueInfo & queue_info)103 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
104 is_async_ = queue_info.producer_config.is_async;
105 default_width_ = queue_info.producer_config.default_width;
106 default_height_ = queue_info.producer_config.default_height;
107 default_format_ = queue_info.producer_config.default_format;
108 user_metadata_size_ = queue_info.producer_config.user_metadata_size;
109 id_ = queue_info.id;
110 }
111
CreateConsumerQueue()112 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
113 if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
114 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
115 else
116 return nullptr;
117 }
118
CreateSilentConsumerQueue()119 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
120 if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
121 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
122 else
123 return nullptr;
124 }
125
CreateConsumerQueueHandle(bool silent)126 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
127 bool silent) {
128 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
129 if (!status) {
130 ALOGE(
131 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
132 "%s",
133 status.GetErrorMessage().c_str());
134 return ErrorStatus(status.error());
135 }
136
137 return status;
138 }
139
WaitForBuffers(int timeout)140 bool BufferHubQueue::WaitForBuffers(int timeout) {
141 ATRACE_NAME("BufferHubQueue::WaitForBuffers");
142 std::array<epoll_event, kMaxEvents> events;
143
144 // Loop at least once to check for hangups.
145 do {
146 ALOGD_IF(
147 TRACE,
148 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
149 id(), count(), capacity());
150
151 // If there is already a buffer then just check for hangup without waiting.
152 const int ret = epoll_fd_.Wait(events.data(), events.size(),
153 count() == 0 ? timeout : 0);
154
155 if (ret == 0) {
156 ALOGI_IF(TRACE,
157 "BufferHubQueue::WaitForBuffers: No events before timeout: "
158 "queue_id=%d",
159 id());
160 return count() != 0;
161 }
162
163 if (ret < 0 && ret != -EINTR) {
164 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
165 strerror(-ret));
166 return false;
167 }
168
169 const int num_events = ret;
170
171 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
172 // one for each buffer in the queue, and one extra event for the queue
173 // client itself.
174 for (int i = 0; i < num_events; i++) {
175 int32_t event_fd;
176 int32_t index;
177 std::tie(event_fd, index) = Unstuff(events[i].data.u64);
178
179 PDX_TRACE_FORMAT(
180 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
181 "slot=%d|",
182 id(), num_events, i, event_fd, index);
183
184 ALOGD_IF(TRACE,
185 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
186 i, event_fd, index);
187
188 if (is_buffer_event_index(index)) {
189 HandleBufferEvent(static_cast<size_t>(index), event_fd,
190 events[i].events);
191 } else if (is_queue_event_index(index)) {
192 HandleQueueEvent(events[i].events);
193 } else {
194 ALOGW(
195 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
196 "index=%d",
197 event_fd, index);
198 }
199 }
200 } while (count() == 0 && capacity() > 0 && !hung_up());
201
202 return count() != 0;
203 }
204
HandleBufferEvent(size_t slot,int event_fd,int poll_events)205 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
206 int poll_events) {
207 ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
208 if (!buffers_[slot]) {
209 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
210 return ErrorStatus(ENOENT);
211 }
212
213 auto status = buffers_[slot]->GetEventMask(poll_events);
214 if (!status) {
215 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
216 status.GetErrorMessage().c_str());
217 return status.error_status();
218 }
219
220 const int events = status.get();
221 PDX_TRACE_FORMAT(
222 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
223 "events=%d|",
224 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
225
226 if (events & EPOLLIN) {
227 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
228 } else if (events & EPOLLHUP) {
229 ALOGW(
230 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
231 "event_fd=%d buffer_id=%d",
232 slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
233 return RemoveBuffer(slot);
234 } else {
235 ALOGW(
236 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
237 "events=%d",
238 slot, events);
239 }
240
241 return {};
242 }
243
HandleQueueEvent(int poll_event)244 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
245 ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
246 auto status = GetEventMask(poll_event);
247 if (!status) {
248 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
249 status.GetErrorMessage().c_str());
250 return status.error_status();
251 }
252
253 const int events = status.get();
254 if (events & EPOLLIN) {
255 // Note that after buffer imports, if |count()| still returns 0, epoll
256 // wait will be tried again to acquire the newly imported buffer.
257 auto buffer_status = OnBufferAllocated();
258 if (!buffer_status) {
259 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
260 buffer_status.GetErrorMessage().c_str());
261 }
262 } else if (events & EPOLLHUP) {
263 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
264 hung_up_ = true;
265 } else {
266 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
267 }
268
269 return {};
270 }
271
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buffer,size_t slot)272 Status<void> BufferHubQueue::AddBuffer(
273 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
274 ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
275 buffer->id(), slot);
276
277 if (is_full()) {
278 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
279 capacity_);
280 return ErrorStatus(E2BIG);
281 }
282
283 if (buffers_[slot]) {
284 // Replace the buffer if the slot is occupied. This could happen when the
285 // producer side replaced the slot with a newly allocated buffer. Remove the
286 // buffer before setting up with the new one.
287 auto remove_status = RemoveBuffer(slot);
288 if (!remove_status)
289 return remove_status.error_status();
290 }
291
292 for (const auto& event_source : buffer->GetEventSources()) {
293 epoll_event event = {.events = event_source.event_mask | EPOLLET,
294 .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
295 const int ret =
296 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
297 if (ret < 0) {
298 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
299 strerror(-ret));
300 return ErrorStatus(-ret);
301 }
302 }
303
304 buffers_[slot] = buffer;
305 capacity_++;
306 return {};
307 }
308
RemoveBuffer(size_t slot)309 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
310 ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
311
312 if (buffers_[slot]) {
313 for (const auto& event_source : buffers_[slot]->GetEventSources()) {
314 const int ret =
315 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
316 if (ret < 0) {
317 ALOGE(
318 "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
319 "set: %s",
320 strerror(-ret));
321 return ErrorStatus(-ret);
322 }
323 }
324
325 // Trigger OnBufferRemoved callback if registered.
326 if (on_buffer_removed_)
327 on_buffer_removed_(buffers_[slot]);
328
329 buffers_[slot] = nullptr;
330 capacity_--;
331 }
332
333 return {};
334 }
335
Enqueue(Entry entry)336 Status<void> BufferHubQueue::Enqueue(Entry entry) {
337 if (!is_full()) {
338 available_buffers_.push(std::move(entry));
339
340 // Trigger OnBufferAvailable callback if registered.
341 if (on_buffer_available_)
342 on_buffer_available_();
343
344 return {};
345 } else {
346 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
347 return ErrorStatus(E2BIG);
348 }
349 }
350
Dequeue(int timeout,size_t * slot)351 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
352 size_t* slot) {
353 ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
354 timeout);
355
356 PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
357
358 if (count() == 0) {
359 if (!WaitForBuffers(timeout))
360 return ErrorStatus(ETIMEDOUT);
361 }
362
363 auto& entry = available_buffers_.top();
364 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
365 entry.slot);
366
367 std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
368 *slot = entry.slot;
369
370 available_buffers_.pop();
371
372 return {std::move(buffer)};
373 }
374
SetBufferAvailableCallback(BufferAvailableCallback callback)375 void BufferHubQueue::SetBufferAvailableCallback(
376 BufferAvailableCallback callback) {
377 on_buffer_available_ = callback;
378 }
379
SetBufferRemovedCallback(BufferRemovedCallback callback)380 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
381 on_buffer_removed_ = callback;
382 }
383
FreeAllBuffers()384 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
385 // Clear all available buffers.
386 while (!available_buffers_.empty())
387 available_buffers_.pop();
388
389 pdx::Status<void> last_error; // No error.
390 // Clear all buffers this producer queue is tracking.
391 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
392 if (buffers_[slot] != nullptr) {
393 auto status = RemoveBuffer(slot);
394 if (!status) {
395 ALOGE(
396 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
397 "slot=%zu.",
398 slot);
399 last_error = status.error_status();
400 }
401 }
402 }
403
404 return last_error;
405 }
406
ProducerQueue(LocalChannelHandle handle)407 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
408 : BASE(std::move(handle)) {
409 auto status = ImportQueue();
410 if (!status) {
411 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
412 status.GetErrorMessage().c_str());
413 Close(-status.error());
414 }
415 }
416
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)417 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
418 const UsagePolicy& usage)
419 : BASE(BufferHubRPC::kClientPath) {
420 auto status =
421 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
422 if (!status) {
423 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
424 status.GetErrorMessage().c_str());
425 Close(-status.error());
426 return;
427 }
428
429 SetupQueue(status.get());
430 }
431
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)432 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
433 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
434 uint64_t usage, size_t buffer_count) {
435 if (capacity() + buffer_count > kMaxQueueCapacity) {
436 ALOGE(
437 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
438 "allocate %zu more buffer(s).",
439 capacity(), buffer_count);
440 return ErrorStatus(E2BIG);
441 }
442
443 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
444 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
445 width, height, layer_count, format, usage, buffer_count);
446 if (!status) {
447 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
448 status.GetErrorMessage().c_str());
449 return status.error_status();
450 }
451
452 auto buffer_handle_slots = status.take();
453 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
454 "BufferHubRPC::ProducerQueueAllocateBuffers should "
455 "return %zu buffer handle(s), but returned %zu instead.",
456 buffer_count, buffer_handle_slots.size());
457
458 std::vector<size_t> buffer_slots;
459 buffer_slots.reserve(buffer_count);
460
461 // Bookkeeping for each buffer.
462 for (auto& hs : buffer_handle_slots) {
463 auto& buffer_handle = hs.first;
464 size_t buffer_slot = hs.second;
465
466 // Note that import might (though very unlikely) fail. If so, buffer_handle
467 // will be closed and included in returned buffer_slots.
468 if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
469 buffer_slot)) {
470 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
471 buffer_slot);
472 buffer_slots.push_back(buffer_slot);
473 }
474 }
475
476 if (buffer_slots.size() == 0) {
477 // Error out if no buffer is allocated and improted.
478 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
479 ErrorStatus(ENOMEM);
480 }
481
482 return {std::move(buffer_slots)};
483 }
484
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)485 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
486 uint32_t layer_count,
487 uint32_t format, uint64_t usage) {
488 // We only allocate one buffer at a time.
489 constexpr size_t buffer_count = 1;
490 auto status =
491 AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
492 if (!status) {
493 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
494 status.GetErrorMessage().c_str());
495 return status.error_status();
496 }
497
498 if (status.get().size() == 0) {
499 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
500 ErrorStatus(ENOMEM);
501 }
502
503 return {status.get()[0]};
504 }
505
AddBuffer(const std::shared_ptr<BufferProducer> & buffer,size_t slot)506 Status<void> ProducerQueue::AddBuffer(
507 const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
508 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
509 id(), buffer->id(), slot);
510 // For producer buffer, we need to enqueue the newly added buffer
511 // immediately. Producer queue starts with all buffers in available state.
512 auto status = BufferHubQueue::AddBuffer(buffer, slot);
513 if (!status)
514 return status;
515
516 return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
517 }
518
RemoveBuffer(size_t slot)519 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
520 auto status =
521 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
522 if (!status) {
523 ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
524 status.GetErrorMessage().c_str());
525 return status.error_status();
526 }
527
528 return BufferHubQueue::RemoveBuffer(slot);
529 }
530
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)531 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
532 int timeout, size_t* slot, LocalHandle* release_fence) {
533 DvrNativeBufferMetadata canonical_meta;
534 return Dequeue(timeout, slot, &canonical_meta, release_fence);
535 }
536
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence)537 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
538 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
539 pdx::LocalHandle* release_fence) {
540 ATRACE_NAME("ProducerQueue::Dequeue");
541 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
542 ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
543 return ErrorStatus(EINVAL);
544 }
545
546 auto status = BufferHubQueue::Dequeue(timeout, slot);
547 if (!status)
548 return status.error_status();
549
550 auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
551 const int ret = buffer->GainAsync(out_meta, release_fence);
552 if (ret < 0 && ret != -EALREADY)
553 return ErrorStatus(-ret);
554
555 return {std::move(buffer)};
556 }
557
ConsumerQueue(LocalChannelHandle handle)558 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
559 : BufferHubQueue(std::move(handle)) {
560 auto status = ImportQueue();
561 if (!status) {
562 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
563 status.GetErrorMessage().c_str());
564 Close(-status.error());
565 }
566
567 auto import_status = ImportBuffers();
568 if (import_status) {
569 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
570 import_status.get());
571 } else {
572 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
573 import_status.GetErrorMessage().c_str());
574 }
575 }
576
ImportBuffers()577 Status<size_t> ConsumerQueue::ImportBuffers() {
578 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
579 if (!status) {
580 if (status.error() == EBADR) {
581 ALOGI(
582 "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
583 "imported.");
584 return {0};
585 } else {
586 ALOGE(
587 "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
588 status.GetErrorMessage().c_str());
589 return status.error_status();
590 }
591 }
592
593 int ret;
594 Status<void> last_error;
595 size_t imported_buffers_count = 0;
596
597 auto buffer_handle_slots = status.take();
598 for (auto& buffer_handle_slot : buffer_handle_slots) {
599 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
600 buffer_handle_slot.first.value());
601
602 std::unique_ptr<BufferConsumer> buffer_consumer =
603 BufferConsumer::Import(std::move(buffer_handle_slot.first));
604 if (!buffer_consumer) {
605 ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
606 buffer_handle_slot.second);
607 last_error = ErrorStatus(EPIPE);
608 continue;
609 }
610
611 auto add_status =
612 AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
613 if (!add_status) {
614 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
615 add_status.GetErrorMessage().c_str());
616 last_error = add_status;
617 } else {
618 imported_buffers_count++;
619 }
620 }
621
622 if (imported_buffers_count > 0)
623 return {imported_buffers_count};
624 else
625 return last_error.error_status();
626 }
627
AddBuffer(const std::shared_ptr<BufferConsumer> & buffer,size_t slot)628 Status<void> ConsumerQueue::AddBuffer(
629 const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
630 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
631 id(), buffer->id(), slot);
632 auto status = BufferHubQueue::AddBuffer(buffer, slot);
633 if (!status)
634 return status;
635
636 // Check to see if the buffer is already signaled. This is necessary to catch
637 // cases where buffers are already available; epoll edge triggered mode does
638 // not fire until an edge transition when adding new buffers to the epoll
639 // set. Note that we only poll the fd events because HandleBufferEvent() takes
640 // care of checking the translated buffer events.
641 auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
642 if (!poll_status && poll_status.error() != ETIMEDOUT) {
643 ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
644 poll_status.GetErrorMessage().c_str());
645 return poll_status.error_status();
646 }
647
648 // Update accounting if the buffer is available.
649 if (poll_status)
650 return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
651 else
652 return {};
653 }
654
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)655 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
656 int timeout, size_t* slot, void* meta, size_t user_metadata_size,
657 LocalHandle* acquire_fence) {
658 if (user_metadata_size != user_metadata_size_) {
659 ALOGE(
660 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
661 "does not match metadata size (%zu) for the queue.",
662 user_metadata_size, user_metadata_size_);
663 return ErrorStatus(EINVAL);
664 }
665
666 DvrNativeBufferMetadata canonical_meta;
667 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
668 if (!status)
669 return status.error_status();
670
671 if (meta && user_metadata_size) {
672 void* metadata_src =
673 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
674 if (metadata_src) {
675 memcpy(meta, metadata_src, user_metadata_size);
676 } else {
677 ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
678 }
679 }
680
681 return status;
682 }
683
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)684 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
685 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
686 pdx::LocalHandle* acquire_fence) {
687 ATRACE_NAME("ConsumerQueue::Dequeue");
688 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
689 ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
690 return ErrorStatus(EINVAL);
691 }
692
693 auto status = BufferHubQueue::Dequeue(timeout, slot);
694 if (!status)
695 return status.error_status();
696
697 auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
698 const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
699 if (ret < 0)
700 return ErrorStatus(-ret);
701
702 return {std::move(buffer)};
703 }
704
OnBufferAllocated()705 Status<void> ConsumerQueue::OnBufferAllocated() {
706 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
707
708 auto status = ImportBuffers();
709 if (!status) {
710 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
711 status.GetErrorMessage().c_str());
712 return ErrorStatus(status.error());
713 } else if (status.get() == 0) {
714 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
715 return ErrorStatus(ENOBUFS);
716 } else {
717 ALOGD_IF(TRACE,
718 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
719 status.get());
720 return {};
721 }
722 }
723
724 } // namespace dvr
725 } // namespace android
726