• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "consumer_queue_channel.h"
2 
3 #include <pdx/channel_handle.h>
4 
5 #include "producer_channel.h"
6 
7 using android::pdx::ErrorStatus;
8 using android::pdx::RemoteChannelHandle;
9 using android::pdx::Status;
10 using android::pdx::rpc::DispatchRemoteMethod;
11 using android::pdx::rpc::RemoteMethodError;
12 
13 namespace android {
14 namespace dvr {
15 
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer)16 ConsumerQueueChannel::ConsumerQueueChannel(
17     BufferHubService* service, int buffer_id, int channel_id,
18     const std::shared_ptr<Channel>& producer)
19     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20       producer_(producer),
21       capacity_(0) {
22   GetProducer()->AddConsumer(this);
23 }
24 
~ConsumerQueueChannel()25 ConsumerQueueChannel::~ConsumerQueueChannel() {
26   ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
27            channel_id());
28 
29   if (auto producer = GetProducer()) {
30     producer->RemoveConsumer(this);
31   }
32 }
33 
HandleMessage(Message & message)34 bool ConsumerQueueChannel::HandleMessage(Message& message) {
35   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
36   auto producer = GetProducer();
37   if (!producer) {
38     RemoteMethodError(message, EPIPE);
39     return true;
40   }
41 
42   switch (message.GetOp()) {
43     case BufferHubRPC::CreateConsumerQueue::Opcode:
44       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
45           *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
46       return true;
47 
48     case BufferHubRPC::GetQueueInfo::Opcode:
49       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
50           *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
51       return true;
52 
53     case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
54       DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
55           *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
56       return true;
57 
58     default:
59       return false;
60   }
61 }
62 
GetProducer() const63 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
64     const {
65   return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
66 }
67 
HandleImpulse(Message &)68 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
69   ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
70 }
71 
GetBufferInfo() const72 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
73   BufferHubChannel::BufferInfo info;
74   if (auto producer = GetProducer()) {
75     // If producer has not hung up, copy most buffer info from the producer.
76     info = producer->GetBufferInfo();
77   }
78   info.id = buffer_id();
79   info.capacity = capacity_;
80   return info;
81 }
82 
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t slot)83 void ConsumerQueueChannel::RegisterNewBuffer(
84     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
85   ALOGD_IF(TRACE,
86            "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
87            producer_channel->buffer_id(), slot);
88   pending_buffer_slots_.emplace(producer_channel, slot);
89 
90   // Signal the client that there is new buffer available throught POLLIN.
91   SignalAvailable();
92 }
93 
94 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)95 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
96   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
97   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
98   ALOGD_IF(
99       TRACE,
100       "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
101       "import: %zu",
102       pending_buffer_slots_.size());
103 
104   while (!pending_buffer_slots_.empty()) {
105     auto producer_channel = pending_buffer_slots_.front().first.lock();
106     size_t producer_slot = pending_buffer_slots_.front().second;
107     pending_buffer_slots_.pop();
108 
109     // It's possible that the producer channel has expired. When this occurs,
110     // ignore the producer channel.
111     if (producer_channel == nullptr) {
112       ALOGW(
113           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
114           "channel has already been expired.");
115       continue;
116     }
117 
118     auto status = producer_channel->CreateConsumer(message);
119 
120     // If no buffers are imported successfully, clear available and return an
121     // error. Otherwise, return all consumer handles already imported
122     // successfully, but keep available bits on, so that the client can retry
123     // importing remaining consumer buffers.
124     if (!status) {
125       ALOGE(
126           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
127           "consumer: %s",
128           status.GetErrorMessage().c_str());
129       if (buffer_handles.empty()) {
130         ClearAvailable();
131         return status.error_status();
132       } else {
133         return {std::move(buffer_handles)};
134       }
135     }
136 
137     buffer_handles.emplace_back(status.take(), producer_slot);
138   }
139 
140   ClearAvailable();
141   return {std::move(buffer_handles)};
142 }
143 
OnProducerClosed()144 void ConsumerQueueChannel::OnProducerClosed() {
145   ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
146            buffer_id());
147   producer_.reset();
148   Hangup();
149 }
150 
151 }  // namespace dvr
152 }  // namespace android
153