1 #include <sys/epoll.h>
2 #include <sys/eventfd.h>
3 #include <sys/poll.h>
4
5 #include <algorithm>
6 #include <atomic>
7 #include <thread>
8
9 #include <log/log.h>
10 #include <private/dvr/bufferhub_rpc.h>
11 #include <private/dvr/consumer_channel.h>
12 #include <private/dvr/producer_channel.h>
13 #include <sync/sync.h>
14 #include <utils/Trace.h>
15
16 using android::pdx::BorrowedHandle;
17 using android::pdx::ErrorStatus;
18 using android::pdx::Message;
19 using android::pdx::RemoteChannelHandle;
20 using android::pdx::Status;
21 using android::pdx::rpc::BufferWrapper;
22 using android::pdx::rpc::DispatchRemoteMethod;
23 using android::pdx::rpc::WrapBuffer;
24
25 namespace android {
26 namespace dvr {
27
ProducerChannel(BufferHubService * service,int buffer_id,int channel_id,IonBuffer buffer,IonBuffer metadata_buffer,size_t user_metadata_size,int * error)28 ProducerChannel::ProducerChannel(BufferHubService* service, int buffer_id,
29 int channel_id, IonBuffer buffer,
30 IonBuffer metadata_buffer,
31 size_t user_metadata_size, int* error)
32 : BufferHubChannel(service, buffer_id, channel_id, kProducerType),
33 buffer_(std::move(buffer)),
34 metadata_buffer_(std::move(metadata_buffer)),
35 user_metadata_size_(user_metadata_size),
36 metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
37 user_metadata_size) {
38 if (!buffer_.IsValid()) {
39 ALOGE("ProducerChannel::ProducerChannel: Invalid buffer.");
40 *error = -EINVAL;
41 return;
42 }
43 if (!metadata_buffer_.IsValid()) {
44 ALOGE("ProducerChannel::ProducerChannel: Invalid metadata buffer.");
45 *error = -EINVAL;
46 return;
47 }
48
49 *error = InitializeBuffer();
50 }
51
ProducerChannel(BufferHubService * service,int channel_id,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size,int * error)52 ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id,
53 uint32_t width, uint32_t height,
54 uint32_t layer_count, uint32_t format,
55 uint64_t usage, size_t user_metadata_size,
56 int* error)
57 : BufferHubChannel(service, channel_id, channel_id, kProducerType),
58 user_metadata_size_(user_metadata_size),
59 metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
60 user_metadata_size) {
61 if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) {
62 ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s",
63 strerror(-ret));
64 *error = ret;
65 return;
66 }
67
68 if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1,
69 /*layer_count=*/1,
70 BufferHubDefs::kMetadataFormat,
71 BufferHubDefs::kMetadataUsage)) {
72 ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s",
73 strerror(-ret));
74 *error = ret;
75 return;
76 }
77
78 *error = InitializeBuffer();
79 }
80
InitializeBuffer()81 int ProducerChannel::InitializeBuffer() {
82 void* metadata_ptr = nullptr;
83 if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0,
84 /*y=*/0, metadata_buf_size_,
85 /*height=*/1, &metadata_ptr)) {
86 ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata.");
87 return ret;
88 }
89 metadata_header_ =
90 reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr);
91
92 // Using placement new here to reuse shared memory instead of new allocation
93 // and also initialize the value to zero.
94 buffer_state_ = new (&metadata_header_->bufferState) std::atomic<uint32_t>(0);
95 fence_state_ = new (&metadata_header_->fenceState) std::atomic<uint32_t>(0);
96 active_clients_bit_mask_ =
97 new (&metadata_header_->activeClientsBitMask) std::atomic<uint32_t>(0);
98
99 // Producer channel is never created after consumer channel, and one buffer
100 // only have one fixed producer for now. Thus, it is correct to assume
101 // producer state bit is kFirstClientBitMask for now.
102 active_clients_bit_mask_->store(BufferHubDefs::kFirstClientBitMask,
103 std::memory_order_release);
104
105 acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
106 release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
107 if (!acquire_fence_fd_ || !release_fence_fd_) {
108 ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences.");
109 return -EIO;
110 }
111
112 dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
113 if (!dummy_fence_fd_) {
114 ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences.");
115 return EIO;
116 }
117
118 epoll_event event;
119 event.events = 0;
120 event.data.u32 = 0U;
121 if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(),
122 &event) < 0) {
123 ALOGE(
124 "ProducerChannel::ProducerChannel: Failed to modify the shared "
125 "release fence to include the dummy fence: %s",
126 strerror(errno));
127 return -EIO;
128 }
129
130 // Success.
131 return 0;
132 }
133
Create(BufferHubService * service,int buffer_id,int channel_id,IonBuffer buffer,IonBuffer metadata_buffer,size_t user_metadata_size)134 std::unique_ptr<ProducerChannel> ProducerChannel::Create(
135 BufferHubService* service, int buffer_id, int channel_id, IonBuffer buffer,
136 IonBuffer metadata_buffer, size_t user_metadata_size) {
137 int error = 0;
138 std::unique_ptr<ProducerChannel> producer(new ProducerChannel(
139 service, buffer_id, channel_id, std::move(buffer),
140 std::move(metadata_buffer), user_metadata_size, &error));
141
142 if (error < 0)
143 return nullptr;
144 else
145 return producer;
146 }
147
Create(BufferHubService * service,int channel_id,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size)148 Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create(
149 BufferHubService* service, int channel_id, uint32_t width, uint32_t height,
150 uint32_t layer_count, uint32_t format, uint64_t usage,
151 size_t user_metadata_size) {
152 int error;
153 std::shared_ptr<ProducerChannel> producer(
154 new ProducerChannel(service, channel_id, width, height, layer_count,
155 format, usage, user_metadata_size, &error));
156 if (error < 0)
157 return ErrorStatus(-error);
158 else
159 return {std::move(producer)};
160 }
161
~ProducerChannel()162 ProducerChannel::~ProducerChannel() {
163 ALOGD_IF(TRACE,
164 "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d "
165 "state=%" PRIx32 ".",
166 channel_id(), buffer_id(),
167 buffer_state_->load(std::memory_order_acquire));
168 for (auto consumer : consumer_channels_) {
169 consumer->OnProducerClosed();
170 }
171 Hangup();
172 }
173
GetBufferInfo() const174 BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const {
175 // Derive the mask of signaled buffers in this producer / consumer set.
176 uint32_t signaled_mask = signaled() ? BufferHubDefs::kFirstClientBitMask : 0;
177 for (const ConsumerChannel* consumer : consumer_channels_) {
178 signaled_mask |= consumer->signaled() ? consumer->client_state_mask() : 0;
179 }
180
181 return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(),
182 buffer_.height(), buffer_.layer_count(), buffer_.format(),
183 buffer_.usage(),
184 buffer_state_->load(std::memory_order_acquire),
185 signaled_mask, metadata_header_->queueIndex);
186 }
187
HandleImpulse(Message & message)188 void ProducerChannel::HandleImpulse(Message& message) {
189 ATRACE_NAME("ProducerChannel::HandleImpulse");
190 switch (message.GetOp()) {
191 case BufferHubRPC::ProducerGain::Opcode:
192 OnProducerGain(message);
193 break;
194 case BufferHubRPC::ProducerPost::Opcode:
195 OnProducerPost(message, {});
196 break;
197 }
198 }
199
HandleMessage(Message & message)200 bool ProducerChannel::HandleMessage(Message& message) {
201 ATRACE_NAME("ProducerChannel::HandleMessage");
202 switch (message.GetOp()) {
203 case BufferHubRPC::GetBuffer::Opcode:
204 DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
205 *this, &ProducerChannel::OnGetBuffer, message);
206 return true;
207
208 case BufferHubRPC::NewConsumer::Opcode:
209 DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
210 *this, &ProducerChannel::OnNewConsumer, message);
211 return true;
212
213 case BufferHubRPC::ProducerPost::Opcode:
214 DispatchRemoteMethod<BufferHubRPC::ProducerPost>(
215 *this, &ProducerChannel::OnProducerPost, message);
216 return true;
217
218 case BufferHubRPC::ProducerGain::Opcode:
219 DispatchRemoteMethod<BufferHubRPC::ProducerGain>(
220 *this, &ProducerChannel::OnProducerGain, message);
221 return true;
222
223 default:
224 return false;
225 }
226 }
227
GetBuffer(uint32_t client_state_mask)228 BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer(
229 uint32_t client_state_mask) {
230 return {buffer_,
231 metadata_buffer_,
232 buffer_id(),
233 channel_id(),
234 client_state_mask,
235 acquire_fence_fd_.Borrow(),
236 release_fence_fd_.Borrow()};
237 }
238
OnGetBuffer(Message &)239 Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer(
240 Message& /*message*/) {
241 ATRACE_NAME("ProducerChannel::OnGetBuffer");
242 ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx32 ".",
243 buffer_id(), buffer_state_->load(std::memory_order_acquire));
244 return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
245 }
246
CreateConsumerStateMask()247 Status<uint32_t> ProducerChannel::CreateConsumerStateMask() {
248 // Try find the next consumer state bit which has not been claimed by any
249 // consumer yet.
250 // memory_order_acquire is chosen here because all writes in other threads
251 // that release active_clients_bit_mask_ need to be visible here.
252 uint32_t current_active_clients_bit_mask =
253 active_clients_bit_mask_->load(std::memory_order_acquire);
254 uint32_t consumer_state_mask =
255 BufferHubDefs::findNextAvailableClientStateMask(
256 current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
257 if (consumer_state_mask == 0U) {
258 ALOGE("%s: reached the maximum mumber of consumers per producer: 63.",
259 __FUNCTION__);
260 return ErrorStatus(E2BIG);
261 }
262 uint32_t updated_active_clients_bit_mask =
263 current_active_clients_bit_mask | consumer_state_mask;
264 // Set the updated value only if the current value stays the same as what was
265 // read before. If the comparison succeeds, update the value without
266 // reordering anything before or after this read-modify-write in the current
267 // thread, and the modification will be visible in other threads that acquire
268 // active_clients_bit_mask_. If the comparison fails, load the result of
269 // all writes from all threads to updated_active_clients_bit_mask.
270 // Keep on finding the next available slient state mask until succeed or out
271 // of memory.
272 while (!active_clients_bit_mask_->compare_exchange_weak(
273 current_active_clients_bit_mask, updated_active_clients_bit_mask,
274 std::memory_order_acq_rel, std::memory_order_acquire)) {
275 ALOGE("%s: Current active clients bit mask is changed to %" PRIx32
276 ", which was expected to be %" PRIx32
277 ". Trying to generate a new client state mask to resolve race "
278 "condition.",
279 __FUNCTION__, updated_active_clients_bit_mask,
280 current_active_clients_bit_mask);
281 consumer_state_mask = BufferHubDefs::findNextAvailableClientStateMask(
282 current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
283 if (consumer_state_mask == 0U) {
284 ALOGE("%s: reached the maximum mumber of consumers per producer: %d.",
285 __FUNCTION__, (BufferHubDefs::kMaxNumberOfClients - 1));
286 return ErrorStatus(E2BIG);
287 }
288 updated_active_clients_bit_mask =
289 current_active_clients_bit_mask | consumer_state_mask;
290 }
291
292 return {consumer_state_mask};
293 }
294
RemoveConsumerClientMask(uint32_t consumer_state_mask)295 void ProducerChannel::RemoveConsumerClientMask(uint32_t consumer_state_mask) {
296 // Clear up the buffer state and fence state in case there is already
297 // something there due to possible race condition between producer post and
298 // consumer failed to create channel.
299 buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
300 fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
301
302 // Restore the consumer state bit and make it visible in other threads that
303 // acquire the active_clients_bit_mask_.
304 active_clients_bit_mask_->fetch_and(~consumer_state_mask,
305 std::memory_order_release);
306 }
307
CreateConsumer(Message & message,uint32_t consumer_state_mask)308 Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
309 Message& message, uint32_t consumer_state_mask) {
310 ATRACE_NAME(__FUNCTION__);
311 ALOGD("%s: buffer_id=%d", __FUNCTION__, buffer_id());
312
313 int channel_id;
314 auto status = message.PushChannel(0, nullptr, &channel_id);
315 if (!status) {
316 ALOGE("%s: Failed to push consumer channel: %s", __FUNCTION__,
317 status.GetErrorMessage().c_str());
318 RemoveConsumerClientMask(consumer_state_mask);
319 return ErrorStatus(ENOMEM);
320 }
321
322 auto consumer = std::make_shared<ConsumerChannel>(
323 service(), buffer_id(), channel_id, consumer_state_mask,
324 shared_from_this());
325 const auto channel_status = service()->SetChannel(channel_id, consumer);
326 if (!channel_status) {
327 ALOGE("%s: failed to set new consumer channel: %s.", __FUNCTION__,
328 channel_status.GetErrorMessage().c_str());
329 RemoveConsumerClientMask(consumer_state_mask);
330 return ErrorStatus(ENOMEM);
331 }
332
333 uint32_t current_buffer_state =
334 buffer_state_->load(std::memory_order_acquire);
335 // Return the consumer channel handle without signal when adding the new
336 // consumer to a buffer that is available to producer (a.k.a a fully-released
337 // buffer) or a gained buffer.
338 if (current_buffer_state == 0U ||
339 BufferHubDefs::isAnyClientGained(current_buffer_state)) {
340 return {status.take()};
341 }
342
343 // Signal the new consumer when adding it to a posted producer.
344 bool update_buffer_state = true;
345 if (!BufferHubDefs::isClientPosted(current_buffer_state,
346 consumer_state_mask)) {
347 uint32_t updated_buffer_state =
348 current_buffer_state ^
349 (consumer_state_mask & BufferHubDefs::kHighBitsMask);
350 while (!buffer_state_->compare_exchange_weak(
351 current_buffer_state, updated_buffer_state, std::memory_order_acq_rel,
352 std::memory_order_acquire)) {
353 ALOGV(
354 "%s: Failed to post to the new consumer. "
355 "Current buffer state was changed to %" PRIx32
356 " when trying to acquire the buffer and modify the buffer state to "
357 "%" PRIx32
358 ". About to try again if the buffer is still not gained nor fully "
359 "released.",
360 __FUNCTION__, current_buffer_state, updated_buffer_state);
361 if (current_buffer_state == 0U ||
362 BufferHubDefs::isAnyClientGained(current_buffer_state)) {
363 ALOGI("%s: buffer is gained or fully released, state=%" PRIx32 ".",
364 __FUNCTION__, current_buffer_state);
365 update_buffer_state = false;
366 break;
367 }
368 updated_buffer_state =
369 current_buffer_state ^
370 (consumer_state_mask & BufferHubDefs::kHighBitsMask);
371 }
372 }
373 if (update_buffer_state || BufferHubDefs::isClientPosted(
374 buffer_state_->load(std::memory_order_acquire),
375 consumer_state_mask)) {
376 consumer->OnProducerPosted();
377 }
378
379 return {status.take()};
380 }
381
OnNewConsumer(Message & message)382 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
383 ATRACE_NAME("ProducerChannel::OnNewConsumer");
384 ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
385 auto status = CreateConsumerStateMask();
386 if (!status.ok()) {
387 return status.error_status();
388 }
389 return CreateConsumer(message, /*consumer_state_mask=*/status.get());
390 }
391
OnProducerPost(Message &,LocalFence acquire_fence)392 Status<void> ProducerChannel::OnProducerPost(Message&,
393 LocalFence acquire_fence) {
394 ATRACE_NAME("ProducerChannel::OnProducerPost");
395 ALOGD_IF(TRACE, "%s: buffer_id=%d, state=0x%x", __FUNCTION__, buffer_id(),
396 buffer_state_->load(std::memory_order_acquire));
397
398 epoll_event event;
399 event.events = 0;
400 event.data.u32 = 0U;
401 int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
402 dummy_fence_fd_.Get(), &event);
403 ALOGE_IF(ret < 0,
404 "ProducerChannel::OnProducerPost: Failed to modify the shared "
405 "release fence to include the dummy fence: %s",
406 strerror(errno));
407
408 eventfd_t dummy_fence_count = 0U;
409 if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) {
410 const int error = errno;
411 if (error != EAGAIN) {
412 ALOGE(
413 "ProducerChannel::ProducerChannel: Failed to read dummy fence, "
414 "error: %s",
415 strerror(error));
416 return ErrorStatus(error);
417 }
418 }
419
420 ALOGW_IF(dummy_fence_count > 0,
421 "ProducerChannel::ProducerChannel: %" PRIu64
422 " dummy fence(s) was signaled during last release/gain cycle "
423 "buffer_id=%d.",
424 dummy_fence_count, buffer_id());
425
426 post_fence_ = std::move(acquire_fence);
427
428 // Signal any interested consumers. If there are none, the buffer will stay
429 // in posted state until a consumer comes online. This behavior guarantees
430 // that no frame is silently dropped.
431 for (auto& consumer : consumer_channels_) {
432 consumer->OnProducerPosted();
433 }
434
435 return {};
436 }
437
OnProducerGain(Message &)438 Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) {
439 ATRACE_NAME("ProducerChannel::OnGain");
440 ALOGD_IF(TRACE, "%s: buffer_id=%d", __FUNCTION__, buffer_id());
441
442 ClearAvailable();
443 post_fence_.close();
444 for (auto& consumer : consumer_channels_) {
445 consumer->OnProducerGained();
446 }
447 return {std::move(returned_fence_)};
448 }
449
450 // TODO(b/112338294) Keep here for reference. Remove it after new logic is
451 // written.
452 /* Status<RemoteChannelHandle> ProducerChannel::OnProducerDetach(
453 Message& message) {
454 ATRACE_NAME("ProducerChannel::OnProducerDetach");
455 ALOGD_IF(TRACE, "ProducerChannel::OnProducerDetach: buffer_id=%d",
456 buffer_id());
457
458 uint32_t buffer_state = buffer_state_->load(std::memory_order_acquire);
459 if (!BufferHubDefs::isClientGained(
460 buffer_state, BufferHubDefs::kFirstClientStateMask)) {
461 // Can only detach a ProducerBuffer when it's in gained state.
462 ALOGW(
463 "ProducerChannel::OnProducerDetach: The buffer (id=%d, state=%"
464 PRIx32
465 ") is not in gained state.",
466 buffer_id(), buffer_state);
467 return {};
468 }
469
470 int channel_id;
471 auto status = message.PushChannel(0, nullptr, &channel_id);
472 if (!status) {
473 ALOGE(
474 "ProducerChannel::OnProducerDetach: Failed to push detached buffer "
475 "channel: %s",
476 status.GetErrorMessage().c_str());
477 return ErrorStatus(ENOMEM);
478 }
479
480 // Make sure we unlock the buffer.
481 if (int ret = metadata_buffer_.Unlock()) {
482 ALOGE("ProducerChannel::OnProducerDetach: Failed to unlock metadata.");
483 return ErrorStatus(-ret);
484 };
485
486 std::unique_ptr<BufferChannel> channel =
487 BufferChannel::Create(service(), buffer_id(), channel_id,
488 std::move(buffer_), user_metadata_size_);
489 if (!channel) {
490 ALOGE("ProducerChannel::OnProducerDetach: Invalid buffer.");
491 return ErrorStatus(EINVAL);
492 }
493
494 const auto channel_status =
495 service()->SetChannel(channel_id, std::move(channel));
496 if (!channel_status) {
497 // Technically, this should never fail, as we just pushed the channel.
498 // Note that LOG_FATAL will be stripped out in non-debug build.
499 LOG_FATAL(
500 "ProducerChannel::OnProducerDetach: Failed to set new detached "
501 "buffer channel: %s.", channel_status.GetErrorMessage().c_str());
502 }
503
504 return status;
505 } */
506
OnConsumerAcquire(Message &)507 Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) {
508 ATRACE_NAME("ProducerChannel::OnConsumerAcquire");
509 ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d",
510 buffer_id());
511
512 // Return a borrowed fd to avoid unnecessary duplication of the underlying
513 // fd. Serialization just needs to read the handle.
514 return {std::move(post_fence_)};
515 }
516
OnConsumerRelease(Message &,LocalFence release_fence)517 Status<void> ProducerChannel::OnConsumerRelease(Message&,
518 LocalFence release_fence) {
519 ATRACE_NAME("ProducerChannel::OnConsumerRelease");
520 ALOGD_IF(TRACE, "ProducerChannel::OnConsumerRelease: buffer_id=%d",
521 buffer_id());
522
523 // Attempt to merge the fences if necessary.
524 if (release_fence) {
525 if (returned_fence_) {
526 LocalFence merged_fence(sync_merge("bufferhub_merged",
527 returned_fence_.get_fd(),
528 release_fence.get_fd()));
529 const int error = errno;
530 if (!merged_fence) {
531 ALOGE("ProducerChannel::OnConsumerRelease: Failed to merge fences: %s",
532 strerror(error));
533 return ErrorStatus(error);
534 }
535 returned_fence_ = std::move(merged_fence);
536 } else {
537 returned_fence_ = std::move(release_fence);
538 }
539 }
540
541 if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
542 buffer_state_->store(0U);
543 SignalAvailable();
544 if (orphaned_consumer_bit_mask_) {
545 ALOGW(
546 "%s: orphaned buffer detected during the this acquire/release cycle: "
547 "id=%d orphaned=0x%" PRIx32 " queue_index=%" PRId64 ".",
548 __FUNCTION__, buffer_id(), orphaned_consumer_bit_mask_,
549 metadata_header_->queueIndex);
550 orphaned_consumer_bit_mask_ = 0;
551 }
552 }
553
554 return {};
555 }
556
OnConsumerOrphaned(const uint32_t & consumer_state_mask)557 void ProducerChannel::OnConsumerOrphaned(const uint32_t& consumer_state_mask) {
558 // Remember the ignored consumer so that newly added consumer won't be
559 // taking the same state mask as this orphaned consumer.
560 ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_mask,
561 "%s: Consumer (consumer_state_mask=%" PRIx32
562 ") is already orphaned.",
563 __FUNCTION__, consumer_state_mask);
564 orphaned_consumer_bit_mask_ |= consumer_state_mask;
565
566 if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
567 buffer_state_->store(0U);
568 SignalAvailable();
569 }
570
571 // Atomically clear the fence state bit as an orphaned consumer will never
572 // signal a release fence.
573 fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
574
575 // Atomically set the buffer state of this consumer to released state.
576 buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
577
578 ALOGW(
579 "%s: detected new orphaned consumer buffer_id=%d "
580 "consumer_state_mask=%" PRIx32 " queue_index=%" PRId64
581 " buffer_state=%" PRIx32 " fence_state=%" PRIx32 ".",
582 __FUNCTION__, buffer_id(), consumer_state_mask,
583 metadata_header_->queueIndex,
584 buffer_state_->load(std::memory_order_acquire),
585 fence_state_->load(std::memory_order_acquire));
586 }
587
AddConsumer(ConsumerChannel * channel)588 void ProducerChannel::AddConsumer(ConsumerChannel* channel) {
589 consumer_channels_.push_back(channel);
590 }
591
RemoveConsumer(ConsumerChannel * channel)592 void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) {
593 consumer_channels_.erase(
594 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
595 // Restore the consumer state bit and make it visible in other threads that
596 // acquire the active_clients_bit_mask_.
597 uint32_t consumer_state_mask = channel->client_state_mask();
598 uint32_t current_active_clients_bit_mask =
599 active_clients_bit_mask_->load(std::memory_order_acquire);
600 uint32_t updated_active_clients_bit_mask =
601 current_active_clients_bit_mask & (~consumer_state_mask);
602 while (!active_clients_bit_mask_->compare_exchange_weak(
603 current_active_clients_bit_mask, updated_active_clients_bit_mask,
604 std::memory_order_acq_rel, std::memory_order_acquire)) {
605 ALOGI(
606 "%s: Failed to remove consumer state mask. Current active clients bit "
607 "mask is changed to %" PRIx32
608 " when trying to acquire and modify it to %" PRIx32
609 ". About to try again.",
610 __FUNCTION__, current_active_clients_bit_mask,
611 updated_active_clients_bit_mask);
612 updated_active_clients_bit_mask =
613 current_active_clients_bit_mask & (~consumer_state_mask);
614 }
615
616 const uint32_t current_buffer_state =
617 buffer_state_->load(std::memory_order_acquire);
618 if (BufferHubDefs::isClientPosted(current_buffer_state,
619 consumer_state_mask) ||
620 BufferHubDefs::isClientAcquired(current_buffer_state,
621 consumer_state_mask)) {
622 // The consumer client is being destoryed without releasing. This could
623 // happen in corner cases when the consumer crashes. Here we mark it
624 // orphaned before remove it from producer.
625 OnConsumerOrphaned(consumer_state_mask);
626 return;
627 }
628
629 if (BufferHubDefs::isClientReleased(current_buffer_state,
630 consumer_state_mask) ||
631 BufferHubDefs::isAnyClientGained(current_buffer_state)) {
632 // The consumer is being close while it is suppose to signal a release
633 // fence. Signal the dummy fence here.
634 if (fence_state_->load(std::memory_order_acquire) & consumer_state_mask) {
635 epoll_event event;
636 event.events = EPOLLIN;
637 event.data.u32 = consumer_state_mask;
638 if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
639 dummy_fence_fd_.Get(), &event) < 0) {
640 ALOGE(
641 "%s: Failed to modify the shared release fence to include the "
642 "dummy fence: %s",
643 __FUNCTION__, strerror(errno));
644 return;
645 }
646 ALOGW("%s: signal dummy release fence buffer_id=%d", __FUNCTION__,
647 buffer_id());
648 eventfd_write(dummy_fence_fd_.Get(), 1);
649 }
650 }
651 }
652
653 // Returns true if the given parameters match the underlying buffer
654 // parameters.
CheckParameters(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t user_metadata_size) const655 bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height,
656 uint32_t layer_count, uint32_t format,
657 uint64_t usage,
658 size_t user_metadata_size) const {
659 return user_metadata_size == user_metadata_size_ &&
660 buffer_.width() == width && buffer_.height() == height &&
661 buffer_.layer_count() == layer_count && buffer_.format() == format &&
662 buffer_.usage() == usage;
663 }
664
IsBufferReleasedByAllActiveClientsExceptForOrphans() const665 bool ProducerChannel::IsBufferReleasedByAllActiveClientsExceptForOrphans()
666 const {
667 return (buffer_state_->load(std::memory_order_acquire) &
668 ~orphaned_consumer_bit_mask_ &
669 active_clients_bit_mask_->load(std::memory_order_acquire)) == 0U;
670 }
671
672 } // namespace dvr
673 } // namespace android
674