1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef HIVIEW_BASE_PIPELINE_H 16 #define HIVIEW_BASE_PIPELINE_H 17 #include <memory> 18 19 #include "event.h" 20 #include "event_loop.h" 21 #include "plugin.h" 22 23 namespace OHOS { 24 namespace HiviewDFX { 25 class Pipeline; 26 27 class PipelineEventProducer; 28 29 class PipelineEvent : public Event { 30 public: PipelineEvent(const std::string & sender,PipelineEventProducer * handler)31 PipelineEvent(const std::string& sender, PipelineEventProducer* handler) 32 : Event(sender), handler_(handler), startDeliver_(false), pipelineName_("") 33 { 34 isPipeline_ = true; 35 }; 36 PipelineEvent(const Event & event)37 PipelineEvent(const Event& event) : Event(event), handler_(nullptr), startDeliver_(false), pipelineName_("") 38 { 39 isPipeline_ = true; 40 }; 41 PipelineEvent(const PipelineEvent & obj)42 PipelineEvent(const PipelineEvent& obj) 43 : Event(obj), 44 inputLogfileParamList_(std::move(obj.inputLogfileParamList_)), 45 deleteLogfileParamList_(std::move(obj.deleteLogfileParamList_)), 46 handler_(obj.handler_), 47 startDeliver_(obj.startDeliver_), 48 pipelineName_(obj.pipelineName_), 49 processors_(std::move(obj.processors_)){}; 50 51 PipelineEvent& operator=(const PipelineEvent& obj) 52 { 53 if (&obj == this) { 54 return *this; 55 } 56 57 Event::operator=(obj); 58 inputLogfileParamList_ = std::move(obj.inputLogfileParamList_); 59 deleteLogfileParamList_ = std::move(obj.deleteLogfileParamList_); 60 handler_ = obj.handler_; 61 startDeliver_ = obj.startDeliver_; 62 pipelineName_ = obj.pipelineName_; 63 processors_ = std::move(obj.processors_); 64 return *this; 65 }; 66 ~PipelineEvent()67 virtual ~PipelineEvent(){}; 68 69 void OnRepack() override; 70 void OnPending() override; 71 bool OnContinue() override; 72 bool OnFinish() override; 73 uint32_t GetPendingProcessorSize() override; 74 void SetPipelineInfo(const std::string& pipelineName, std::list<std::weak_ptr<Plugin>>& processors); 75 std::string GetPipelineInfo(); 76 std::list<std::string> inputLogfileParamList_; 77 std::list<std::string> deleteLogfileParamList_; 78 79 // transform an event into pipeline event 80 // fill with pipeline info of given name 81 // the new event share no relationship with the old one 82 // EventType should be the correct type of the event object 83 // PipelineEventType should be the type of the target repack type 84 // copy construct will be invoked in the repack process 85 template <typename EventType, typename PipelineEventType> RepackPipelineEvent(std::shared_ptr<Plugin> caller,const std::string & pipelineName,std::shared_ptr<Event> & event,bool deliverFromCurrent)86 static std::shared_ptr<PipelineEventType> RepackPipelineEvent(std::shared_ptr<Plugin> caller, 87 const std::string& pipelineName, 88 std::shared_ptr<Event>& event, 89 bool deliverFromCurrent) 90 { 91 auto base = std::static_pointer_cast<EventType>(event); 92 auto derived = std::make_shared<PipelineEventType>(*(base.get())); 93 auto pipe = std::static_pointer_cast<PipelineEvent>(derived); 94 pipe->OnRepack(); 95 FillPipelineInfo(caller, pipelineName, pipe, deliverFromCurrent); 96 return derived; 97 }; 98 99 static void FillPipelineInfo(std::shared_ptr<Plugin> caller, const std::string& pipelineName, 100 std::shared_ptr<PipelineEvent> event, bool deliverFromCurrent); 101 102 private: 103 PipelineEventProducer* handler_; 104 bool startDeliver_; 105 std::string pipelineName_; 106 std::list<std::weak_ptr<Plugin>> processors_; 107 }; 108 109 class PipelineEventProducer { 110 public: ~PipelineEventProducer()111 virtual ~PipelineEventProducer(){}; 112 // Notify event producer that an event has finish it's delivery 113 virtual void Recycle(PipelineEvent* event) = 0; 114 // Pause dispatch and schedule resume dispatch 115 virtual void PauseDispatch(std::weak_ptr<Plugin> plugin) = 0; 116 }; 117 118 class Pipeline : public std::enable_shared_from_this<Pipeline> { 119 public: Pipeline(const std::string & name,std::list<std::weak_ptr<Plugin>> & processors)120 Pipeline(const std::string& name, std::list<std::weak_ptr<Plugin>>& processors) 121 : name_(name), processors_(std::move(processors)){}; 122 bool CanProcessEvent(std::shared_ptr<PipelineEvent> event); 123 void ProcessEvent(std::shared_ptr<PipelineEvent> event); ~Pipeline()124 ~Pipeline() {}; 125 GetName()126 const std::string& GetName() const 127 { 128 return name_; 129 }; 130 GetProcessSequence()131 std::list<std::weak_ptr<Plugin>> GetProcessSequence() 132 { 133 return processors_; 134 }; 135 void AppendProcessor(std::weak_ptr<Plugin> plugin); 136 void RemoveProcessor(std::weak_ptr<Plugin> plugin); 137 private: 138 std::string name_; 139 std::list<std::weak_ptr<Plugin>> processors_; 140 }; 141 } // namespace HiviewDFX 142 } // namespace OHOS 143 #endif // HIVIEW_BASE_PIPELINE_H