1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23
24 #include <inttypes.h>
25
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30
31 #define DEBUG 0
32 #if DEBUG
33 #define DDD(...) ALOGD(__VA_ARGS__)
34 #else
35 #define DDD(...) ((void)0)
36 #endif
37
38 namespace android {
39
pop_front()40 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
41 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
42 mQueue.pop_front();
43 return work;
44 }
45
push_back(std::unique_ptr<C2Work> work)46 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
47 mQueue.push_back({std::move(work), NO_DRAIN});
48 }
49
empty() const50 bool SimpleC2Component::WorkQueue::empty() const { return mQueue.empty(); }
51
clear()52 void SimpleC2Component::WorkQueue::clear() { mQueue.clear(); }
53
drainMode() const54 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
55 return mQueue.front().drainMode;
56 }
57
markDrain(uint32_t drainMode)58 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
59 mQueue.push_back({nullptr, drainMode});
60 }
61
62 ////////////////////////////////////////////////////////////////////////////////
63
WorkHandler()64 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
65
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)66 void SimpleC2Component::WorkHandler::setComponent(
67 const std::shared_ptr<SimpleC2Component> &thiz) {
68 mThiz = thiz;
69 }
70
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)71 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
72 sp<AReplyToken> replyId;
73 CHECK(msg->senderAwaitsResponse(&replyId));
74 sp<AMessage> reply = new AMessage;
75 if (err) {
76 reply->setInt32("err", *err);
77 }
78 reply->postReply(replyId);
79 }
80
onMessageReceived(const sp<AMessage> & msg)81 void SimpleC2Component::WorkHandler::onMessageReceived(
82 const sp<AMessage> &msg) {
83 std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
84 if (!thiz) {
85 ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
86 sp<AReplyToken> replyId;
87 if (msg->senderAwaitsResponse(&replyId)) {
88 sp<AMessage> reply = new AMessage;
89 reply->setInt32("err", C2_CORRUPTED);
90 reply->postReply(replyId);
91 }
92 return;
93 }
94
95 switch (msg->what()) {
96 case kWhatProcess: {
97 if (mRunning) {
98 if (thiz->processQueue()) {
99 (new AMessage(kWhatProcess, this))->post();
100 }
101 } else {
102 DDD("Ignore process message as we're not running");
103 }
104 break;
105 }
106 case kWhatInit: {
107 int32_t err = thiz->onInit();
108 Reply(msg, &err);
109 [[fallthrough]];
110 }
111 case kWhatStart: {
112 mRunning = true;
113 break;
114 }
115 case kWhatStop: {
116 int32_t err = thiz->onStop();
117 Reply(msg, &err);
118 break;
119 }
120 case kWhatReset: {
121 thiz->onReset();
122 mRunning = false;
123 Reply(msg);
124 break;
125 }
126 case kWhatRelease: {
127 thiz->onRelease();
128 mRunning = false;
129 Reply(msg);
130 break;
131 }
132 default: {
133 ALOGD("Unrecognized msg: %d", msg->what());
134 break;
135 }
136 }
137 }
138
139 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
140 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)141 BlockingBlockPool(const std::shared_ptr<C2BlockPool> &base) : mBase{base} {}
142
getLocalId() const143 virtual local_id_t getLocalId() const override {
144 return mBase->getLocalId();
145 }
146
getAllocatorId() const147 virtual C2Allocator::id_t getAllocatorId() const override {
148 return mBase->getAllocatorId();
149 }
150
151 virtual c2_status_t
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)152 fetchLinearBlock(uint32_t capacity, C2MemoryUsage usage,
153 std::shared_ptr<C2LinearBlock> *block) {
154 c2_status_t status;
155 do {
156 status = mBase->fetchLinearBlock(capacity, usage, block);
157 } while (status == C2_BLOCKING);
158 return status;
159 }
160
161 virtual c2_status_t
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)162 fetchCircularBlock(uint32_t capacity, C2MemoryUsage usage,
163 std::shared_ptr<C2CircularBlock> *block) {
164 c2_status_t status;
165 do {
166 status = mBase->fetchCircularBlock(capacity, usage, block);
167 } while (status == C2_BLOCKING);
168 return status;
169 }
170
171 virtual c2_status_t
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)172 fetchGraphicBlock(uint32_t width, uint32_t height, uint32_t format,
173 C2MemoryUsage usage,
174 std::shared_ptr<C2GraphicBlock> *block) {
175 c2_status_t status;
176 do {
177 status =
178 mBase->fetchGraphicBlock(width, height, format, usage, block);
179 } while (status == C2_BLOCKING);
180 return status;
181 }
182
183 private:
184 std::shared_ptr<C2BlockPool> mBase;
185 };
186
187 ////////////////////////////////////////////////////////////////////////////////
188
189 namespace {
190
191 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anone0837dbf0111::DummyReadView192 DummyReadView() : C2ReadView(C2_NO_INIT) {}
193 };
194
195 } // namespace
196
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)197 SimpleC2Component::SimpleC2Component(
198 const std::shared_ptr<C2ComponentInterface> &intf)
199 : mDummyReadView(DummyReadView()), mIntf(intf), mLooper(new ALooper),
200 mHandler(new WorkHandler) {
201 mLooper->setName(intf->getName().c_str());
202 (void)mLooper->registerHandler(mHandler);
203 mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
204 }
205
~SimpleC2Component()206 SimpleC2Component::~SimpleC2Component() {
207 mLooper->unregisterHandler(mHandler->id());
208 (void)mLooper->stop();
209 }
210
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)211 c2_status_t SimpleC2Component::setListener_vb(
212 const std::shared_ptr<C2Component::Listener> &listener,
213 c2_blocking_t mayBlock) {
214 mHandler->setComponent(shared_from_this());
215
216 Mutexed<ExecState>::Locked state(mExecState);
217 if (state->mState == RUNNING) {
218 if (listener) {
219 return C2_BAD_STATE;
220 } else if (!mayBlock) {
221 return C2_BLOCKING;
222 }
223 }
224 state->mListener = listener;
225 // TODO: wait for listener change to have taken place before returning
226 // (e.g. if there is an ongoing listener callback)
227 return C2_OK;
228 }
229
230 c2_status_t
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)231 SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> *const items) {
232 {
233 Mutexed<ExecState>::Locked state(mExecState);
234 if (state->mState != RUNNING) {
235 return C2_BAD_STATE;
236 }
237 }
238 bool queueWasEmpty = false;
239 {
240 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
241 queueWasEmpty = queue->empty();
242 while (!items->empty()) {
243 queue->push_back(std::move(items->front()));
244 items->pop_front();
245 }
246 }
247 if (queueWasEmpty) {
248 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
249 }
250 return C2_OK;
251 }
252
253 c2_status_t
announce_nb(const std::vector<C2WorkOutline> & items)254 SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
255 (void)items;
256 return C2_OMITTED;
257 }
258
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)259 c2_status_t SimpleC2Component::flush_sm(
260 flush_mode_t flushMode,
261 std::list<std::unique_ptr<C2Work>> *const flushedWork) {
262 (void)flushMode;
263 {
264 Mutexed<ExecState>::Locked state(mExecState);
265 if (state->mState != RUNNING) {
266 return C2_BAD_STATE;
267 }
268 }
269 {
270 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
271 queue->incGeneration();
272 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
273 while (!queue->empty()) {
274 std::unique_ptr<C2Work> work = queue->pop_front();
275 if (work) {
276 flushedWork->push_back(std::move(work));
277 }
278 }
279 while (!queue->pending().empty()) {
280 flushedWork->push_back(std::move(queue->pending().begin()->second));
281 queue->pending().erase(queue->pending().begin());
282 }
283 }
284
285 return C2_OK;
286 }
287
drain_nb(drain_mode_t drainMode)288 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
289 if (drainMode == DRAIN_CHAIN) {
290 return C2_OMITTED;
291 }
292 {
293 Mutexed<ExecState>::Locked state(mExecState);
294 if (state->mState != RUNNING) {
295 return C2_BAD_STATE;
296 }
297 }
298 bool queueWasEmpty = false;
299 {
300 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
301 queueWasEmpty = queue->empty();
302 queue->markDrain(drainMode);
303 }
304 if (queueWasEmpty) {
305 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
306 }
307
308 return C2_OK;
309 }
310
start()311 c2_status_t SimpleC2Component::start() {
312 Mutexed<ExecState>::Locked state(mExecState);
313 if (state->mState == RUNNING) {
314 return C2_BAD_STATE;
315 }
316 bool needsInit = (state->mState == UNINITIALIZED);
317 state.unlock();
318 if (needsInit) {
319 sp<AMessage> reply;
320 (new AMessage(WorkHandler::kWhatInit, mHandler))
321 ->postAndAwaitResponse(&reply);
322 int32_t err;
323 CHECK(reply->findInt32("err", &err));
324 if (err != C2_OK) {
325 return (c2_status_t)err;
326 }
327 } else {
328 (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
329 }
330 state.lock();
331 state->mState = RUNNING;
332 return C2_OK;
333 }
334
stop()335 c2_status_t SimpleC2Component::stop() {
336 DDD("stop");
337 {
338 Mutexed<ExecState>::Locked state(mExecState);
339 if (state->mState != RUNNING) {
340 return C2_BAD_STATE;
341 }
342 state->mState = STOPPED;
343 }
344 {
345 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
346 queue->clear();
347 queue->pending().clear();
348 }
349 sp<AMessage> reply;
350 (new AMessage(WorkHandler::kWhatStop, mHandler))
351 ->postAndAwaitResponse(&reply);
352 int32_t err;
353 CHECK(reply->findInt32("err", &err));
354 if (err != C2_OK) {
355 return (c2_status_t)err;
356 }
357 return C2_OK;
358 }
359
reset()360 c2_status_t SimpleC2Component::reset() {
361 DDD("reset");
362 {
363 Mutexed<ExecState>::Locked state(mExecState);
364 state->mState = UNINITIALIZED;
365 }
366 {
367 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
368 queue->clear();
369 queue->pending().clear();
370 }
371 sp<AMessage> reply;
372 (new AMessage(WorkHandler::kWhatReset, mHandler))
373 ->postAndAwaitResponse(&reply);
374 return C2_OK;
375 }
376
release()377 c2_status_t SimpleC2Component::release() {
378 DDD("release");
379 sp<AMessage> reply;
380 (new AMessage(WorkHandler::kWhatRelease, mHandler))
381 ->postAndAwaitResponse(&reply);
382 return C2_OK;
383 }
384
intf()385 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
386 return mIntf;
387 }
388
389 namespace {
390
vec(std::unique_ptr<C2Work> & work)391 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
392 std::list<std::unique_ptr<C2Work>> ret;
393 ret.push_back(std::move(work));
394 return ret;
395 }
396
397 } // namespace
398
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)399 void SimpleC2Component::finish(
400 uint64_t frameIndex,
401 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
402 std::unique_ptr<C2Work> work;
403 {
404 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
405 if (queue->pending().count(frameIndex) == 0) {
406 ALOGW("unknown frame index: %" PRIu64, frameIndex);
407 return;
408 }
409 work = std::move(queue->pending().at(frameIndex));
410 queue->pending().erase(frameIndex);
411 }
412 if (work) {
413 fillWork(work);
414 std::shared_ptr<C2Component::Listener> listener =
415 mExecState.lock()->mListener;
416 listener->onWorkDone_nb(shared_from_this(), vec(work));
417 DDD("returning pending work");
418 }
419 }
420
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)421 void SimpleC2Component::cloneAndSend(
422 uint64_t frameIndex, const std::unique_ptr<C2Work> ¤tWork,
423 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
424 std::unique_ptr<C2Work> work(new C2Work);
425 if (currentWork->input.ordinal.frameIndex == frameIndex) {
426 work->input.flags = currentWork->input.flags;
427 work->input.ordinal = currentWork->input.ordinal;
428 } else {
429 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
430 if (queue->pending().count(frameIndex) == 0) {
431 ALOGW("unknown frame index: %" PRIu64, frameIndex);
432 return;
433 }
434 work->input.flags = queue->pending().at(frameIndex)->input.flags;
435 work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
436 }
437 work->worklets.emplace_back(new C2Worklet);
438 if (work) {
439 fillWork(work);
440 std::shared_ptr<C2Component::Listener> listener =
441 mExecState.lock()->mListener;
442 listener->onWorkDone_nb(shared_from_this(), vec(work));
443 DDD("cloned and sending work");
444 }
445 }
446
processQueue()447 bool SimpleC2Component::processQueue() {
448 std::unique_ptr<C2Work> work;
449 uint64_t generation;
450 int32_t drainMode;
451 bool isFlushPending = false;
452 bool hasQueuedWork = false;
453 {
454 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
455 if (queue->empty()) {
456 return false;
457 }
458
459 generation = queue->generation();
460 drainMode = queue->drainMode();
461 isFlushPending = queue->popPendingFlush();
462 work = queue->pop_front();
463 hasQueuedWork = !queue->empty();
464 }
465 if (isFlushPending) {
466 DDD("processing pending flush");
467 c2_status_t err = onFlush_sm();
468 if (err != C2_OK) {
469 ALOGD("flush err: %d", err);
470 // TODO: error
471 }
472 }
473
474 if (!mOutputBlockPool) {
475 c2_status_t err = [this] {
476 // TODO: don't use query_vb
477 C2StreamBufferTypeSetting::output outputFormat(0u);
478 std::vector<std::unique_ptr<C2Param>> params;
479 c2_status_t err = intf()->query_vb(
480 {&outputFormat}, {C2PortBlockPoolsTuning::output::PARAM_TYPE},
481 C2_DONT_BLOCK, ¶ms);
482 if (err != C2_OK && err != C2_BAD_INDEX) {
483 ALOGD("query err = %d", err);
484 return err;
485 }
486 C2BlockPool::local_id_t poolId =
487 outputFormat.value == C2BufferData::GRAPHIC
488 ? C2BlockPool::BASIC_GRAPHIC
489 : C2BlockPool::BASIC_LINEAR;
490 if (params.size()) {
491 C2PortBlockPoolsTuning::output *outputPools =
492 C2PortBlockPoolsTuning::output::From(params[0].get());
493 if (outputPools && outputPools->flexCount() >= 1) {
494 poolId = outputPools->m.values[0];
495 }
496 }
497
498 std::shared_ptr<C2BlockPool> blockPool;
499 err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
500 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
501 (unsigned long long)poolId,
502 (unsigned long long)(blockPool ? blockPool->getLocalId()
503 : 111000111),
504 err);
505 if (err == C2_OK) {
506 mOutputBlockPool =
507 std::make_shared<BlockingBlockPool>(blockPool);
508 }
509 return err;
510 }();
511 if (err != C2_OK) {
512 Mutexed<ExecState>::Locked state(mExecState);
513 std::shared_ptr<C2Component::Listener> listener = state->mListener;
514 state.unlock();
515 listener->onError_nb(shared_from_this(), err);
516 return hasQueuedWork;
517 }
518 }
519
520 if (!work) {
521 c2_status_t err = drain(drainMode, mOutputBlockPool);
522 if (err != C2_OK) {
523 Mutexed<ExecState>::Locked state(mExecState);
524 std::shared_ptr<C2Component::Listener> listener = state->mListener;
525 state.unlock();
526 listener->onError_nb(shared_from_this(), err);
527 }
528 return hasQueuedWork;
529 }
530
531 {
532 std::vector<C2Param *> updates;
533 for (const std::unique_ptr<C2Param> ¶m : work->input.configUpdate) {
534 if (param) {
535 updates.emplace_back(param.get());
536 }
537 }
538 if (!updates.empty()) {
539 std::vector<std::unique_ptr<C2SettingResult>> failures;
540 c2_status_t err =
541 intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
542 ALOGD("applied %zu configUpdates => %s (%d)", updates.size(),
543 asString(err), err);
544 }
545 }
546
547 DDD("start processing frame #%" PRIu64,
548 work->input.ordinal.frameIndex.peeku());
549 // If input buffer list is not empty, it means we have some input to process
550 // on. However, input could be a null buffer. In such case, clear the buffer
551 // list before making call to process().
552 if (!work->input.buffers.empty() && !work->input.buffers[0]) {
553 ALOGD("Encountered null input buffer. Clearing the input buffer");
554 work->input.buffers.clear();
555 }
556 process(work, mOutputBlockPool);
557 DDD("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
558 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
559 if (queue->generation() != generation) {
560 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
561 queue->generation(), generation);
562 work->result = C2_NOT_FOUND;
563 queue.unlock();
564
565 Mutexed<ExecState>::Locked state(mExecState);
566 std::shared_ptr<C2Component::Listener> listener = state->mListener;
567 state.unlock();
568 listener->onWorkDone_nb(shared_from_this(), vec(work));
569 return hasQueuedWork;
570 }
571 if (work->workletsProcessed != 0u) {
572 queue.unlock();
573 Mutexed<ExecState>::Locked state(mExecState);
574 DDD("returning this work");
575 std::shared_ptr<C2Component::Listener> listener = state->mListener;
576 state.unlock();
577 listener->onWorkDone_nb(shared_from_this(), vec(work));
578 } else {
579 work->input.buffers.clear();
580 std::unique_ptr<C2Work> unexpected;
581
582 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
583 DDD("queue pending work %" PRIu64, frameIndex);
584 if (queue->pending().count(frameIndex) != 0) {
585 unexpected = std::move(queue->pending().at(frameIndex));
586 queue->pending().erase(frameIndex);
587 }
588 (void)queue->pending().insert({frameIndex, std::move(work)});
589
590 queue.unlock();
591 if (unexpected) {
592 ALOGD("unexpected pending work");
593 unexpected->result = C2_CORRUPTED;
594 Mutexed<ExecState>::Locked state(mExecState);
595 std::shared_ptr<C2Component::Listener> listener = state->mListener;
596 state.unlock();
597 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
598 }
599 }
600 return hasQueuedWork;
601 }
602
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)603 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
604 const std::shared_ptr<C2LinearBlock> &block) {
605 return createLinearBuffer(block, block->offset(), block->size());
606 }
607
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)608 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
609 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
610 return C2Buffer::CreateLinearBuffer(
611 block->share(offset, size, ::C2Fence()));
612 }
613
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)614 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
615 const std::shared_ptr<C2GraphicBlock> &block) {
616 return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
617 }
618
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)619 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
620 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
621 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
622 }
623
624 } // namespace android
625