#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
// Enable/disable debug logging.
#define TRACE 0
namespace android {
namespace dvr {
using pdx::LocalChannelHandle;
using pdx::LocalHandle;
namespace {
constexpr uint32_t kBufferWidth = 100;
constexpr uint32_t kBufferHeight = 1;
constexpr uint32_t kBufferLayerCount = 1;
constexpr uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB;
constexpr uint64_t kBufferUsage = GRALLOC_USAGE_SW_READ_RARELY;
constexpr int kTimeoutMs = 100;
constexpr int kNoTimeout = 0;
class BufferHubQueueTest : public ::testing::Test {
public:
bool CreateProducerQueue(const ProducerQueueConfig& config,
const UsagePolicy& usage) {
producer_queue_ = ProducerQueue::Create(config, usage);
return producer_queue_ != nullptr;
}
bool CreateConsumerQueue() {
if (producer_queue_) {
consumer_queue_ = producer_queue_->CreateConsumerQueue();
return consumer_queue_ != nullptr;
} else {
return false;
}
}
bool CreateQueues(const ProducerQueueConfig& config,
const UsagePolicy& usage) {
return CreateProducerQueue(config, usage) && CreateConsumerQueue();
}
void AllocateBuffer(size_t* slot_out = nullptr) {
// Create producer buffer.
auto status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
kBufferLayerCount,
kBufferFormat, kBufferUsage);
ASSERT_TRUE(status.ok());
size_t slot = status.take();
if (slot_out)
*slot_out = slot;
}
bool WaitAndHandleOnce(BufferHubQueue* queue, int timeout_ms) {
pollfd pfd{queue->queue_fd(), POLLIN, 0};
int ret;
do {
ret = poll(&pfd, 1, timeout_ms);
} while (ret == -1 && errno == EINTR);
if (ret < 0) {
ALOGW("Failed to poll queue %d's event fd, error: %s.", queue->id(),
strerror(errno));
return false;
} else if (ret == 0) {
return false;
}
return queue->HandleQueueEvents();
}
protected:
ProducerQueueConfigBuilder config_builder_;
std::unique_ptr producer_queue_;
std::unique_ptr consumer_queue_;
};
TEST_F(BufferHubQueueTest, TestDequeue) {
const int64_t nb_dequeue_times = 16;
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
// Allocate only one buffer.
AllocateBuffer();
// But dequeue multiple times.
for (int64_t i = 0; i < nb_dequeue_times; i++) {
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
// Producer gains a buffer.
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// Producer posts the buffer.
mi.index = i;
EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0);
// Consumer acquires a buffer.
auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(c1_status.ok()) << c1_status.GetErrorMessage();
auto c1 = c1_status.take();
ASSERT_NE(c1, nullptr);
EXPECT_EQ(mi.index, i);
EXPECT_EQ(mo.index, i);
// Consumer releases the buffer.
EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0);
}
}
TEST_F(BufferHubQueueTest,
TestDequeuePostedBufferIfNoAvailableReleasedBuffer_withConsumerBuffer) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
// Allocate 3 buffers to use.
const size_t test_queue_capacity = 3;
for (int64_t i = 0; i < test_queue_capacity; i++) {
AllocateBuffer();
}
EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity);
size_t producer_slot, consumer_slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
// Producer posts 2 buffers and remember their posted sequence.
std::deque posted_slots;
for (int64_t i = 0; i < 2; i++) {
auto p1_status =
producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// Producer should not be gaining posted buffer when there are still
// available buffers to gain.
auto found_iter =
std::find(posted_slots.begin(), posted_slots.end(), producer_slot);
EXPECT_EQ(found_iter, posted_slots.end());
posted_slots.push_back(producer_slot);
// Producer posts the buffer.
mi.index = i;
EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
}
// Consumer acquires one buffer.
auto c1_status =
consumer_queue_->Dequeue(kTimeoutMs, &consumer_slot, &mo, &fence);
EXPECT_TRUE(c1_status.ok());
auto c1 = c1_status.take();
ASSERT_NE(c1, nullptr);
// Consumer should get the oldest posted buffer. No checks here.
// posted_slots[0] should be in acquired state now.
EXPECT_EQ(mo.index, 0);
// Consumer releases the buffer.
EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0);
// posted_slots[0] should be in released state now.
// Producer gain and post 2 buffers.
for (int64_t i = 0; i < 2; i++) {
auto p1_status =
producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// The gained buffer should be the one in released state or the one haven't
// been use.
EXPECT_NE(posted_slots[1], producer_slot);
mi.index = i + 2;
EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
}
// Producer gains a buffer.
auto p1_status =
producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// The gained buffer should be the oldest posted buffer.
EXPECT_EQ(posted_slots[1], producer_slot);
// Producer posts the buffer.
mi.index = 4;
EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
}
TEST_F(BufferHubQueueTest,
TestDequeuePostedBufferIfNoAvailableReleasedBuffer_noConsumerBuffer) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
// Allocate 4 buffers to use.
const size_t test_queue_capacity = 4;
for (int64_t i = 0; i < test_queue_capacity; i++) {
AllocateBuffer();
}
EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity);
// Post all allowed buffers and remember their posted sequence.
std::deque posted_slots;
for (int64_t i = 0; i < test_queue_capacity; i++) {
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
// Producer gains a buffer.
auto p1_status =
producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// Producer should not be gaining posted buffer when there are still
// available buffers to gain.
auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), slot);
EXPECT_EQ(found_iter, posted_slots.end());
posted_slots.push_back(slot);
// Producer posts the buffer.
mi.index = i;
EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0);
}
// Gain posted buffers in sequence.
const int64_t nb_dequeue_all_times = 2;
for (int j = 0; j < nb_dequeue_all_times; ++j) {
for (int i = 0; i < test_queue_capacity; ++i) {
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
// Producer gains a buffer.
auto p1_status =
producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// The gained buffer should be the oldest posted buffer.
EXPECT_EQ(posted_slots[i], slot);
// Producer posts the buffer.
mi.index = i + test_queue_capacity * (j + 1);
EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0);
}
}
}
TEST_F(BufferHubQueueTest, TestProducerConsumer) {
const size_t kBufferCount = 16;
size_t slot;
DvrNativeBufferMetadata mi, mo;
LocalHandle fence;
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
for (size_t i = 0; i < kBufferCount; i++) {
AllocateBuffer();
// Producer queue has all the available buffers on initialize.
ASSERT_EQ(producer_queue_->count(), i + 1);
ASSERT_EQ(producer_queue_->capacity(), i + 1);
// Consumer queue has no avaiable buffer on initialize.
ASSERT_EQ(consumer_queue_->count(), 0U);
// Consumer queue does not import buffers until a dequeue is issued.
ASSERT_EQ(consumer_queue_->capacity(), i);
// Dequeue returns timeout since no buffer is ready to consumer, but
// this implicitly triggers buffer import and bump up |capacity|.
auto status = consumer_queue_->Dequeue(kNoTimeout, &slot, &mo, &fence);
ASSERT_FALSE(status.ok());
ASSERT_EQ(ETIMEDOUT, status.error());
ASSERT_EQ(consumer_queue_->capacity(), i + 1);
}
// Use eventfd as a stand-in for a fence.
LocalHandle post_fence(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
for (size_t i = 0; i < kBufferCount; i++) {
// First time there is no buffer available to dequeue.
auto consumer_status =
consumer_queue_->Dequeue(kNoTimeout, &slot, &mo, &fence);
ASSERT_FALSE(consumer_status.ok());
ASSERT_EQ(consumer_status.error(), ETIMEDOUT);
// Make sure Producer buffer is POSTED so that it's ready to Accquire
// in the consumer's Dequeue() function.
auto producer_status =
producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
auto producer = producer_status.take();
ASSERT_NE(nullptr, producer);
mi.index = static_cast(i);
ASSERT_EQ(producer->PostAsync(&mi, post_fence), 0);
// Second time the just the POSTED buffer should be dequeued.
consumer_status = consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(consumer_status.ok());
EXPECT_TRUE(fence.IsValid());
auto consumer = consumer_status.take();
ASSERT_NE(nullptr, consumer);
ASSERT_EQ(mi.index, mo.index);
}
}
TEST_F(BufferHubQueueTest, TestInsertBuffer) {
ASSERT_TRUE(CreateProducerQueue(config_builder_.Build(), UsagePolicy{}));
consumer_queue_ = producer_queue_->CreateConsumerQueue();
ASSERT_TRUE(consumer_queue_ != nullptr);
EXPECT_EQ(producer_queue_->capacity(), 0);
EXPECT_EQ(consumer_queue_->capacity(), 0);
std::shared_ptr p1 = ProducerBuffer::Create(
kBufferWidth, kBufferHeight, kBufferFormat, kBufferUsage, 0);
ASSERT_TRUE(p1 != nullptr);
ASSERT_EQ(p1->GainAsync(), 0);
// Inserting a posted buffer will fail.
DvrNativeBufferMetadata meta;
EXPECT_EQ(p1->PostAsync(&meta, LocalHandle()), 0);
auto status_or_slot = producer_queue_->InsertBuffer(p1);
EXPECT_FALSE(status_or_slot.ok());
EXPECT_EQ(status_or_slot.error(), EINVAL);
// Inserting a gained buffer will succeed.
std::shared_ptr p2 = ProducerBuffer::Create(
kBufferWidth, kBufferHeight, kBufferFormat, kBufferUsage);
ASSERT_EQ(p2->GainAsync(), 0);
ASSERT_TRUE(p2 != nullptr);
status_or_slot = producer_queue_->InsertBuffer(p2);
EXPECT_TRUE(status_or_slot.ok()) << status_or_slot.GetErrorMessage();
// This is the first buffer inserted, should take slot 0.
size_t slot = status_or_slot.get();
EXPECT_EQ(slot, 0);
// Wait and expect the consumer to kick up the newly inserted buffer.
WaitAndHandleOnce(consumer_queue_.get(), kTimeoutMs);
EXPECT_EQ(consumer_queue_->capacity(), 1ULL);
}
TEST_F(BufferHubQueueTest, TestRemoveBuffer) {
ASSERT_TRUE(CreateProducerQueue(config_builder_.Build(), UsagePolicy{}));
DvrNativeBufferMetadata mo;
// Allocate buffers.
const size_t kBufferCount = 4u;
for (size_t i = 0; i < kBufferCount; i++) {
AllocateBuffer();
}
ASSERT_EQ(kBufferCount, producer_queue_->count());
ASSERT_EQ(kBufferCount, producer_queue_->capacity());
consumer_queue_ = producer_queue_->CreateConsumerQueue();
ASSERT_NE(nullptr, consumer_queue_);
// Check that buffers are correctly imported on construction.
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
EXPECT_EQ(0u, consumer_queue_->count());
// Dequeue all the buffers and keep track of them in an array. This prevents
// the producer queue ring buffer ref counts from interfering with the tests.
struct Entry {
std::shared_ptr buffer;
LocalHandle fence;
size_t slot;
};
std::array buffers;
for (size_t i = 0; i < kBufferCount; i++) {
Entry* entry = &buffers[i];
auto producer_status =
producer_queue_->Dequeue(kTimeoutMs, &entry->slot, &mo, &entry->fence);
ASSERT_TRUE(producer_status.ok());
entry->buffer = producer_status.take();
ASSERT_NE(nullptr, entry->buffer);
}
// Remove a buffer and make sure both queues reflect the change.
ASSERT_TRUE(producer_queue_->RemoveBuffer(buffers[0].slot));
EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
// As long as the removed buffer is still alive the consumer queue won't know
// its gone.
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
// Release the removed buffer.
buffers[0].buffer = nullptr;
// Now the consumer queue should know it's gone.
EXPECT_FALSE(WaitAndHandleOnce(consumer_queue_.get(), kTimeoutMs));
ASSERT_EQ(kBufferCount - 1, consumer_queue_->capacity());
// Allocate a new buffer. This should take the first empty slot.
size_t slot;
AllocateBuffer(&slot);
ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
EXPECT_EQ(buffers[0].slot, slot);
EXPECT_EQ(kBufferCount, producer_queue_->capacity());
// The consumer queue should pick up the new buffer.
EXPECT_EQ(kBufferCount - 1, consumer_queue_->capacity());
EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
// Remove and allocate a buffer.
ASSERT_TRUE(producer_queue_->RemoveBuffer(buffers[1].slot));
EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
buffers[1].buffer = nullptr;
AllocateBuffer(&slot);
ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
EXPECT_EQ(buffers[1].slot, slot);
EXPECT_EQ(kBufferCount, producer_queue_->capacity());
// The consumer queue should pick up the new buffer but the count shouldn't
// change.
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
// Remove and allocate a buffer, but don't free the buffer right away.
ASSERT_TRUE(producer_queue_->RemoveBuffer(buffers[2].slot));
EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
AllocateBuffer(&slot);
ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
EXPECT_EQ(buffers[2].slot, slot);
EXPECT_EQ(kBufferCount, producer_queue_->capacity());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
// Release the producer buffer to trigger a POLLHUP event for an already
// removed buffer.
buffers[2].buffer = nullptr;
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
}
TEST_F(BufferHubQueueTest, TestMultipleConsumers) {
// ProducerConfigureBuilder doesn't set Metadata{size}, which means there
// is no metadata associated with this BufferQueue's buffer.
ASSERT_TRUE(CreateProducerQueue(config_builder_.Build(), UsagePolicy{}));
// Allocate buffers.
const size_t kBufferCount = 4u;
for (size_t i = 0; i < kBufferCount; i++) {
AllocateBuffer();
}
ASSERT_EQ(kBufferCount, producer_queue_->count());
// Build a silent consumer queue to test multi-consumer queue features.
auto silent_queue = producer_queue_->CreateSilentConsumerQueue();
ASSERT_NE(nullptr, silent_queue);
// Check that silent queue doesn't import buffers on creation.
EXPECT_EQ(silent_queue->capacity(), 0U);
// Dequeue and post a buffer.
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
auto producer_status =
producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(producer_status.ok());
auto producer_buffer = producer_status.take();
ASSERT_NE(producer_buffer, nullptr);
EXPECT_EQ(producer_buffer->PostAsync(&mi, {}), 0);
// After post, check the number of remaining available buffers.
EXPECT_EQ(producer_queue_->count(), kBufferCount - 1);
// Currently we expect no buffer to be available prior to calling
// WaitForBuffers/HandleQueueEvents.
// TODO(eieio): Note this behavior may change in the future.
EXPECT_EQ(silent_queue->count(), 0U);
EXPECT_FALSE(silent_queue->HandleQueueEvents());
EXPECT_EQ(silent_queue->count(), 0U);
// Build a new consumer queue to test multi-consumer queue features.
consumer_queue_ = silent_queue->CreateConsumerQueue();
ASSERT_NE(consumer_queue_, nullptr);
// Check that buffers are correctly imported on construction.
EXPECT_EQ(consumer_queue_->capacity(), kBufferCount);
// Buffers are only imported, but their availability is not checked until
// first call to Dequeue().
EXPECT_EQ(consumer_queue_->count(), 0U);
// Reclaim released/ignored buffers.
EXPECT_EQ(producer_queue_->count(), kBufferCount - 1);
usleep(10000);
WaitAndHandleOnce(producer_queue_.get(), kTimeoutMs);
EXPECT_EQ(producer_queue_->count(), kBufferCount - 1);
// Post another buffer.
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(producer_status.ok());
producer_buffer = producer_status.take();
ASSERT_NE(producer_buffer, nullptr);
EXPECT_EQ(producer_buffer->PostAsync(&mi, {}), 0);
// Verify that the consumer queue receives it.
size_t consumer_queue_count = consumer_queue_->count();
WaitAndHandleOnce(consumer_queue_.get(), kTimeoutMs);
EXPECT_GT(consumer_queue_->count(), consumer_queue_count);
// Save the current consumer queue buffer count to compare after the dequeue.
consumer_queue_count = consumer_queue_->count();
// Dequeue and acquire/release (discard) buffers on the consumer end.
auto consumer_status =
consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(consumer_status.ok());
auto consumer_buffer = consumer_status.take();
ASSERT_NE(consumer_buffer, nullptr);
consumer_buffer->Discard();
// Buffer should be returned to the producer queue without being handled by
// the silent consumer queue.
EXPECT_LT(consumer_queue_->count(), consumer_queue_count);
EXPECT_EQ(producer_queue_->count(), kBufferCount - 2);
WaitAndHandleOnce(producer_queue_.get(), kTimeoutMs);
EXPECT_EQ(producer_queue_->count(), kBufferCount - 1);
}
struct TestUserMetadata {
char a;
int32_t b;
int64_t c;
};
constexpr uint64_t kUserMetadataSize =
static_cast(sizeof(TestUserMetadata));
TEST_F(BufferHubQueueTest, TestUserMetadata) {
ASSERT_TRUE(CreateQueues(
config_builder_.SetMetadata().Build(), UsagePolicy{}));
AllocateBuffer();
std::vector user_metadata_list = {
{'0', 0, 0}, {'1', 10, 3333}, {'@', 123, 1000000000}};
for (auto user_metadata : user_metadata_list) {
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// TODO(b/69469185): Test against metadata from consumer once we implement
// release metadata properly.
// EXPECT_EQ(mo.user_metadata_ptr, 0U);
// EXPECT_EQ(mo.user_metadata_size, 0U);
mi.user_metadata_size = kUserMetadataSize;
mi.user_metadata_ptr = reinterpret_cast(&user_metadata);
EXPECT_EQ(p1->PostAsync(&mi, {}), 0);
auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(c1_status.ok()) << c1_status.GetErrorMessage();
auto c1 = c1_status.take();
ASSERT_NE(c1, nullptr);
EXPECT_EQ(mo.user_metadata_size, kUserMetadataSize);
auto out_user_metadata =
reinterpret_cast(mo.user_metadata_ptr);
EXPECT_EQ(user_metadata.a, out_user_metadata->a);
EXPECT_EQ(user_metadata.b, out_user_metadata->b);
EXPECT_EQ(user_metadata.c, out_user_metadata->c);
// When release, empty metadata is also legit.
mi.user_metadata_size = 0U;
mi.user_metadata_ptr = 0U;
c1->ReleaseAsync(&mi, {});
}
}
TEST_F(BufferHubQueueTest, TestUserMetadataMismatch) {
ASSERT_TRUE(CreateQueues(
config_builder_.SetMetadata().Build(), UsagePolicy{}));
AllocateBuffer();
TestUserMetadata user_metadata;
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
EXPECT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// Post with mismatched user metadata size will fail. But the producer buffer
// itself should stay untouched.
mi.user_metadata_ptr = reinterpret_cast(&user_metadata);
mi.user_metadata_size = kUserMetadataSize + 1;
EXPECT_EQ(p1->PostAsync(&mi, {}), -E2BIG);
// Post with the exact same user metdata size can success.
mi.user_metadata_ptr = reinterpret_cast(&user_metadata);
mi.user_metadata_size = kUserMetadataSize;
EXPECT_EQ(p1->PostAsync(&mi, {}), 0);
}
TEST_F(BufferHubQueueTest, TestEnqueue) {
ASSERT_TRUE(CreateQueues(config_builder_.SetMetadata().Build(),
UsagePolicy{}));
AllocateBuffer();
size_t slot;
LocalHandle fence;
DvrNativeBufferMetadata mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(nullptr, p1);
producer_queue_->Enqueue(p1, slot, 0ULL);
auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_FALSE(c1_status.ok());
}
TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
size_t ps1;
AllocateBuffer();
LocalHandle fence;
DvrNativeBufferMetadata mi, mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &ps1, &mo, &fence);
ASSERT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_NE(p1, nullptr);
// producer queue is exhausted
size_t ps2;
auto p2_status = producer_queue_->Dequeue(kTimeoutMs, &ps2, &mo, &fence);
ASSERT_FALSE(p2_status.ok());
ASSERT_EQ(ETIMEDOUT, p2_status.error());
// dynamically add buffer.
AllocateBuffer();
ASSERT_EQ(producer_queue_->count(), 1U);
ASSERT_EQ(producer_queue_->capacity(), 2U);
// now we can dequeue again
p2_status = producer_queue_->Dequeue(kTimeoutMs, &ps2, &mo, &fence);
ASSERT_TRUE(p2_status.ok());
auto p2 = p2_status.take();
ASSERT_NE(p2, nullptr);
ASSERT_EQ(producer_queue_->count(), 0U);
// p1 and p2 should have different slot number
ASSERT_NE(ps1, ps2);
// Consumer queue does not import buffers until |Dequeue| or |ImportBuffers|
// are called. So far consumer_queue_ should be empty.
ASSERT_EQ(consumer_queue_->count(), 0U);
int64_t seq = 1;
mi.index = seq;
ASSERT_EQ(p1->PostAsync(&mi, {}), 0);
size_t cs1, cs2;
auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &cs1, &mo, &fence);
ASSERT_TRUE(c1_status.ok()) << c1_status.GetErrorMessage();
auto c1 = c1_status.take();
ASSERT_NE(c1, nullptr);
ASSERT_EQ(consumer_queue_->count(), 0U);
ASSERT_EQ(consumer_queue_->capacity(), 2U);
ASSERT_EQ(cs1, ps1);
ASSERT_EQ(p2->PostAsync(&mi, {}), 0);
auto c2_status = consumer_queue_->Dequeue(kTimeoutMs, &cs2, &mo, &fence);
ASSERT_TRUE(c2_status.ok());
auto c2 = c2_status.take();
ASSERT_NE(c2, nullptr);
ASSERT_EQ(cs2, ps2);
}
TEST_F(BufferHubQueueTest, TestAllocateTwoBuffers) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
ASSERT_EQ(producer_queue_->capacity(), 0);
auto status = producer_queue_->AllocateBuffers(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage, /*buffer_count=*/2);
ASSERT_TRUE(status.ok());
std::vector buffer_slots = status.take();
ASSERT_EQ(buffer_slots.size(), 2);
ASSERT_EQ(producer_queue_->capacity(), 2);
}
TEST_F(BufferHubQueueTest, TestAllocateZeroBuffers) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
ASSERT_EQ(producer_queue_->capacity(), 0);
auto status = producer_queue_->AllocateBuffers(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage, /*buffer_count=*/0);
ASSERT_TRUE(status.ok());
std::vector buffer_slots = status.take();
ASSERT_EQ(buffer_slots.size(), 0);
ASSERT_EQ(producer_queue_->capacity(), 0);
}
TEST_F(BufferHubQueueTest, TestUsageSetMask) {
const uint32_t set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
ASSERT_TRUE(
CreateQueues(config_builder_.Build(), UsagePolicy{set_mask, 0, 0, 0}));
// When allocation, leave out |set_mask| from usage bits on purpose.
auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage & ~set_mask);
ASSERT_TRUE(status.ok());
LocalHandle fence;
size_t slot;
DvrNativeBufferMetadata mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_EQ(p1->usage() & set_mask, set_mask);
}
TEST_F(BufferHubQueueTest, TestUsageClearMask) {
const uint32_t clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
ASSERT_TRUE(
CreateQueues(config_builder_.Build(), UsagePolicy{0, clear_mask, 0, 0}));
// When allocation, add |clear_mask| into usage bits on purpose.
auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage | clear_mask);
ASSERT_TRUE(status.ok());
LocalHandle fence;
size_t slot;
DvrNativeBufferMetadata mo;
auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(p1_status.ok());
auto p1 = p1_status.take();
ASSERT_EQ(p1->usage() & clear_mask, 0U);
}
TEST_F(BufferHubQueueTest, TestUsageDenySetMask) {
const uint32_t deny_set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
ASSERT_TRUE(CreateQueues(config_builder_.SetMetadata().Build(),
UsagePolicy{0, 0, deny_set_mask, 0}));
// Now that |deny_set_mask| is illegal, allocation without those bits should
// be able to succeed.
auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage & ~deny_set_mask);
ASSERT_TRUE(status.ok());
// While allocation with those bits should fail.
status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
kBufferLayerCount, kBufferFormat,
kBufferUsage | deny_set_mask);
ASSERT_FALSE(status.ok());
ASSERT_EQ(EINVAL, status.error());
}
TEST_F(BufferHubQueueTest, TestUsageDenyClearMask) {
const uint32_t deny_clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
ASSERT_TRUE(CreateQueues(config_builder_.SetMetadata().Build(),
UsagePolicy{0, 0, 0, deny_clear_mask}));
// Now that clearing |deny_clear_mask| is illegal (i.e. setting these bits are
// mandatory), allocation with those bits should be able to succeed.
auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage | deny_clear_mask);
ASSERT_TRUE(status.ok());
// While allocation without those bits should fail.
status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
kBufferLayerCount, kBufferFormat,
kBufferUsage & ~deny_clear_mask);
ASSERT_FALSE(status.ok());
ASSERT_EQ(EINVAL, status.error());
}
TEST_F(BufferHubQueueTest, TestQueueInfo) {
static const bool kIsAsync = true;
ASSERT_TRUE(CreateQueues(config_builder_.SetIsAsync(kIsAsync)
.SetDefaultWidth(kBufferWidth)
.SetDefaultHeight(kBufferHeight)
.SetDefaultFormat(kBufferFormat)
.Build(),
UsagePolicy{}));
EXPECT_EQ(producer_queue_->default_width(), kBufferWidth);
EXPECT_EQ(producer_queue_->default_height(), kBufferHeight);
EXPECT_EQ(producer_queue_->default_format(), kBufferFormat);
EXPECT_EQ(producer_queue_->is_async(), kIsAsync);
EXPECT_EQ(consumer_queue_->default_width(), kBufferWidth);
EXPECT_EQ(consumer_queue_->default_height(), kBufferHeight);
EXPECT_EQ(consumer_queue_->default_format(), kBufferFormat);
EXPECT_EQ(consumer_queue_->is_async(), kIsAsync);
}
TEST_F(BufferHubQueueTest, TestFreeAllBuffers) {
constexpr size_t kBufferCount = 2;
#define CHECK_NO_BUFFER_THEN_ALLOCATE(num_buffers) \
EXPECT_EQ(consumer_queue_->count(), 0U); \
EXPECT_EQ(consumer_queue_->capacity(), 0U); \
EXPECT_EQ(producer_queue_->count(), 0U); \
EXPECT_EQ(producer_queue_->capacity(), 0U); \
for (size_t i = 0; i < num_buffers; i++) { \
AllocateBuffer(); \
} \
EXPECT_EQ(producer_queue_->count(), num_buffers); \
EXPECT_EQ(producer_queue_->capacity(), num_buffers);
size_t slot;
LocalHandle fence;
pdx::Status status;
pdx::Status> consumer_status;
pdx::Status> producer_status;
std::shared_ptr consumer_buffer;
std::shared_ptr producer_buffer;
DvrNativeBufferMetadata mi, mo;
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
// Free all buffers when buffers are avaible for dequeue.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// Free all buffers when one buffer is dequeued.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// Free all buffers when all buffers are dequeued.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
for (size_t i = 0; i < kBufferCount; i++) {
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
}
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// Free all buffers when one buffer is posted.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
producer_buffer = producer_status.take();
ASSERT_NE(nullptr, producer_buffer);
ASSERT_EQ(0, producer_buffer->PostAsync(&mi, fence));
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// Free all buffers when all buffers are posted.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
for (size_t i = 0; i < kBufferCount; i++) {
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
producer_buffer = producer_status.take();
ASSERT_NE(producer_buffer, nullptr);
ASSERT_EQ(producer_buffer->PostAsync(&mi, fence), 0);
}
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// Free all buffers when all buffers are acquired.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
for (size_t i = 0; i < kBufferCount; i++) {
producer_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(producer_status.ok());
producer_buffer = producer_status.take();
ASSERT_NE(producer_buffer, nullptr);
ASSERT_EQ(producer_buffer->PostAsync(&mi, fence), 0);
consumer_status = consumer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence);
ASSERT_TRUE(consumer_status.ok()) << consumer_status.GetErrorMessage();
}
status = producer_queue_->FreeAllBuffers();
EXPECT_TRUE(status.ok());
// In addition to FreeAllBuffers() from the queue, it is also required to
// delete all references to the ProducerBuffer (i.e. the PDX client).
producer_buffer = nullptr;
// Crank consumer queue events to pickup EPOLLHUP events on the queue.
consumer_queue_->HandleQueueEvents();
// One last check.
CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
#undef CHECK_NO_BUFFER_THEN_ALLOCATE
}
TEST_F(BufferHubQueueTest, TestProducerToParcelableNotEmpty) {
ASSERT_TRUE(CreateQueues(config_builder_.SetMetadata().Build(),
UsagePolicy{}));
// Allocate only one buffer.
AllocateBuffer();
// Export should fail as the queue is not empty.
auto status = producer_queue_->TakeAsParcelable();
EXPECT_FALSE(status.ok());
}
TEST_F(BufferHubQueueTest, TestProducerExportToParcelable) {
ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));
auto s1 = producer_queue_->TakeAsParcelable();
EXPECT_TRUE(s1.ok());
ProducerQueueParcelable output_parcelable = s1.take();
EXPECT_TRUE(output_parcelable.IsValid());
Parcel parcel;
status_t res;
res = output_parcelable.writeToParcel(&parcel);
EXPECT_EQ(res, OK);
// After written into parcelable, the output_parcelable is still valid has
// keeps the producer channel alive.
EXPECT_TRUE(output_parcelable.IsValid());
// Creating producer buffer should fail.
auto s2 = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
kBufferLayerCount, kBufferFormat,
kBufferUsage);
ASSERT_FALSE(s2.ok());
// Reset the data position so that we can read back from the same parcel
// without doing actually Binder IPC.
parcel.setDataPosition(0);
producer_queue_ = nullptr;
// Recreate the producer queue from the parcel.
ProducerQueueParcelable input_parcelable;
EXPECT_FALSE(input_parcelable.IsValid());
res = input_parcelable.readFromParcel(&parcel);
EXPECT_EQ(res, OK);
EXPECT_TRUE(input_parcelable.IsValid());
EXPECT_EQ(producer_queue_, nullptr);
producer_queue_ = ProducerQueue::Import(input_parcelable.TakeChannelHandle());
EXPECT_FALSE(input_parcelable.IsValid());
ASSERT_NE(producer_queue_, nullptr);
// Newly created queue from the parcel can allocate buffer, post buffer to
// consumer.
EXPECT_NO_FATAL_FAILURE(AllocateBuffer());
EXPECT_EQ(producer_queue_->count(), 1U);
EXPECT_EQ(producer_queue_->capacity(), 1U);
size_t slot;
DvrNativeBufferMetadata producer_meta;
DvrNativeBufferMetadata consumer_meta;
LocalHandle fence;
auto s3 = producer_queue_->Dequeue(0, &slot, &producer_meta, &fence);
EXPECT_TRUE(s3.ok());
std::shared_ptr p1 = s3.take();
ASSERT_NE(p1, nullptr);
producer_meta.timestamp = 42;
EXPECT_EQ(p1->PostAsync(&producer_meta, LocalHandle()), 0);
// Make sure the buffer can be dequeued from consumer side.
auto s4 = consumer_queue_->Dequeue(kTimeoutMs, &slot, &consumer_meta, &fence);
EXPECT_TRUE(s4.ok()) << s4.GetErrorMessage();
EXPECT_EQ(consumer_queue_->capacity(), 1U);
auto consumer = s4.take();
ASSERT_NE(consumer, nullptr);
EXPECT_EQ(producer_meta.timestamp, consumer_meta.timestamp);
}
TEST_F(BufferHubQueueTest, TestCreateConsumerParcelable) {
ASSERT_TRUE(CreateProducerQueue(config_builder_.Build(), UsagePolicy{}));
auto s1 = producer_queue_->CreateConsumerQueueParcelable();
EXPECT_TRUE(s1.ok());
ConsumerQueueParcelable output_parcelable = s1.take();
EXPECT_TRUE(output_parcelable.IsValid());
// Write to a Parcel new object.
Parcel parcel;
status_t res;
res = output_parcelable.writeToParcel(&parcel);
// Reset the data position so that we can read back from the same parcel
// without doing actually Binder IPC.
parcel.setDataPosition(0);
// No consumer queue created yet.
EXPECT_EQ(consumer_queue_, nullptr);
// If the parcel contains a consumer queue, read into a
// ProducerQueueParcelable should fail.
ProducerQueueParcelable wrongly_typed_parcelable;
EXPECT_FALSE(wrongly_typed_parcelable.IsValid());
res = wrongly_typed_parcelable.readFromParcel(&parcel);
EXPECT_EQ(res, -EINVAL);
parcel.setDataPosition(0);
// Create the consumer queue from the parcel.
ConsumerQueueParcelable input_parcelable;
EXPECT_FALSE(input_parcelable.IsValid());
res = input_parcelable.readFromParcel(&parcel);
EXPECT_EQ(res, OK);
EXPECT_TRUE(input_parcelable.IsValid());
consumer_queue_ = ConsumerQueue::Import(input_parcelable.TakeChannelHandle());
EXPECT_FALSE(input_parcelable.IsValid());
ASSERT_NE(consumer_queue_, nullptr);
EXPECT_NO_FATAL_FAILURE(AllocateBuffer());
EXPECT_EQ(producer_queue_->count(), 1U);
EXPECT_EQ(producer_queue_->capacity(), 1U);
size_t slot;
DvrNativeBufferMetadata producer_meta;
DvrNativeBufferMetadata consumer_meta;
LocalHandle fence;
auto s2 = producer_queue_->Dequeue(0, &slot, &producer_meta, &fence);
EXPECT_TRUE(s2.ok());
std::shared_ptr p1 = s2.take();
ASSERT_NE(p1, nullptr);
producer_meta.timestamp = 42;
EXPECT_EQ(p1->PostAsync(&producer_meta, LocalHandle()), 0);
// Make sure the buffer can be dequeued from consumer side.
auto s3 = consumer_queue_->Dequeue(kTimeoutMs, &slot, &consumer_meta, &fence);
EXPECT_TRUE(s3.ok()) << s3.GetErrorMessage();
EXPECT_EQ(consumer_queue_->capacity(), 1U);
auto consumer = s3.take();
ASSERT_NE(consumer, nullptr);
EXPECT_EQ(producer_meta.timestamp, consumer_meta.timestamp);
}
} // namespace
} // namespace dvr
} // namespace android