1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "pipeline.h"
16 #include "file_util.h"
17 #include "hiview_logger.h"
18 #include "thread_util.h"
19 #include "time_util.h"
20 namespace OHOS {
21 namespace HiviewDFX {
22 DEFINE_LOG_TAG("HiView-Pipeline");
OnRepack()23 void PipelineEvent::OnRepack()
24 {
25 startDeliver_ = false;
26 ResetTimestamp();
27 }
28
OnPending()29 void PipelineEvent::OnPending()
30 {
31 hasPending_ = true;
32 }
33
OnContinue()34 bool PipelineEvent::OnContinue()
35 {
36 if ((!hasFinish_) && processors_.empty()) {
37 return OnFinish();
38 }
39
40 // the framework will call OnContinue when the event is assigned to a pipeline
41 if (!startDeliver_) {
42 startDeliver_ = true;
43 }
44
45 std::weak_ptr<Plugin> plugin = processors_.front();
46 processors_.pop_front();
47 if (auto pluginPtr = plugin.lock()) {
48 if (!pluginPtr->CanProcessMoreEvents()) {
49 if (handler_ != nullptr) {
50 handler_->PauseDispatch(plugin);
51 }
52 }
53
54 if (!pluginPtr->IsInterestedPipelineEvent(shared_from_this())) {
55 if ((!HasFinish() && !HasPending())) {
56 return OnContinue();
57 }
58 return true;
59 }
60
61 if (auto workLoop = pluginPtr->GetWorkLoop()) {
62 workLoop->AddEvent(pluginPtr, shared_from_this());
63 } else {
64 pluginPtr->OnEventProxy(shared_from_this());
65 }
66 } else {
67 return OnContinue();
68 }
69 return true;
70 }
71
OnFinish()72 bool PipelineEvent::OnFinish()
73 {
74 {
75 uint64_t nowTime = TimeUtil::GenerateTimestamp();
76 processTime_ = nowTime > createTime_ ? (nowTime - createTime_) : 0;
77 }
78 if (handler_ != nullptr) {
79 handler_->Recycle(this);
80 }
81
82 hasFinish_ = true;
83 return true;
84 }
85
GetPendingProcessorSize()86 uint32_t PipelineEvent::GetPendingProcessorSize()
87 {
88 return processors_.size();
89 }
90
SetPipelineInfo(const std::string & pipelineName,std::list<std::weak_ptr<Plugin>> & processors)91 void PipelineEvent::SetPipelineInfo(const std::string& pipelineName, std::list<std::weak_ptr<Plugin>>& processors)
92 {
93 pipelineName_ = pipelineName;
94 processors_ = processors;
95 }
96
GetPipelineInfo()97 std::string PipelineEvent::GetPipelineInfo()
98 {
99 return pipelineName_;
100 }
101
FillPipelineInfo(std::shared_ptr<Plugin> caller,const std::string & pipelineName,std::shared_ptr<PipelineEvent> event,bool deliverFromCurrent)102 void PipelineEvent::FillPipelineInfo(std::shared_ptr<Plugin> caller, const std::string& pipelineName,
103 std::shared_ptr<PipelineEvent> event, bool deliverFromCurrent)
104 {
105 if (caller == nullptr || event == nullptr || caller->GetHiviewContext() == nullptr) {
106 return;
107 }
108
109 auto seq = caller->GetHiviewContext()->GetPipelineSequenceByName(pipelineName);
110 if (deliverFromCurrent) {
111 while (!seq.empty()) {
112 auto& plugin = seq.front();
113 if (auto pluginPtr = plugin.lock()) {
114 if (pluginPtr->GetName() == caller->GetName()) {
115 break;
116 }
117 }
118 seq.pop_front();
119 }
120 }
121 event->SetPipelineInfo(pipelineName, seq);
122 }
123
CanProcessEvent(std::shared_ptr<PipelineEvent> event)124 bool Pipeline::CanProcessEvent(std::shared_ptr<PipelineEvent> event)
125 {
126 if (processors_.empty()) {
127 HIVIEW_LOGI("no processor in this pipeline.");
128 return false;
129 }
130
131 std::weak_ptr<Plugin> plugin = processors_.front();
132 if (auto pluginPtr = plugin.lock()) {
133 return pluginPtr->CanProcessEvent(std::dynamic_pointer_cast<Event>(event));
134 }
135 return false;
136 }
137
ProcessEvent(std::shared_ptr<PipelineEvent> event)138 void Pipeline::ProcessEvent(std::shared_ptr<PipelineEvent> event)
139 {
140 event->SetPipelineInfo(name_, processors_);
141 event->OnContinue();
142 }
143
AppendProcessor(std::weak_ptr<Plugin> plugin)144 void Pipeline::AppendProcessor(std::weak_ptr<Plugin> plugin)
145 {
146 processors_.push_back(plugin);
147 }
148
RemoveProcessor(std::weak_ptr<Plugin> plugin)149 void Pipeline::RemoveProcessor(std::weak_ptr<Plugin> plugin)
150 {
151 processors_.remove_if([plugin](std::weak_ptr<Plugin> wp) {
152 std::shared_ptr<Plugin> cur = plugin.lock();
153 std::shared_ptr<Plugin> sp = wp.lock();
154 if (cur != nullptr && sp != nullptr) {
155 return cur == sp;
156 }
157 return false;
158 });
159 }
160 } // namespace HiviewDFX
161 } // namespace OHOS
162