1 #include "consumer_queue_channel.h"
2
3 #include <pdx/channel_handle.h>
4
5 #include "producer_channel.h"
6
7 using android::pdx::ErrorStatus;
8 using android::pdx::RemoteChannelHandle;
9 using android::pdx::Status;
10 using android::pdx::rpc::DispatchRemoteMethod;
11 using android::pdx::rpc::RemoteMethodError;
12
13 namespace android {
14 namespace dvr {
15
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer)16 ConsumerQueueChannel::ConsumerQueueChannel(
17 BufferHubService* service, int buffer_id, int channel_id,
18 const std::shared_ptr<Channel>& producer)
19 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20 producer_(producer),
21 capacity_(0) {
22 GetProducer()->AddConsumer(this);
23 }
24
~ConsumerQueueChannel()25 ConsumerQueueChannel::~ConsumerQueueChannel() {
26 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
27 channel_id());
28
29 if (auto producer = GetProducer()) {
30 producer->RemoveConsumer(this);
31 }
32 }
33
HandleMessage(Message & message)34 bool ConsumerQueueChannel::HandleMessage(Message& message) {
35 ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
36 auto producer = GetProducer();
37 if (!producer) {
38 RemoteMethodError(message, EPIPE);
39 return true;
40 }
41
42 switch (message.GetOp()) {
43 case BufferHubRPC::CreateConsumerQueue::Opcode:
44 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
45 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
46 return true;
47
48 case BufferHubRPC::GetQueueInfo::Opcode:
49 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
50 *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
51 return true;
52
53 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
54 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
55 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
56 return true;
57
58 default:
59 return false;
60 }
61 }
62
GetProducer() const63 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
64 const {
65 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
66 }
67
HandleImpulse(Message &)68 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
69 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
70 }
71
GetBufferInfo() const72 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
73 BufferHubChannel::BufferInfo info;
74 if (auto producer = GetProducer()) {
75 // If producer has not hung up, copy most buffer info from the producer.
76 info = producer->GetBufferInfo();
77 }
78 info.id = buffer_id();
79 info.capacity = capacity_;
80 return info;
81 }
82
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t slot)83 void ConsumerQueueChannel::RegisterNewBuffer(
84 const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
85 ALOGD_IF(TRACE,
86 "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
87 producer_channel->buffer_id(), slot);
88 pending_buffer_slots_.emplace(producer_channel, slot);
89
90 // Signal the client that there is new buffer available throught POLLIN.
91 SignalAvailable();
92 }
93
94 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)95 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
96 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
97 ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
98 ALOGD_IF(
99 TRACE,
100 "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
101 "import: %zu",
102 pending_buffer_slots_.size());
103
104 while (!pending_buffer_slots_.empty()) {
105 auto producer_channel = pending_buffer_slots_.front().first.lock();
106 size_t producer_slot = pending_buffer_slots_.front().second;
107 pending_buffer_slots_.pop();
108
109 // It's possible that the producer channel has expired. When this occurs,
110 // ignore the producer channel.
111 if (producer_channel == nullptr) {
112 ALOGW(
113 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
114 "channel has already been expired.");
115 continue;
116 }
117
118 auto status = producer_channel->CreateConsumer(message);
119
120 // If no buffers are imported successfully, clear available and return an
121 // error. Otherwise, return all consumer handles already imported
122 // successfully, but keep available bits on, so that the client can retry
123 // importing remaining consumer buffers.
124 if (!status) {
125 ALOGE(
126 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
127 "consumer: %s",
128 status.GetErrorMessage().c_str());
129 if (buffer_handles.empty()) {
130 ClearAvailable();
131 return status.error_status();
132 } else {
133 return {std::move(buffer_handles)};
134 }
135 }
136
137 buffer_handles.emplace_back(status.take(), producer_slot);
138 }
139
140 ClearAvailable();
141 return {std::move(buffer_handles)};
142 }
143
OnProducerClosed()144 void ConsumerQueueChannel::OnProducerClosed() {
145 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
146 buffer_id());
147 producer_.reset();
148 Hangup();
149 }
150
151 } // namespace dvr
152 } // namespace android
153