• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "pipeline.h"
16 #include "audit.h"
17 #include "file_util.h"
18 #include "logger.h"
19 #include "thread_util.h"
20 #include "time_util.h"
21 namespace OHOS {
22 namespace HiviewDFX {
23 DEFINE_LOG_TAG("HiView-Pipeline");
OnRepack()24 void PipelineEvent::OnRepack()
25 {
26     startDeliver_ = false;
27     ResetTimestamp();
28 }
29 
OnPending()30 void PipelineEvent::OnPending()
31 {
32     hasPending_ = true;
33 }
34 
OnContinue()35 bool PipelineEvent::OnContinue()
36 {
37     if ((!hasFinish_) && processors_.empty()) {
38         return OnFinish();
39     }
40 
41     // once the event start delivering
42     // the call OnContinue means one has done the processing of the event
43     // this may be called by upstream event processor or the framework
44     if (Audit::IsEnabled() && startDeliver_) {
45         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
46             createTime_, std::to_string(Thread::GetTid()));
47     }
48 
49     // the framework will call OnContinue when the event is assigned to a pipeline
50     if (!startDeliver_) {
51         startDeliver_ = true;
52     }
53 
54     std::weak_ptr<Plugin> plugin = processors_.front();
55     processors_.pop_front();
56     if (auto pluginPtr = plugin.lock()) {
57         if (!pluginPtr->CanProcessMoreEvents()) {
58             if (handler_ != nullptr) {
59                 handler_->PauseDispatch(plugin);
60             }
61         }
62 
63         if (Audit::IsEnabled()) {
64             Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_IN, createTime_,
65                                    pluginPtr->GetHandlerInfo());
66         }
67 
68         if (!pluginPtr->IsInterestedPipelineEvent(shared_from_this())) {
69             if ((!HasFinish() && !HasPending())) {
70                 return OnContinue();
71             }
72             return true;
73         }
74 
75         if (auto workLoop = pluginPtr->GetWorkLoop()) {
76             workLoop->AddEvent(pluginPtr, shared_from_this());
77         } else {
78             pluginPtr->OnEventProxy(shared_from_this());
79         }
80     } else {
81         return OnContinue();
82     }
83     return true;
84 }
85 
OnFinish()86 bool PipelineEvent::OnFinish()
87 {
88     processTime_ = TimeUtil::GenerateTimestamp() - createTime_;
89     if (handler_ != nullptr) {
90         handler_->Recycle(this);
91     }
92 
93     hasFinish_ = true;
94     if (Audit::IsEnabled()) {
95         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_HANDLE_OUT,
96             createTime_, std::to_string(Thread::GetTid()));
97         Audit::WriteAuditEvent(Audit::StatsEvent::PIPELINE_EVENT_DONE, createTime_, pipelineName_);
98     }
99     return true;
100 }
101 
GetPendingProcessorSize()102 uint32_t PipelineEvent::GetPendingProcessorSize()
103 {
104     return processors_.size();
105 }
106 
SetPipelineInfo(const std::string & pipelineName,std::list<std::weak_ptr<Plugin>> & processors)107 void PipelineEvent::SetPipelineInfo(const std::string& pipelineName, std::list<std::weak_ptr<Plugin>>& processors)
108 {
109     pipelineName_ = pipelineName;
110     processors_ = processors;
111 }
112 
GetPipelineInfo()113 std::string PipelineEvent::GetPipelineInfo()
114 {
115     return pipelineName_;
116 }
117 
FillPipelineInfo(std::shared_ptr<Plugin> caller,const std::string & pipelineName,std::shared_ptr<PipelineEvent> event,bool deliverFromCurrent)118 void PipelineEvent::FillPipelineInfo(std::shared_ptr<Plugin> caller, const std::string& pipelineName,
119                                      std::shared_ptr<PipelineEvent> event, bool deliverFromCurrent)
120 {
121     if (caller == nullptr || event == nullptr || caller->GetHiviewContext() == nullptr) {
122         return;
123     }
124 
125     auto seq = caller->GetHiviewContext()->GetPipelineSequenceByName(pipelineName);
126     if (deliverFromCurrent) {
127         while (!seq.empty()) {
128             auto& plugin = seq.front();
129             if (auto pluginPtr = plugin.lock()) {
130                 if (pluginPtr->GetName() == caller->GetName()) {
131                     break;
132                 }
133             }
134             seq.pop_front();
135         }
136     }
137     event->SetPipelineInfo(pipelineName, seq);
138 }
139 
CanProcessEvent(std::shared_ptr<PipelineEvent> event)140 bool Pipeline::CanProcessEvent(std::shared_ptr<PipelineEvent> event)
141 {
142     if (processors_.empty()) {
143         HIVIEW_LOGI("no processor in this pipeline.");
144         return false;
145     }
146 
147     std::weak_ptr<Plugin> plugin = processors_.front();
148     if (auto pluginPtr = plugin.lock()) {
149         return pluginPtr->CanProcessEvent(std::dynamic_pointer_cast<Event>(event));
150     }
151     return false;
152 }
153 
ProcessEvent(std::shared_ptr<PipelineEvent> event)154 void Pipeline::ProcessEvent(std::shared_ptr<PipelineEvent> event)
155 {
156     event->SetPipelineInfo(name_, processors_);
157     event->OnContinue();
158 }
159 
AppendProcessor(std::weak_ptr<Plugin> plugin)160 void Pipeline::AppendProcessor(std::weak_ptr<Plugin> plugin)
161 {
162     processors_.push_back(plugin);
163 }
164 
RemoveProcessor(std::weak_ptr<Plugin> plugin)165 void Pipeline::RemoveProcessor(std::weak_ptr<Plugin> plugin)
166 {
167     processors_.remove_if([plugin](std::weak_ptr<Plugin> wp) {
168         std::shared_ptr<Plugin> cur = plugin.lock();
169         std::shared_ptr<Plugin> sp = wp.lock();
170         if (cur != nullptr && sp != nullptr) {
171             return cur == sp;
172         }
173         return false;
174     });
175 }
176 } // namespace HiviewDFX
177 } // namespace OHOS
178