1 #include <inttypes.h>
2
3 #include <private/dvr/consumer_queue_channel.h>
4 #include <private/dvr/producer_channel.h>
5 #include <private/dvr/producer_queue_channel.h>
6
7 using android::pdx::ErrorStatus;
8 using android::pdx::Message;
9 using android::pdx::RemoteChannelHandle;
10 using android::pdx::Status;
11 using android::pdx::rpc::DispatchRemoteMethod;
12
13 namespace android {
14 namespace dvr {
15
ProducerQueueChannel(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy,int * error)16 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
17 int channel_id,
18 const ProducerQueueConfig& config,
19 const UsagePolicy& usage_policy,
20 int* error)
21 : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
22 config_(config),
23 usage_policy_(usage_policy),
24 capacity_(0) {
25 *error = 0;
26 }
27
~ProducerQueueChannel()28 ProducerQueueChannel::~ProducerQueueChannel() {
29 ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
30 buffer_id());
31 for (auto* consumer : consumer_channels_)
32 consumer->OnProducerClosed();
33 }
34
35 /* static */
Create(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy)36 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
37 BufferHubService* service, int channel_id,
38 const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
39 // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
40 // should be mutually exclusive.
41 if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
42 ALOGE(
43 "BufferHubService::OnCreateProducerQueue: illegal usage mask "
44 "configuration: usage_deny_set_mask=%" PRIx64
45 " usage_deny_clear_mask=%" PRIx64,
46 usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
47 return ErrorStatus(EINVAL);
48 }
49
50 int error = 0;
51 std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
52 service, channel_id, config, usage_policy, &error));
53 if (error < 0)
54 return ErrorStatus(-error);
55 else
56 return {std::move(producer)};
57 }
58
HandleMessage(Message & message)59 bool ProducerQueueChannel::HandleMessage(Message& message) {
60 ATRACE_NAME("ProducerQueueChannel::HandleMessage");
61 switch (message.GetOp()) {
62 case BufferHubRPC::CreateConsumerQueue::Opcode:
63 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
64 *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
65 return true;
66
67 case BufferHubRPC::GetQueueInfo::Opcode:
68 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
69 *this, &ProducerQueueChannel::OnGetQueueInfo, message);
70 return true;
71
72 case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
73 DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
74 *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
75 message);
76 return true;
77
78 case BufferHubRPC::ProducerQueueInsertBuffer::Opcode:
79 DispatchRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
80 *this, &ProducerQueueChannel::OnProducerQueueInsertBuffer, message);
81 return true;
82
83 case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
84 DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
85 *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
86 return true;
87
88 default:
89 return false;
90 }
91 }
92
HandleImpulse(Message &)93 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
94 ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
95 }
96
GetBufferInfo() const97 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
98 return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
99 usage_policy_);
100 }
101
OnCreateConsumerQueue(Message & message,bool silent)102 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
103 Message& message, bool silent) {
104 ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
105 ALOGD_IF(
106 TRACE,
107 "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
108 channel_id(), silent);
109
110 int channel_id;
111 auto status = message.PushChannel(0, nullptr, &channel_id);
112 if (!status) {
113 ALOGE(
114 "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
115 "channel: %s",
116 status.GetErrorMessage().c_str());
117 return ErrorStatus(ENOMEM);
118 }
119
120 auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
121 service(), buffer_id(), channel_id, shared_from_this(), silent);
122
123 // Register the existing buffers with the new consumer queue.
124 for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
125 if (auto buffer = buffers_[slot].lock())
126 consumer_queue_channel->RegisterNewBuffer(buffer, slot);
127 }
128
129 const auto channel_status =
130 service()->SetChannel(channel_id, consumer_queue_channel);
131 if (!channel_status) {
132 ALOGE(
133 "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
134 "%s",
135 channel_status.GetErrorMessage().c_str());
136 return ErrorStatus(ENOMEM);
137 }
138
139 return {status.take()};
140 }
141
OnGetQueueInfo(Message &)142 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
143 return {{config_, buffer_id()}};
144 }
145
146 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnProducerQueueAllocateBuffers(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)147 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
148 Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
149 uint32_t format, uint64_t usage, size_t buffer_count) {
150 ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
151 ALOGD_IF(TRACE,
152 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
153 "producer_channel_id=%d",
154 channel_id());
155
156 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
157
158 // Deny buffer allocation violating preset rules.
159 if (usage & usage_policy_.usage_deny_set_mask) {
160 ALOGE(
161 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
162 " is not permitted. Violating usage_deny_set_mask, the following bits "
163 "shall not be set: %" PRIx64 ".",
164 usage, usage_policy_.usage_deny_set_mask);
165 return ErrorStatus(EINVAL);
166 }
167
168 if (~usage & usage_policy_.usage_deny_clear_mask) {
169 ALOGE(
170 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
171 " is not permitted. Violating usage_deny_clear_mask, the following "
172 "bits must be set: %" PRIx64 ".",
173 usage, usage_policy_.usage_deny_clear_mask);
174 return ErrorStatus(EINVAL);
175 }
176
177 // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
178 // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
179 uint64_t effective_usage =
180 (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
181
182 for (size_t i = 0; i < buffer_count; i++) {
183 auto status = AllocateBuffer(message, width, height, layer_count, format,
184 effective_usage);
185 if (!status) {
186 ALOGE(
187 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
188 "allocate new buffer.");
189 return ErrorStatus(status.error());
190 }
191 buffer_handles.push_back(status.take());
192 }
193
194 return {std::move(buffer_handles)};
195 }
196
197 Status<std::pair<RemoteChannelHandle, size_t>>
AllocateBuffer(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)198 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
199 uint32_t height, uint32_t layer_count,
200 uint32_t format, uint64_t usage) {
201 ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
202 ALOGD_IF(TRACE,
203 "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
204 channel_id());
205
206 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
207 ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
208 return ErrorStatus(E2BIG);
209 }
210
211 // Here we are creating a new BufferHubBuffer, initialize the producer
212 // channel, and returning its file handle back to the client.
213 // buffer_id is the id of the producer channel of BufferHubBuffer.
214 int buffer_id;
215 auto status = message.PushChannel(0, nullptr, &buffer_id);
216
217 if (!status) {
218 ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
219 status.GetErrorMessage().c_str());
220 return ErrorStatus(status.error());
221 }
222
223 ALOGD_IF(TRACE,
224 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
225 "height=%u layer_count=%u format=%u usage=%" PRIx64,
226 buffer_id, width, height, layer_count, format, usage);
227 auto buffer_handle = status.take();
228
229 auto producer_channel_status =
230 ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
231 format, usage, config_.user_metadata_size);
232 if (!producer_channel_status) {
233 ALOGE(
234 "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
235 "buffer: %s",
236 producer_channel_status.GetErrorMessage().c_str());
237 return ErrorStatus(ENOMEM);
238 }
239 auto producer_channel = producer_channel_status.take();
240
241 ALOGD_IF(
242 TRACE,
243 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
244 buffer_id, buffer_handle.value());
245
246 const auto channel_status =
247 service()->SetChannel(buffer_id, producer_channel);
248 if (!channel_status) {
249 ALOGE(
250 "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
251 "for new BufferHubBuffer: %s",
252 channel_status.GetErrorMessage().c_str());
253 return ErrorStatus(ENOMEM);
254 }
255
256 // Register the newly allocated buffer's channel_id into the first empty
257 // buffer slot.
258 size_t slot = 0;
259 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
260 if (buffers_[slot].expired())
261 break;
262 }
263 if (slot == BufferHubRPC::kMaxQueueCapacity) {
264 ALOGE(
265 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
266 "buffer allocation.");
267 return ErrorStatus(E2BIG);
268 }
269
270 buffers_[slot] = producer_channel;
271 capacity_++;
272
273 // Notify each consumer channel about the new buffer.
274 for (auto* consumer_channel : consumer_channels_) {
275 ALOGD(
276 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
277 "buffer, buffer_id=%d",
278 buffer_id);
279 consumer_channel->RegisterNewBuffer(producer_channel, slot);
280 }
281
282 return {{std::move(buffer_handle), slot}};
283 }
284
OnProducerQueueInsertBuffer(pdx::Message & message,int buffer_cid)285 Status<size_t> ProducerQueueChannel::OnProducerQueueInsertBuffer(
286 pdx::Message& message, int buffer_cid) {
287 ATRACE_NAME("ProducerQueueChannel::InsertBuffer");
288 ALOGD_IF(TRACE,
289 "ProducerQueueChannel::InsertBuffer: channel_id=%d, buffer_cid=%d",
290 channel_id(), buffer_cid);
291
292 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
293 ALOGE("ProducerQueueChannel::InsertBuffer: reaches kMaxQueueCapacity.");
294 return ErrorStatus(E2BIG);
295 }
296 auto producer_channel = std::static_pointer_cast<ProducerChannel>(
297 service()->GetChannel(buffer_cid));
298 if (producer_channel == nullptr ||
299 producer_channel->channel_type() != BufferHubChannel::kProducerType) {
300 // Rejects the request if the requested buffer channel is invalid and/or
301 // it's not a ProducerChannel.
302 ALOGE(
303 "ProducerQueueChannel::InsertBuffer: Invalid buffer_cid=%d, "
304 "producer_buffer=0x%p, channel_type=%d.",
305 buffer_cid, producer_channel.get(),
306 producer_channel == nullptr ? -1 : producer_channel->channel_type());
307 return ErrorStatus(EINVAL);
308 }
309 if (producer_channel->GetActiveProcessId() != message.GetProcessId()) {
310 // Rejects the request if the requested buffer channel is not currently
311 // connected to the caller this is IPC request. This effectively prevents
312 // fake buffer_cid from being injected.
313 ALOGE(
314 "ProducerQueueChannel::InsertBuffer: Requested buffer channel "
315 "(buffer_cid=%d) is not connected to the calling process (pid=%d). "
316 "It's connected to a different process (pid=%d).",
317 buffer_cid, message.GetProcessId(),
318 producer_channel->GetActiveProcessId());
319 return ErrorStatus(EINVAL);
320 }
321 uint64_t buffer_state = producer_channel->buffer_state();
322 // TODO(b/112007999) add an atomic variable in metadata header in shared
323 // memory to indicate which client is the last producer of the buffer.
324 // Currently, the first client is the only producer to the buffer.
325 // Thus, it checks whether the first client gains the buffer below.
326 if (!BufferHubDefs::isClientGained(buffer_state,
327 BufferHubDefs::kFirstClientBitMask)) {
328 // Rejects the request if the requested buffer is not in Gained state.
329 ALOGE(
330 "ProducerQueueChannel::InsertBuffer: The buffer (cid=%d, "
331 "state=0x%" PRIx64 ") is not in gained state.",
332 buffer_cid, buffer_state);
333 return ErrorStatus(EINVAL);
334 }
335
336 // Register the to-be-inserted buffer's channel_id into the first empty
337 // buffer slot.
338 size_t slot = 0;
339 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
340 if (buffers_[slot].expired())
341 break;
342 }
343 if (slot == BufferHubRPC::kMaxQueueCapacity) {
344 ALOGE(
345 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
346 "buffer allocation.");
347 return ErrorStatus(E2BIG);
348 }
349
350 buffers_[slot] = producer_channel;
351 capacity_++;
352
353 // Notify each consumer channel about the new buffer.
354 for (auto* consumer_channel : consumer_channels_) {
355 ALOGD(
356 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
357 "buffer, buffer_cid=%d",
358 buffer_cid);
359 consumer_channel->RegisterNewBuffer(producer_channel, slot);
360 }
361
362 return {slot};
363 }
364
OnProducerQueueRemoveBuffer(Message &,size_t slot)365 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
366 Message& /*message*/, size_t slot) {
367 if (buffers_[slot].expired()) {
368 ALOGE(
369 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
370 "an invalid buffer producer at slot %zu",
371 slot);
372 return ErrorStatus(EINVAL);
373 }
374
375 if (capacity_ == 0) {
376 ALOGE(
377 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
378 "buffer producer while the queue's capacity is already zero.");
379 return ErrorStatus(EINVAL);
380 }
381
382 buffers_[slot].reset();
383 capacity_--;
384 return {};
385 }
386
AddConsumer(ConsumerQueueChannel * channel)387 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
388 consumer_channels_.push_back(channel);
389 }
390
RemoveConsumer(ConsumerQueueChannel * channel)391 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
392 consumer_channels_.erase(
393 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
394 }
395
396 } // namespace dvr
397 } // namespace android
398