• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23 
24 #include <inttypes.h>
25 
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30 
31 #define DEBUG 0
32 #if DEBUG
33 #define DDD(...) ALOGD(__VA_ARGS__)
34 #else
35 #define DDD(...) ((void)0)
36 #endif
37 
38 namespace android {
39 
pop_front()40 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
41     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
42     mQueue.pop_front();
43     return work;
44 }
45 
push_back(std::unique_ptr<C2Work> work)46 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
47     mQueue.push_back({std::move(work), NO_DRAIN});
48 }
49 
empty() const50 bool SimpleC2Component::WorkQueue::empty() const { return mQueue.empty(); }
51 
clear()52 void SimpleC2Component::WorkQueue::clear() { mQueue.clear(); }
53 
drainMode() const54 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
55     return mQueue.front().drainMode;
56 }
57 
markDrain(uint32_t drainMode)58 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
59     mQueue.push_back({nullptr, drainMode});
60 }
61 
62 ////////////////////////////////////////////////////////////////////////////////
63 
WorkHandler()64 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
65 
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)66 void SimpleC2Component::WorkHandler::setComponent(
67     const std::shared_ptr<SimpleC2Component> &thiz) {
68     mThiz = thiz;
69 }
70 
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)71 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
72     sp<AReplyToken> replyId;
73     CHECK(msg->senderAwaitsResponse(&replyId));
74     sp<AMessage> reply = new AMessage;
75     if (err) {
76         reply->setInt32("err", *err);
77     }
78     reply->postReply(replyId);
79 }
80 
onMessageReceived(const sp<AMessage> & msg)81 void SimpleC2Component::WorkHandler::onMessageReceived(
82     const sp<AMessage> &msg) {
83     std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
84     if (!thiz) {
85         ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
86         sp<AReplyToken> replyId;
87         if (msg->senderAwaitsResponse(&replyId)) {
88             sp<AMessage> reply = new AMessage;
89             reply->setInt32("err", C2_CORRUPTED);
90             reply->postReply(replyId);
91         }
92         return;
93     }
94 
95     switch (msg->what()) {
96     case kWhatProcess: {
97         if (mRunning) {
98             if (thiz->processQueue()) {
99                 (new AMessage(kWhatProcess, this))->post();
100             }
101         } else {
102             DDD("Ignore process message as we're not running");
103         }
104         break;
105     }
106     case kWhatInit: {
107         int32_t err = thiz->onInit();
108         Reply(msg, &err);
109         [[fallthrough]];
110     }
111     case kWhatStart: {
112         mRunning = true;
113         break;
114     }
115     case kWhatStop: {
116         int32_t err = thiz->onStop();
117         Reply(msg, &err);
118         break;
119     }
120     case kWhatReset: {
121         thiz->onReset();
122         mRunning = false;
123         Reply(msg);
124         break;
125     }
126     case kWhatRelease: {
127         thiz->onRelease();
128         mRunning = false;
129         Reply(msg);
130         break;
131     }
132     default: {
133         ALOGD("Unrecognized msg: %d", msg->what());
134         break;
135     }
136     }
137 }
138 
139 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
140   public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)141     BlockingBlockPool(const std::shared_ptr<C2BlockPool> &base) : mBase{base} {}
142 
getLocalId() const143     virtual local_id_t getLocalId() const override {
144         return mBase->getLocalId();
145     }
146 
getAllocatorId() const147     virtual C2Allocator::id_t getAllocatorId() const override {
148         return mBase->getAllocatorId();
149     }
150 
151     virtual c2_status_t
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)152     fetchLinearBlock(uint32_t capacity, C2MemoryUsage usage,
153                      std::shared_ptr<C2LinearBlock> *block) {
154         c2_status_t status;
155         do {
156             status = mBase->fetchLinearBlock(capacity, usage, block);
157         } while (status == C2_BLOCKING);
158         return status;
159     }
160 
161     virtual c2_status_t
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)162     fetchCircularBlock(uint32_t capacity, C2MemoryUsage usage,
163                        std::shared_ptr<C2CircularBlock> *block) {
164         c2_status_t status;
165         do {
166             status = mBase->fetchCircularBlock(capacity, usage, block);
167         } while (status == C2_BLOCKING);
168         return status;
169     }
170 
171     virtual c2_status_t
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)172     fetchGraphicBlock(uint32_t width, uint32_t height, uint32_t format,
173                       C2MemoryUsage usage,
174                       std::shared_ptr<C2GraphicBlock> *block) {
175         c2_status_t status;
176         do {
177             status =
178                 mBase->fetchGraphicBlock(width, height, format, usage, block);
179         } while (status == C2_BLOCKING);
180         return status;
181     }
182 
183   private:
184     std::shared_ptr<C2BlockPool> mBase;
185 };
186 
187 ////////////////////////////////////////////////////////////////////////////////
188 
189 namespace {
190 
191 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anone0837dbf0111::DummyReadView192     DummyReadView() : C2ReadView(C2_NO_INIT) {}
193 };
194 
195 } // namespace
196 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)197 SimpleC2Component::SimpleC2Component(
198     const std::shared_ptr<C2ComponentInterface> &intf)
199     : mDummyReadView(DummyReadView()), mIntf(intf), mLooper(new ALooper),
200       mHandler(new WorkHandler) {
201     mLooper->setName(intf->getName().c_str());
202     (void)mLooper->registerHandler(mHandler);
203     mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
204 }
205 
~SimpleC2Component()206 SimpleC2Component::~SimpleC2Component() {
207     mLooper->unregisterHandler(mHandler->id());
208     (void)mLooper->stop();
209 }
210 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)211 c2_status_t SimpleC2Component::setListener_vb(
212     const std::shared_ptr<C2Component::Listener> &listener,
213     c2_blocking_t mayBlock) {
214     mHandler->setComponent(shared_from_this());
215 
216     Mutexed<ExecState>::Locked state(mExecState);
217     if (state->mState == RUNNING) {
218         if (listener) {
219             return C2_BAD_STATE;
220         } else if (!mayBlock) {
221             return C2_BLOCKING;
222         }
223     }
224     state->mListener = listener;
225     // TODO: wait for listener change to have taken place before returning
226     // (e.g. if there is an ongoing listener callback)
227     return C2_OK;
228 }
229 
230 c2_status_t
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)231 SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> *const items) {
232     {
233         Mutexed<ExecState>::Locked state(mExecState);
234         if (state->mState != RUNNING) {
235             return C2_BAD_STATE;
236         }
237     }
238     bool queueWasEmpty = false;
239     {
240         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
241         queueWasEmpty = queue->empty();
242         while (!items->empty()) {
243             queue->push_back(std::move(items->front()));
244             items->pop_front();
245         }
246     }
247     if (queueWasEmpty) {
248         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
249     }
250     return C2_OK;
251 }
252 
253 c2_status_t
announce_nb(const std::vector<C2WorkOutline> & items)254 SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
255     (void)items;
256     return C2_OMITTED;
257 }
258 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)259 c2_status_t SimpleC2Component::flush_sm(
260     flush_mode_t flushMode,
261     std::list<std::unique_ptr<C2Work>> *const flushedWork) {
262     (void)flushMode;
263     {
264         Mutexed<ExecState>::Locked state(mExecState);
265         if (state->mState != RUNNING) {
266             return C2_BAD_STATE;
267         }
268     }
269     {
270         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
271         queue->incGeneration();
272         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
273         while (!queue->empty()) {
274             std::unique_ptr<C2Work> work = queue->pop_front();
275             if (work) {
276                 flushedWork->push_back(std::move(work));
277             }
278         }
279         while (!queue->pending().empty()) {
280             flushedWork->push_back(std::move(queue->pending().begin()->second));
281             queue->pending().erase(queue->pending().begin());
282         }
283     }
284 
285     return C2_OK;
286 }
287 
drain_nb(drain_mode_t drainMode)288 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
289     if (drainMode == DRAIN_CHAIN) {
290         return C2_OMITTED;
291     }
292     {
293         Mutexed<ExecState>::Locked state(mExecState);
294         if (state->mState != RUNNING) {
295             return C2_BAD_STATE;
296         }
297     }
298     bool queueWasEmpty = false;
299     {
300         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
301         queueWasEmpty = queue->empty();
302         queue->markDrain(drainMode);
303     }
304     if (queueWasEmpty) {
305         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
306     }
307 
308     return C2_OK;
309 }
310 
start()311 c2_status_t SimpleC2Component::start() {
312     Mutexed<ExecState>::Locked state(mExecState);
313     if (state->mState == RUNNING) {
314         return C2_BAD_STATE;
315     }
316     bool needsInit = (state->mState == UNINITIALIZED);
317     state.unlock();
318     if (needsInit) {
319         sp<AMessage> reply;
320         (new AMessage(WorkHandler::kWhatInit, mHandler))
321             ->postAndAwaitResponse(&reply);
322         int32_t err;
323         CHECK(reply->findInt32("err", &err));
324         if (err != C2_OK) {
325             return (c2_status_t)err;
326         }
327     } else {
328         (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
329     }
330     state.lock();
331     state->mState = RUNNING;
332     return C2_OK;
333 }
334 
stop()335 c2_status_t SimpleC2Component::stop() {
336     DDD("stop");
337     {
338         Mutexed<ExecState>::Locked state(mExecState);
339         if (state->mState != RUNNING) {
340             return C2_BAD_STATE;
341         }
342         state->mState = STOPPED;
343     }
344     {
345         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
346         queue->clear();
347         queue->pending().clear();
348     }
349     sp<AMessage> reply;
350     (new AMessage(WorkHandler::kWhatStop, mHandler))
351         ->postAndAwaitResponse(&reply);
352     int32_t err;
353     CHECK(reply->findInt32("err", &err));
354     if (err != C2_OK) {
355         return (c2_status_t)err;
356     }
357     return C2_OK;
358 }
359 
reset()360 c2_status_t SimpleC2Component::reset() {
361     DDD("reset");
362     {
363         Mutexed<ExecState>::Locked state(mExecState);
364         state->mState = UNINITIALIZED;
365     }
366     {
367         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
368         queue->clear();
369         queue->pending().clear();
370     }
371     sp<AMessage> reply;
372     (new AMessage(WorkHandler::kWhatReset, mHandler))
373         ->postAndAwaitResponse(&reply);
374     return C2_OK;
375 }
376 
release()377 c2_status_t SimpleC2Component::release() {
378     DDD("release");
379     sp<AMessage> reply;
380     (new AMessage(WorkHandler::kWhatRelease, mHandler))
381         ->postAndAwaitResponse(&reply);
382     return C2_OK;
383 }
384 
intf()385 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
386     return mIntf;
387 }
388 
389 namespace {
390 
vec(std::unique_ptr<C2Work> & work)391 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
392     std::list<std::unique_ptr<C2Work>> ret;
393     ret.push_back(std::move(work));
394     return ret;
395 }
396 
397 } // namespace
398 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)399 void SimpleC2Component::finish(
400     uint64_t frameIndex,
401     std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
402     std::unique_ptr<C2Work> work;
403     {
404         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
405         if (queue->pending().count(frameIndex) == 0) {
406             ALOGW("unknown frame index: %" PRIu64, frameIndex);
407             return;
408         }
409         work = std::move(queue->pending().at(frameIndex));
410         queue->pending().erase(frameIndex);
411     }
412     if (work) {
413         fillWork(work);
414         std::shared_ptr<C2Component::Listener> listener =
415             mExecState.lock()->mListener;
416         listener->onWorkDone_nb(shared_from_this(), vec(work));
417         DDD("returning pending work");
418     }
419 }
420 
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)421 void SimpleC2Component::cloneAndSend(
422     uint64_t frameIndex, const std::unique_ptr<C2Work> &currentWork,
423     std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
424     std::unique_ptr<C2Work> work(new C2Work);
425     if (currentWork->input.ordinal.frameIndex == frameIndex) {
426         work->input.flags = currentWork->input.flags;
427         work->input.ordinal = currentWork->input.ordinal;
428     } else {
429         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
430         if (queue->pending().count(frameIndex) == 0) {
431             ALOGW("unknown frame index: %" PRIu64, frameIndex);
432             return;
433         }
434         work->input.flags = queue->pending().at(frameIndex)->input.flags;
435         work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
436     }
437     work->worklets.emplace_back(new C2Worklet);
438     if (work) {
439         fillWork(work);
440         std::shared_ptr<C2Component::Listener> listener =
441             mExecState.lock()->mListener;
442         listener->onWorkDone_nb(shared_from_this(), vec(work));
443         DDD("cloned and sending work");
444     }
445 }
446 
processQueue()447 bool SimpleC2Component::processQueue() {
448     std::unique_ptr<C2Work> work;
449     uint64_t generation;
450     int32_t drainMode;
451     bool isFlushPending = false;
452     bool hasQueuedWork = false;
453     {
454         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
455         if (queue->empty()) {
456             return false;
457         }
458 
459         generation = queue->generation();
460         drainMode = queue->drainMode();
461         isFlushPending = queue->popPendingFlush();
462         work = queue->pop_front();
463         hasQueuedWork = !queue->empty();
464     }
465     if (isFlushPending) {
466         DDD("processing pending flush");
467         c2_status_t err = onFlush_sm();
468         if (err != C2_OK) {
469             ALOGD("flush err: %d", err);
470             // TODO: error
471         }
472     }
473 
474     if (!mOutputBlockPool) {
475         c2_status_t err = [this] {
476             // TODO: don't use query_vb
477             C2StreamBufferTypeSetting::output outputFormat(0u);
478             std::vector<std::unique_ptr<C2Param>> params;
479             c2_status_t err = intf()->query_vb(
480                 {&outputFormat}, {C2PortBlockPoolsTuning::output::PARAM_TYPE},
481                 C2_DONT_BLOCK, &params);
482             if (err != C2_OK && err != C2_BAD_INDEX) {
483                 ALOGD("query err = %d", err);
484                 return err;
485             }
486             C2BlockPool::local_id_t poolId =
487                 outputFormat.value == C2BufferData::GRAPHIC
488                     ? C2BlockPool::BASIC_GRAPHIC
489                     : C2BlockPool::BASIC_LINEAR;
490             if (params.size()) {
491                 C2PortBlockPoolsTuning::output *outputPools =
492                     C2PortBlockPoolsTuning::output::From(params[0].get());
493                 if (outputPools && outputPools->flexCount() >= 1) {
494                     poolId = outputPools->m.values[0];
495                 }
496             }
497 
498             std::shared_ptr<C2BlockPool> blockPool;
499             err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
500             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
501                   (unsigned long long)poolId,
502                   (unsigned long long)(blockPool ? blockPool->getLocalId()
503                                                  : 111000111),
504                   err);
505             if (err == C2_OK) {
506                 mOutputBlockPool =
507                     std::make_shared<BlockingBlockPool>(blockPool);
508             }
509             return err;
510         }();
511         if (err != C2_OK) {
512             Mutexed<ExecState>::Locked state(mExecState);
513             std::shared_ptr<C2Component::Listener> listener = state->mListener;
514             state.unlock();
515             listener->onError_nb(shared_from_this(), err);
516             return hasQueuedWork;
517         }
518     }
519 
520     if (!work) {
521         c2_status_t err = drain(drainMode, mOutputBlockPool);
522         if (err != C2_OK) {
523             Mutexed<ExecState>::Locked state(mExecState);
524             std::shared_ptr<C2Component::Listener> listener = state->mListener;
525             state.unlock();
526             listener->onError_nb(shared_from_this(), err);
527         }
528         return hasQueuedWork;
529     }
530 
531     {
532         std::vector<C2Param *> updates;
533         for (const std::unique_ptr<C2Param> &param : work->input.configUpdate) {
534             if (param) {
535                 updates.emplace_back(param.get());
536             }
537         }
538         if (!updates.empty()) {
539             std::vector<std::unique_ptr<C2SettingResult>> failures;
540             c2_status_t err =
541                 intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
542             ALOGD("applied %zu configUpdates => %s (%d)", updates.size(),
543                   asString(err), err);
544         }
545     }
546 
547     DDD("start processing frame #%" PRIu64,
548         work->input.ordinal.frameIndex.peeku());
549     // If input buffer list is not empty, it means we have some input to process
550     // on. However, input could be a null buffer. In such case, clear the buffer
551     // list before making call to process().
552     if (!work->input.buffers.empty() && !work->input.buffers[0]) {
553         ALOGD("Encountered null input buffer. Clearing the input buffer");
554         work->input.buffers.clear();
555     }
556     process(work, mOutputBlockPool);
557     DDD("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
558     Mutexed<WorkQueue>::Locked queue(mWorkQueue);
559     if (queue->generation() != generation) {
560         ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
561               queue->generation(), generation);
562         work->result = C2_NOT_FOUND;
563         queue.unlock();
564 
565         Mutexed<ExecState>::Locked state(mExecState);
566         std::shared_ptr<C2Component::Listener> listener = state->mListener;
567         state.unlock();
568         listener->onWorkDone_nb(shared_from_this(), vec(work));
569         return hasQueuedWork;
570     }
571     if (work->workletsProcessed != 0u) {
572         queue.unlock();
573         Mutexed<ExecState>::Locked state(mExecState);
574         DDD("returning this work");
575         std::shared_ptr<C2Component::Listener> listener = state->mListener;
576         state.unlock();
577         listener->onWorkDone_nb(shared_from_this(), vec(work));
578     } else {
579         work->input.buffers.clear();
580         std::unique_ptr<C2Work> unexpected;
581 
582         uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
583         DDD("queue pending work %" PRIu64, frameIndex);
584         if (queue->pending().count(frameIndex) != 0) {
585             unexpected = std::move(queue->pending().at(frameIndex));
586             queue->pending().erase(frameIndex);
587         }
588         (void)queue->pending().insert({frameIndex, std::move(work)});
589 
590         queue.unlock();
591         if (unexpected) {
592             ALOGD("unexpected pending work");
593             unexpected->result = C2_CORRUPTED;
594             Mutexed<ExecState>::Locked state(mExecState);
595             std::shared_ptr<C2Component::Listener> listener = state->mListener;
596             state.unlock();
597             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
598         }
599     }
600     return hasQueuedWork;
601 }
602 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)603 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
604     const std::shared_ptr<C2LinearBlock> &block) {
605     return createLinearBuffer(block, block->offset(), block->size());
606 }
607 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)608 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
609     const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
610     return C2Buffer::CreateLinearBuffer(
611         block->share(offset, size, ::C2Fence()));
612 }
613 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)614 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
615     const std::shared_ptr<C2GraphicBlock> &block) {
616     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
617 }
618 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)619 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
620     const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
621     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
622 }
623 
624 } // namespace android
625