1 /*
2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Filter"
19
20 #include <BufferAllocator/BufferAllocator.h>
21 #include <aidl/android/hardware/tv/tuner/DemuxFilterMonitorEventType.h>
22 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
23 #include <aidl/android/hardware/tv/tuner/Result.h>
24 #include <aidlcommonsupport/NativeHandle.h>
25 #include <inttypes.h>
26 #include <utils/Log.h>
27
28 #include "Filter.h"
29
30 namespace aidl {
31 namespace android {
32 namespace hardware {
33 namespace tv {
34 namespace tuner {
35
36 #define WAIT_TIMEOUT 3000000000
37
FilterCallbackScheduler(const std::shared_ptr<IFilterCallback> & cb)38 FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
39 : mCallback(cb),
40 mIsConditionMet(false),
41 mDataLength(0),
42 mTimeDelayInMs(0),
43 mDataSizeDelayInBytes(0) {
44 start();
45 }
46
~FilterCallbackScheduler()47 FilterCallbackScheduler::~FilterCallbackScheduler() {
48 stop();
49 }
50
onFilterEvent(DemuxFilterEvent && event)51 void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
52 std::unique_lock<std::mutex> lock(mLock);
53 mCallbackBuffer.push_back(std::move(event));
54 mDataLength += getDemuxFilterEventDataLength(event);
55
56 if (isDataSizeDelayConditionMetLocked()) {
57 mIsConditionMet = true;
58 // unlock, so thread is not immediately blocked when it is notified.
59 lock.unlock();
60 mCv.notify_all();
61 }
62 }
63
onFilterStatus(const DemuxFilterStatus & status)64 void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
65 if (mCallback) {
66 mCallback->onFilterStatus(status);
67 }
68 }
69
flushEvents()70 void FilterCallbackScheduler::flushEvents() {
71 std::unique_lock<std::mutex> lock(mLock);
72 mCallbackBuffer.clear();
73 mDataLength = 0;
74 }
75
setTimeDelayHint(int timeDelay)76 void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
77 std::unique_lock<std::mutex> lock(mLock);
78 mTimeDelayInMs = timeDelay;
79 // always notify condition variable to update timeout
80 mIsConditionMet = true;
81 lock.unlock();
82 mCv.notify_all();
83 }
84
setDataSizeDelayHint(int dataSizeDelay)85 void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
86 std::unique_lock<std::mutex> lock(mLock);
87 mDataSizeDelayInBytes = dataSizeDelay;
88 if (isDataSizeDelayConditionMetLocked()) {
89 mIsConditionMet = true;
90 lock.unlock();
91 mCv.notify_all();
92 }
93 }
94
hasCallbackRegistered() const95 bool FilterCallbackScheduler::hasCallbackRegistered() const {
96 return mCallback != nullptr;
97 }
98
start()99 void FilterCallbackScheduler::start() {
100 mIsRunning = true;
101 mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
102 }
103
stop()104 void FilterCallbackScheduler::stop() {
105 mIsRunning = false;
106 if (mCallbackThread.joinable()) {
107 {
108 std::lock_guard<std::mutex> lock(mLock);
109 mIsConditionMet = true;
110 }
111 mCv.notify_all();
112 mCallbackThread.join();
113 }
114 }
115
threadLoop()116 void FilterCallbackScheduler::threadLoop() {
117 while (mIsRunning) {
118 threadLoopOnce();
119 }
120 }
121
threadLoopOnce()122 void FilterCallbackScheduler::threadLoopOnce() {
123 std::unique_lock<std::mutex> lock(mLock);
124 if (mTimeDelayInMs > 0) {
125 // Note: predicate protects from lost and spurious wakeups
126 mCv.wait_for(lock, std::chrono::milliseconds(mTimeDelayInMs),
127 [this] { return mIsConditionMet; });
128 } else {
129 // Note: predicate protects from lost and spurious wakeups
130 mCv.wait(lock, [this] { return mIsConditionMet; });
131 }
132 mIsConditionMet = false;
133
134 // condition_variable wait locks mutex on timeout / notify
135 // Note: if stop() has been called in the meantime, do not send more filter
136 // events.
137 if (mIsRunning && !mCallbackBuffer.empty()) {
138 if (mCallback) {
139 mCallback->onFilterEvent(mCallbackBuffer);
140 }
141 mCallbackBuffer.clear();
142 mDataLength = 0;
143 }
144 }
145
146 // mLock needs to be held to call this function
isDataSizeDelayConditionMetLocked()147 bool FilterCallbackScheduler::isDataSizeDelayConditionMetLocked() {
148 if (mDataSizeDelayInBytes == 0) {
149 // Data size delay is disabled.
150 if (mTimeDelayInMs == 0) {
151 // Events should only be sent immediately if time delay is disabled
152 // as well.
153 return true;
154 }
155 return false;
156 }
157
158 // Data size delay is enabled.
159 return mDataLength >= mDataSizeDelayInBytes;
160 }
161
getDemuxFilterEventDataLength(const DemuxFilterEvent & event)162 int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
163 // there is a risk that dataLength could be a negative value, but it
164 // *should* be safe to assume that it is always positive.
165 switch (event.getTag()) {
166 case DemuxFilterEvent::Tag::section:
167 return event.get<DemuxFilterEvent::Tag::section>().dataLength;
168 case DemuxFilterEvent::Tag::media:
169 return event.get<DemuxFilterEvent::Tag::media>().dataLength;
170 case DemuxFilterEvent::Tag::pes:
171 return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
172 case DemuxFilterEvent::Tag::download:
173 return event.get<DemuxFilterEvent::Tag::download>().dataLength;
174 case DemuxFilterEvent::Tag::ipPayload:
175 return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
176
177 case DemuxFilterEvent::Tag::tsRecord:
178 case DemuxFilterEvent::Tag::mmtpRecord:
179 case DemuxFilterEvent::Tag::temi:
180 case DemuxFilterEvent::Tag::monitorEvent:
181 case DemuxFilterEvent::Tag::startId:
182 // these events do not include a payload and should therefore return
183 // 0.
184 // do not add a default option, so this will not compile when new types
185 // are added.
186 return 0;
187 }
188 }
189
Filter(DemuxFilterType type,int64_t filterId,uint32_t bufferSize,const std::shared_ptr<IFilterCallback> & cb,std::shared_ptr<Demux> demux)190 Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
191 const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
192 : mDemux(demux),
193 mCallbackScheduler(cb),
194 mFilterId(filterId),
195 mBufferSize(bufferSize),
196 mType(type) {
197 switch (mType.mainType) {
198 case DemuxFilterMainType::TS:
199 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
200 DemuxTsFilterType::AUDIO ||
201 mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
202 DemuxTsFilterType::VIDEO) {
203 mIsMediaFilter = true;
204 }
205 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
206 DemuxTsFilterType::PCR) {
207 mIsPcrFilter = true;
208 }
209 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
210 DemuxTsFilterType::RECORD) {
211 mIsRecordFilter = true;
212 }
213 break;
214 case DemuxFilterMainType::MMTP:
215 if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
216 DemuxMmtpFilterType::AUDIO ||
217 mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
218 DemuxMmtpFilterType::VIDEO) {
219 mIsMediaFilter = true;
220 }
221 if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
222 DemuxMmtpFilterType::RECORD) {
223 mIsRecordFilter = true;
224 }
225 break;
226 case DemuxFilterMainType::IP:
227 break;
228 case DemuxFilterMainType::TLV:
229 break;
230 case DemuxFilterMainType::ALP:
231 break;
232 default:
233 break;
234 }
235 }
236
~Filter()237 Filter::~Filter() {
238 close();
239 }
240
getId64Bit(int64_t * _aidl_return)241 ::ndk::ScopedAStatus Filter::getId64Bit(int64_t* _aidl_return) {
242 ALOGV("%s", __FUNCTION__);
243
244 *_aidl_return = mFilterId;
245 return ::ndk::ScopedAStatus::ok();
246 }
247
getId(int32_t * _aidl_return)248 ::ndk::ScopedAStatus Filter::getId(int32_t* _aidl_return) {
249 ALOGV("%s", __FUNCTION__);
250
251 *_aidl_return = static_cast<int32_t>(mFilterId);
252 return ::ndk::ScopedAStatus::ok();
253 }
254
setDataSource(const std::shared_ptr<IFilter> & in_filter)255 ::ndk::ScopedAStatus Filter::setDataSource(const std::shared_ptr<IFilter>& in_filter) {
256 ALOGV("%s", __FUNCTION__);
257
258 mDataSource = in_filter;
259 mIsDataSourceDemux = false;
260
261 return ::ndk::ScopedAStatus::ok();
262 }
263
setDelayHint(const FilterDelayHint & in_hint)264 ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
265 if (mIsMediaFilter) {
266 // delay hint is not supported for media filters
267 return ::ndk::ScopedAStatus::fromServiceSpecificError(
268 static_cast<int32_t>(Result::UNAVAILABLE));
269 }
270
271 ALOGV("%s", __FUNCTION__);
272 if (in_hint.hintValue < 0) {
273 return ::ndk::ScopedAStatus::fromServiceSpecificError(
274 static_cast<int32_t>(Result::INVALID_ARGUMENT));
275 }
276
277 switch (in_hint.hintType) {
278 case FilterDelayHintType::TIME_DELAY_IN_MS:
279 mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
280 break;
281 case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
282 mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
283 break;
284 default:
285 return ::ndk::ScopedAStatus::fromServiceSpecificError(
286 static_cast<int32_t>(Result::INVALID_ARGUMENT));
287 }
288
289 return ::ndk::ScopedAStatus::ok();
290 }
291
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)292 ::ndk::ScopedAStatus Filter::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
293 ALOGV("%s", __FUNCTION__);
294
295 mIsUsingFMQ = mIsRecordFilter ? false : true;
296
297 *out_queue = mFilterMQ->dupeDesc();
298 return ::ndk::ScopedAStatus::ok();
299 }
300
configure(const DemuxFilterSettings & in_settings)301 ::ndk::ScopedAStatus Filter::configure(const DemuxFilterSettings& in_settings) {
302 ALOGV("%s", __FUNCTION__);
303
304 mFilterSettings = in_settings;
305 switch (mType.mainType) {
306 case DemuxFilterMainType::TS:
307 mTpid = in_settings.get<DemuxFilterSettings::Tag::ts>().tpid;
308 break;
309 case DemuxFilterMainType::MMTP:
310 break;
311 case DemuxFilterMainType::IP:
312 break;
313 case DemuxFilterMainType::TLV:
314 break;
315 case DemuxFilterMainType::ALP:
316 break;
317 default:
318 break;
319 }
320
321 mConfigured = true;
322 return ::ndk::ScopedAStatus::ok();
323 }
324
start()325 ::ndk::ScopedAStatus Filter::start() {
326 ALOGV("%s", __FUNCTION__);
327 mFilterThreadRunning = true;
328 std::vector<DemuxFilterEvent> events;
329 // All the filter event callbacks in start are for testing purpose.
330 switch (mType.mainType) {
331 case DemuxFilterMainType::TS:
332 createMediaEvent(events, false);
333 createMediaEvent(events, true);
334 createTsRecordEvent(events);
335 createTemiEvent(events);
336 break;
337 case DemuxFilterMainType::MMTP:
338 createDownloadEvent(events);
339 createMmtpRecordEvent(events);
340 break;
341 case DemuxFilterMainType::IP:
342 createSectionEvent(events);
343 createIpPayloadEvent(events);
344 break;
345 case DemuxFilterMainType::TLV:
346 createMonitorEvent(events);
347 break;
348 case DemuxFilterMainType::ALP:
349 createMonitorEvent(events);
350 break;
351 default:
352 break;
353 }
354
355 for (auto&& event : events) {
356 mCallbackScheduler.onFilterEvent(std::move(event));
357 }
358
359 return startFilterLoop();
360 }
361
stop()362 ::ndk::ScopedAStatus Filter::stop() {
363 ALOGV("%s", __FUNCTION__);
364
365 mFilterThreadRunning = false;
366 if (mFilterThread.joinable()) {
367 mFilterThread.join();
368 }
369
370 mCallbackScheduler.flushEvents();
371
372 return ::ndk::ScopedAStatus::ok();
373 }
374
flush()375 ::ndk::ScopedAStatus Filter::flush() {
376 ALOGV("%s", __FUNCTION__);
377
378 // temp implementation to flush the FMQ
379 int size = mFilterMQ->availableToRead();
380 int8_t* buffer = new int8_t[size];
381 mFilterMQ->read(buffer, size);
382 delete[] buffer;
383 mFilterStatus = DemuxFilterStatus::DATA_READY;
384
385 return ::ndk::ScopedAStatus::ok();
386 }
387
releaseAvHandle(const NativeHandle & in_avMemory,int64_t in_avDataId)388 ::ndk::ScopedAStatus Filter::releaseAvHandle(const NativeHandle& in_avMemory, int64_t in_avDataId) {
389 ALOGV("%s", __FUNCTION__);
390
391 if ((mSharedAvMemHandle != nullptr) && (in_avMemory.fds.size() > 0) &&
392 (sameFile(in_avMemory.fds[0].get(), mSharedAvMemHandle->data[0]))) {
393 freeSharedAvHandle();
394 return ::ndk::ScopedAStatus::ok();
395 }
396
397 if (mDataId2Avfd.find(in_avDataId) == mDataId2Avfd.end()) {
398 return ::ndk::ScopedAStatus::fromServiceSpecificError(
399 static_cast<int32_t>(Result::INVALID_ARGUMENT));
400 }
401
402 ::close(mDataId2Avfd[in_avDataId]);
403 return ::ndk::ScopedAStatus::ok();
404 }
405
close()406 ::ndk::ScopedAStatus Filter::close() {
407 ALOGV("%s", __FUNCTION__);
408
409 stop();
410
411 return mDemux->removeFilter(mFilterId);
412 }
413
configureIpCid(int32_t in_ipCid)414 ::ndk::ScopedAStatus Filter::configureIpCid(int32_t in_ipCid) {
415 ALOGV("%s", __FUNCTION__);
416
417 if (mType.mainType != DemuxFilterMainType::IP) {
418 return ::ndk::ScopedAStatus::fromServiceSpecificError(
419 static_cast<int32_t>(Result::INVALID_STATE));
420 }
421
422 mCid = in_ipCid;
423 return ::ndk::ScopedAStatus::ok();
424 }
425
getAvSharedHandle(NativeHandle * out_avMemory,int64_t * _aidl_return)426 ::ndk::ScopedAStatus Filter::getAvSharedHandle(NativeHandle* out_avMemory, int64_t* _aidl_return) {
427 ALOGV("%s", __FUNCTION__);
428
429 if (!mIsMediaFilter) {
430 return ::ndk::ScopedAStatus::fromServiceSpecificError(
431 static_cast<int32_t>(Result::INVALID_STATE));
432 }
433
434 if (mSharedAvMemHandle != nullptr) {
435 *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
436 *_aidl_return = BUFFER_SIZE;
437 mUsingSharedAvMem = true;
438 return ::ndk::ScopedAStatus::ok();
439 }
440
441 int av_fd = createAvIonFd(BUFFER_SIZE);
442 if (av_fd < 0) {
443 return ::ndk::ScopedAStatus::fromServiceSpecificError(
444 static_cast<int32_t>(Result::OUT_OF_MEMORY));
445 }
446
447 mSharedAvMemHandle = createNativeHandle(av_fd);
448 if (mSharedAvMemHandle == nullptr) {
449 ::close(av_fd);
450 *_aidl_return = 0;
451 return ::ndk::ScopedAStatus::fromServiceSpecificError(
452 static_cast<int32_t>(Result::UNKNOWN_ERROR));
453 }
454 ::close(av_fd);
455 mUsingSharedAvMem = true;
456
457 *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
458 *_aidl_return = BUFFER_SIZE;
459 return ::ndk::ScopedAStatus::ok();
460 }
461
configureAvStreamType(const AvStreamType & in_avStreamType)462 ::ndk::ScopedAStatus Filter::configureAvStreamType(const AvStreamType& in_avStreamType) {
463 ALOGV("%s", __FUNCTION__);
464
465 if (!mIsMediaFilter) {
466 return ::ndk::ScopedAStatus::fromServiceSpecificError(
467 static_cast<int32_t>(Result::UNAVAILABLE));
468 }
469
470 switch (in_avStreamType.getTag()) {
471 case AvStreamType::Tag::audio:
472 mAudioStreamType =
473 static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::audio>());
474 break;
475 case AvStreamType::Tag::video:
476 mVideoStreamType =
477 static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::video>());
478 break;
479 default:
480 break;
481 }
482
483 return ::ndk::ScopedAStatus::ok();
484 }
485
configureMonitorEvent(int in_monitorEventTypes)486 ::ndk::ScopedAStatus Filter::configureMonitorEvent(int in_monitorEventTypes) {
487 ALOGV("%s", __FUNCTION__);
488
489 int32_t newScramblingStatus =
490 in_monitorEventTypes &
491 static_cast<int32_t>(DemuxFilterMonitorEventType::SCRAMBLING_STATUS);
492 int32_t newIpCid =
493 in_monitorEventTypes & static_cast<int32_t>(DemuxFilterMonitorEventType::IP_CID_CHANGE);
494
495 // if scrambling status monitoring flipped, record the new state and send msg on enabling
496 if (newScramblingStatus ^ mScramblingStatusMonitored) {
497 mScramblingStatusMonitored = newScramblingStatus;
498 if (mScramblingStatusMonitored) {
499 if (mCallbackScheduler.hasCallbackRegistered()) {
500 // Assuming current status is always NOT_SCRAMBLED
501 auto monitorEvent = DemuxFilterMonitorEvent::make<
502 DemuxFilterMonitorEvent::Tag::scramblingStatus>(
503 ScramblingStatus::NOT_SCRAMBLED);
504 auto event =
505 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
506 mCallbackScheduler.onFilterEvent(std::move(event));
507 } else {
508 return ::ndk::ScopedAStatus::fromServiceSpecificError(
509 static_cast<int32_t>(Result::INVALID_STATE));
510 }
511 }
512 }
513
514 // if ip cid monitoring flipped, record the new state and send msg on enabling
515 if (newIpCid ^ mIpCidMonitored) {
516 mIpCidMonitored = newIpCid;
517 if (mIpCidMonitored) {
518 if (mCallbackScheduler.hasCallbackRegistered()) {
519 // Return random cid
520 auto monitorEvent =
521 DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
522 auto event =
523 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
524 mCallbackScheduler.onFilterEvent(std::move(event));
525 } else {
526 return ::ndk::ScopedAStatus::fromServiceSpecificError(
527 static_cast<int32_t>(Result::INVALID_STATE));
528 }
529 }
530 }
531
532 return ::ndk::ScopedAStatus::ok();
533 }
534
createFilterMQ()535 bool Filter::createFilterMQ() {
536 ALOGV("%s", __FUNCTION__);
537
538 // Create a synchronized FMQ that supports blocking read/write
539 std::unique_ptr<FilterMQ> tmpFilterMQ =
540 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
541 if (!tmpFilterMQ->isValid()) {
542 ALOGW("[Filter] Failed to create FMQ of filter with id: %" PRIu64, mFilterId);
543 return false;
544 }
545
546 mFilterMQ = std::move(tmpFilterMQ);
547
548 if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventsFlag) !=
549 ::android::OK) {
550 return false;
551 }
552
553 return true;
554 }
555
startFilterLoop()556 ::ndk::ScopedAStatus Filter::startFilterLoop() {
557 mFilterThread = std::thread(&Filter::filterThreadLoop, this);
558 return ::ndk::ScopedAStatus::ok();
559 }
560
filterThreadLoop()561 void Filter::filterThreadLoop() {
562 if (!mFilterThreadRunning) {
563 return;
564 }
565
566 ALOGD("[Filter] filter %" PRIu64 " threadLoop start.", mFilterId);
567
568 // For the first time of filter output, implementation needs to send the filter
569 // Event Callback without waiting for the DATA_CONSUMED to init the process.
570 while (mFilterThreadRunning) {
571 std::unique_lock<std::mutex> lock(mFilterEventsLock);
572 if (mFilterEvents.size() == 0) {
573 lock.unlock();
574 if (DEBUG_FILTER) {
575 ALOGD("[Filter] wait for filter data output.");
576 }
577 usleep(1000 * 1000);
578 continue;
579 }
580
581 // After successfully write, send a callback and wait for the read to be done
582 if (mCallbackScheduler.hasCallbackRegistered()) {
583 if (mConfigured) {
584 auto startEvent =
585 DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
586 mCallbackScheduler.onFilterEvent(std::move(startEvent));
587 mConfigured = false;
588 }
589
590 // lock is still being held
591 for (auto&& event : mFilterEvents) {
592 mCallbackScheduler.onFilterEvent(std::move(event));
593 }
594 } else {
595 ALOGD("[Filter] filter callback is not configured yet.");
596 mFilterThreadRunning = false;
597 return;
598 }
599
600 mFilterEvents.clear();
601 mFilterStatus = DemuxFilterStatus::DATA_READY;
602 mCallbackScheduler.onFilterStatus(mFilterStatus);
603 break;
604 }
605
606 while (mFilterThreadRunning) {
607 uint32_t efState = 0;
608 // We do not wait for the last round of written data to be read to finish the thread
609 // because the VTS can verify the reading itself.
610 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
611 if (!mFilterThreadRunning) {
612 break;
613 }
614 while (mFilterThreadRunning && mIsUsingFMQ) {
615 ::android::status_t status = mFilterEventsFlag->wait(
616 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
617 WAIT_TIMEOUT, true /* retry on spurious wake */);
618 if (status != ::android::OK) {
619 ALOGD("[Filter] wait for data consumed");
620 continue;
621 }
622 break;
623 }
624
625 maySendFilterStatusCallback();
626
627 while (mFilterThreadRunning) {
628 std::lock_guard<std::mutex> lock(mFilterEventsLock);
629 if (mFilterEvents.size() == 0) {
630 continue;
631 }
632 // After successfully write, send a callback and wait for the read to be done
633 for (auto&& event : mFilterEvents) {
634 mCallbackScheduler.onFilterEvent(std::move(event));
635 }
636 mFilterEvents.clear();
637 break;
638 }
639 // We do not wait for the last read to be done
640 // VTS can verify the read result itself.
641 if (i == SECTION_WRITE_COUNT - 1) {
642 ALOGD("[Filter] filter %" PRIu64 " writing done. Ending thread", mFilterId);
643 break;
644 }
645 }
646 break;
647 }
648 ALOGD("[Filter] filter thread ended.");
649 }
650
freeSharedAvHandle()651 void Filter::freeSharedAvHandle() {
652 if (!mIsMediaFilter) {
653 return;
654 }
655 native_handle_close(mSharedAvMemHandle);
656 native_handle_delete(mSharedAvMemHandle);
657 mSharedAvMemHandle = nullptr;
658 }
659
dump(int fd,const char **,uint32_t)660 binder_status_t Filter::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
661 dprintf(fd, " Filter %" PRIu64 ":\n", mFilterId);
662 dprintf(fd, " Main type: %d\n", mType.mainType);
663 dprintf(fd, " mIsMediaFilter: %d\n", mIsMediaFilter);
664 dprintf(fd, " mIsPcrFilter: %d\n", mIsPcrFilter);
665 dprintf(fd, " mIsRecordFilter: %d\n", mIsRecordFilter);
666 dprintf(fd, " mIsUsingFMQ: %d\n", mIsUsingFMQ);
667 dprintf(fd, " mFilterThreadRunning: %d\n", (bool)mFilterThreadRunning);
668 return STATUS_OK;
669 }
670
maySendFilterStatusCallback()671 void Filter::maySendFilterStatusCallback() {
672 if (!mIsUsingFMQ) {
673 return;
674 }
675 std::lock_guard<std::mutex> lock(mFilterStatusLock);
676 int availableToRead = mFilterMQ->availableToRead();
677 int availableToWrite = mFilterMQ->availableToWrite();
678 int fmqSize = mFilterMQ->getQuantumCount();
679
680 DemuxFilterStatus newStatus = checkFilterStatusChange(
681 availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
682 if (mFilterStatus != newStatus) {
683 mCallbackScheduler.onFilterStatus(newStatus);
684 mFilterStatus = newStatus;
685 }
686 }
687
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)688 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
689 uint32_t availableToRead, uint32_t highThreshold,
690 uint32_t lowThreshold) {
691 if (availableToWrite == 0) {
692 return DemuxFilterStatus::OVERFLOW;
693 } else if (availableToRead > highThreshold) {
694 return DemuxFilterStatus::HIGH_WATER;
695 } else if (availableToRead == 0) {
696 return DemuxFilterStatus::NO_DATA;
697 } else if (availableToRead < lowThreshold) {
698 return DemuxFilterStatus::LOW_WATER;
699 }
700 return mFilterStatus;
701 }
702
getTpid()703 uint16_t Filter::getTpid() {
704 return mTpid;
705 }
706
updateFilterOutput(vector<int8_t> & data)707 void Filter::updateFilterOutput(vector<int8_t>& data) {
708 std::lock_guard<std::mutex> lock(mFilterOutputLock);
709 mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
710 }
711
updatePts(uint64_t pts)712 void Filter::updatePts(uint64_t pts) {
713 std::lock_guard<std::mutex> lock(mFilterOutputLock);
714 mPts = pts;
715 }
716
updateRecordOutput(vector<int8_t> & data)717 void Filter::updateRecordOutput(vector<int8_t>& data) {
718 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
719 mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
720 }
721
startFilterHandler()722 ::ndk::ScopedAStatus Filter::startFilterHandler() {
723 std::lock_guard<std::mutex> lock(mFilterOutputLock);
724 switch (mType.mainType) {
725 case DemuxFilterMainType::TS:
726 switch (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>()) {
727 case DemuxTsFilterType::UNDEFINED:
728 break;
729 case DemuxTsFilterType::SECTION:
730 startSectionFilterHandler();
731 break;
732 case DemuxTsFilterType::PES:
733 startPesFilterHandler();
734 break;
735 case DemuxTsFilterType::TS:
736 startTsFilterHandler();
737 break;
738 case DemuxTsFilterType::AUDIO:
739 case DemuxTsFilterType::VIDEO:
740 startMediaFilterHandler();
741 break;
742 case DemuxTsFilterType::PCR:
743 startPcrFilterHandler();
744 break;
745 case DemuxTsFilterType::TEMI:
746 startTemiFilterHandler();
747 break;
748 default:
749 break;
750 }
751 break;
752 case DemuxFilterMainType::MMTP:
753 /*mmtpSettings*/
754 break;
755 case DemuxFilterMainType::IP:
756 /*ipSettings*/
757 break;
758 case DemuxFilterMainType::TLV:
759 /*tlvSettings*/
760 break;
761 case DemuxFilterMainType::ALP:
762 /*alpSettings*/
763 break;
764 default:
765 break;
766 }
767 return ::ndk::ScopedAStatus::ok();
768 }
769
startSectionFilterHandler()770 ::ndk::ScopedAStatus Filter::startSectionFilterHandler() {
771 if (mFilterOutput.empty()) {
772 return ::ndk::ScopedAStatus::ok();
773 }
774 if (!writeSectionsAndCreateEvent(mFilterOutput)) {
775 ALOGD("[Filter] filter %" PRIu64 " fails to write into FMQ. Ending thread", mFilterId);
776 return ::ndk::ScopedAStatus::fromServiceSpecificError(
777 static_cast<int32_t>(Result::UNKNOWN_ERROR));
778 }
779
780 mFilterOutput.clear();
781
782 return ::ndk::ScopedAStatus::ok();
783 }
784
startPesFilterHandler()785 ::ndk::ScopedAStatus Filter::startPesFilterHandler() {
786 if (mFilterOutput.empty()) {
787 return ::ndk::ScopedAStatus::ok();
788 }
789
790 for (int i = 0; i < mFilterOutput.size(); i += 188) {
791 if (mPesSizeLeft == 0) {
792 uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
793 mFilterOutput[i + 6];
794 if (DEBUG_FILTER) {
795 ALOGD("[Filter] prefix %d", prefix);
796 }
797 if (prefix == 0x000001) {
798 // TODO handle mulptiple Pes filters
799 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
800 static_cast<uint8_t>(mFilterOutput[i + 9]);
801 mPesSizeLeft += 6;
802 if (DEBUG_FILTER) {
803 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
804 }
805 } else {
806 continue;
807 }
808 }
809
810 uint32_t endPoint = min(184u, mPesSizeLeft);
811 // append data and check size
812 vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
813 vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
814 mPesOutput.insert(mPesOutput.end(), first, last);
815 // size does not match then continue
816 mPesSizeLeft -= endPoint;
817 if (DEBUG_FILTER) {
818 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
819 }
820 if (mPesSizeLeft > 0) {
821 continue;
822 }
823 // size match then create event
824 if (!writeDataToFilterMQ(mPesOutput)) {
825 ALOGD("[Filter] pes data write failed");
826 mFilterOutput.clear();
827 return ::ndk::ScopedAStatus::fromServiceSpecificError(
828 static_cast<int32_t>(Result::INVALID_ARGUMENT));
829 }
830 maySendFilterStatusCallback();
831 DemuxFilterPesEvent pesEvent;
832 pesEvent = {
833 // temp dump meta data
834 .streamId = static_cast<int32_t>(mPesOutput[3]),
835 .dataLength = static_cast<int32_t>(mPesOutput.size()),
836 };
837 if (DEBUG_FILTER) {
838 ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
839 }
840
841 {
842 std::lock_guard<std::mutex> lock(mFilterEventsLock);
843 mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
844 }
845
846 mPesOutput.clear();
847 }
848
849 mFilterOutput.clear();
850
851 return ::ndk::ScopedAStatus::ok();
852 }
853
startTsFilterHandler()854 ::ndk::ScopedAStatus Filter::startTsFilterHandler() {
855 // TODO handle starting TS filter
856 return ::ndk::ScopedAStatus::ok();
857 }
858
859 // Read PES (Packetized Elementary Stream) Packets from TransportStreams
860 // as defined in ISO/IEC 13818-1 Section 2.4.3.6. Create MediaEvents
861 // containing only their data without TS or PES headers.
startMediaFilterHandler()862 ::ndk::ScopedAStatus Filter::startMediaFilterHandler() {
863 if (mFilterOutput.empty()) {
864 return ::ndk::ScopedAStatus::ok();
865 }
866
867 // mPts being set before our MediaFilterHandler begins indicates that all
868 // metadata has already been handled. We can therefore create an event
869 // with the existing data. This method is used when processing ES files.
870 ::ndk::ScopedAStatus result;
871 if (mPts) {
872 result = createMediaFilterEventWithIon(mFilterOutput);
873 if (result.isOk()) {
874 mFilterOutput.clear();
875 }
876 return result;
877 }
878
879 for (int i = 0; i < mFilterOutput.size(); i += 188) {
880 // Every packet has a 4 Byte TS Header preceding it
881 uint32_t headerSize = 4;
882
883 if (mPesSizeLeft == 0) {
884 // Packet Start Code Prefix is defined as the first 3 bytes of
885 // the PES Header and should always have the value 0x000001
886 uint32_t prefix = (static_cast<uint8_t>(mFilterOutput[i + 4]) << 16) |
887 (static_cast<uint8_t>(mFilterOutput[i + 5]) << 8) |
888 static_cast<uint8_t>(mFilterOutput[i + 6]);
889 if (DEBUG_FILTER) {
890 ALOGD("[Filter] prefix %d", prefix);
891 }
892 if (prefix == 0x000001) {
893 // TODO handle multiple Pes filters
894 // Location of PES fields from ISO/IEC 13818-1 Section 2.4.3.6
895 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
896 static_cast<uint8_t>(mFilterOutput[i + 9]);
897 bool hasPts = static_cast<uint8_t>(mFilterOutput[i + 11]) & 0x80;
898 uint8_t optionalFieldsLength = static_cast<uint8_t>(mFilterOutput[i + 12]);
899 headerSize += 9 + optionalFieldsLength;
900
901 if (hasPts) {
902 // Pts is a 33-bit field which is stored across 5 bytes, with
903 // bits in between as reserved fields which must be ignored
904 mPts = 0;
905 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 13]) & 0x0e) << 29;
906 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 14]) & 0xff) << 22;
907 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 15]) & 0xfe) << 14;
908 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 16]) & 0xff) << 7;
909 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 17]) & 0xfe) >> 1;
910 }
911
912 if (DEBUG_FILTER) {
913 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
914 }
915 } else {
916 continue;
917 }
918 }
919
920 uint32_t endPoint = min(188u - headerSize, mPesSizeLeft);
921 // append data and check size
922 vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + headerSize;
923 vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + headerSize + endPoint;
924 mPesOutput.insert(mPesOutput.end(), first, last);
925 // size does not match then continue
926 mPesSizeLeft -= endPoint;
927 if (DEBUG_FILTER) {
928 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
929 }
930 if (mPesSizeLeft > 0 || mAvBufferCopyCount++ < 10) {
931 continue;
932 }
933
934 result = createMediaFilterEventWithIon(mPesOutput);
935 if (!result.isOk()) {
936 mFilterOutput.clear();
937 return result;
938 }
939 }
940
941 mFilterOutput.clear();
942
943 return ::ndk::ScopedAStatus::ok();
944 }
945
createMediaFilterEventWithIon(vector<int8_t> & output)946 ::ndk::ScopedAStatus Filter::createMediaFilterEventWithIon(vector<int8_t>& output) {
947 if (mUsingSharedAvMem) {
948 if (mSharedAvMemHandle == nullptr) {
949 return ::ndk::ScopedAStatus::fromServiceSpecificError(
950 static_cast<int32_t>(Result::UNKNOWN_ERROR));
951 }
952 return createShareMemMediaEvents(output);
953 }
954
955 return createIndependentMediaEvents(output);
956 }
957
startRecordFilterHandler()958 ::ndk::ScopedAStatus Filter::startRecordFilterHandler() {
959 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
960 if (mRecordFilterOutput.empty()) {
961 return ::ndk::ScopedAStatus::ok();
962 }
963
964 if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
965 ALOGD("[Filter] dvr fails to write into record FMQ.");
966 return ::ndk::ScopedAStatus::fromServiceSpecificError(
967 static_cast<int32_t>(Result::UNKNOWN_ERROR));
968 }
969
970 DemuxFilterTsRecordEvent recordEvent;
971 recordEvent = {
972 .byteNumber = static_cast<int64_t>(mRecordFilterOutput.size()),
973 .pts = (mPts == 0) ? static_cast<int64_t>(time(NULL)) * 900000 : mPts,
974 .firstMbInSlice = 0, // random address
975 };
976
977 {
978 std::lock_guard<std::mutex> lock(mFilterEventsLock);
979 mFilterEvents.push_back(
980 DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
981 }
982
983 mRecordFilterOutput.clear();
984 return ::ndk::ScopedAStatus::ok();
985 }
986
startPcrFilterHandler()987 ::ndk::ScopedAStatus Filter::startPcrFilterHandler() {
988 // TODO handle starting PCR filter
989 return ::ndk::ScopedAStatus::ok();
990 }
991
startTemiFilterHandler()992 ::ndk::ScopedAStatus Filter::startTemiFilterHandler() {
993 // TODO handle starting TEMI filter
994 return ::ndk::ScopedAStatus::ok();
995 }
996
997 // Read PSI (Program Specific Information) Sections from TransportStreams
998 // as defined in ISO/IEC 13818-1 Section 2.4.4
writeSectionsAndCreateEvent(vector<int8_t> & data)999 bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
1000 // TODO check how many sections has been read
1001 ALOGD("[Filter] section handler");
1002
1003 // Transport Stream Packets are 188 bytes long, as defined in the
1004 // Introduction of ISO/IEC 13818-1
1005 for (int i = 0; i < data.size(); i += 188) {
1006 if (mSectionSizeLeft == 0) {
1007 // Location for sectionSize as defined by Section 2.4.4
1008 // Note that the first 4 bytes skipped are the TsHeader
1009 mSectionSizeLeft = ((static_cast<uint8_t>(data[i + 5]) & 0x0f) << 8) |
1010 static_cast<uint8_t>(data[i + 6]);
1011 mSectionSizeLeft += 3;
1012 if (DEBUG_FILTER) {
1013 ALOGD("[Filter] section data length %d", mSectionSizeLeft);
1014 }
1015 }
1016
1017 // 184 bytes per packet is derived by subtracting the 4 byte length of
1018 // the TsHeader from its 188 byte packet size
1019 uint32_t endPoint = min(184u, mSectionSizeLeft);
1020 // append data and check size
1021 vector<int8_t>::const_iterator first = data.begin() + i + 4;
1022 vector<int8_t>::const_iterator last = data.begin() + i + 4 + endPoint;
1023 mSectionOutput.insert(mSectionOutput.end(), first, last);
1024 // size does not match then continue
1025 mSectionSizeLeft -= endPoint;
1026 if (DEBUG_FILTER) {
1027 ALOGD("[Filter] section data left %d", mSectionSizeLeft);
1028 }
1029 if (mSectionSizeLeft > 0) {
1030 continue;
1031 }
1032
1033 if (!writeDataToFilterMQ(mSectionOutput)) {
1034 mSectionOutput.clear();
1035 return false;
1036 }
1037
1038 DemuxFilterSectionEvent secEvent;
1039 secEvent = {
1040 // temp dump meta data
1041 .tableId = 0,
1042 .version = 1,
1043 .sectionNum = 1,
1044 .dataLength = static_cast<int32_t>(mSectionOutput.size()),
1045 };
1046 if (DEBUG_FILTER) {
1047 ALOGD("[Filter] assembled section data length %" PRIu64, secEvent.dataLength);
1048 }
1049
1050 {
1051 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1052 mFilterEvents.push_back(
1053 DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
1054 }
1055 mSectionOutput.clear();
1056 }
1057
1058 return true;
1059 }
1060
writeDataToFilterMQ(const std::vector<int8_t> & data)1061 bool Filter::writeDataToFilterMQ(const std::vector<int8_t>& data) {
1062 std::lock_guard<std::mutex> lock(mWriteLock);
1063 if (mFilterMQ->write(data.data(), data.size())) {
1064 return true;
1065 }
1066 return false;
1067 }
1068
attachFilterToRecord(const std::shared_ptr<Dvr> dvr)1069 void Filter::attachFilterToRecord(const std::shared_ptr<Dvr> dvr) {
1070 mDvr = dvr;
1071 }
1072
detachFilterFromRecord()1073 void Filter::detachFilterFromRecord() {
1074 mDvr = nullptr;
1075 }
1076
createAvIonFd(int size)1077 int Filter::createAvIonFd(int size) {
1078 // Create an DMA-BUF fd and allocate an av fd mapped to a buffer to it.
1079 auto buffer_allocator = std::make_unique<BufferAllocator>();
1080 if (!buffer_allocator) {
1081 ALOGE("[Filter] Unable to create BufferAllocator object");
1082 return -1;
1083 }
1084 int av_fd = -1;
1085 av_fd = buffer_allocator->Alloc("system-uncached", size);
1086 if (av_fd < 0) {
1087 ALOGE("[Filter] Failed to create av fd %d", errno);
1088 return -1;
1089 }
1090 return av_fd;
1091 }
1092
getIonBuffer(int fd,int size)1093 uint8_t* Filter::getIonBuffer(int fd, int size) {
1094 uint8_t* avBuf = static_cast<uint8_t*>(
1095 mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 /*offset*/));
1096 if (avBuf == MAP_FAILED) {
1097 ALOGE("[Filter] fail to allocate buffer %d", errno);
1098 return NULL;
1099 }
1100 return avBuf;
1101 }
1102
createNativeHandle(int fd)1103 native_handle_t* Filter::createNativeHandle(int fd) {
1104 native_handle_t* nativeHandle;
1105 if (fd < 0) {
1106 nativeHandle = native_handle_create(/*numFd*/ 0, 0);
1107 } else {
1108 // Create a native handle to pass the av fd via the callback event.
1109 nativeHandle = native_handle_create(/*numFd*/ 1, 0);
1110 }
1111 if (nativeHandle == NULL) {
1112 ALOGE("[Filter] Failed to create native_handle %d", errno);
1113 return NULL;
1114 }
1115 if (nativeHandle->numFds > 0) {
1116 nativeHandle->data[0] = dup(fd);
1117 }
1118 return nativeHandle;
1119 }
1120
createIndependentMediaEvents(vector<int8_t> & output)1121 ::ndk::ScopedAStatus Filter::createIndependentMediaEvents(vector<int8_t>& output) {
1122 int av_fd = createAvIonFd(output.size());
1123 if (av_fd == -1) {
1124 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1125 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1126 }
1127 // copy the filtered data to the buffer
1128 uint8_t* avBuffer = getIonBuffer(av_fd, output.size());
1129 if (avBuffer == NULL) {
1130 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1131 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1132 }
1133 memcpy(avBuffer, output.data(), output.size() * sizeof(uint8_t));
1134
1135 native_handle_t* nativeHandle = createNativeHandle(av_fd);
1136 if (nativeHandle == NULL) {
1137 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1138 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1139 }
1140
1141 // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1142 uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1143 mDataId2Avfd[dataId] = dup(av_fd);
1144
1145 // Create mediaEvent and send callback
1146 auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1147 auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1148 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1149 mediaEvent.dataLength = static_cast<int64_t>(output.size());
1150 mediaEvent.avDataId = static_cast<int64_t>(dataId);
1151 if (mPts) {
1152 mediaEvent.pts = mPts;
1153 mPts = 0;
1154 }
1155
1156 {
1157 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1158 mFilterEvents.push_back(std::move(event));
1159 }
1160
1161 // Clear and log
1162 native_handle_close(nativeHandle);
1163 native_handle_delete(nativeHandle);
1164 output.clear();
1165 mAvBufferCopyCount = 0;
1166 if (DEBUG_FILTER) {
1167 ALOGD("[Filter] av data length %d", static_cast<int32_t>(output.size()));
1168 }
1169 return ::ndk::ScopedAStatus::ok();
1170 }
1171
createShareMemMediaEvents(vector<int8_t> & output)1172 ::ndk::ScopedAStatus Filter::createShareMemMediaEvents(vector<int8_t>& output) {
1173 // copy the filtered data to the shared buffer
1174 uint8_t* sharedAvBuffer =
1175 getIonBuffer(mSharedAvMemHandle->data[0], output.size() + mSharedAvMemOffset);
1176 if (sharedAvBuffer == NULL) {
1177 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1178 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1179 }
1180 memcpy(sharedAvBuffer + mSharedAvMemOffset, output.data(), output.size() * sizeof(uint8_t));
1181
1182 // Create a memory handle with numFds == 0
1183 native_handle_t* nativeHandle = createNativeHandle(-1);
1184 if (nativeHandle == NULL) {
1185 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1186 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1187 }
1188
1189 // Create mediaEvent and send callback
1190 auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1191 auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1192 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1193 mediaEvent.offset = mSharedAvMemOffset;
1194 mediaEvent.dataLength = static_cast<int64_t>(output.size());
1195 if (mPts) {
1196 mediaEvent.pts = mPts;
1197 mPts = 0;
1198 }
1199
1200 {
1201 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1202 mFilterEvents.push_back(std::move(event));
1203 }
1204
1205 mSharedAvMemOffset += output.size();
1206
1207 // Clear and log
1208 native_handle_close(nativeHandle);
1209 native_handle_delete(nativeHandle);
1210 output.clear();
1211 if (DEBUG_FILTER) {
1212 ALOGD("[Filter] shared av data length %d", static_cast<int32_t>(output.size()));
1213 }
1214 return ::ndk::ScopedAStatus::ok();
1215 }
1216
sameFile(int fd1,int fd2)1217 bool Filter::sameFile(int fd1, int fd2) {
1218 struct stat stat1, stat2;
1219 if (fstat(fd1, &stat1) < 0 || fstat(fd2, &stat2) < 0) {
1220 return false;
1221 }
1222 return (stat1.st_dev == stat2.st_dev) && (stat1.st_ino == stat2.st_ino);
1223 }
1224
createMediaEvent(vector<DemuxFilterEvent> & events,bool isAudioPresentation)1225 void Filter::createMediaEvent(vector<DemuxFilterEvent>& events, bool isAudioPresentation) {
1226 DemuxFilterMediaEvent mediaEvent;
1227 mediaEvent.streamId = 1;
1228 mediaEvent.isPtsPresent = true;
1229 mediaEvent.isDtsPresent = false;
1230 mediaEvent.dataLength = 3;
1231 mediaEvent.offset = 4;
1232 mediaEvent.isSecureMemory = true;
1233 mediaEvent.mpuSequenceNumber = 6;
1234 mediaEvent.isPesPrivateData = true;
1235
1236 if (isAudioPresentation) {
1237 AudioPresentation audioPresentation0{
1238 .preselection.preselectionId = 0,
1239 .preselection.labels = {{"en", "Commentator"}, {"es", "Comentarista"}},
1240 .preselection.language = "en",
1241 .preselection.renderingIndication =
1242 AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1243 .preselection.hasAudioDescription = false,
1244 .preselection.hasSpokenSubtitles = false,
1245 .preselection.hasDialogueEnhancement = true,
1246 .ac4ShortProgramId = 42};
1247 AudioPresentation audioPresentation1{
1248 .preselection.preselectionId = 1,
1249 .preselection.labels = {{"en", "Crowd"}, {"es", "Multitud"}},
1250 .preselection.language = "en",
1251 .preselection.renderingIndication =
1252 AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1253 .preselection.hasAudioDescription = false,
1254 .preselection.hasSpokenSubtitles = false,
1255 .preselection.hasDialogueEnhancement = false,
1256 .ac4ShortProgramId = 42};
1257 vector<AudioPresentation> audioPresentations;
1258 audioPresentations.push_back(audioPresentation0);
1259 audioPresentations.push_back(audioPresentation1);
1260 mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audioPresentations>(
1261 audioPresentations);
1262 } else {
1263 AudioExtraMetaData audio;
1264 audio.adFade = 1;
1265 audio.adPan = 2;
1266 audio.versionTextTag = 3;
1267 audio.adGainCenter = 4;
1268 audio.adGainFront = 5;
1269 audio.adGainSurround = 6;
1270 mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audio>(audio);
1271 }
1272
1273 int av_fd = createAvIonFd(BUFFER_SIZE);
1274 if (av_fd == -1) {
1275 return;
1276 }
1277
1278 native_handle_t* nativeHandle = createNativeHandle(av_fd);
1279 if (nativeHandle == nullptr) {
1280 ::close(av_fd);
1281 ALOGE("[Filter] Failed to create native_handle %d", errno);
1282 return;
1283 }
1284
1285 // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1286 uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1287 mDataId2Avfd[dataId] = dup(av_fd);
1288
1289 mediaEvent.avDataId = static_cast<int64_t>(dataId);
1290 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1291
1292 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(std::move(mediaEvent)));
1293
1294 native_handle_close(nativeHandle);
1295 native_handle_delete(nativeHandle);
1296 }
1297
createTsRecordEvent(vector<DemuxFilterEvent> & events)1298 void Filter::createTsRecordEvent(vector<DemuxFilterEvent>& events) {
1299 DemuxPid pid;
1300 DemuxFilterScIndexMask mask;
1301 DemuxFilterTsRecordEvent tsRecord1;
1302 pid.set<DemuxPid::Tag::tPid>(1);
1303 mask.set<DemuxFilterScIndexMask::Tag::scIndex>(1);
1304 tsRecord1.pid = pid;
1305 tsRecord1.tsIndexMask = 1;
1306 tsRecord1.scIndexMask = mask;
1307 tsRecord1.byteNumber = 2;
1308
1309 DemuxFilterTsRecordEvent tsRecord2;
1310 tsRecord2.pts = 1;
1311 tsRecord2.firstMbInSlice = 2; // random address
1312
1313 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord1)));
1314 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord2)));
1315 }
1316
createMmtpRecordEvent(vector<DemuxFilterEvent> & events)1317 void Filter::createMmtpRecordEvent(vector<DemuxFilterEvent>& events) {
1318 DemuxFilterMmtpRecordEvent mmtpRecord1;
1319 mmtpRecord1.scHevcIndexMask = 1;
1320 mmtpRecord1.byteNumber = 2;
1321
1322 DemuxFilterMmtpRecordEvent mmtpRecord2;
1323 mmtpRecord2.pts = 1;
1324 mmtpRecord2.mpuSequenceNumber = 2;
1325 mmtpRecord2.firstMbInSlice = 3;
1326 mmtpRecord2.tsIndexMask = 4;
1327
1328 events.push_back(
1329 DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord1)));
1330 events.push_back(
1331 DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord2)));
1332 }
1333
createSectionEvent(vector<DemuxFilterEvent> & events)1334 void Filter::createSectionEvent(vector<DemuxFilterEvent>& events) {
1335 DemuxFilterSectionEvent section;
1336 section.tableId = 1;
1337 section.version = 2;
1338 section.sectionNum = 3;
1339 section.dataLength = 0;
1340
1341 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(std::move(section)));
1342 }
1343
createPesEvent(vector<DemuxFilterEvent> & events)1344 void Filter::createPesEvent(vector<DemuxFilterEvent>& events) {
1345 DemuxFilterPesEvent pes;
1346 pes.streamId = 1;
1347 pes.dataLength = 1;
1348 pes.mpuSequenceNumber = 2;
1349
1350 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(std::move(pes)));
1351 }
1352
createDownloadEvent(vector<DemuxFilterEvent> & events)1353 void Filter::createDownloadEvent(vector<DemuxFilterEvent>& events) {
1354 DemuxFilterDownloadEvent download;
1355 download.itemId = 1;
1356 download.downloadId = 1;
1357 download.mpuSequenceNumber = 2;
1358 download.itemFragmentIndex = 3;
1359 download.lastItemFragmentIndex = 4;
1360 download.dataLength = 0;
1361
1362 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::download>(std::move(download)));
1363 }
1364
createIpPayloadEvent(vector<DemuxFilterEvent> & events)1365 void Filter::createIpPayloadEvent(vector<DemuxFilterEvent>& events) {
1366 DemuxFilterIpPayloadEvent ipPayload;
1367 ipPayload.dataLength = 0;
1368
1369 events.push_back(
1370 DemuxFilterEvent::make<DemuxFilterEvent::Tag::ipPayload>(std::move(ipPayload)));
1371 }
1372
createTemiEvent(vector<DemuxFilterEvent> & events)1373 void Filter::createTemiEvent(vector<DemuxFilterEvent>& events) {
1374 DemuxFilterTemiEvent temi;
1375 temi.pts = 1;
1376 temi.descrTag = 2;
1377 temi.descrData = {3};
1378
1379 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::temi>(std::move(temi)));
1380 }
1381
createMonitorEvent(vector<DemuxFilterEvent> & events)1382 void Filter::createMonitorEvent(vector<DemuxFilterEvent>& events) {
1383 DemuxFilterMonitorEvent monitor;
1384 monitor.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(ScramblingStatus::SCRAMBLED);
1385
1386 events.push_back(
1387 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(std::move(monitor)));
1388 }
1389
createRestartEvent(vector<DemuxFilterEvent> & events)1390 void Filter::createRestartEvent(vector<DemuxFilterEvent>& events) {
1391 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(1));
1392 }
1393
1394 } // namespace tuner
1395 } // namespace tv
1396 } // namespace hardware
1397 } // namespace android
1398 } // namespace aidl
1399