1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "pipeline.h"
16 #include "audit.h"
17 #include "file_util.h"
18 #include "logger.h"
19 #include "thread_util.h"
20 #include "time_util.h"
21 namespace OHOS {
22 namespace HiviewDFX {
23 DEFINE_LOG_TAG("HiView-Pipeline");
OnRepack()24 void PipelineEvent::OnRepack()
25 {
26 startDeliver_ = false;
27 ResetTimestamp();
28 }
29
OnPending()30 void PipelineEvent::OnPending()
31 {
32 hasPending_ = true;
33 }
34
OnContinue()35 bool PipelineEvent::OnContinue()
36 {
37 if ((!hasFinish_) && processors_.empty()) {
38 return OnFinish();
39 }
40
41 // once the event start delivering
42 // the call OnContinue means one has done the processing of the event
43 // this may be called by upstream event processor or the framework
44 if (Audit::IsEnabled() && startDeliver_) {
45 Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
46 createTime_, std::to_string(Thread::GetTid()));
47 }
48
49 // the framework will call OnContinue when the event is assigned to a pipeline
50 if (!startDeliver_) {
51 startDeliver_ = true;
52 }
53
54 std::weak_ptr<Plugin> plugin = processors_.front();
55 processors_.pop_front();
56 if (auto pluginPtr = plugin.lock()) {
57 if (!pluginPtr->CanProcessMoreEvents()) {
58 if (handler_ != nullptr) {
59 handler_->PauseDispatch(plugin);
60 }
61 }
62
63 if (Audit::IsEnabled()) {
64 Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_IN, createTime_,
65 pluginPtr->GetHandlerInfo());
66 }
67
68 if (!pluginPtr->IsInterestedPipelineEvent(shared_from_this())) {
69 if ((!HasFinish() && !HasPending())) {
70 return OnContinue();
71 }
72 return true;
73 }
74
75 if (auto workLoop = pluginPtr->GetWorkLoop()) {
76 workLoop->AddEvent(pluginPtr, shared_from_this());
77 } else {
78 pluginPtr->OnEventProxy(shared_from_this());
79 }
80 } else {
81 return OnContinue();
82 }
83 return true;
84 }
85
OnFinish()86 bool PipelineEvent::OnFinish()
87 {
88 processTime_ = TimeUtil::GenerateTimestamp() - createTime_;
89 if (handler_ != nullptr) {
90 handler_->Recycle(this);
91 }
92
93 hasFinish_ = true;
94 if (Audit::IsEnabled()) {
95 Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
96 createTime_, std::to_string(Thread::GetTid()));
97 Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_DONE, createTime_, pipelineName_);
98 }
99 return true;
100 }
101
GetPendingProcessorSize()102 uint32_t PipelineEvent::GetPendingProcessorSize()
103 {
104 return processors_.size();
105 }
106
SetPipelineInfo(const std::string & pipelineName,std::list<std::weak_ptr<Plugin>> & processors)107 void PipelineEvent::SetPipelineInfo(const std::string& pipelineName, std::list<std::weak_ptr<Plugin>>& processors)
108 {
109 pipelineName_ = pipelineName;
110 processors_ = processors;
111 }
112
GetPipelineInfo()113 std::string PipelineEvent::GetPipelineInfo()
114 {
115 return pipelineName_;
116 }
117
FillPipelineInfo(std::shared_ptr<Plugin> caller,const std::string & pipelineName,std::shared_ptr<PipelineEvent> event,bool deliverFromCurrent)118 void PipelineEvent::FillPipelineInfo(std::shared_ptr<Plugin> caller, const std::string& pipelineName,
119 std::shared_ptr<PipelineEvent> event, bool deliverFromCurrent)
120 {
121 if (caller == nullptr || event == nullptr || caller->GetHiviewContext() == nullptr) {
122 return;
123 }
124
125 auto seq = caller->GetHiviewContext()->GetPipelineSequenceByName(pipelineName);
126 if (deliverFromCurrent) {
127 while (!seq.empty()) {
128 auto& plugin = seq.front();
129 if (auto pluginPtr = plugin.lock()) {
130 if (pluginPtr->GetName() == caller->GetName()) {
131 break;
132 }
133 }
134 seq.pop_front();
135 }
136 }
137 event->SetPipelineInfo(pipelineName, seq);
138 }
139
CanProcessEvent(std::shared_ptr<PipelineEvent> event)140 bool Pipeline::CanProcessEvent(std::shared_ptr<PipelineEvent> event)
141 {
142 if (processors_.empty()) {
143 HIVIEW_LOGI("no processor in this pipeline.");
144 return false;
145 }
146
147 std::weak_ptr<Plugin> plugin = processors_.front();
148 if (auto pluginPtr = plugin.lock()) {
149 return pluginPtr->CanProcessEvent(std::dynamic_pointer_cast<Event>(event));
150 }
151 return false;
152 }
153
ProcessEvent(std::shared_ptr<PipelineEvent> event)154 void Pipeline::ProcessEvent(std::shared_ptr<PipelineEvent> event)
155 {
156 event->SetPipelineInfo(name_, processors_);
157 event->OnContinue();
158 }
159
AppendProcessor(std::weak_ptr<Plugin> plugin)160 void Pipeline::AppendProcessor(std::weak_ptr<Plugin> plugin)
161 {
162 processors_.push_back(plugin);
163 }
164
RemoveProcessor(std::weak_ptr<Plugin> plugin)165 void Pipeline::RemoveProcessor(std::weak_ptr<Plugin> plugin)
166 {
167 processors_.remove_if([plugin](std::weak_ptr<Plugin> wp) {
168 std::shared_ptr<Plugin> cur = plugin.lock();
169 std::shared_ptr<Plugin> sp = wp.lock();
170 if (cur != nullptr && sp != nullptr) {
171 return cur == sp;
172 }
173 return false;
174 });
175 }
176 } // namespace HiviewDFX
177 } // namespace OHOS
178