• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23 
24 #include <inttypes.h>
25 
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30 
31 namespace android {
32 
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35     mQueue.pop_front();
36     return work;
37 }
38 
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40     mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42 
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44     return mQueue.empty();
45 }
46 
clear()47 void SimpleC2Component::WorkQueue::clear() {
48     mQueue.clear();
49 }
50 
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52     return mQueue.front().drainMode;
53 }
54 
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56     mQueue.push_back({ nullptr, drainMode });
57 }
58 
59 ////////////////////////////////////////////////////////////////////////////////
60 
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62 
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64         const std::shared_ptr<SimpleC2Component> &thiz) {
65     mThiz = thiz;
66 }
67 
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69     sp<AReplyToken> replyId;
70     CHECK(msg->senderAwaitsResponse(&replyId));
71     sp<AMessage> reply = new AMessage;
72     if (err) {
73         reply->setInt32("err", *err);
74     }
75     reply->postReply(replyId);
76 }
77 
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79     std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80     if (!thiz) {
81         ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82         sp<AReplyToken> replyId;
83         if (msg->senderAwaitsResponse(&replyId)) {
84             sp<AMessage> reply = new AMessage;
85             reply->setInt32("err", C2_CORRUPTED);
86             reply->postReply(replyId);
87         }
88         return;
89     }
90 
91     switch (msg->what()) {
92         case kWhatProcess: {
93             if (mRunning) {
94                 if (thiz->processQueue()) {
95                     (new AMessage(kWhatProcess, this))->post();
96                 }
97             } else {
98                 ALOGV("Ignore process message as we're not running");
99             }
100             break;
101         }
102         case kWhatInit: {
103             int32_t err = thiz->onInit();
104             Reply(msg, &err);
105             [[fallthrough]];
106         }
107         case kWhatStart: {
108             mRunning = true;
109             break;
110         }
111         case kWhatStop: {
112             int32_t err = thiz->onStop();
113             thiz->mOutputBlockPool.reset();
114             Reply(msg, &err);
115             break;
116         }
117         case kWhatReset: {
118             thiz->onReset();
119             thiz->mOutputBlockPool.reset();
120             mRunning = false;
121             Reply(msg);
122             break;
123         }
124         case kWhatRelease: {
125             thiz->onRelease();
126             thiz->mOutputBlockPool.reset();
127             mRunning = false;
128             Reply(msg);
129             break;
130         }
131         default: {
132             ALOGD("Unrecognized msg: %d", msg->what());
133             break;
134         }
135     }
136 }
137 
138 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
139 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)140     BlockingBlockPool(const std::shared_ptr<C2BlockPool>& base): mBase{base} {}
141 
getLocalId() const142     virtual local_id_t getLocalId() const override {
143         return mBase->getLocalId();
144     }
145 
getAllocatorId() const146     virtual C2Allocator::id_t getAllocatorId() const override {
147         return mBase->getAllocatorId();
148     }
149 
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)150     virtual c2_status_t fetchLinearBlock(
151             uint32_t capacity,
152             C2MemoryUsage usage,
153             std::shared_ptr<C2LinearBlock>* block) {
154         c2_status_t status;
155         do {
156             status = mBase->fetchLinearBlock(capacity, usage, block);
157         } while (status == C2_BLOCKING);
158         return status;
159     }
160 
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)161     virtual c2_status_t fetchCircularBlock(
162             uint32_t capacity,
163             C2MemoryUsage usage,
164             std::shared_ptr<C2CircularBlock>* block) {
165         c2_status_t status;
166         do {
167             status = mBase->fetchCircularBlock(capacity, usage, block);
168         } while (status == C2_BLOCKING);
169         return status;
170     }
171 
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)172     virtual c2_status_t fetchGraphicBlock(
173             uint32_t width, uint32_t height, uint32_t format,
174             C2MemoryUsage usage,
175             std::shared_ptr<C2GraphicBlock>* block) {
176         c2_status_t status;
177         do {
178             status = mBase->fetchGraphicBlock(width, height, format, usage,
179                                               block);
180         } while (status == C2_BLOCKING);
181         return status;
182     }
183 
184 private:
185     std::shared_ptr<C2BlockPool> mBase;
186 };
187 
188 ////////////////////////////////////////////////////////////////////////////////
189 
190 namespace {
191 
192 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon43dcb38c0111::DummyReadView193     DummyReadView() : C2ReadView(C2_NO_INIT) {}
194 };
195 
196 }  // namespace
197 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)198 SimpleC2Component::SimpleC2Component(
199         const std::shared_ptr<C2ComponentInterface> &intf)
200     : mDummyReadView(DummyReadView()),
201       mIntf(intf),
202       mLooper(new ALooper),
203       mHandler(new WorkHandler) {
204     mLooper->setName(intf->getName().c_str());
205     (void)mLooper->registerHandler(mHandler);
206     mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
207 }
208 
~SimpleC2Component()209 SimpleC2Component::~SimpleC2Component() {
210     mLooper->unregisterHandler(mHandler->id());
211     (void)mLooper->stop();
212 }
213 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)214 c2_status_t SimpleC2Component::setListener_vb(
215         const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
216     mHandler->setComponent(shared_from_this());
217 
218     Mutexed<ExecState>::Locked state(mExecState);
219     if (state->mState == RUNNING) {
220         if (listener) {
221             return C2_BAD_STATE;
222         } else if (!mayBlock) {
223             return C2_BLOCKING;
224         }
225     }
226     state->mListener = listener;
227     // TODO: wait for listener change to have taken place before returning
228     // (e.g. if there is an ongoing listener callback)
229     return C2_OK;
230 }
231 
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)232 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
233     {
234         Mutexed<ExecState>::Locked state(mExecState);
235         if (state->mState != RUNNING) {
236             return C2_BAD_STATE;
237         }
238     }
239     bool queueWasEmpty = false;
240     {
241         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
242         queueWasEmpty = queue->empty();
243         while (!items->empty()) {
244             queue->push_back(std::move(items->front()));
245             items->pop_front();
246         }
247     }
248     if (queueWasEmpty) {
249         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
250     }
251     return C2_OK;
252 }
253 
announce_nb(const std::vector<C2WorkOutline> & items)254 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
255     (void)items;
256     return C2_OMITTED;
257 }
258 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)259 c2_status_t SimpleC2Component::flush_sm(
260         flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
261     (void)flushMode;
262     {
263         Mutexed<ExecState>::Locked state(mExecState);
264         if (state->mState != RUNNING) {
265             return C2_BAD_STATE;
266         }
267     }
268     {
269         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
270         queue->incGeneration();
271         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
272         while (!queue->empty()) {
273             std::unique_ptr<C2Work> work = queue->pop_front();
274             if (work) {
275                 flushedWork->push_back(std::move(work));
276             }
277         }
278         while (!queue->pending().empty()) {
279             flushedWork->push_back(std::move(queue->pending().begin()->second));
280             queue->pending().erase(queue->pending().begin());
281         }
282     }
283 
284     return C2_OK;
285 }
286 
drain_nb(drain_mode_t drainMode)287 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
288     if (drainMode == DRAIN_CHAIN) {
289         return C2_OMITTED;
290     }
291     {
292         Mutexed<ExecState>::Locked state(mExecState);
293         if (state->mState != RUNNING) {
294             return C2_BAD_STATE;
295         }
296     }
297     bool queueWasEmpty = false;
298     {
299         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
300         queueWasEmpty = queue->empty();
301         queue->markDrain(drainMode);
302     }
303     if (queueWasEmpty) {
304         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
305     }
306 
307     return C2_OK;
308 }
309 
start()310 c2_status_t SimpleC2Component::start() {
311     Mutexed<ExecState>::Locked state(mExecState);
312     if (state->mState == RUNNING) {
313         return C2_BAD_STATE;
314     }
315     bool needsInit = (state->mState == UNINITIALIZED);
316     state.unlock();
317     if (needsInit) {
318         sp<AMessage> reply;
319         (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
320         int32_t err;
321         CHECK(reply->findInt32("err", &err));
322         if (err != C2_OK) {
323             return (c2_status_t)err;
324         }
325     } else {
326         (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
327     }
328     state.lock();
329     state->mState = RUNNING;
330     return C2_OK;
331 }
332 
stop()333 c2_status_t SimpleC2Component::stop() {
334     ALOGV("stop");
335     {
336         Mutexed<ExecState>::Locked state(mExecState);
337         if (state->mState != RUNNING) {
338             return C2_BAD_STATE;
339         }
340         state->mState = STOPPED;
341     }
342     {
343         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
344         queue->clear();
345         queue->pending().clear();
346     }
347     sp<AMessage> reply;
348     (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
349     int32_t err;
350     CHECK(reply->findInt32("err", &err));
351     if (err != C2_OK) {
352         return (c2_status_t)err;
353     }
354     return C2_OK;
355 }
356 
reset()357 c2_status_t SimpleC2Component::reset() {
358     ALOGV("reset");
359     {
360         Mutexed<ExecState>::Locked state(mExecState);
361         state->mState = UNINITIALIZED;
362     }
363     {
364         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
365         queue->clear();
366         queue->pending().clear();
367     }
368     sp<AMessage> reply;
369     (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
370     return C2_OK;
371 }
372 
release()373 c2_status_t SimpleC2Component::release() {
374     ALOGV("release");
375     sp<AMessage> reply;
376     (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
377     return C2_OK;
378 }
379 
intf()380 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
381     return mIntf;
382 }
383 
384 namespace {
385 
vec(std::unique_ptr<C2Work> & work)386 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
387     std::list<std::unique_ptr<C2Work>> ret;
388     ret.push_back(std::move(work));
389     return ret;
390 }
391 
392 }  // namespace
393 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)394 void SimpleC2Component::finish(
395         uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
396     std::unique_ptr<C2Work> work;
397     {
398         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
399         if (queue->pending().count(frameIndex) == 0) {
400             ALOGW("unknown frame index: %" PRIu64, frameIndex);
401             return;
402         }
403         work = std::move(queue->pending().at(frameIndex));
404         queue->pending().erase(frameIndex);
405     }
406     if (work) {
407         fillWork(work);
408         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
409         listener->onWorkDone_nb(shared_from_this(), vec(work));
410         ALOGV("returning pending work");
411     }
412 }
413 
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)414 void SimpleC2Component::cloneAndSend(
415         uint64_t frameIndex,
416         const std::unique_ptr<C2Work> &currentWork,
417         std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
418     std::unique_ptr<C2Work> work(new C2Work);
419     if (currentWork->input.ordinal.frameIndex == frameIndex) {
420         work->input.flags = currentWork->input.flags;
421         work->input.ordinal = currentWork->input.ordinal;
422     } else {
423         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
424         if (queue->pending().count(frameIndex) == 0) {
425             ALOGW("unknown frame index: %" PRIu64, frameIndex);
426             return;
427         }
428         work->input.flags = queue->pending().at(frameIndex)->input.flags;
429         work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
430     }
431     work->worklets.emplace_back(new C2Worklet);
432     if (work) {
433         fillWork(work);
434         std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
435         listener->onWorkDone_nb(shared_from_this(), vec(work));
436         ALOGV("cloned and sending work");
437     }
438 }
439 
processQueue()440 bool SimpleC2Component::processQueue() {
441     std::unique_ptr<C2Work> work;
442     uint64_t generation;
443     int32_t drainMode;
444     bool isFlushPending = false;
445     bool hasQueuedWork = false;
446     {
447         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
448         if (queue->empty()) {
449             return false;
450         }
451 
452         generation = queue->generation();
453         drainMode = queue->drainMode();
454         isFlushPending = queue->popPendingFlush();
455         work = queue->pop_front();
456         hasQueuedWork = !queue->empty();
457     }
458     if (isFlushPending) {
459         ALOGV("processing pending flush");
460         c2_status_t err = onFlush_sm();
461         if (err != C2_OK) {
462             ALOGD("flush err: %d", err);
463             // TODO: error
464         }
465     }
466 
467     if (!mOutputBlockPool) {
468         c2_status_t err = [this] {
469             // TODO: don't use query_vb
470             C2StreamBufferTypeSetting::output outputFormat(0u);
471             std::vector<std::unique_ptr<C2Param>> params;
472             c2_status_t err = intf()->query_vb(
473                     { &outputFormat },
474                     { C2PortBlockPoolsTuning::output::PARAM_TYPE },
475                     C2_DONT_BLOCK,
476                     &params);
477             if (err != C2_OK && err != C2_BAD_INDEX) {
478                 ALOGD("query err = %d", err);
479                 return err;
480             }
481             C2BlockPool::local_id_t poolId =
482                 outputFormat.value == C2BufferData::GRAPHIC
483                         ? C2BlockPool::BASIC_GRAPHIC
484                         : C2BlockPool::BASIC_LINEAR;
485             if (params.size()) {
486                 C2PortBlockPoolsTuning::output *outputPools =
487                     C2PortBlockPoolsTuning::output::From(params[0].get());
488                 if (outputPools && outputPools->flexCount() >= 1) {
489                     poolId = outputPools->m.values[0];
490                 }
491             }
492 
493             std::shared_ptr<C2BlockPool> blockPool;
494             err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
495             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
496                     (unsigned long long)poolId,
497                     (unsigned long long)(
498                             blockPool ? blockPool->getLocalId() : 111000111),
499                     err);
500             if (err == C2_OK) {
501                 mOutputBlockPool = std::make_shared<BlockingBlockPool>(blockPool);
502             }
503             return err;
504         }();
505         if (err != C2_OK) {
506             Mutexed<ExecState>::Locked state(mExecState);
507             std::shared_ptr<C2Component::Listener> listener = state->mListener;
508             state.unlock();
509             listener->onError_nb(shared_from_this(), err);
510             return hasQueuedWork;
511         }
512     }
513 
514     if (!work) {
515         c2_status_t err = drain(drainMode, mOutputBlockPool);
516         if (err != C2_OK) {
517             Mutexed<ExecState>::Locked state(mExecState);
518             std::shared_ptr<C2Component::Listener> listener = state->mListener;
519             state.unlock();
520             listener->onError_nb(shared_from_this(), err);
521         }
522         return hasQueuedWork;
523     }
524 
525     {
526         std::vector<C2Param *> updates;
527         for (const std::unique_ptr<C2Param> &param: work->input.configUpdate) {
528             if (param) {
529                 updates.emplace_back(param.get());
530             }
531         }
532         if (!updates.empty()) {
533             std::vector<std::unique_ptr<C2SettingResult>> failures;
534             c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
535             ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
536         }
537     }
538 
539     ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
540     // If input buffer list is not empty, it means we have some input to process on.
541     // However, input could be a null buffer. In such case, clear the buffer list
542     // before making call to process().
543     if (!work->input.buffers.empty() && !work->input.buffers[0]) {
544         ALOGD("Encountered null input buffer. Clearing the input buffer");
545         work->input.buffers.clear();
546     }
547     process(work, mOutputBlockPool);
548     ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
549     Mutexed<WorkQueue>::Locked queue(mWorkQueue);
550     if (queue->generation() != generation) {
551         ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
552                 queue->generation(), generation);
553         work->result = C2_NOT_FOUND;
554         queue.unlock();
555 
556         Mutexed<ExecState>::Locked state(mExecState);
557         std::shared_ptr<C2Component::Listener> listener = state->mListener;
558         state.unlock();
559         listener->onWorkDone_nb(shared_from_this(), vec(work));
560         return hasQueuedWork;
561     }
562     if (work->workletsProcessed != 0u) {
563         queue.unlock();
564         Mutexed<ExecState>::Locked state(mExecState);
565         ALOGV("returning this work");
566         std::shared_ptr<C2Component::Listener> listener = state->mListener;
567         state.unlock();
568         listener->onWorkDone_nb(shared_from_this(), vec(work));
569     } else {
570         ALOGV("queue pending work");
571         work->input.buffers.clear();
572         std::unique_ptr<C2Work> unexpected;
573 
574         uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
575         if (queue->pending().count(frameIndex) != 0) {
576             unexpected = std::move(queue->pending().at(frameIndex));
577             queue->pending().erase(frameIndex);
578         }
579         (void)queue->pending().insert({ frameIndex, std::move(work) });
580 
581         queue.unlock();
582         if (unexpected) {
583             ALOGD("unexpected pending work");
584             unexpected->result = C2_CORRUPTED;
585             Mutexed<ExecState>::Locked state(mExecState);
586             std::shared_ptr<C2Component::Listener> listener = state->mListener;
587             state.unlock();
588             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
589         }
590     }
591     return hasQueuedWork;
592 }
593 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)594 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
595         const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
596     return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
597 }
598 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)599 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
600         const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
601     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
602 }
603 
604 } // namespace android
605