• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Filter"
19 
20 #include <BufferAllocator/BufferAllocator.h>
21 #include <aidl/android/hardware/tv/tuner/DemuxFilterMonitorEventType.h>
22 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
23 #include <aidl/android/hardware/tv/tuner/Result.h>
24 #include <aidlcommonsupport/NativeHandle.h>
25 #include <inttypes.h>
26 #include <utils/Log.h>
27 
28 #include "Filter.h"
29 
30 namespace aidl {
31 namespace android {
32 namespace hardware {
33 namespace tv {
34 namespace tuner {
35 
36 #define WAIT_TIMEOUT 3000000000
37 
FilterCallbackScheduler(const std::shared_ptr<IFilterCallback> & cb)38 FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
39     : mCallback(cb),
40       mIsConditionMet(false),
41       mDataLength(0),
42       mTimeDelayInMs(0),
43       mDataSizeDelayInBytes(0) {
44     start();
45 }
46 
~FilterCallbackScheduler()47 FilterCallbackScheduler::~FilterCallbackScheduler() {
48     stop();
49 }
50 
onFilterEvent(DemuxFilterEvent && event)51 void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
52     std::unique_lock<std::mutex> lock(mLock);
53     mCallbackBuffer.push_back(std::move(event));
54     mDataLength += getDemuxFilterEventDataLength(event);
55 
56     if (isDataSizeDelayConditionMetLocked()) {
57         mIsConditionMet = true;
58         // unlock, so thread is not immediately blocked when it is notified.
59         lock.unlock();
60         mCv.notify_all();
61     }
62 }
63 
onFilterStatus(const DemuxFilterStatus & status)64 void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
65     if (mCallback) {
66         mCallback->onFilterStatus(status);
67     }
68 }
69 
flushEvents()70 void FilterCallbackScheduler::flushEvents() {
71     std::unique_lock<std::mutex> lock(mLock);
72     mCallbackBuffer.clear();
73     mDataLength = 0;
74 }
75 
setTimeDelayHint(int timeDelay)76 void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
77     std::unique_lock<std::mutex> lock(mLock);
78     mTimeDelayInMs = timeDelay;
79     // always notify condition variable to update timeout
80     mIsConditionMet = true;
81     lock.unlock();
82     mCv.notify_all();
83 }
84 
setDataSizeDelayHint(int dataSizeDelay)85 void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
86     std::unique_lock<std::mutex> lock(mLock);
87     mDataSizeDelayInBytes = dataSizeDelay;
88     if (isDataSizeDelayConditionMetLocked()) {
89         mIsConditionMet = true;
90         lock.unlock();
91         mCv.notify_all();
92     }
93 }
94 
hasCallbackRegistered() const95 bool FilterCallbackScheduler::hasCallbackRegistered() const {
96     return mCallback != nullptr;
97 }
98 
start()99 void FilterCallbackScheduler::start() {
100     mIsRunning = true;
101     mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
102 }
103 
stop()104 void FilterCallbackScheduler::stop() {
105     mIsRunning = false;
106     if (mCallbackThread.joinable()) {
107         {
108             std::lock_guard<std::mutex> lock(mLock);
109             mIsConditionMet = true;
110         }
111         mCv.notify_all();
112         mCallbackThread.join();
113     }
114 }
115 
threadLoop()116 void FilterCallbackScheduler::threadLoop() {
117     while (mIsRunning) {
118         threadLoopOnce();
119     }
120 }
121 
threadLoopOnce()122 void FilterCallbackScheduler::threadLoopOnce() {
123     std::unique_lock<std::mutex> lock(mLock);
124     if (mTimeDelayInMs > 0) {
125         // Note: predicate protects from lost and spurious wakeups
126         mCv.wait_for(lock, std::chrono::milliseconds(mTimeDelayInMs),
127                      [this] { return mIsConditionMet; });
128     } else {
129         // Note: predicate protects from lost and spurious wakeups
130         mCv.wait(lock, [this] { return mIsConditionMet; });
131     }
132     mIsConditionMet = false;
133 
134     // condition_variable wait locks mutex on timeout / notify
135     // Note: if stop() has been called in the meantime, do not send more filter
136     // events.
137     if (mIsRunning && !mCallbackBuffer.empty()) {
138         if (mCallback) {
139             mCallback->onFilterEvent(mCallbackBuffer);
140         }
141         mCallbackBuffer.clear();
142         mDataLength = 0;
143     }
144 }
145 
146 // mLock needs to be held to call this function
isDataSizeDelayConditionMetLocked()147 bool FilterCallbackScheduler::isDataSizeDelayConditionMetLocked() {
148     if (mDataSizeDelayInBytes == 0) {
149         // Data size delay is disabled.
150         if (mTimeDelayInMs == 0) {
151             // Events should only be sent immediately if time delay is disabled
152             // as well.
153             return true;
154         }
155         return false;
156     }
157 
158     // Data size delay is enabled.
159     return mDataLength >= mDataSizeDelayInBytes;
160 }
161 
getDemuxFilterEventDataLength(const DemuxFilterEvent & event)162 int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
163     // there is a risk that dataLength could be a negative value, but it
164     // *should* be safe to assume that it is always positive.
165     switch (event.getTag()) {
166         case DemuxFilterEvent::Tag::section:
167             return event.get<DemuxFilterEvent::Tag::section>().dataLength;
168         case DemuxFilterEvent::Tag::media:
169             return event.get<DemuxFilterEvent::Tag::media>().dataLength;
170         case DemuxFilterEvent::Tag::pes:
171             return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
172         case DemuxFilterEvent::Tag::download:
173             return event.get<DemuxFilterEvent::Tag::download>().dataLength;
174         case DemuxFilterEvent::Tag::ipPayload:
175             return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
176 
177         case DemuxFilterEvent::Tag::tsRecord:
178         case DemuxFilterEvent::Tag::mmtpRecord:
179         case DemuxFilterEvent::Tag::temi:
180         case DemuxFilterEvent::Tag::monitorEvent:
181         case DemuxFilterEvent::Tag::startId:
182             // these events do not include a payload and should therefore return
183             // 0.
184             // do not add a default option, so this will not compile when new types
185             // are added.
186             return 0;
187     }
188 }
189 
Filter(DemuxFilterType type,int64_t filterId,uint32_t bufferSize,const std::shared_ptr<IFilterCallback> & cb,std::shared_ptr<Demux> demux)190 Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
191                const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
192     : mDemux(demux),
193       mCallbackScheduler(cb),
194       mFilterId(filterId),
195       mBufferSize(bufferSize),
196       mType(type) {
197     switch (mType.mainType) {
198         case DemuxFilterMainType::TS:
199             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
200                         DemuxTsFilterType::AUDIO ||
201                 mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
202                         DemuxTsFilterType::VIDEO) {
203                 mIsMediaFilter = true;
204             }
205             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
206                 DemuxTsFilterType::PCR) {
207                 mIsPcrFilter = true;
208             }
209             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
210                 DemuxTsFilterType::RECORD) {
211                 mIsRecordFilter = true;
212             }
213             break;
214         case DemuxFilterMainType::MMTP:
215             if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
216                         DemuxMmtpFilterType::AUDIO ||
217                 mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
218                         DemuxMmtpFilterType::VIDEO) {
219                 mIsMediaFilter = true;
220             }
221             if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
222                 DemuxMmtpFilterType::RECORD) {
223                 mIsRecordFilter = true;
224             }
225             break;
226         case DemuxFilterMainType::IP:
227             break;
228         case DemuxFilterMainType::TLV:
229             break;
230         case DemuxFilterMainType::ALP:
231             break;
232         default:
233             break;
234     }
235 }
236 
~Filter()237 Filter::~Filter() {
238     close();
239 }
240 
getId64Bit(int64_t * _aidl_return)241 ::ndk::ScopedAStatus Filter::getId64Bit(int64_t* _aidl_return) {
242     ALOGV("%s", __FUNCTION__);
243 
244     *_aidl_return = mFilterId;
245     return ::ndk::ScopedAStatus::ok();
246 }
247 
getId(int32_t * _aidl_return)248 ::ndk::ScopedAStatus Filter::getId(int32_t* _aidl_return) {
249     ALOGV("%s", __FUNCTION__);
250 
251     *_aidl_return = static_cast<int32_t>(mFilterId);
252     return ::ndk::ScopedAStatus::ok();
253 }
254 
setDataSource(const std::shared_ptr<IFilter> & in_filter)255 ::ndk::ScopedAStatus Filter::setDataSource(const std::shared_ptr<IFilter>& in_filter) {
256     ALOGV("%s", __FUNCTION__);
257 
258     mDataSource = in_filter;
259     mIsDataSourceDemux = false;
260 
261     return ::ndk::ScopedAStatus::ok();
262 }
263 
setDelayHint(const FilterDelayHint & in_hint)264 ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
265     if (mIsMediaFilter) {
266         // delay hint is not supported for media filters
267         return ::ndk::ScopedAStatus::fromServiceSpecificError(
268                 static_cast<int32_t>(Result::UNAVAILABLE));
269     }
270 
271     ALOGV("%s", __FUNCTION__);
272     if (in_hint.hintValue < 0) {
273         return ::ndk::ScopedAStatus::fromServiceSpecificError(
274                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
275     }
276 
277     switch (in_hint.hintType) {
278         case FilterDelayHintType::TIME_DELAY_IN_MS:
279             mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
280             break;
281         case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
282             mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
283             break;
284         default:
285             return ::ndk::ScopedAStatus::fromServiceSpecificError(
286                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
287     }
288 
289     return ::ndk::ScopedAStatus::ok();
290 }
291 
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)292 ::ndk::ScopedAStatus Filter::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
293     ALOGV("%s", __FUNCTION__);
294 
295     mIsUsingFMQ = mIsRecordFilter ? false : true;
296 
297     *out_queue = mFilterMQ->dupeDesc();
298     return ::ndk::ScopedAStatus::ok();
299 }
300 
configure(const DemuxFilterSettings & in_settings)301 ::ndk::ScopedAStatus Filter::configure(const DemuxFilterSettings& in_settings) {
302     ALOGV("%s", __FUNCTION__);
303 
304     mFilterSettings = in_settings;
305     switch (mType.mainType) {
306         case DemuxFilterMainType::TS:
307             mTpid = in_settings.get<DemuxFilterSettings::Tag::ts>().tpid;
308             break;
309         case DemuxFilterMainType::MMTP:
310             break;
311         case DemuxFilterMainType::IP:
312             break;
313         case DemuxFilterMainType::TLV:
314             break;
315         case DemuxFilterMainType::ALP:
316             break;
317         default:
318             break;
319     }
320 
321     mConfigured = true;
322     return ::ndk::ScopedAStatus::ok();
323 }
324 
start()325 ::ndk::ScopedAStatus Filter::start() {
326     ALOGV("%s", __FUNCTION__);
327     mFilterThreadRunning = true;
328     std::vector<DemuxFilterEvent> events;
329     // All the filter event callbacks in start are for testing purpose.
330     switch (mType.mainType) {
331         case DemuxFilterMainType::TS:
332             createMediaEvent(events, false);
333             createMediaEvent(events, true);
334             createTsRecordEvent(events);
335             createTemiEvent(events);
336             break;
337         case DemuxFilterMainType::MMTP:
338             createDownloadEvent(events);
339             createMmtpRecordEvent(events);
340             break;
341         case DemuxFilterMainType::IP:
342             createSectionEvent(events);
343             createIpPayloadEvent(events);
344             break;
345         case DemuxFilterMainType::TLV:
346             createMonitorEvent(events);
347             break;
348         case DemuxFilterMainType::ALP:
349             createMonitorEvent(events);
350             break;
351         default:
352             break;
353     }
354 
355     for (auto&& event : events) {
356         mCallbackScheduler.onFilterEvent(std::move(event));
357     }
358 
359     return startFilterLoop();
360 }
361 
stop()362 ::ndk::ScopedAStatus Filter::stop() {
363     ALOGV("%s", __FUNCTION__);
364 
365     mFilterThreadRunning = false;
366     if (mFilterThread.joinable()) {
367         mFilterThread.join();
368     }
369 
370     mCallbackScheduler.flushEvents();
371 
372     return ::ndk::ScopedAStatus::ok();
373 }
374 
flush()375 ::ndk::ScopedAStatus Filter::flush() {
376     ALOGV("%s", __FUNCTION__);
377 
378     // temp implementation to flush the FMQ
379     int size = mFilterMQ->availableToRead();
380     int8_t* buffer = new int8_t[size];
381     mFilterMQ->read(buffer, size);
382     delete[] buffer;
383     mFilterStatus = DemuxFilterStatus::DATA_READY;
384 
385     return ::ndk::ScopedAStatus::ok();
386 }
387 
releaseAvHandle(const NativeHandle & in_avMemory,int64_t in_avDataId)388 ::ndk::ScopedAStatus Filter::releaseAvHandle(const NativeHandle& in_avMemory, int64_t in_avDataId) {
389     ALOGV("%s", __FUNCTION__);
390 
391     if ((mSharedAvMemHandle != nullptr) && (in_avMemory.fds.size() > 0) &&
392         (sameFile(in_avMemory.fds[0].get(), mSharedAvMemHandle->data[0]))) {
393         freeSharedAvHandle();
394         return ::ndk::ScopedAStatus::ok();
395     }
396 
397     if (mDataId2Avfd.find(in_avDataId) == mDataId2Avfd.end()) {
398         return ::ndk::ScopedAStatus::fromServiceSpecificError(
399                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
400     }
401 
402     ::close(mDataId2Avfd[in_avDataId]);
403     return ::ndk::ScopedAStatus::ok();
404 }
405 
close()406 ::ndk::ScopedAStatus Filter::close() {
407     ALOGV("%s", __FUNCTION__);
408 
409     stop();
410 
411     return mDemux->removeFilter(mFilterId);
412 }
413 
configureIpCid(int32_t in_ipCid)414 ::ndk::ScopedAStatus Filter::configureIpCid(int32_t in_ipCid) {
415     ALOGV("%s", __FUNCTION__);
416 
417     if (mType.mainType != DemuxFilterMainType::IP) {
418         return ::ndk::ScopedAStatus::fromServiceSpecificError(
419                 static_cast<int32_t>(Result::INVALID_STATE));
420     }
421 
422     mCid = in_ipCid;
423     return ::ndk::ScopedAStatus::ok();
424 }
425 
getAvSharedHandle(NativeHandle * out_avMemory,int64_t * _aidl_return)426 ::ndk::ScopedAStatus Filter::getAvSharedHandle(NativeHandle* out_avMemory, int64_t* _aidl_return) {
427     ALOGV("%s", __FUNCTION__);
428 
429     if (!mIsMediaFilter) {
430         return ::ndk::ScopedAStatus::fromServiceSpecificError(
431                 static_cast<int32_t>(Result::INVALID_STATE));
432     }
433 
434     if (mSharedAvMemHandle != nullptr) {
435         *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
436         *_aidl_return = BUFFER_SIZE;
437         mUsingSharedAvMem = true;
438         return ::ndk::ScopedAStatus::ok();
439     }
440 
441     int av_fd = createAvIonFd(BUFFER_SIZE);
442     if (av_fd < 0) {
443         return ::ndk::ScopedAStatus::fromServiceSpecificError(
444                 static_cast<int32_t>(Result::OUT_OF_MEMORY));
445     }
446 
447     mSharedAvMemHandle = createNativeHandle(av_fd);
448     if (mSharedAvMemHandle == nullptr) {
449         ::close(av_fd);
450         *_aidl_return = 0;
451         return ::ndk::ScopedAStatus::fromServiceSpecificError(
452                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
453     }
454     ::close(av_fd);
455     mUsingSharedAvMem = true;
456 
457     *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
458     *_aidl_return = BUFFER_SIZE;
459     return ::ndk::ScopedAStatus::ok();
460 }
461 
configureAvStreamType(const AvStreamType & in_avStreamType)462 ::ndk::ScopedAStatus Filter::configureAvStreamType(const AvStreamType& in_avStreamType) {
463     ALOGV("%s", __FUNCTION__);
464 
465     if (!mIsMediaFilter) {
466         return ::ndk::ScopedAStatus::fromServiceSpecificError(
467                 static_cast<int32_t>(Result::UNAVAILABLE));
468     }
469 
470     switch (in_avStreamType.getTag()) {
471         case AvStreamType::Tag::audio:
472             mAudioStreamType =
473                     static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::audio>());
474             break;
475         case AvStreamType::Tag::video:
476             mVideoStreamType =
477                     static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::video>());
478             break;
479         default:
480             break;
481     }
482 
483     return ::ndk::ScopedAStatus::ok();
484 }
485 
configureMonitorEvent(int in_monitorEventTypes)486 ::ndk::ScopedAStatus Filter::configureMonitorEvent(int in_monitorEventTypes) {
487     ALOGV("%s", __FUNCTION__);
488 
489     int32_t newScramblingStatus =
490             in_monitorEventTypes &
491             static_cast<int32_t>(DemuxFilterMonitorEventType::SCRAMBLING_STATUS);
492     int32_t newIpCid =
493             in_monitorEventTypes & static_cast<int32_t>(DemuxFilterMonitorEventType::IP_CID_CHANGE);
494 
495     // if scrambling status monitoring flipped, record the new state and send msg on enabling
496     if (newScramblingStatus ^ mScramblingStatusMonitored) {
497         mScramblingStatusMonitored = newScramblingStatus;
498         if (mScramblingStatusMonitored) {
499             if (mCallbackScheduler.hasCallbackRegistered()) {
500                 // Assuming current status is always NOT_SCRAMBLED
501                 auto monitorEvent = DemuxFilterMonitorEvent::make<
502                         DemuxFilterMonitorEvent::Tag::scramblingStatus>(
503                         ScramblingStatus::NOT_SCRAMBLED);
504                 auto event =
505                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
506                 mCallbackScheduler.onFilterEvent(std::move(event));
507             } else {
508                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
509                         static_cast<int32_t>(Result::INVALID_STATE));
510             }
511         }
512     }
513 
514     // if ip cid monitoring flipped, record the new state and send msg on enabling
515     if (newIpCid ^ mIpCidMonitored) {
516         mIpCidMonitored = newIpCid;
517         if (mIpCidMonitored) {
518             if (mCallbackScheduler.hasCallbackRegistered()) {
519                 // Return random cid
520                 auto monitorEvent =
521                         DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
522                 auto event =
523                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
524                 mCallbackScheduler.onFilterEvent(std::move(event));
525             } else {
526                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
527                         static_cast<int32_t>(Result::INVALID_STATE));
528             }
529         }
530     }
531 
532     return ::ndk::ScopedAStatus::ok();
533 }
534 
createFilterMQ()535 bool Filter::createFilterMQ() {
536     ALOGV("%s", __FUNCTION__);
537 
538     // Create a synchronized FMQ that supports blocking read/write
539     std::unique_ptr<FilterMQ> tmpFilterMQ =
540             std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
541     if (!tmpFilterMQ->isValid()) {
542         ALOGW("[Filter] Failed to create FMQ of filter with id: %" PRIu64, mFilterId);
543         return false;
544     }
545 
546     mFilterMQ = std::move(tmpFilterMQ);
547 
548     if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventsFlag) !=
549         ::android::OK) {
550         return false;
551     }
552 
553     return true;
554 }
555 
startFilterLoop()556 ::ndk::ScopedAStatus Filter::startFilterLoop() {
557     mFilterThread = std::thread(&Filter::filterThreadLoop, this);
558     return ::ndk::ScopedAStatus::ok();
559 }
560 
filterThreadLoop()561 void Filter::filterThreadLoop() {
562     if (!mFilterThreadRunning) {
563         return;
564     }
565 
566     ALOGD("[Filter] filter %" PRIu64 " threadLoop start.", mFilterId);
567 
568     // For the first time of filter output, implementation needs to send the filter
569     // Event Callback without waiting for the DATA_CONSUMED to init the process.
570     while (mFilterThreadRunning) {
571         std::unique_lock<std::mutex> lock(mFilterEventsLock);
572         if (mFilterEvents.size() == 0) {
573             lock.unlock();
574             if (DEBUG_FILTER) {
575                 ALOGD("[Filter] wait for filter data output.");
576             }
577             usleep(1000 * 1000);
578             continue;
579         }
580 
581         // After successfully write, send a callback and wait for the read to be done
582         if (mCallbackScheduler.hasCallbackRegistered()) {
583             if (mConfigured) {
584                 auto startEvent =
585                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
586                 mCallbackScheduler.onFilterEvent(std::move(startEvent));
587                 mConfigured = false;
588             }
589 
590             // lock is still being held
591             for (auto&& event : mFilterEvents) {
592                 mCallbackScheduler.onFilterEvent(std::move(event));
593             }
594         } else {
595             ALOGD("[Filter] filter callback is not configured yet.");
596             mFilterThreadRunning = false;
597             return;
598         }
599 
600         mFilterEvents.clear();
601         mFilterStatus = DemuxFilterStatus::DATA_READY;
602         mCallbackScheduler.onFilterStatus(mFilterStatus);
603         break;
604     }
605 
606     while (mFilterThreadRunning) {
607         uint32_t efState = 0;
608         // We do not wait for the last round of written data to be read to finish the thread
609         // because the VTS can verify the reading itself.
610         for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
611             if (!mFilterThreadRunning) {
612                 break;
613             }
614             while (mFilterThreadRunning && mIsUsingFMQ) {
615                 ::android::status_t status = mFilterEventsFlag->wait(
616                         static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
617                         WAIT_TIMEOUT, true /* retry on spurious wake */);
618                 if (status != ::android::OK) {
619                     ALOGD("[Filter] wait for data consumed");
620                     continue;
621                 }
622                 break;
623             }
624 
625             maySendFilterStatusCallback();
626 
627             while (mFilterThreadRunning) {
628                 std::lock_guard<std::mutex> lock(mFilterEventsLock);
629                 if (mFilterEvents.size() == 0) {
630                     continue;
631                 }
632                 // After successfully write, send a callback and wait for the read to be done
633                 for (auto&& event : mFilterEvents) {
634                     mCallbackScheduler.onFilterEvent(std::move(event));
635                 }
636                 mFilterEvents.clear();
637                 break;
638             }
639             // We do not wait for the last read to be done
640             // VTS can verify the read result itself.
641             if (i == SECTION_WRITE_COUNT - 1) {
642                 ALOGD("[Filter] filter %" PRIu64 " writing done. Ending thread", mFilterId);
643                 break;
644             }
645         }
646         break;
647     }
648     ALOGD("[Filter] filter thread ended.");
649 }
650 
freeSharedAvHandle()651 void Filter::freeSharedAvHandle() {
652     if (!mIsMediaFilter) {
653         return;
654     }
655     native_handle_close(mSharedAvMemHandle);
656     native_handle_delete(mSharedAvMemHandle);
657     mSharedAvMemHandle = nullptr;
658 }
659 
dump(int fd,const char **,uint32_t)660 binder_status_t Filter::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
661     dprintf(fd, "    Filter %" PRIu64 ":\n", mFilterId);
662     dprintf(fd, "      Main type: %d\n", mType.mainType);
663     dprintf(fd, "      mIsMediaFilter: %d\n", mIsMediaFilter);
664     dprintf(fd, "      mIsPcrFilter: %d\n", mIsPcrFilter);
665     dprintf(fd, "      mIsRecordFilter: %d\n", mIsRecordFilter);
666     dprintf(fd, "      mIsUsingFMQ: %d\n", mIsUsingFMQ);
667     dprintf(fd, "      mFilterThreadRunning: %d\n", (bool)mFilterThreadRunning);
668     return STATUS_OK;
669 }
670 
maySendFilterStatusCallback()671 void Filter::maySendFilterStatusCallback() {
672     if (!mIsUsingFMQ) {
673         return;
674     }
675     std::lock_guard<std::mutex> lock(mFilterStatusLock);
676     int availableToRead = mFilterMQ->availableToRead();
677     int availableToWrite = mFilterMQ->availableToWrite();
678     int fmqSize = mFilterMQ->getQuantumCount();
679 
680     DemuxFilterStatus newStatus = checkFilterStatusChange(
681             availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
682     if (mFilterStatus != newStatus) {
683         mCallbackScheduler.onFilterStatus(newStatus);
684         mFilterStatus = newStatus;
685     }
686 }
687 
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)688 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
689                                                   uint32_t availableToRead, uint32_t highThreshold,
690                                                   uint32_t lowThreshold) {
691     if (availableToWrite == 0) {
692         return DemuxFilterStatus::OVERFLOW;
693     } else if (availableToRead > highThreshold) {
694         return DemuxFilterStatus::HIGH_WATER;
695     } else if (availableToRead == 0) {
696         return DemuxFilterStatus::NO_DATA;
697     } else if (availableToRead < lowThreshold) {
698         return DemuxFilterStatus::LOW_WATER;
699     }
700     return mFilterStatus;
701 }
702 
getTpid()703 uint16_t Filter::getTpid() {
704     return mTpid;
705 }
706 
updateFilterOutput(vector<int8_t> & data)707 void Filter::updateFilterOutput(vector<int8_t>& data) {
708     std::lock_guard<std::mutex> lock(mFilterOutputLock);
709     mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
710 }
711 
updatePts(uint64_t pts)712 void Filter::updatePts(uint64_t pts) {
713     std::lock_guard<std::mutex> lock(mFilterOutputLock);
714     mPts = pts;
715 }
716 
updateRecordOutput(vector<int8_t> & data)717 void Filter::updateRecordOutput(vector<int8_t>& data) {
718     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
719     mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
720 }
721 
startFilterHandler()722 ::ndk::ScopedAStatus Filter::startFilterHandler() {
723     std::lock_guard<std::mutex> lock(mFilterOutputLock);
724     switch (mType.mainType) {
725         case DemuxFilterMainType::TS:
726             switch (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>()) {
727                 case DemuxTsFilterType::UNDEFINED:
728                     break;
729                 case DemuxTsFilterType::SECTION:
730                     startSectionFilterHandler();
731                     break;
732                 case DemuxTsFilterType::PES:
733                     startPesFilterHandler();
734                     break;
735                 case DemuxTsFilterType::TS:
736                     startTsFilterHandler();
737                     break;
738                 case DemuxTsFilterType::AUDIO:
739                 case DemuxTsFilterType::VIDEO:
740                     startMediaFilterHandler();
741                     break;
742                 case DemuxTsFilterType::PCR:
743                     startPcrFilterHandler();
744                     break;
745                 case DemuxTsFilterType::TEMI:
746                     startTemiFilterHandler();
747                     break;
748                 default:
749                     break;
750             }
751             break;
752         case DemuxFilterMainType::MMTP:
753             /*mmtpSettings*/
754             break;
755         case DemuxFilterMainType::IP:
756             /*ipSettings*/
757             break;
758         case DemuxFilterMainType::TLV:
759             /*tlvSettings*/
760             break;
761         case DemuxFilterMainType::ALP:
762             /*alpSettings*/
763             break;
764         default:
765             break;
766     }
767     return ::ndk::ScopedAStatus::ok();
768 }
769 
startSectionFilterHandler()770 ::ndk::ScopedAStatus Filter::startSectionFilterHandler() {
771     if (mFilterOutput.empty()) {
772         return ::ndk::ScopedAStatus::ok();
773     }
774     if (!writeSectionsAndCreateEvent(mFilterOutput)) {
775         ALOGD("[Filter] filter %" PRIu64 " fails to write into FMQ. Ending thread", mFilterId);
776         return ::ndk::ScopedAStatus::fromServiceSpecificError(
777                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
778     }
779 
780     mFilterOutput.clear();
781 
782     return ::ndk::ScopedAStatus::ok();
783 }
784 
startPesFilterHandler()785 ::ndk::ScopedAStatus Filter::startPesFilterHandler() {
786     if (mFilterOutput.empty()) {
787         return ::ndk::ScopedAStatus::ok();
788     }
789 
790     for (int i = 0; i < mFilterOutput.size(); i += 188) {
791         if (mPesSizeLeft == 0) {
792             uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
793                               mFilterOutput[i + 6];
794             if (DEBUG_FILTER) {
795                 ALOGD("[Filter] prefix %d", prefix);
796             }
797             if (prefix == 0x000001) {
798                 // TODO handle mulptiple Pes filters
799                 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
800                                static_cast<uint8_t>(mFilterOutput[i + 9]);
801                 mPesSizeLeft += 6;
802                 if (DEBUG_FILTER) {
803                     ALOGD("[Filter] pes data length %d", mPesSizeLeft);
804                 }
805             } else {
806                 continue;
807             }
808         }
809 
810         uint32_t endPoint = min(184u, mPesSizeLeft);
811         // append data and check size
812         vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
813         vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
814         mPesOutput.insert(mPesOutput.end(), first, last);
815         // size does not match then continue
816         mPesSizeLeft -= endPoint;
817         if (DEBUG_FILTER) {
818             ALOGD("[Filter] pes data left %d", mPesSizeLeft);
819         }
820         if (mPesSizeLeft > 0) {
821             continue;
822         }
823         // size match then create event
824         if (!writeDataToFilterMQ(mPesOutput)) {
825             ALOGD("[Filter] pes data write failed");
826             mFilterOutput.clear();
827             return ::ndk::ScopedAStatus::fromServiceSpecificError(
828                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
829         }
830         maySendFilterStatusCallback();
831         DemuxFilterPesEvent pesEvent;
832         pesEvent = {
833                 // temp dump meta data
834                 .streamId = static_cast<int32_t>(mPesOutput[3]),
835                 .dataLength = static_cast<int32_t>(mPesOutput.size()),
836         };
837         if (DEBUG_FILTER) {
838             ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
839         }
840 
841         {
842             std::lock_guard<std::mutex> lock(mFilterEventsLock);
843             mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
844         }
845 
846         mPesOutput.clear();
847     }
848 
849     mFilterOutput.clear();
850 
851     return ::ndk::ScopedAStatus::ok();
852 }
853 
startTsFilterHandler()854 ::ndk::ScopedAStatus Filter::startTsFilterHandler() {
855     // TODO handle starting TS filter
856     return ::ndk::ScopedAStatus::ok();
857 }
858 
859 // Read PES (Packetized Elementary Stream) Packets from TransportStreams
860 // as defined in ISO/IEC 13818-1 Section 2.4.3.6. Create MediaEvents
861 // containing only their data without TS or PES headers.
startMediaFilterHandler()862 ::ndk::ScopedAStatus Filter::startMediaFilterHandler() {
863     if (mFilterOutput.empty()) {
864         return ::ndk::ScopedAStatus::ok();
865     }
866 
867     // mPts being set before our MediaFilterHandler begins indicates that all
868     // metadata has already been handled. We can therefore create an event
869     // with the existing data. This method is used when processing ES files.
870     ::ndk::ScopedAStatus result;
871     if (mPts) {
872         result = createMediaFilterEventWithIon(mFilterOutput);
873         if (result.isOk()) {
874             mFilterOutput.clear();
875         }
876         return result;
877     }
878 
879     for (int i = 0; i < mFilterOutput.size(); i += 188) {
880         // Every packet has a 4 Byte TS Header preceding it
881         uint32_t headerSize = 4;
882 
883         if (mPesSizeLeft == 0) {
884             // Packet Start Code Prefix is defined as the first 3 bytes of
885             // the PES Header and should always have the value 0x000001
886             uint32_t prefix = (static_cast<uint8_t>(mFilterOutput[i + 4]) << 16) |
887                               (static_cast<uint8_t>(mFilterOutput[i + 5]) << 8) |
888                               static_cast<uint8_t>(mFilterOutput[i + 6]);
889             if (DEBUG_FILTER) {
890                 ALOGD("[Filter] prefix %d", prefix);
891             }
892             if (prefix == 0x000001) {
893                 // TODO handle multiple Pes filters
894                 // Location of PES fields from ISO/IEC 13818-1 Section 2.4.3.6
895                 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
896                                static_cast<uint8_t>(mFilterOutput[i + 9]);
897                 bool hasPts = static_cast<uint8_t>(mFilterOutput[i + 11]) & 0x80;
898                 uint8_t optionalFieldsLength = static_cast<uint8_t>(mFilterOutput[i + 12]);
899                 headerSize += 9 + optionalFieldsLength;
900 
901                 if (hasPts) {
902                     // Pts is a 33-bit field which is stored across 5 bytes, with
903                     // bits in between as reserved fields which must be ignored
904                     mPts = 0;
905                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 13]) & 0x0e) << 29;
906                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 14]) & 0xff) << 22;
907                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 15]) & 0xfe) << 14;
908                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 16]) & 0xff) << 7;
909                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 17]) & 0xfe) >> 1;
910                 }
911 
912                 if (DEBUG_FILTER) {
913                     ALOGD("[Filter] pes data length %d", mPesSizeLeft);
914                 }
915             } else {
916                 continue;
917             }
918         }
919 
920         uint32_t endPoint = min(188u - headerSize, mPesSizeLeft);
921         // append data and check size
922         vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + headerSize;
923         vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + headerSize + endPoint;
924         mPesOutput.insert(mPesOutput.end(), first, last);
925         // size does not match then continue
926         mPesSizeLeft -= endPoint;
927         if (DEBUG_FILTER) {
928             ALOGD("[Filter] pes data left %d", mPesSizeLeft);
929         }
930         if (mPesSizeLeft > 0 || mAvBufferCopyCount++ < 10) {
931             continue;
932         }
933 
934         result = createMediaFilterEventWithIon(mPesOutput);
935         if (!result.isOk()) {
936             mFilterOutput.clear();
937             return result;
938         }
939     }
940 
941     mFilterOutput.clear();
942 
943     return ::ndk::ScopedAStatus::ok();
944 }
945 
createMediaFilterEventWithIon(vector<int8_t> & output)946 ::ndk::ScopedAStatus Filter::createMediaFilterEventWithIon(vector<int8_t>& output) {
947     if (mUsingSharedAvMem) {
948         if (mSharedAvMemHandle == nullptr) {
949             return ::ndk::ScopedAStatus::fromServiceSpecificError(
950                     static_cast<int32_t>(Result::UNKNOWN_ERROR));
951         }
952         return createShareMemMediaEvents(output);
953     }
954 
955     return createIndependentMediaEvents(output);
956 }
957 
startRecordFilterHandler()958 ::ndk::ScopedAStatus Filter::startRecordFilterHandler() {
959     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
960     if (mRecordFilterOutput.empty()) {
961         return ::ndk::ScopedAStatus::ok();
962     }
963 
964     if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
965         ALOGD("[Filter] dvr fails to write into record FMQ.");
966         return ::ndk::ScopedAStatus::fromServiceSpecificError(
967                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
968     }
969 
970     DemuxFilterTsRecordEvent recordEvent;
971     recordEvent = {
972             .byteNumber = static_cast<int64_t>(mRecordFilterOutput.size()),
973             .pts = (mPts == 0) ? static_cast<int64_t>(time(NULL)) * 900000 : mPts,
974             .firstMbInSlice = 0,  // random address
975     };
976 
977     {
978         std::lock_guard<std::mutex> lock(mFilterEventsLock);
979         mFilterEvents.push_back(
980                 DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
981     }
982 
983     mRecordFilterOutput.clear();
984     return ::ndk::ScopedAStatus::ok();
985 }
986 
startPcrFilterHandler()987 ::ndk::ScopedAStatus Filter::startPcrFilterHandler() {
988     // TODO handle starting PCR filter
989     return ::ndk::ScopedAStatus::ok();
990 }
991 
startTemiFilterHandler()992 ::ndk::ScopedAStatus Filter::startTemiFilterHandler() {
993     // TODO handle starting TEMI filter
994     return ::ndk::ScopedAStatus::ok();
995 }
996 
997 // Read PSI (Program Specific Information) Sections from TransportStreams
998 // as defined in ISO/IEC 13818-1 Section 2.4.4
writeSectionsAndCreateEvent(vector<int8_t> & data)999 bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
1000     // TODO check how many sections has been read
1001     ALOGD("[Filter] section handler");
1002 
1003     // Transport Stream Packets are 188 bytes long, as defined in the
1004     // Introduction of ISO/IEC 13818-1
1005     for (int i = 0; i < data.size(); i += 188) {
1006         if (mSectionSizeLeft == 0) {
1007             // Location for sectionSize as defined by Section 2.4.4
1008             // Note that the first 4 bytes skipped are the TsHeader
1009             mSectionSizeLeft = ((static_cast<uint8_t>(data[i + 5]) & 0x0f) << 8) |
1010                                static_cast<uint8_t>(data[i + 6]);
1011             mSectionSizeLeft += 3;
1012             if (DEBUG_FILTER) {
1013                 ALOGD("[Filter] section data length %d", mSectionSizeLeft);
1014             }
1015         }
1016 
1017         // 184 bytes per packet is derived by subtracting the 4 byte length of
1018         // the TsHeader from its 188 byte packet size
1019         uint32_t endPoint = min(184u, mSectionSizeLeft);
1020         // append data and check size
1021         vector<int8_t>::const_iterator first = data.begin() + i + 4;
1022         vector<int8_t>::const_iterator last = data.begin() + i + 4 + endPoint;
1023         mSectionOutput.insert(mSectionOutput.end(), first, last);
1024         // size does not match then continue
1025         mSectionSizeLeft -= endPoint;
1026         if (DEBUG_FILTER) {
1027             ALOGD("[Filter] section data left %d", mSectionSizeLeft);
1028         }
1029         if (mSectionSizeLeft > 0) {
1030             continue;
1031         }
1032 
1033         if (!writeDataToFilterMQ(mSectionOutput)) {
1034             mSectionOutput.clear();
1035             return false;
1036         }
1037 
1038         DemuxFilterSectionEvent secEvent;
1039         secEvent = {
1040                 // temp dump meta data
1041                 .tableId = 0,
1042                 .version = 1,
1043                 .sectionNum = 1,
1044                 .dataLength = static_cast<int32_t>(mSectionOutput.size()),
1045         };
1046         if (DEBUG_FILTER) {
1047             ALOGD("[Filter] assembled section data length %" PRIu64, secEvent.dataLength);
1048         }
1049 
1050         {
1051             std::lock_guard<std::mutex> lock(mFilterEventsLock);
1052             mFilterEvents.push_back(
1053                     DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
1054         }
1055         mSectionOutput.clear();
1056     }
1057 
1058     return true;
1059 }
1060 
writeDataToFilterMQ(const std::vector<int8_t> & data)1061 bool Filter::writeDataToFilterMQ(const std::vector<int8_t>& data) {
1062     std::lock_guard<std::mutex> lock(mWriteLock);
1063     if (mFilterMQ->write(data.data(), data.size())) {
1064         return true;
1065     }
1066     return false;
1067 }
1068 
attachFilterToRecord(const std::shared_ptr<Dvr> dvr)1069 void Filter::attachFilterToRecord(const std::shared_ptr<Dvr> dvr) {
1070     mDvr = dvr;
1071 }
1072 
detachFilterFromRecord()1073 void Filter::detachFilterFromRecord() {
1074     mDvr = nullptr;
1075 }
1076 
createAvIonFd(int size)1077 int Filter::createAvIonFd(int size) {
1078     // Create an DMA-BUF fd and allocate an av fd mapped to a buffer to it.
1079     auto buffer_allocator = std::make_unique<BufferAllocator>();
1080     if (!buffer_allocator) {
1081         ALOGE("[Filter] Unable to create BufferAllocator object");
1082         return -1;
1083     }
1084     int av_fd = -1;
1085     av_fd = buffer_allocator->Alloc("system-uncached", size);
1086     if (av_fd < 0) {
1087         ALOGE("[Filter] Failed to create av fd %d", errno);
1088         return -1;
1089     }
1090     return av_fd;
1091 }
1092 
getIonBuffer(int fd,int size)1093 uint8_t* Filter::getIonBuffer(int fd, int size) {
1094     uint8_t* avBuf = static_cast<uint8_t*>(
1095             mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 /*offset*/));
1096     if (avBuf == MAP_FAILED) {
1097         ALOGE("[Filter] fail to allocate buffer %d", errno);
1098         return NULL;
1099     }
1100     return avBuf;
1101 }
1102 
createNativeHandle(int fd)1103 native_handle_t* Filter::createNativeHandle(int fd) {
1104     native_handle_t* nativeHandle;
1105     if (fd < 0) {
1106         nativeHandle = native_handle_create(/*numFd*/ 0, 0);
1107     } else {
1108         // Create a native handle to pass the av fd via the callback event.
1109         nativeHandle = native_handle_create(/*numFd*/ 1, 0);
1110     }
1111     if (nativeHandle == NULL) {
1112         ALOGE("[Filter] Failed to create native_handle %d", errno);
1113         return NULL;
1114     }
1115     if (nativeHandle->numFds > 0) {
1116         nativeHandle->data[0] = dup(fd);
1117     }
1118     return nativeHandle;
1119 }
1120 
createIndependentMediaEvents(vector<int8_t> & output)1121 ::ndk::ScopedAStatus Filter::createIndependentMediaEvents(vector<int8_t>& output) {
1122     int av_fd = createAvIonFd(output.size());
1123     if (av_fd == -1) {
1124         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1125                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1126     }
1127     // copy the filtered data to the buffer
1128     uint8_t* avBuffer = getIonBuffer(av_fd, output.size());
1129     if (avBuffer == NULL) {
1130         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1131                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1132     }
1133     memcpy(avBuffer, output.data(), output.size() * sizeof(uint8_t));
1134 
1135     native_handle_t* nativeHandle = createNativeHandle(av_fd);
1136     if (nativeHandle == NULL) {
1137         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1138                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1139     }
1140 
1141     // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1142     uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1143     mDataId2Avfd[dataId] = dup(av_fd);
1144 
1145     // Create mediaEvent and send callback
1146     auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1147     auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1148     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1149     mediaEvent.dataLength = static_cast<int64_t>(output.size());
1150     mediaEvent.avDataId = static_cast<int64_t>(dataId);
1151     if (mPts) {
1152         mediaEvent.pts = mPts;
1153         mPts = 0;
1154     }
1155 
1156     {
1157         std::lock_guard<std::mutex> lock(mFilterEventsLock);
1158         mFilterEvents.push_back(std::move(event));
1159     }
1160 
1161     // Clear and log
1162     native_handle_close(nativeHandle);
1163     native_handle_delete(nativeHandle);
1164     output.clear();
1165     mAvBufferCopyCount = 0;
1166     if (DEBUG_FILTER) {
1167         ALOGD("[Filter] av data length %d", static_cast<int32_t>(output.size()));
1168     }
1169     return ::ndk::ScopedAStatus::ok();
1170 }
1171 
createShareMemMediaEvents(vector<int8_t> & output)1172 ::ndk::ScopedAStatus Filter::createShareMemMediaEvents(vector<int8_t>& output) {
1173     // copy the filtered data to the shared buffer
1174     uint8_t* sharedAvBuffer =
1175             getIonBuffer(mSharedAvMemHandle->data[0], output.size() + mSharedAvMemOffset);
1176     if (sharedAvBuffer == NULL) {
1177         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1178                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1179     }
1180     memcpy(sharedAvBuffer + mSharedAvMemOffset, output.data(), output.size() * sizeof(uint8_t));
1181 
1182     // Create a memory handle with numFds == 0
1183     native_handle_t* nativeHandle = createNativeHandle(-1);
1184     if (nativeHandle == NULL) {
1185         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1186                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1187     }
1188 
1189     // Create mediaEvent and send callback
1190     auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1191     auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1192     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1193     mediaEvent.offset = mSharedAvMemOffset;
1194     mediaEvent.dataLength = static_cast<int64_t>(output.size());
1195     if (mPts) {
1196         mediaEvent.pts = mPts;
1197         mPts = 0;
1198     }
1199 
1200     {
1201         std::lock_guard<std::mutex> lock(mFilterEventsLock);
1202         mFilterEvents.push_back(std::move(event));
1203     }
1204 
1205     mSharedAvMemOffset += output.size();
1206 
1207     // Clear and log
1208     native_handle_close(nativeHandle);
1209     native_handle_delete(nativeHandle);
1210     output.clear();
1211     if (DEBUG_FILTER) {
1212         ALOGD("[Filter] shared av data length %d", static_cast<int32_t>(output.size()));
1213     }
1214     return ::ndk::ScopedAStatus::ok();
1215 }
1216 
sameFile(int fd1,int fd2)1217 bool Filter::sameFile(int fd1, int fd2) {
1218     struct stat stat1, stat2;
1219     if (fstat(fd1, &stat1) < 0 || fstat(fd2, &stat2) < 0) {
1220         return false;
1221     }
1222     return (stat1.st_dev == stat2.st_dev) && (stat1.st_ino == stat2.st_ino);
1223 }
1224 
createMediaEvent(vector<DemuxFilterEvent> & events,bool isAudioPresentation)1225 void Filter::createMediaEvent(vector<DemuxFilterEvent>& events, bool isAudioPresentation) {
1226     DemuxFilterMediaEvent mediaEvent;
1227     mediaEvent.streamId = 1;
1228     mediaEvent.isPtsPresent = true;
1229     mediaEvent.isDtsPresent = false;
1230     mediaEvent.dataLength = 3;
1231     mediaEvent.offset = 4;
1232     mediaEvent.isSecureMemory = true;
1233     mediaEvent.mpuSequenceNumber = 6;
1234     mediaEvent.isPesPrivateData = true;
1235 
1236     if (isAudioPresentation) {
1237         AudioPresentation audioPresentation0{
1238                 .preselection.preselectionId = 0,
1239                 .preselection.labels = {{"en", "Commentator"}, {"es", "Comentarista"}},
1240                 .preselection.language = "en",
1241                 .preselection.renderingIndication =
1242                         AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1243                 .preselection.hasAudioDescription = false,
1244                 .preselection.hasSpokenSubtitles = false,
1245                 .preselection.hasDialogueEnhancement = true,
1246                 .ac4ShortProgramId = 42};
1247         AudioPresentation audioPresentation1{
1248                 .preselection.preselectionId = 1,
1249                 .preselection.labels = {{"en", "Crowd"}, {"es", "Multitud"}},
1250                 .preselection.language = "en",
1251                 .preselection.renderingIndication =
1252                         AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1253                 .preselection.hasAudioDescription = false,
1254                 .preselection.hasSpokenSubtitles = false,
1255                 .preselection.hasDialogueEnhancement = false,
1256                 .ac4ShortProgramId = 42};
1257         vector<AudioPresentation> audioPresentations;
1258         audioPresentations.push_back(audioPresentation0);
1259         audioPresentations.push_back(audioPresentation1);
1260         mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audioPresentations>(
1261                 audioPresentations);
1262     } else {
1263         AudioExtraMetaData audio;
1264         audio.adFade = 1;
1265         audio.adPan = 2;
1266         audio.versionTextTag = 3;
1267         audio.adGainCenter = 4;
1268         audio.adGainFront = 5;
1269         audio.adGainSurround = 6;
1270         mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audio>(audio);
1271     }
1272 
1273     int av_fd = createAvIonFd(BUFFER_SIZE);
1274     if (av_fd == -1) {
1275         return;
1276     }
1277 
1278     native_handle_t* nativeHandle = createNativeHandle(av_fd);
1279     if (nativeHandle == nullptr) {
1280         ::close(av_fd);
1281         ALOGE("[Filter] Failed to create native_handle %d", errno);
1282         return;
1283     }
1284 
1285     // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1286     uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1287     mDataId2Avfd[dataId] = dup(av_fd);
1288 
1289     mediaEvent.avDataId = static_cast<int64_t>(dataId);
1290     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1291 
1292     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(std::move(mediaEvent)));
1293 
1294     native_handle_close(nativeHandle);
1295     native_handle_delete(nativeHandle);
1296 }
1297 
createTsRecordEvent(vector<DemuxFilterEvent> & events)1298 void Filter::createTsRecordEvent(vector<DemuxFilterEvent>& events) {
1299     DemuxPid pid;
1300     DemuxFilterScIndexMask mask;
1301     DemuxFilterTsRecordEvent tsRecord1;
1302     pid.set<DemuxPid::Tag::tPid>(1);
1303     mask.set<DemuxFilterScIndexMask::Tag::scIndex>(1);
1304     tsRecord1.pid = pid;
1305     tsRecord1.tsIndexMask = 1;
1306     tsRecord1.scIndexMask = mask;
1307     tsRecord1.byteNumber = 2;
1308 
1309     DemuxFilterTsRecordEvent tsRecord2;
1310     tsRecord2.pts = 1;
1311     tsRecord2.firstMbInSlice = 2;  // random address
1312 
1313     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord1)));
1314     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord2)));
1315 }
1316 
createMmtpRecordEvent(vector<DemuxFilterEvent> & events)1317 void Filter::createMmtpRecordEvent(vector<DemuxFilterEvent>& events) {
1318     DemuxFilterMmtpRecordEvent mmtpRecord1;
1319     mmtpRecord1.scHevcIndexMask = 1;
1320     mmtpRecord1.byteNumber = 2;
1321 
1322     DemuxFilterMmtpRecordEvent mmtpRecord2;
1323     mmtpRecord2.pts = 1;
1324     mmtpRecord2.mpuSequenceNumber = 2;
1325     mmtpRecord2.firstMbInSlice = 3;
1326     mmtpRecord2.tsIndexMask = 4;
1327 
1328     events.push_back(
1329             DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord1)));
1330     events.push_back(
1331             DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord2)));
1332 }
1333 
createSectionEvent(vector<DemuxFilterEvent> & events)1334 void Filter::createSectionEvent(vector<DemuxFilterEvent>& events) {
1335     DemuxFilterSectionEvent section;
1336     section.tableId = 1;
1337     section.version = 2;
1338     section.sectionNum = 3;
1339     section.dataLength = 0;
1340 
1341     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(std::move(section)));
1342 }
1343 
createPesEvent(vector<DemuxFilterEvent> & events)1344 void Filter::createPesEvent(vector<DemuxFilterEvent>& events) {
1345     DemuxFilterPesEvent pes;
1346     pes.streamId = 1;
1347     pes.dataLength = 1;
1348     pes.mpuSequenceNumber = 2;
1349 
1350     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(std::move(pes)));
1351 }
1352 
createDownloadEvent(vector<DemuxFilterEvent> & events)1353 void Filter::createDownloadEvent(vector<DemuxFilterEvent>& events) {
1354     DemuxFilterDownloadEvent download;
1355     download.itemId = 1;
1356     download.downloadId = 1;
1357     download.mpuSequenceNumber = 2;
1358     download.itemFragmentIndex = 3;
1359     download.lastItemFragmentIndex = 4;
1360     download.dataLength = 0;
1361 
1362     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::download>(std::move(download)));
1363 }
1364 
createIpPayloadEvent(vector<DemuxFilterEvent> & events)1365 void Filter::createIpPayloadEvent(vector<DemuxFilterEvent>& events) {
1366     DemuxFilterIpPayloadEvent ipPayload;
1367     ipPayload.dataLength = 0;
1368 
1369     events.push_back(
1370             DemuxFilterEvent::make<DemuxFilterEvent::Tag::ipPayload>(std::move(ipPayload)));
1371 }
1372 
createTemiEvent(vector<DemuxFilterEvent> & events)1373 void Filter::createTemiEvent(vector<DemuxFilterEvent>& events) {
1374     DemuxFilterTemiEvent temi;
1375     temi.pts = 1;
1376     temi.descrTag = 2;
1377     temi.descrData = {3};
1378 
1379     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::temi>(std::move(temi)));
1380 }
1381 
createMonitorEvent(vector<DemuxFilterEvent> & events)1382 void Filter::createMonitorEvent(vector<DemuxFilterEvent>& events) {
1383     DemuxFilterMonitorEvent monitor;
1384     monitor.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(ScramblingStatus::SCRAMBLED);
1385 
1386     events.push_back(
1387             DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(std::move(monitor)));
1388 }
1389 
createRestartEvent(vector<DemuxFilterEvent> & events)1390 void Filter::createRestartEvent(vector<DemuxFilterEvent>& events) {
1391     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(1));
1392 }
1393 
1394 }  // namespace tuner
1395 }  // namespace tv
1396 }  // namespace hardware
1397 }  // namespace android
1398 }  // namespace aidl
1399