• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <StreamScheduler.h>
18 #include <ImsMediaTrace.h>
19 #include <stdint.h>
20 #include <chrono>
21 #include <thread>
22 #include <algorithm>
23 
24 using namespace std::chrono;
25 
26 #define RUN_WAIT_TIMEOUT_MS  1
27 #define STOP_WAIT_TIMEOUT_MS 1000
28 
StreamScheduler()29 StreamScheduler::StreamScheduler() {}
30 
~StreamScheduler()31 StreamScheduler::~StreamScheduler()
32 {
33     Stop();
34 }
35 
RegisterNode(BaseNode * pNode)36 void StreamScheduler::RegisterNode(BaseNode* pNode)
37 {
38     if (pNode == nullptr)
39     {
40         return;
41     }
42 
43     IMLOGD2("[RegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
44     std::lock_guard<std::mutex> guard(mMutex);
45     mlistRegisteredNode.push_back(pNode);
46 }
47 
DeRegisterNode(BaseNode * pNode)48 void StreamScheduler::DeRegisterNode(BaseNode* pNode)
49 {
50     if (pNode == nullptr)
51     {
52         return;
53     }
54 
55     IMLOGD2("[DeRegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
56     std::lock_guard<std::mutex> guard(mMutex);
57     mlistRegisteredNode.remove(pNode);
58 }
59 
Start()60 void StreamScheduler::Start()
61 {
62     IMLOGD1("[Start] [%p] enter", this);
63 
64     for (auto& node : mlistRegisteredNode)
65     {
66         if (node != nullptr)
67         {
68             IMLOGD2("[Start] [%p] registered node [%s]", this, node->GetNodeName());
69         }
70     }
71 
72     if (!mlistRegisteredNode.empty())
73     {
74         IMLOGD1("[Start] [%p] Start thread", this);
75         StartThread();
76     }
77 
78     IMLOGD1("[Start] [%p] exit", this);
79 }
80 
Stop()81 void StreamScheduler::Stop()
82 {
83     IMLOGD1("[Stop] [%p] enter", this);
84 
85     if (!IsThreadStopped())
86     {
87         StopThread();
88         Awake();
89         mConditionExit.wait_timeout(STOP_WAIT_TIMEOUT_MS);
90     }
91 
92     IMLOGD1("[Stop] [%p] exit", this);
93 }
94 
Awake()95 void StreamScheduler::Awake()
96 {
97     mConditionMain.signal();
98 }
99 
RunRegisteredNode()100 void StreamScheduler::RunRegisteredNode()
101 {
102     // the list to contain non-source type node
103     std::list<BaseNode*> listNodesToRun;
104 
105     for (auto& node : mlistRegisteredNode)
106     {
107         if (node != nullptr && node->GetState() == kNodeStateRunning && !node->IsRunTime())
108         {
109             if (node->IsSourceNode())  // process the source node
110             {
111                 node->ProcessData();
112             }
113             else if (node->GetDataCount() > 0)
114             {
115                 listNodesToRun.push_back(node);  // store node to run
116             }
117         }
118     }
119 
120     while (!listNodesToRun.empty())
121     {
122         std::list<BaseNode*>::iterator maxNode =
123                 std::max_element(listNodesToRun.begin(), listNodesToRun.end(),
124                         [=](BaseNode* a, BaseNode* b)
125                         {
126                             return a->GetDataCount() < b->GetDataCount();
127                         });
128 
129         if (maxNode == listNodesToRun.end())
130         {
131             break;
132         }
133 
134         (*maxNode)->ProcessData();  // process the non runtime node
135 
136         if (IsThreadStopped())
137         {
138             break;
139         }
140 
141         listNodesToRun.remove(*maxNode);
142     };
143 }
144 
run()145 void* StreamScheduler::run()
146 {
147     IMLOGD1("[run] [%p] enter", this);
148 
149     // start nodes
150     mMutex.lock();
151 
152     for (auto& node : mlistRegisteredNode)
153     {
154         if (node != nullptr && !node->IsRunTimeStart())
155         {
156             if (node->GetState() == kNodeStateStopped && node->ProcessStart() != RESULT_SUCCESS)
157             {
158                 // TODO: report error
159                 IMLOGE0("[run] error");
160             }
161         }
162     }
163 
164     mMutex.unlock();
165 
166     while (!IsThreadStopped())
167     {
168         mMutex.lock();
169         RunRegisteredNode();
170         mMutex.unlock();
171 
172         if (IsThreadStopped())
173         {
174             break;
175         }
176 
177         mConditionMain.wait_timeout(RUN_WAIT_TIMEOUT_MS);
178     }
179 
180     mConditionExit.signal();
181     IMLOGD1("[run] [%p] exit", this);
182     return nullptr;
183 }