1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23
24 #include <inttypes.h>
25
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30
31 namespace android {
32
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35 mQueue.pop_front();
36 return work;
37 }
38
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40 mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44 return mQueue.empty();
45 }
46
clear()47 void SimpleC2Component::WorkQueue::clear() {
48 mQueue.clear();
49 }
50
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52 return mQueue.front().drainMode;
53 }
54
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56 mQueue.push_back({ nullptr, drainMode });
57 }
58
59 ////////////////////////////////////////////////////////////////////////////////
60
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64 const std::shared_ptr<SimpleC2Component> &thiz) {
65 mThiz = thiz;
66 }
67
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69 sp<AReplyToken> replyId;
70 CHECK(msg->senderAwaitsResponse(&replyId));
71 sp<AMessage> reply = new AMessage;
72 if (err) {
73 reply->setInt32("err", *err);
74 }
75 reply->postReply(replyId);
76 }
77
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79 std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80 if (!thiz) {
81 ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82 sp<AReplyToken> replyId;
83 if (msg->senderAwaitsResponse(&replyId)) {
84 sp<AMessage> reply = new AMessage;
85 reply->setInt32("err", C2_CORRUPTED);
86 reply->postReply(replyId);
87 }
88 return;
89 }
90
91 switch (msg->what()) {
92 case kWhatProcess: {
93 if (mRunning) {
94 if (thiz->processQueue()) {
95 (new AMessage(kWhatProcess, this))->post();
96 }
97 } else {
98 ALOGV("Ignore process message as we're not running");
99 }
100 break;
101 }
102 case kWhatInit: {
103 int32_t err = thiz->onInit();
104 Reply(msg, &err);
105 [[fallthrough]];
106 }
107 case kWhatStart: {
108 mRunning = true;
109 break;
110 }
111 case kWhatStop: {
112 int32_t err = thiz->onStop();
113 Reply(msg, &err);
114 break;
115 }
116 case kWhatReset: {
117 thiz->onReset();
118 mRunning = false;
119 Reply(msg);
120 break;
121 }
122 case kWhatRelease: {
123 thiz->onRelease();
124 mRunning = false;
125 Reply(msg);
126 break;
127 }
128 default: {
129 ALOGD("Unrecognized msg: %d", msg->what());
130 break;
131 }
132 }
133 }
134
135 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
136 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)137 BlockingBlockPool(const std::shared_ptr<C2BlockPool>& base): mBase{base} {}
138
getLocalId() const139 virtual local_id_t getLocalId() const override {
140 return mBase->getLocalId();
141 }
142
getAllocatorId() const143 virtual C2Allocator::id_t getAllocatorId() const override {
144 return mBase->getAllocatorId();
145 }
146
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)147 virtual c2_status_t fetchLinearBlock(
148 uint32_t capacity,
149 C2MemoryUsage usage,
150 std::shared_ptr<C2LinearBlock>* block) {
151 c2_status_t status;
152 do {
153 status = mBase->fetchLinearBlock(capacity, usage, block);
154 } while (status == C2_BLOCKING);
155 return status;
156 }
157
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)158 virtual c2_status_t fetchCircularBlock(
159 uint32_t capacity,
160 C2MemoryUsage usage,
161 std::shared_ptr<C2CircularBlock>* block) {
162 c2_status_t status;
163 do {
164 status = mBase->fetchCircularBlock(capacity, usage, block);
165 } while (status == C2_BLOCKING);
166 return status;
167 }
168
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)169 virtual c2_status_t fetchGraphicBlock(
170 uint32_t width, uint32_t height, uint32_t format,
171 C2MemoryUsage usage,
172 std::shared_ptr<C2GraphicBlock>* block) {
173 c2_status_t status;
174 do {
175 status = mBase->fetchGraphicBlock(width, height, format, usage,
176 block);
177 } while (status == C2_BLOCKING);
178 return status;
179 }
180
181 private:
182 std::shared_ptr<C2BlockPool> mBase;
183 };
184
185 ////////////////////////////////////////////////////////////////////////////////
186
187 namespace {
188
189 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon32a9a5ee0111::DummyReadView190 DummyReadView() : C2ReadView(C2_NO_INIT) {}
191 };
192
193 } // namespace
194
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)195 SimpleC2Component::SimpleC2Component(
196 const std::shared_ptr<C2ComponentInterface> &intf)
197 : mDummyReadView(DummyReadView()),
198 mIntf(intf),
199 mLooper(new ALooper),
200 mHandler(new WorkHandler) {
201 mLooper->setName(intf->getName().c_str());
202 (void)mLooper->registerHandler(mHandler);
203 mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
204 }
205
~SimpleC2Component()206 SimpleC2Component::~SimpleC2Component() {
207 mLooper->unregisterHandler(mHandler->id());
208 (void)mLooper->stop();
209 }
210
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)211 c2_status_t SimpleC2Component::setListener_vb(
212 const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
213 mHandler->setComponent(shared_from_this());
214
215 Mutexed<ExecState>::Locked state(mExecState);
216 if (state->mState == RUNNING) {
217 if (listener) {
218 return C2_BAD_STATE;
219 } else if (!mayBlock) {
220 return C2_BLOCKING;
221 }
222 }
223 state->mListener = listener;
224 // TODO: wait for listener change to have taken place before returning
225 // (e.g. if there is an ongoing listener callback)
226 return C2_OK;
227 }
228
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)229 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
230 {
231 Mutexed<ExecState>::Locked state(mExecState);
232 if (state->mState != RUNNING) {
233 return C2_BAD_STATE;
234 }
235 }
236 bool queueWasEmpty = false;
237 {
238 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
239 queueWasEmpty = queue->empty();
240 while (!items->empty()) {
241 queue->push_back(std::move(items->front()));
242 items->pop_front();
243 }
244 }
245 if (queueWasEmpty) {
246 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
247 }
248 return C2_OK;
249 }
250
announce_nb(const std::vector<C2WorkOutline> & items)251 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
252 (void)items;
253 return C2_OMITTED;
254 }
255
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)256 c2_status_t SimpleC2Component::flush_sm(
257 flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
258 (void)flushMode;
259 {
260 Mutexed<ExecState>::Locked state(mExecState);
261 if (state->mState != RUNNING) {
262 return C2_BAD_STATE;
263 }
264 }
265 {
266 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
267 queue->incGeneration();
268 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
269 while (!queue->empty()) {
270 std::unique_ptr<C2Work> work = queue->pop_front();
271 if (work) {
272 flushedWork->push_back(std::move(work));
273 }
274 }
275 while (!queue->pending().empty()) {
276 flushedWork->push_back(std::move(queue->pending().begin()->second));
277 queue->pending().erase(queue->pending().begin());
278 }
279 }
280
281 return C2_OK;
282 }
283
drain_nb(drain_mode_t drainMode)284 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
285 if (drainMode == DRAIN_CHAIN) {
286 return C2_OMITTED;
287 }
288 {
289 Mutexed<ExecState>::Locked state(mExecState);
290 if (state->mState != RUNNING) {
291 return C2_BAD_STATE;
292 }
293 }
294 bool queueWasEmpty = false;
295 {
296 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
297 queueWasEmpty = queue->empty();
298 queue->markDrain(drainMode);
299 }
300 if (queueWasEmpty) {
301 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
302 }
303
304 return C2_OK;
305 }
306
start()307 c2_status_t SimpleC2Component::start() {
308 Mutexed<ExecState>::Locked state(mExecState);
309 if (state->mState == RUNNING) {
310 return C2_BAD_STATE;
311 }
312 bool needsInit = (state->mState == UNINITIALIZED);
313 state.unlock();
314 if (needsInit) {
315 sp<AMessage> reply;
316 (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
317 int32_t err;
318 CHECK(reply->findInt32("err", &err));
319 if (err != C2_OK) {
320 return (c2_status_t)err;
321 }
322 } else {
323 (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
324 }
325 state.lock();
326 state->mState = RUNNING;
327 return C2_OK;
328 }
329
stop()330 c2_status_t SimpleC2Component::stop() {
331 ALOGV("stop");
332 {
333 Mutexed<ExecState>::Locked state(mExecState);
334 if (state->mState != RUNNING) {
335 return C2_BAD_STATE;
336 }
337 state->mState = STOPPED;
338 }
339 {
340 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
341 queue->clear();
342 queue->pending().clear();
343 }
344 sp<AMessage> reply;
345 (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
346 int32_t err;
347 CHECK(reply->findInt32("err", &err));
348 if (err != C2_OK) {
349 return (c2_status_t)err;
350 }
351 return C2_OK;
352 }
353
reset()354 c2_status_t SimpleC2Component::reset() {
355 ALOGV("reset");
356 {
357 Mutexed<ExecState>::Locked state(mExecState);
358 state->mState = UNINITIALIZED;
359 }
360 {
361 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
362 queue->clear();
363 queue->pending().clear();
364 }
365 sp<AMessage> reply;
366 (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
367 return C2_OK;
368 }
369
release()370 c2_status_t SimpleC2Component::release() {
371 ALOGV("release");
372 sp<AMessage> reply;
373 (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
374 return C2_OK;
375 }
376
intf()377 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
378 return mIntf;
379 }
380
381 namespace {
382
vec(std::unique_ptr<C2Work> & work)383 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
384 std::list<std::unique_ptr<C2Work>> ret;
385 ret.push_back(std::move(work));
386 return ret;
387 }
388
389 } // namespace
390
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)391 void SimpleC2Component::finish(
392 uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
393 std::unique_ptr<C2Work> work;
394 {
395 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
396 if (queue->pending().count(frameIndex) == 0) {
397 ALOGW("unknown frame index: %" PRIu64, frameIndex);
398 return;
399 }
400 work = std::move(queue->pending().at(frameIndex));
401 queue->pending().erase(frameIndex);
402 }
403 if (work) {
404 fillWork(work);
405 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
406 listener->onWorkDone_nb(shared_from_this(), vec(work));
407 ALOGV("returning pending work");
408 }
409 }
410
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)411 void SimpleC2Component::cloneAndSend(
412 uint64_t frameIndex,
413 const std::unique_ptr<C2Work> ¤tWork,
414 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
415 std::unique_ptr<C2Work> work(new C2Work);
416 if (currentWork->input.ordinal.frameIndex == frameIndex) {
417 work->input.flags = currentWork->input.flags;
418 work->input.ordinal = currentWork->input.ordinal;
419 } else {
420 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
421 if (queue->pending().count(frameIndex) == 0) {
422 ALOGW("unknown frame index: %" PRIu64, frameIndex);
423 return;
424 }
425 work->input.flags = queue->pending().at(frameIndex)->input.flags;
426 work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
427 }
428 work->worklets.emplace_back(new C2Worklet);
429 if (work) {
430 fillWork(work);
431 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
432 listener->onWorkDone_nb(shared_from_this(), vec(work));
433 ALOGV("cloned and sending work");
434 }
435 }
436
processQueue()437 bool SimpleC2Component::processQueue() {
438 std::unique_ptr<C2Work> work;
439 uint64_t generation;
440 int32_t drainMode;
441 bool isFlushPending = false;
442 bool hasQueuedWork = false;
443 {
444 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
445 if (queue->empty()) {
446 return false;
447 }
448
449 generation = queue->generation();
450 drainMode = queue->drainMode();
451 isFlushPending = queue->popPendingFlush();
452 work = queue->pop_front();
453 hasQueuedWork = !queue->empty();
454 }
455 if (isFlushPending) {
456 ALOGV("processing pending flush");
457 c2_status_t err = onFlush_sm();
458 if (err != C2_OK) {
459 ALOGD("flush err: %d", err);
460 // TODO: error
461 }
462 }
463
464 if (!mOutputBlockPool) {
465 c2_status_t err = [this] {
466 // TODO: don't use query_vb
467 C2StreamBufferTypeSetting::output outputFormat(0u);
468 std::vector<std::unique_ptr<C2Param>> params;
469 c2_status_t err = intf()->query_vb(
470 { &outputFormat },
471 { C2PortBlockPoolsTuning::output::PARAM_TYPE },
472 C2_DONT_BLOCK,
473 ¶ms);
474 if (err != C2_OK && err != C2_BAD_INDEX) {
475 ALOGD("query err = %d", err);
476 return err;
477 }
478 C2BlockPool::local_id_t poolId =
479 outputFormat.value == C2BufferData::GRAPHIC
480 ? C2BlockPool::BASIC_GRAPHIC
481 : C2BlockPool::BASIC_LINEAR;
482 if (params.size()) {
483 C2PortBlockPoolsTuning::output *outputPools =
484 C2PortBlockPoolsTuning::output::From(params[0].get());
485 if (outputPools && outputPools->flexCount() >= 1) {
486 poolId = outputPools->m.values[0];
487 }
488 }
489
490 std::shared_ptr<C2BlockPool> blockPool;
491 err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
492 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
493 (unsigned long long)poolId,
494 (unsigned long long)(
495 blockPool ? blockPool->getLocalId() : 111000111),
496 err);
497 if (err == C2_OK) {
498 mOutputBlockPool = std::make_shared<BlockingBlockPool>(blockPool);
499 }
500 return err;
501 }();
502 if (err != C2_OK) {
503 Mutexed<ExecState>::Locked state(mExecState);
504 std::shared_ptr<C2Component::Listener> listener = state->mListener;
505 state.unlock();
506 listener->onError_nb(shared_from_this(), err);
507 return hasQueuedWork;
508 }
509 }
510
511 if (!work) {
512 c2_status_t err = drain(drainMode, mOutputBlockPool);
513 if (err != C2_OK) {
514 Mutexed<ExecState>::Locked state(mExecState);
515 std::shared_ptr<C2Component::Listener> listener = state->mListener;
516 state.unlock();
517 listener->onError_nb(shared_from_this(), err);
518 }
519 return hasQueuedWork;
520 }
521
522 {
523 std::vector<C2Param *> updates;
524 for (const std::unique_ptr<C2Param> ¶m: work->input.configUpdate) {
525 if (param) {
526 updates.emplace_back(param.get());
527 }
528 }
529 if (!updates.empty()) {
530 std::vector<std::unique_ptr<C2SettingResult>> failures;
531 c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
532 ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
533 }
534 }
535
536 ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
537 // If input buffer list is not empty, it means we have some input to process on.
538 // However, input could be a null buffer. In such case, clear the buffer list
539 // before making call to process().
540 if (!work->input.buffers.empty() && !work->input.buffers[0]) {
541 ALOGD("Encountered null input buffer. Clearing the input buffer");
542 work->input.buffers.clear();
543 }
544 process(work, mOutputBlockPool);
545 ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
546 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
547 if (queue->generation() != generation) {
548 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
549 queue->generation(), generation);
550 work->result = C2_NOT_FOUND;
551 queue.unlock();
552
553 Mutexed<ExecState>::Locked state(mExecState);
554 std::shared_ptr<C2Component::Listener> listener = state->mListener;
555 state.unlock();
556 listener->onWorkDone_nb(shared_from_this(), vec(work));
557 return hasQueuedWork;
558 }
559 if (work->workletsProcessed != 0u) {
560 queue.unlock();
561 Mutexed<ExecState>::Locked state(mExecState);
562 ALOGV("returning this work");
563 std::shared_ptr<C2Component::Listener> listener = state->mListener;
564 state.unlock();
565 listener->onWorkDone_nb(shared_from_this(), vec(work));
566 } else {
567 ALOGV("queue pending work");
568 work->input.buffers.clear();
569 std::unique_ptr<C2Work> unexpected;
570
571 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
572 if (queue->pending().count(frameIndex) != 0) {
573 unexpected = std::move(queue->pending().at(frameIndex));
574 queue->pending().erase(frameIndex);
575 }
576 (void)queue->pending().insert({ frameIndex, std::move(work) });
577
578 queue.unlock();
579 if (unexpected) {
580 ALOGD("unexpected pending work");
581 unexpected->result = C2_CORRUPTED;
582 Mutexed<ExecState>::Locked state(mExecState);
583 std::shared_ptr<C2Component::Listener> listener = state->mListener;
584 state.unlock();
585 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
586 }
587 }
588 return hasQueuedWork;
589 }
590
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)591 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
592 const std::shared_ptr<C2LinearBlock> &block) {
593 return createLinearBuffer(block, block->offset(), block->size());
594 }
595
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)596 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
597 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
598 return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
599 }
600
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)601 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
602 const std::shared_ptr<C2GraphicBlock> &block) {
603 return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
604 }
605
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)606 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
607 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
608 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
609 }
610
611 } // namespace android
612