1 #include <thread>
2
3 #include <log/log.h>
4 #include <private/dvr/bufferhub_rpc.h>
5 #include <private/dvr/consumer_channel.h>
6 #include <private/dvr/producer_channel.h>
7 #include <utils/Trace.h>
8
9 using android::pdx::BorrowedHandle;
10 using android::pdx::Channel;
11 using android::pdx::ErrorStatus;
12 using android::pdx::Message;
13 using android::pdx::Status;
14 using android::pdx::rpc::DispatchRemoteMethod;
15
16 namespace android {
17 namespace dvr {
18
ConsumerChannel(BufferHubService * service,int buffer_id,int channel_id,uint32_t client_state_mask,const std::shared_ptr<Channel> producer)19 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
20 int channel_id, uint32_t client_state_mask,
21 const std::shared_ptr<Channel> producer)
22 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
23 client_state_mask_(client_state_mask),
24 producer_(producer) {
25 GetProducer()->AddConsumer(this);
26 }
27
~ConsumerChannel()28 ConsumerChannel::~ConsumerChannel() {
29 ALOGD_IF(TRACE,
30 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
31 channel_id(), buffer_id());
32
33 if (auto producer = GetProducer()) {
34 producer->RemoveConsumer(this);
35 }
36 }
37
GetBufferInfo() const38 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
39 BufferHubChannel::BufferInfo info;
40 if (auto producer = GetProducer()) {
41 // If producer has not hung up, copy most buffer info from the producer.
42 info = producer->GetBufferInfo();
43 } else {
44 info.signaled_mask = client_state_mask();
45 }
46 info.id = buffer_id();
47 return info;
48 }
49
GetProducer() const50 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
51 return std::static_pointer_cast<ProducerChannel>(producer_.lock());
52 }
53
HandleImpulse(Message & message)54 void ConsumerChannel::HandleImpulse(Message& message) {
55 ATRACE_NAME("ConsumerChannel::HandleImpulse");
56 switch (message.GetOp()) {
57 case BufferHubRPC::ConsumerAcquire::Opcode:
58 OnConsumerAcquire(message);
59 break;
60 case BufferHubRPC::ConsumerRelease::Opcode:
61 OnConsumerRelease(message, {});
62 break;
63 }
64 }
65
HandleMessage(Message & message)66 bool ConsumerChannel::HandleMessage(Message& message) {
67 ATRACE_NAME("ConsumerChannel::HandleMessage");
68 auto producer = GetProducer();
69 if (!producer)
70 REPLY_ERROR_RETURN(message, EPIPE, true);
71
72 switch (message.GetOp()) {
73 case BufferHubRPC::GetBuffer::Opcode:
74 DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
75 *this, &ConsumerChannel::OnGetBuffer, message);
76 return true;
77
78 case BufferHubRPC::NewConsumer::Opcode:
79 DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
80 *producer, &ProducerChannel::OnNewConsumer, message);
81 return true;
82
83 case BufferHubRPC::ConsumerAcquire::Opcode:
84 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
85 *this, &ConsumerChannel::OnConsumerAcquire, message);
86 return true;
87
88 case BufferHubRPC::ConsumerRelease::Opcode:
89 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
90 *this, &ConsumerChannel::OnConsumerRelease, message);
91 return true;
92
93 default:
94 return false;
95 }
96 }
97
OnGetBuffer(Message &)98 Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
99 Message& /*message*/) {
100 ATRACE_NAME("ConsumerChannel::OnGetBuffer");
101 ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
102 if (auto producer = GetProducer()) {
103 return {producer->GetBuffer(client_state_mask_)};
104 } else {
105 return ErrorStatus(EPIPE);
106 }
107 }
108
OnConsumerAcquire(Message & message)109 Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
110 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
111 auto producer = GetProducer();
112 if (!producer)
113 return ErrorStatus(EPIPE);
114
115 if (acquired_ || released_) {
116 ALOGE(
117 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
118 "acquired=%d released=%d channel_id=%d buffer_id=%d",
119 acquired_, released_, message.GetChannelId(), producer->buffer_id());
120 return ErrorStatus(EBUSY);
121 } else {
122 auto status = producer->OnConsumerAcquire(message);
123 if (status) {
124 ClearAvailable();
125 acquired_ = true;
126 }
127 return status;
128 }
129 }
130
OnConsumerRelease(Message & message,LocalFence release_fence)131 Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
132 LocalFence release_fence) {
133 ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
134 auto producer = GetProducer();
135 if (!producer)
136 return ErrorStatus(EPIPE);
137
138 if (!acquired_ || released_) {
139 ALOGE(
140 "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
141 "acquired=%d released=%d channel_id=%d buffer_id=%d",
142 acquired_, released_, message.GetChannelId(), producer->buffer_id());
143 return ErrorStatus(EBUSY);
144 } else {
145 auto status =
146 producer->OnConsumerRelease(message, std::move(release_fence));
147 if (status) {
148 ClearAvailable();
149 acquired_ = false;
150 released_ = true;
151 }
152 return status;
153 }
154 }
155
OnProducerGained()156 void ConsumerChannel::OnProducerGained() {
157 // Clear the signal if exist. There is a possiblity that the signal still
158 // exist in consumer client when producer gains the buffer, e.g. newly added
159 // consumer fail to acquire the previous posted buffer in time. Then, when
160 // producer gains back the buffer, posts the buffer again and signal the
161 // consumer later, there won't be an signal change in eventfd, and thus,
162 // consumer will miss the posted buffer later. Thus, we need to clear the
163 // signal in consumer clients if the signal exist.
164 ClearAvailable();
165 }
166
OnProducerPosted()167 void ConsumerChannel::OnProducerPosted() {
168 acquired_ = false;
169 released_ = false;
170 SignalAvailable();
171 }
172
OnProducerClosed()173 void ConsumerChannel::OnProducerClosed() {
174 producer_.reset();
175 Hangup();
176 }
177
178 } // namespace dvr
179 } // namespace android
180