1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23
24 #include <inttypes.h>
25
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30
31 namespace android {
32
pop_front()33 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
34 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
35 mQueue.pop_front();
36 return work;
37 }
38
push_back(std::unique_ptr<C2Work> work)39 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
40 mQueue.push_back({ std::move(work), NO_DRAIN });
41 }
42
empty() const43 bool SimpleC2Component::WorkQueue::empty() const {
44 return mQueue.empty();
45 }
46
clear()47 void SimpleC2Component::WorkQueue::clear() {
48 mQueue.clear();
49 }
50
drainMode() const51 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
52 return mQueue.front().drainMode;
53 }
54
markDrain(uint32_t drainMode)55 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
56 mQueue.push_back({ nullptr, drainMode });
57 }
58
59 ////////////////////////////////////////////////////////////////////////////////
60
WorkHandler()61 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
62
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)63 void SimpleC2Component::WorkHandler::setComponent(
64 const std::shared_ptr<SimpleC2Component> &thiz) {
65 mThiz = thiz;
66 }
67
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)68 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
69 sp<AReplyToken> replyId;
70 CHECK(msg->senderAwaitsResponse(&replyId));
71 sp<AMessage> reply = new AMessage;
72 if (err) {
73 reply->setInt32("err", *err);
74 }
75 reply->postReply(replyId);
76 }
77
onMessageReceived(const sp<AMessage> & msg)78 void SimpleC2Component::WorkHandler::onMessageReceived(const sp<AMessage> &msg) {
79 std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
80 if (!thiz) {
81 ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
82 sp<AReplyToken> replyId;
83 if (msg->senderAwaitsResponse(&replyId)) {
84 sp<AMessage> reply = new AMessage;
85 reply->setInt32("err", C2_CORRUPTED);
86 reply->postReply(replyId);
87 }
88 return;
89 }
90
91 switch (msg->what()) {
92 case kWhatProcess: {
93 if (mRunning) {
94 if (thiz->processQueue()) {
95 (new AMessage(kWhatProcess, this))->post();
96 }
97 } else {
98 ALOGV("Ignore process message as we're not running");
99 }
100 break;
101 }
102 case kWhatInit: {
103 int32_t err = thiz->onInit();
104 Reply(msg, &err);
105 [[fallthrough]];
106 }
107 case kWhatStart: {
108 mRunning = true;
109 break;
110 }
111 case kWhatStop: {
112 int32_t err = thiz->onStop();
113 Reply(msg, &err);
114 break;
115 }
116 case kWhatReset: {
117 thiz->onReset();
118 mRunning = false;
119 Reply(msg);
120 break;
121 }
122 case kWhatRelease: {
123 thiz->onRelease();
124 mRunning = false;
125 Reply(msg);
126 break;
127 }
128 default: {
129 ALOGD("Unrecognized msg: %d", msg->what());
130 break;
131 }
132 }
133 }
134
135 ////////////////////////////////////////////////////////////////////////////////
136
137 namespace {
138
139 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon2b9191930111::DummyReadView140 DummyReadView() : C2ReadView(C2_NO_INIT) {}
141 };
142
143 } // namespace
144
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)145 SimpleC2Component::SimpleC2Component(
146 const std::shared_ptr<C2ComponentInterface> &intf)
147 : mDummyReadView(DummyReadView()),
148 mIntf(intf),
149 mLooper(new ALooper),
150 mHandler(new WorkHandler) {
151 mLooper->setName(intf->getName().c_str());
152 (void)mLooper->registerHandler(mHandler);
153 mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
154 }
155
~SimpleC2Component()156 SimpleC2Component::~SimpleC2Component() {
157 mLooper->unregisterHandler(mHandler->id());
158 (void)mLooper->stop();
159 }
160
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)161 c2_status_t SimpleC2Component::setListener_vb(
162 const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
163 mHandler->setComponent(shared_from_this());
164
165 Mutexed<ExecState>::Locked state(mExecState);
166 if (state->mState == RUNNING) {
167 if (listener) {
168 return C2_BAD_STATE;
169 } else if (!mayBlock) {
170 return C2_BLOCKING;
171 }
172 }
173 state->mListener = listener;
174 // TODO: wait for listener change to have taken place before returning
175 // (e.g. if there is an ongoing listener callback)
176 return C2_OK;
177 }
178
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)179 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
180 {
181 Mutexed<ExecState>::Locked state(mExecState);
182 if (state->mState != RUNNING) {
183 return C2_BAD_STATE;
184 }
185 }
186 bool queueWasEmpty = false;
187 {
188 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
189 queueWasEmpty = queue->empty();
190 while (!items->empty()) {
191 queue->push_back(std::move(items->front()));
192 items->pop_front();
193 }
194 }
195 if (queueWasEmpty) {
196 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
197 }
198 return C2_OK;
199 }
200
announce_nb(const std::vector<C2WorkOutline> & items)201 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
202 (void)items;
203 return C2_OMITTED;
204 }
205
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)206 c2_status_t SimpleC2Component::flush_sm(
207 flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
208 (void)flushMode;
209 {
210 Mutexed<ExecState>::Locked state(mExecState);
211 if (state->mState != RUNNING) {
212 return C2_BAD_STATE;
213 }
214 }
215 {
216 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
217 queue->incGeneration();
218 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
219 while (!queue->empty()) {
220 std::unique_ptr<C2Work> work = queue->pop_front();
221 if (work) {
222 flushedWork->push_back(std::move(work));
223 }
224 }
225 }
226 {
227 Mutexed<PendingWork>::Locked pending(mPendingWork);
228 while (!pending->empty()) {
229 flushedWork->push_back(std::move(pending->begin()->second));
230 pending->erase(pending->begin());
231 }
232 }
233
234 return C2_OK;
235 }
236
drain_nb(drain_mode_t drainMode)237 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
238 if (drainMode == DRAIN_CHAIN) {
239 return C2_OMITTED;
240 }
241 {
242 Mutexed<ExecState>::Locked state(mExecState);
243 if (state->mState != RUNNING) {
244 return C2_BAD_STATE;
245 }
246 }
247 bool queueWasEmpty = false;
248 {
249 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
250 queueWasEmpty = queue->empty();
251 queue->markDrain(drainMode);
252 }
253 if (queueWasEmpty) {
254 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
255 }
256
257 return C2_OK;
258 }
259
start()260 c2_status_t SimpleC2Component::start() {
261 Mutexed<ExecState>::Locked state(mExecState);
262 if (state->mState == RUNNING) {
263 return C2_BAD_STATE;
264 }
265 bool needsInit = (state->mState == UNINITIALIZED);
266 state.unlock();
267 if (needsInit) {
268 sp<AMessage> reply;
269 (new AMessage(WorkHandler::kWhatInit, mHandler))->postAndAwaitResponse(&reply);
270 int32_t err;
271 CHECK(reply->findInt32("err", &err));
272 if (err != C2_OK) {
273 return (c2_status_t)err;
274 }
275 } else {
276 (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
277 }
278 state.lock();
279 state->mState = RUNNING;
280 return C2_OK;
281 }
282
stop()283 c2_status_t SimpleC2Component::stop() {
284 ALOGV("stop");
285 {
286 Mutexed<ExecState>::Locked state(mExecState);
287 if (state->mState != RUNNING) {
288 return C2_BAD_STATE;
289 }
290 state->mState = STOPPED;
291 }
292 {
293 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
294 queue->clear();
295 }
296 {
297 Mutexed<PendingWork>::Locked pending(mPendingWork);
298 pending->clear();
299 }
300 sp<AMessage> reply;
301 (new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
302 int32_t err;
303 CHECK(reply->findInt32("err", &err));
304 if (err != C2_OK) {
305 return (c2_status_t)err;
306 }
307 return C2_OK;
308 }
309
reset()310 c2_status_t SimpleC2Component::reset() {
311 ALOGV("reset");
312 {
313 Mutexed<ExecState>::Locked state(mExecState);
314 state->mState = UNINITIALIZED;
315 }
316 {
317 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
318 queue->clear();
319 }
320 {
321 Mutexed<PendingWork>::Locked pending(mPendingWork);
322 pending->clear();
323 }
324 sp<AMessage> reply;
325 (new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
326 return C2_OK;
327 }
328
release()329 c2_status_t SimpleC2Component::release() {
330 ALOGV("release");
331 sp<AMessage> reply;
332 (new AMessage(WorkHandler::kWhatRelease, mHandler))->postAndAwaitResponse(&reply);
333 return C2_OK;
334 }
335
intf()336 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
337 return mIntf;
338 }
339
340 namespace {
341
vec(std::unique_ptr<C2Work> & work)342 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
343 std::list<std::unique_ptr<C2Work>> ret;
344 ret.push_back(std::move(work));
345 return ret;
346 }
347
348 } // namespace
349
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)350 void SimpleC2Component::finish(
351 uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
352 std::unique_ptr<C2Work> work;
353 {
354 Mutexed<PendingWork>::Locked pending(mPendingWork);
355 if (pending->count(frameIndex) == 0) {
356 ALOGW("unknown frame index: %" PRIu64, frameIndex);
357 return;
358 }
359 work = std::move(pending->at(frameIndex));
360 pending->erase(frameIndex);
361 }
362 if (work) {
363 fillWork(work);
364 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
365 listener->onWorkDone_nb(shared_from_this(), vec(work));
366 ALOGV("returning pending work");
367 }
368 }
369
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)370 void SimpleC2Component::cloneAndSend(
371 uint64_t frameIndex,
372 const std::unique_ptr<C2Work> ¤tWork,
373 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
374 std::unique_ptr<C2Work> work(new C2Work);
375 if (currentWork->input.ordinal.frameIndex == frameIndex) {
376 work->input.flags = currentWork->input.flags;
377 work->input.ordinal = currentWork->input.ordinal;
378 } else {
379 Mutexed<PendingWork>::Locked pending(mPendingWork);
380 if (pending->count(frameIndex) == 0) {
381 ALOGW("unknown frame index: %" PRIu64, frameIndex);
382 return;
383 }
384 work->input.flags = pending->at(frameIndex)->input.flags;
385 work->input.ordinal = pending->at(frameIndex)->input.ordinal;
386 }
387 work->worklets.emplace_back(new C2Worklet);
388 if (work) {
389 fillWork(work);
390 std::shared_ptr<C2Component::Listener> listener = mExecState.lock()->mListener;
391 listener->onWorkDone_nb(shared_from_this(), vec(work));
392 ALOGV("cloned and sending work");
393 }
394 }
395
processQueue()396 bool SimpleC2Component::processQueue() {
397 std::unique_ptr<C2Work> work;
398 uint64_t generation;
399 int32_t drainMode;
400 bool isFlushPending = false;
401 bool hasQueuedWork = false;
402 {
403 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
404 if (queue->empty()) {
405 return false;
406 }
407
408 generation = queue->generation();
409 drainMode = queue->drainMode();
410 isFlushPending = queue->popPendingFlush();
411 work = queue->pop_front();
412 hasQueuedWork = !queue->empty();
413 }
414 if (isFlushPending) {
415 ALOGV("processing pending flush");
416 c2_status_t err = onFlush_sm();
417 if (err != C2_OK) {
418 ALOGD("flush err: %d", err);
419 // TODO: error
420 }
421 }
422
423 if (!mOutputBlockPool) {
424 c2_status_t err = [this] {
425 // TODO: don't use query_vb
426 C2StreamFormatConfig::output outputFormat(0u);
427 std::vector<std::unique_ptr<C2Param>> params;
428 c2_status_t err = intf()->query_vb(
429 { &outputFormat },
430 { C2PortBlockPoolsTuning::output::PARAM_TYPE },
431 C2_DONT_BLOCK,
432 ¶ms);
433 if (err != C2_OK && err != C2_BAD_INDEX) {
434 ALOGD("query err = %d", err);
435 return err;
436 }
437 C2BlockPool::local_id_t poolId =
438 outputFormat.value == C2FormatVideo
439 ? C2BlockPool::BASIC_GRAPHIC
440 : C2BlockPool::BASIC_LINEAR;
441 if (params.size()) {
442 C2PortBlockPoolsTuning::output *outputPools =
443 C2PortBlockPoolsTuning::output::From(params[0].get());
444 if (outputPools && outputPools->flexCount() >= 1) {
445 poolId = outputPools->m.values[0];
446 }
447 }
448
449 err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
450 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
451 (unsigned long long)poolId,
452 (unsigned long long)(
453 mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
454 err);
455 return err;
456 }();
457 if (err != C2_OK) {
458 Mutexed<ExecState>::Locked state(mExecState);
459 std::shared_ptr<C2Component::Listener> listener = state->mListener;
460 state.unlock();
461 listener->onError_nb(shared_from_this(), err);
462 return hasQueuedWork;
463 }
464 }
465
466 if (!work) {
467 c2_status_t err = drain(drainMode, mOutputBlockPool);
468 if (err != C2_OK) {
469 Mutexed<ExecState>::Locked state(mExecState);
470 std::shared_ptr<C2Component::Listener> listener = state->mListener;
471 state.unlock();
472 listener->onError_nb(shared_from_this(), err);
473 }
474 return hasQueuedWork;
475 }
476
477 {
478 std::vector<C2Param *> updates;
479 for (const std::unique_ptr<C2Param> ¶m: work->input.configUpdate) {
480 if (param) {
481 updates.emplace_back(param.get());
482 }
483 }
484 if (!updates.empty()) {
485 std::vector<std::unique_ptr<C2SettingResult>> failures;
486 c2_status_t err = intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
487 ALOGD("applied %zu configUpdates => %s (%d)", updates.size(), asString(err), err);
488 }
489 }
490
491 ALOGV("start processing frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
492 // If input buffer list is not empty, it means we have some input to process on.
493 // However, input could be a null buffer. In such case, clear the buffer list
494 // before making call to process().
495 if (!work->input.buffers.empty() && !work->input.buffers[0]) {
496 ALOGD("Encountered null input buffer. Clearing the input buffer");
497 work->input.buffers.clear();
498 }
499 process(work, mOutputBlockPool);
500 ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
501 {
502 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
503 if (queue->generation() != generation) {
504 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
505 queue->generation(), generation);
506 work->result = C2_NOT_FOUND;
507 queue.unlock();
508 {
509 Mutexed<ExecState>::Locked state(mExecState);
510 std::shared_ptr<C2Component::Listener> listener = state->mListener;
511 state.unlock();
512 listener->onWorkDone_nb(shared_from_this(), vec(work));
513 }
514 queue.lock();
515 return hasQueuedWork;
516 }
517 }
518 if (work->workletsProcessed != 0u) {
519 Mutexed<ExecState>::Locked state(mExecState);
520 ALOGV("returning this work");
521 std::shared_ptr<C2Component::Listener> listener = state->mListener;
522 state.unlock();
523 listener->onWorkDone_nb(shared_from_this(), vec(work));
524 } else {
525 ALOGV("queue pending work");
526 work->input.buffers.clear();
527 std::unique_ptr<C2Work> unexpected;
528 {
529 Mutexed<PendingWork>::Locked pending(mPendingWork);
530 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
531 if (pending->count(frameIndex) != 0) {
532 unexpected = std::move(pending->at(frameIndex));
533 pending->erase(frameIndex);
534 }
535 (void)pending->insert({ frameIndex, std::move(work) });
536 }
537 if (unexpected) {
538 ALOGD("unexpected pending work");
539 unexpected->result = C2_CORRUPTED;
540 Mutexed<ExecState>::Locked state(mExecState);
541 std::shared_ptr<C2Component::Listener> listener = state->mListener;
542 state.unlock();
543 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
544 }
545 }
546 return hasQueuedWork;
547 }
548
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)549 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
550 const std::shared_ptr<C2LinearBlock> &block) {
551 return createLinearBuffer(block, block->offset(), block->size());
552 }
553
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)554 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
555 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
556 return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
557 }
558
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)559 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
560 const std::shared_ptr<C2GraphicBlock> &block) {
561 return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
562 }
563
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)564 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
565 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
566 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
567 }
568
569 } // namespace android
570