1 #include <pdx/channel_handle.h>
2 #include <private/dvr/consumer_queue_channel.h>
3 #include <private/dvr/producer_channel.h>
4
5 using android::pdx::ErrorStatus;
6 using android::pdx::RemoteChannelHandle;
7 using android::pdx::Status;
8 using android::pdx::rpc::DispatchRemoteMethod;
9 using android::pdx::rpc::RemoteMethodError;
10
11 namespace android {
12 namespace dvr {
13
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer,bool silent)14 ConsumerQueueChannel::ConsumerQueueChannel(
15 BufferHubService* service, int buffer_id, int channel_id,
16 const std::shared_ptr<Channel>& producer, bool silent)
17 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
18 producer_(producer),
19 capacity_(0),
20 silent_(silent) {
21 GetProducer()->AddConsumer(this);
22 }
23
~ConsumerQueueChannel()24 ConsumerQueueChannel::~ConsumerQueueChannel() {
25 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
26 channel_id());
27
28 if (auto producer = GetProducer()) {
29 producer->RemoveConsumer(this);
30 }
31 }
32
HandleMessage(Message & message)33 bool ConsumerQueueChannel::HandleMessage(Message& message) {
34 ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
35 auto producer = GetProducer();
36 if (!producer) {
37 RemoteMethodError(message, EPIPE);
38 return true;
39 }
40
41 switch (message.GetOp()) {
42 case BufferHubRPC::CreateConsumerQueue::Opcode:
43 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
44 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
45 return true;
46
47 case BufferHubRPC::GetQueueInfo::Opcode:
48 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
49 *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
50 return true;
51
52 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
53 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
54 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
55 return true;
56
57 default:
58 return false;
59 }
60 }
61
GetProducer() const62 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
63 const {
64 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
65 }
66
HandleImpulse(Message &)67 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
68 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
69 }
70
GetBufferInfo() const71 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
72 BufferHubChannel::BufferInfo info;
73 if (auto producer = GetProducer()) {
74 // If producer has not hung up, copy most buffer info from the producer.
75 info = producer->GetBufferInfo();
76 }
77 info.id = buffer_id();
78 info.capacity = capacity_;
79 return info;
80 }
81
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t producer_slot)82 void ConsumerQueueChannel::RegisterNewBuffer(
83 const std::shared_ptr<ProducerChannel>& producer_channel,
84 size_t producer_slot) {
85 ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
86 __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
87 producer_slot, silent_);
88 // Only register buffers if the queue is not silent.
89 if (silent_) {
90 return;
91 }
92
93 auto status = producer_channel->CreateConsumerStateMask();
94 if (!status.ok()) {
95 ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
96 status.GetErrorMessage().c_str());
97 return;
98 }
99 uint64_t consumer_state_mask = status.get();
100
101 pending_buffer_slots_.emplace(producer_channel, producer_slot,
102 consumer_state_mask);
103 // Signal the client that there is new buffer available.
104 SignalAvailable();
105 }
106
107 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)108 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
109 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
110 ATRACE_NAME(__FUNCTION__);
111 ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
112 pending_buffer_slots_.size());
113
114 // Indicate this is a silent queue that will not import buffers.
115 if (silent_)
116 return ErrorStatus(EBADR);
117
118 while (!pending_buffer_slots_.empty()) {
119 auto producer_channel =
120 pending_buffer_slots_.front().producer_channel.lock();
121 size_t producer_slot = pending_buffer_slots_.front().producer_slot;
122 uint64_t consumer_state_mask =
123 pending_buffer_slots_.front().consumer_state_mask;
124 pending_buffer_slots_.pop();
125
126 // It's possible that the producer channel has expired. When this occurs,
127 // ignore the producer channel.
128 if (producer_channel == nullptr) {
129 ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
130 continue;
131 }
132
133 auto status =
134 producer_channel->CreateConsumer(message, consumer_state_mask);
135
136 // If no buffers are imported successfully, clear available and return an
137 // error. Otherwise, return all consumer handles already imported
138 // successfully, but keep available bits on, so that the client can retry
139 // importing remaining consumer buffers.
140 if (!status) {
141 ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
142 status.GetErrorMessage().c_str());
143 if (buffer_handles.empty()) {
144 ClearAvailable();
145 return status.error_status();
146 } else {
147 return {std::move(buffer_handles)};
148 }
149 }
150
151 buffer_handles.emplace_back(status.take(), producer_slot);
152 }
153
154 ClearAvailable();
155 return {std::move(buffer_handles)};
156 }
157
OnProducerClosed()158 void ConsumerQueueChannel::OnProducerClosed() {
159 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
160 buffer_id());
161 producer_.reset();
162 Hangup();
163 }
164
165 } // namespace dvr
166 } // namespace android
167