1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23
24 #include <inttypes.h>
25
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30
31 namespace android {
32
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35 mQueue.pop_front();
36 return work;
37 }
38
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40 mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44 return mQueue.empty();
45 }
46
clear()47 void SimpleC2Component::WorkQueue::clear() {
48 mQueue.clear();
49 }
50
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52 return mQueue.front().drainMode;
53 }
54
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56 mQueue.push_back({ nullptr, drainMode });
57 }
58
59 ////////////////////////////////////////////////////////////////////////////////
60
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64 const std::shared_ptr<SimpleC2Component> &thiz) {
65 mThiz = thiz;
66 }
67
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69 sp<AReplyToken> replyId;
70 CHECK(msg->senderAwaitsResponse(&replyId));
71 sp<AMessage> reply = new AMessage;
72 if (err) {
73 reply->setInt32("err", *err);
74 }
75 reply->postReply(replyId);
76 }
77
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79 std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80 if (!thiz) {
81 ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82 sp<AReplyToken> replyId;
83 if (msg->senderAwaitsResponse(&replyId)) {
84 sp<AMessage> reply = new AMessage;
85 reply->setInt32("err", C2_CORRUPTED);
86 reply->postReply(replyId);
87 }
88 return;
89 }
90
91 switch (msg->what()) {
92 case kWhatProcess: {
93 if (mRunning) {
94 if (thiz->processQueue()) {
95 (new AMessage(kWhatProcess, this))->post();
96 }
97 } else {
98 ALOGV("Ignore process message as we're not running");
99 }
100 break;
101 }
102 case kWhatInit: {
103 int32_t err = thiz->onInit();
104 Reply(msg, &err);
105 [[fallthrough]];
106 }
107 case kWhatStart: {
108 mRunning = true;
109 break;
110 }
111 case kWhatStop: {
112 int32_t err = thiz->onStop();
113 thiz->mOutputBlockPool.reset();
114 Reply(msg, &err);
115 break;
116 }
117 case kWhatReset: {
118 thiz->onReset();
119 thiz->mOutputBlockPool.reset();
120 mRunning = false;
121 Reply(msg);
122 break;
123 }
124 case kWhatRelease: {
125 thiz->onRelease();
126 thiz->mOutputBlockPool.reset();
127 mRunning = false;
128 Reply(msg);
129 break;
130 }
131 default: {
132 ALOGD("Unrecognized msg: %d", msg->what());
133 break;
134 }
135 }
136 }
137
138 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
139 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)140 BlockingBlockPool(const std::shared_ptr<C2BlockPool>& base): mBase{base} {}
141
getLocalId() const142 virtual local_id_t getLocalId() const override {
143 return mBase->getLocalId();
144 }
145
getAllocatorId() const146 virtual C2Allocator::id_t getAllocatorId() const override {
147 return mBase->getAllocatorId();
148 }
149
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)150 virtual c2_status_t fetchLinearBlock(
151 uint32_t capacity,
152 C2MemoryUsage usage,
153 std::shared_ptr<C2LinearBlock>* block) {
154 c2_status_t status;
155 do {
156 status = mBase->fetchLinearBlock(capacity, usage, block);
157 } while (status == C2_BLOCKING);
158 return status;
159 }
160
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)161 virtual c2_status_t fetchCircularBlock(
162 uint32_t capacity,
163 C2MemoryUsage usage,
164 std::shared_ptr<C2CircularBlock>* block) {
165 c2_status_t status;
166 do {
167 status = mBase->fetchCircularBlock(capacity, usage, block);
168 } while (status == C2_BLOCKING);
169 return status;
170 }
171
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)172 virtual c2_status_t fetchGraphicBlock(
173 uint32_t width, uint32_t height, uint32_t format,
174 C2MemoryUsage usage,
175 std::shared_ptr<C2GraphicBlock>* block) {
176 c2_status_t status;
177 do {
178 status = mBase->fetchGraphicBlock(width, height, format, usage,
179 block);
180 } while (status == C2_BLOCKING);
181 return status;
182 }
183
184 private:
185 std::shared_ptr<C2BlockPool> mBase;
186 };
187
188 ////////////////////////////////////////////////////////////////////////////////
189
190 namespace {
191
192 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon43dcb38c0111::DummyReadView193 DummyReadView() : C2ReadView(C2_NO_INIT) {}
194 };
195
196 } // namespace
197
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)198 SimpleC2Component::SimpleC2Component(
199 const std::shared_ptr<C2ComponentInterface> &intf)
200 : mDummyReadView(DummyReadView()),
201 mIntf(intf),
202 mLooper(new ALooper),
203 mHandler(new WorkHandler) {
204 mLooper->setName(intf->getName().c_str());
205 (void)mLooper->registerHandler(mHandler);
206 mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
207 }
208
~SimpleC2Component()209 SimpleC2Component::~SimpleC2Component() {
210 mLooper->unregisterHandler(mHandler->id());
211 (void)mLooper->stop();
212 }
213
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)214 c2_status_t SimpleC2Component::setListener_vb(
215 const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
216 mHandler->setComponent(shared_from_this());
217
218 Mutexed<ExecState>::Locked state(mExecState);
219 if (state->mState == RUNNING) {
220 if (listener) {
221 return C2_BAD_STATE;
222 } else if (!mayBlock) {
223 return C2_BLOCKING;
224 }
225 }
226 state->mListener = listener;
227 // TODO: wait for listener change to have taken place before returning
228 // (e.g. if there is an ongoing listener callback)
229 return C2_OK;
230 }
231
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)232 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
233 {
234 Mutexed<ExecState>::Locked state(mExecState);
235 if (state->mState != RUNNING) {
236 return C2_BAD_STATE;
237 }
238 }
239 bool queueWasEmpty = false;
240 {
241 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
242 queueWasEmpty = queue->empty();
243 while (!items->empty()) {
244 queue->push_back(std::move(items->front()));
245 items->pop_front();
246 }
247 }
248 if (queueWasEmpty) {
249 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
250 }
251 return C2_OK;
252 }
253
announce_nb(const std::vector<C2WorkOutline> & items)254 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
255 (void)items;
256 return C2_OMITTED;
257 }
258
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)259 c2_status_t SimpleC2Component::flush_sm(
260 flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
261 (void)flushMode;
262 {
263 Mutexed<ExecState>::Locked state(mExecState);
264 if (state->mState != RUNNING) {
265 return C2_BAD_STATE;
266 }
267 }
268 {
269 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
270 queue->incGeneration();
271 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
272 while (!queue->empty()) {
273 std::unique_ptr<C2Work> work = queue->pop_front();
274 if (work) {
275 flushedWork->push_back(std::move(work));
276 }
277 }
278 while (!queue->pending().empty()) {
279 flushedWork->push_back(std::move(queue->pending().begin()->second));
280 queue->pending().erase(queue->pending().begin());
281 }
282 }
283
284 return C2_OK;
285 }
286
drain_nb(drain_mode_t drainMode)287 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
288 if (drainMode == DRAIN_CHAIN) {
289 return C2_OMITTED;
290 }
291 {
292 Mutexed<ExecState>::Locked state(mExecState);
293 if (state->mState != RUNNING) {
294 return C2_BAD_STATE;
295 }
296 }
297 bool queueWasEmpty = false;
298 {
299 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
300 queueWasEmpty = queue->empty();
301 queue->markDrain(drainMode);
302 }
303 if (queueWasEmpty) {
304 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
305 }
306
307 return C2_OK;
308 }
309
start()310 c2_status_t SimpleC2Component::start() {
311 Mutexed<ExecState>::Locked state(mExecState);
312 if (state->mState == RUNNING) {
313 return C2_BAD_STATE;
314 }
315 bool needsInit = (state->mState == UNINITIALIZED);
316 state.unlock();
317 if (needsInit) {
318 sp<AMessage> reply;
319 (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
320 int32_t err;
321 CHECK(reply->findInt32("err", &err));
322 if (err != C2_OK) {
323 return (c2_status_t)err;
324 }
325 } else {
326 (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
327 }
328 state.lock();
329 state->mState = RUNNING;
330 return C2_OK;
331 }
332
stop()333 c2_status_t SimpleC2Component::stop() {
334 ALOGV("stop");
335 {
336 Mutexed<ExecState>::Locked state(mExecState);
337 if (state->mState != RUNNING) {
338 return C2_BAD_STATE;
339 }
340 state->mState = STOPPED;
341 }
342 {
343 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
344 queue->clear();
345 queue->pending().clear();
346 }
347 sp<AMessage> reply;
348 (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
349 int32_t err;
350 CHECK(reply->findInt32("err", &err));
351 if (err != C2_OK) {
352 return (c2_status_t)err;
353 }
354 return C2_OK;
355 }
356
reset()357 c2_status_t SimpleC2Component::reset() {
358 ALOGV("reset");
359 {
360 Mutexed<ExecState>::Locked state(mExecState);
361 state->mState = UNINITIALIZED;
362 }
363 {
364 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
365 queue->clear();
366 queue->pending().clear();
367 }
368 sp<AMessage> reply;
369 (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
370 return C2_OK;
371 }
372
release()373 c2_status_t SimpleC2Component::release() {
374 ALOGV("release");
375 sp<AMessage> reply;
376 (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
377 return C2_OK;
378 }
379
intf()380 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
381 return mIntf;
382 }
383
384 namespace {
385
vec(std::unique_ptr<C2Work> & work)386 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
387 std::list<std::unique_ptr<C2Work>> ret;
388 ret.push_back(std::move(work));
389 return ret;
390 }
391
392 } // namespace
393
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)394 void SimpleC2Component::finish(
395 uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
396 std::unique_ptr<C2Work> work;
397 {
398 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
399 if (queue->pending().count(frameIndex) == 0) {
400 ALOGW("unknown frame index: %" PRIu64, frameIndex);
401 return;
402 }
403 work = std::move(queue->pending().at(frameIndex));
404 queue->pending().erase(frameIndex);
405 }
406 if (work) {
407 fillWork(work);
408 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
409 listener->onWorkDone_nb(shared_from_this(), vec(work));
410 ALOGV("returning pending work");
411 }
412 }
413
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)414 void SimpleC2Component::cloneAndSend(
415 uint64_t frameIndex,
416 const std::unique_ptr<C2Work> ¤tWork,
417 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
418 std::unique_ptr<C2Work> work(new C2Work);
419 if (currentWork->input.ordinal.frameIndex == frameIndex) {
420 work->input.flags = currentWork->input.flags;
421 work->input.ordinal = currentWork->input.ordinal;
422 } else {
423 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
424 if (queue->pending().count(frameIndex) == 0) {
425 ALOGW("unknown frame index: %" PRIu64, frameIndex);
426 return;
427 }
428 work->input.flags = queue->pending().at(frameIndex)->input.flags;
429 work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
430 }
431 work->worklets.emplace_back(new C2Worklet);
432 if (work) {
433 fillWork(work);
434 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
435 listener->onWorkDone_nb(shared_from_this(), vec(work));
436 ALOGV("cloned and sending work");
437 }
438 }
439
processQueue()440 bool SimpleC2Component::processQueue() {
441 std::unique_ptr<C2Work> work;
442 uint64_t generation;
443 int32_t drainMode;
444 bool isFlushPending = false;
445 bool hasQueuedWork = false;
446 {
447 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
448 if (queue->empty()) {
449 return false;
450 }
451
452 generation = queue->generation();
453 drainMode = queue->drainMode();
454 isFlushPending = queue->popPendingFlush();
455 work = queue->pop_front();
456 hasQueuedWork = !queue->empty();
457 }
458 if (isFlushPending) {
459 ALOGV("processing pending flush");
460 c2_status_t err = onFlush_sm();
461 if (err != C2_OK) {
462 ALOGD("flush err: %d", err);
463 // TODO: error
464 }
465 }
466
467 if (!mOutputBlockPool) {
468 c2_status_t err = [this] {
469 // TODO: don't use query_vb
470 C2StreamBufferTypeSetting::output outputFormat(0u);
471 std::vector<std::unique_ptr<C2Param>> params;
472 c2_status_t err = intf()->query_vb(
473 { &outputFormat },
474 { C2PortBlockPoolsTuning::output::PARAM_TYPE },
475 C2_DONT_BLOCK,
476 ¶ms);
477 if (err != C2_OK && err != C2_BAD_INDEX) {
478 ALOGD("query err = %d", err);
479 return err;
480 }
481 C2BlockPool::local_id_t poolId =
482 outputFormat.value == C2BufferData::GRAPHIC
483 ? C2BlockPool::BASIC_GRAPHIC
484 : C2BlockPool::BASIC_LINEAR;
485 if (params.size()) {
486 C2PortBlockPoolsTuning::output *outputPools =
487 C2PortBlockPoolsTuning::output::From(params[0].get());
488 if (outputPools && outputPools->flexCount() >= 1) {
489 poolId = outputPools->m.values[0];
490 }
491 }
492
493 std::shared_ptr<C2BlockPool> blockPool;
494 err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
495 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
496 (unsigned long long)poolId,
497 (unsigned long long)(
498 blockPool ? blockPool->getLocalId() : 111000111),
499 err);
500 if (err == C2_OK) {
501 mOutputBlockPool = std::make_shared<BlockingBlockPool>(blockPool);
502 }
503 return err;
504 }();
505 if (err != C2_OK) {
506 Mutexed<ExecState>::Locked state(mExecState);
507 std::shared_ptr<C2Component::Listener> listener = state->mListener;
508 state.unlock();
509 listener->onError_nb(shared_from_this(), err);
510 return hasQueuedWork;
511 }
512 }
513
514 if (!work) {
515 c2_status_t err = drain(drainMode, mOutputBlockPool);
516 if (err != C2_OK) {
517 Mutexed<ExecState>::Locked state(mExecState);
518 std::shared_ptr<C2Component::Listener> listener = state->mListener;
519 state.unlock();
520 listener->onError_nb(shared_from_this(), err);
521 }
522 return hasQueuedWork;
523 }
524
525 {
526 std::vector<C2Param *> updates;
527 for (const std::unique_ptr<C2Param> ¶m: work->input.configUpdate) {
528 if (param) {
529 updates.emplace_back(param.get());
530 }
531 }
532 if (!updates.empty()) {
533 std::vector<std::unique_ptr<C2SettingResult>> failures;
534 c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
535 ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
536 }
537 }
538
539 ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
540 // If input buffer list is not empty, it means we have some input to process on.
541 // However, input could be a null buffer. In such case, clear the buffer list
542 // before making call to process().
543 if (!work->input.buffers.empty() && !work->input.buffers[0]) {
544 ALOGD("Encountered null input buffer. Clearing the input buffer");
545 work->input.buffers.clear();
546 }
547 process(work, mOutputBlockPool);
548 ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
549 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
550 if (queue->generation() != generation) {
551 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
552 queue->generation(), generation);
553 work->result = C2_NOT_FOUND;
554 queue.unlock();
555
556 Mutexed<ExecState>::Locked state(mExecState);
557 std::shared_ptr<C2Component::Listener> listener = state->mListener;
558 state.unlock();
559 listener->onWorkDone_nb(shared_from_this(), vec(work));
560 return hasQueuedWork;
561 }
562 if (work->workletsProcessed != 0u) {
563 queue.unlock();
564 Mutexed<ExecState>::Locked state(mExecState);
565 ALOGV("returning this work");
566 std::shared_ptr<C2Component::Listener> listener = state->mListener;
567 state.unlock();
568 listener->onWorkDone_nb(shared_from_this(), vec(work));
569 } else {
570 ALOGV("queue pending work");
571 work->input.buffers.clear();
572 std::unique_ptr<C2Work> unexpected;
573
574 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
575 if (queue->pending().count(frameIndex) != 0) {
576 unexpected = std::move(queue->pending().at(frameIndex));
577 queue->pending().erase(frameIndex);
578 }
579 (void)queue->pending().insert({ frameIndex, std::move(work) });
580
581 queue.unlock();
582 if (unexpected) {
583 ALOGD("unexpected pending work");
584 unexpected->result = C2_CORRUPTED;
585 Mutexed<ExecState>::Locked state(mExecState);
586 std::shared_ptr<C2Component::Listener> listener = state->mListener;
587 state.unlock();
588 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
589 }
590 }
591 return hasQueuedWork;
592 }
593
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)594 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
595 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
596 return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
597 }
598
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)599 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
600 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
601 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
602 }
603
604 } // namespace android
605