1 /**
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <StreamScheduler.h>
18 #include <ImsMediaTrace.h>
19 #include <stdint.h>
20 #include <chrono>
21 #include <thread>
22 #include <algorithm>
23
24 using namespace std::chrono;
25
26 #define RUN_WAIT_TIMEOUT_MS 1
27 #define STOP_WAIT_TIMEOUT_MS 1000
28
StreamScheduler()29 StreamScheduler::StreamScheduler() {}
30
~StreamScheduler()31 StreamScheduler::~StreamScheduler()
32 {
33 Stop();
34 }
35
RegisterNode(BaseNode * pNode)36 void StreamScheduler::RegisterNode(BaseNode* pNode)
37 {
38 if (pNode == nullptr)
39 {
40 return;
41 }
42
43 IMLOGD2("[RegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
44 std::lock_guard<std::mutex> guard(mMutex);
45 mlistRegisteredNode.push_back(pNode);
46 }
47
DeRegisterNode(BaseNode * pNode)48 void StreamScheduler::DeRegisterNode(BaseNode* pNode)
49 {
50 if (pNode == nullptr)
51 {
52 return;
53 }
54
55 IMLOGD2("[DeRegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
56 std::lock_guard<std::mutex> guard(mMutex);
57 mlistRegisteredNode.remove(pNode);
58 }
59
Start()60 void StreamScheduler::Start()
61 {
62 IMLOGD1("[Start] [%p] enter", this);
63
64 for (auto& node : mlistRegisteredNode)
65 {
66 if (node != nullptr)
67 {
68 IMLOGD2("[Start] [%p] registered node [%s]", this, node->GetNodeName());
69 }
70 }
71
72 if (!mlistRegisteredNode.empty())
73 {
74 IMLOGD1("[Start] [%p] Start thread", this);
75 StartThread();
76 }
77
78 IMLOGD1("[Start] [%p] exit", this);
79 }
80
Stop()81 void StreamScheduler::Stop()
82 {
83 IMLOGD1("[Stop] [%p] enter", this);
84
85 if (!IsThreadStopped())
86 {
87 StopThread();
88 Awake();
89 mConditionExit.wait_timeout(STOP_WAIT_TIMEOUT_MS);
90 }
91
92 IMLOGD1("[Stop] [%p] exit", this);
93 }
94
Awake()95 void StreamScheduler::Awake()
96 {
97 mConditionMain.signal();
98 }
99
RunRegisteredNode()100 void StreamScheduler::RunRegisteredNode()
101 {
102 // the list to contain non-source type node
103 std::list<BaseNode*> listNodesToRun;
104
105 for (auto& node : mlistRegisteredNode)
106 {
107 if (node != nullptr && node->GetState() == kNodeStateRunning && !node->IsRunTime())
108 {
109 if (node->IsSourceNode()) // process the source node
110 {
111 node->ProcessData();
112 }
113 else if (node->GetDataCount() > 0)
114 {
115 listNodesToRun.push_back(node); // store node to run
116 }
117 }
118 }
119
120 while (!listNodesToRun.empty())
121 {
122 std::list<BaseNode*>::iterator maxNode =
123 std::max_element(listNodesToRun.begin(), listNodesToRun.end(),
124 [=](BaseNode* a, BaseNode* b)
125 {
126 return a->GetDataCount() < b->GetDataCount();
127 });
128
129 if (maxNode == listNodesToRun.end())
130 {
131 break;
132 }
133
134 (*maxNode)->ProcessData(); // process the non runtime node
135
136 if (IsThreadStopped())
137 {
138 break;
139 }
140
141 listNodesToRun.remove(*maxNode);
142 };
143 }
144
run()145 void* StreamScheduler::run()
146 {
147 IMLOGD1("[run] [%p] enter", this);
148
149 // start nodes
150 mMutex.lock();
151
152 for (auto& node : mlistRegisteredNode)
153 {
154 if (node != nullptr && !node->IsRunTimeStart())
155 {
156 if (node->GetState() == kNodeStateStopped && node->ProcessStart() != RESULT_SUCCESS)
157 {
158 // TODO: report error
159 IMLOGE0("[run] error");
160 }
161 }
162 }
163
164 mMutex.unlock();
165
166 while (!IsThreadStopped())
167 {
168 mMutex.lock();
169 RunRegisteredNode();
170 mMutex.unlock();
171
172 if (IsThreadStopped())
173 {
174 break;
175 }
176
177 mConditionMain.wait_timeout(RUN_WAIT_TIMEOUT_MS);
178 }
179
180 mConditionExit.signal();
181 IMLOGD1("[run] [%p] exit", this);
182 return nullptr;
183 }