• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/litert/parallel_lite_actor.h"
18 #include <utility>
19 #include <algorithm>
20 #include "src/litert/lite_mindrt.h"
21 #include "mindrt/include/mindrt.hpp"
22 #include "src/litert/kernel_exec_util.h"
23 #include "src/common/tensor_util.h"
24 #include "src/common/common.h"
25 #include "src/litert/inner_allocator.h"
26 #include "src/litert/kernel/cpu/base/partial_fusion.h"
27 
28 namespace mindspore::lite {
RunOpData(OpData<lite::Tensor> * inputs,mindspore::OpContext<lite::Tensor> * context)29 void ParallelLiteActor::RunOpData(OpData<lite::Tensor> *inputs, mindspore::OpContext<lite::Tensor> *context) {
30   if (call_actor_) {
31     LiteOpActor::RunOpData(inputs, context);
32     return;
33   }
34   auto op_uuid = context->sequential_num_;
35   input_op_datas_[op_uuid].push_back(inputs);
36   inputs_data_[inputs->index_] = inputs->data_;
37   if (input_op_datas_[op_uuid].size() < kernel_->in_tensors().size()) {
38     return;
39   }
40   auto ret = InitInputData();
41   if (ret != RET_OK) {
42     MS_LOG(ERROR) << "run kernel failed, name: " << kernel_->name();
43     context->SetFailed(ret);
44     return;
45   }
46   SetOpContext(context);
47   auto subgraph_kernel = reinterpret_cast<kernel::SubGraphKernel *>(kernel_);
48   if (MS_UNLIKELY(subgraph_kernel->GetGraphChanged())) {
49     if (PostInit() != RET_OK) {
50       MS_LOG(ERROR) << "KernelActorInit failed, name: " << kernel_->name();
51       context->SetFailed(RET_ERROR);
52       return;
53     }
54   }
55   if (MS_UNLIKELY(!finish_)) {
56     // It is uniformly cleared to prevent the residual count caused by the failure of the last run from affecting this
57     // run
58     for (auto &kernels_actor : kernels_actors_) {
59       kernels_actor->ClearReady();
60     }
61   }
62   finish_ = false;
63   output_data_count_ = 0;
64 
65   for (size_t i = 0; i < begin_readly_indexs_.size(); i++) {
66     Async(kernels_actors_[begin_readly_indexs_[i]]->GetAID(), get_actor_mgr(), &mindspore::lite::KernelsActor::Run);
67   }
68   return;
69 }
70 
AddOutputDataCount()71 void ParallelLiteActor::AddOutputDataCount() {
72   auto last_count = output_data_count_.fetch_add(1);
73   if (static_cast<size_t>(last_count + 1) == output_data_arrows_.size()) {
74     input_op_datas_.erase(op_context_->sequential_num_);
75     output_data_count_ = 0;
76     finish_ = true;
77   }
78 }
79 
DelKernelsActors()80 void ParallelLiteActor::DelKernelsActors() {
81   MS_LOG(INFO) << "start del KernelsActors.";
82   for (const auto &actor : kernels_actors_) {
83     mindspore::Terminate(actor->GetAID(), get_actor_mgr());
84   }
85   kernels_actors_.clear();
86   MS_LOG(INFO) << "end del KernelsActors.";
87 }
88 
~ParallelLiteActor()89 ParallelLiteActor::~ParallelLiteActor() { DelKernelsActors(); }
90 
KernelActorInit()91 int ParallelLiteActor::KernelActorInit() {
92   if (results_tensor_index_.size() != results_index_.size()) {
93     MS_LOG(ERROR) << "results_tensor_index_ size " << results_tensor_index_.size()
94                   << " not equal to results_index_ size" << results_index_.size();
95     return RET_ERROR;
96   }
97   size_t max_tensor_index = kernel_->out_tensors().size();
98   MS_CHECK_TRUE_MSG(std::find_if(results_tensor_index_.begin(), results_tensor_index_.end(),
99                                  [max_tensor_index](const size_t index) { return index >= max_tensor_index; }) ==
100                       results_tensor_index_.end(),
101                     RET_ERROR, "results_tensor_index_ invalid.");
102 
103   auto subgraph_kernel = reinterpret_cast<kernel::SubGraphKernel *>(kernel_);
104   kernel::KernelsArray split_kernels;
105   auto ret = subgraph_kernel->SubGraphSplitByOperator(&split_kernels);
106   if (ret != RET_OK) {
107     MS_LOG(ERROR) << "SubGraphSplitByOperator failed.";
108     return ret;
109   }
110   subgraph_kernel->SetGraphChanged(false);
111   size_t units_size = split_kernels.units.size();
112   if (units_size == 0) {
113     MS_LOG(DEBUG) << "split_kernels size is 0.";
114     call_actor_ = true;
115     return RET_OK;
116   }
117 
118   if (output_data_arrows_.size() == 0) {
119     MS_LOG(ERROR) << "output_data_arrows_ size is 0.";
120     return RET_ERROR;
121   }
122   std::vector<lite::Tensor *> graph_output_tensor;
123   for (size_t i = 0; i < output_data_arrows_.size(); i++) {
124     auto &arrow = output_data_arrows_[i];
125     if (static_cast<size_t>(arrow->from_output_index_) >= max_tensor_index) {
126       MS_LOG(ERROR) << "arrow->from_output_index_ " << arrow->from_output_index_ << " greater than tensor maximum "
127                     << max_tensor_index;
128       return RET_ERROR;
129     }
130     graph_output_tensor.push_back(kernel_->out_tensors().at(arrow->from_output_index_));
131   }
132   auto thread_pool = reinterpret_cast<ActorThreadPool *>(ctx_->thread_pool_);
133   size_t graph_output_size = graph_output_tensor.size();
134 
135   int kernels_actor_num = 0;
136   DelKernelsActors();
137   std::string kernels_actor_name = "_" + std::to_string(split_kernels.units.size()) + "_" + GetAID().Name();
138   for (auto &unit : split_kernels.units) {
139     if (unit.kernels.size() == 0) {
140       MS_LOG(ERROR) << "kernels size is 0.";
141       ret = RET_ERROR;
142       break;
143     }
144     auto kernels_actor = std::make_shared<KernelsActor>(
145       this, std::to_string(kernels_actor_num++) + kernels_actor_name + unit.kernels.front()->name(), unit.kernels);
146     if (kernels_actor == nullptr) {
147       MS_LOG(ERROR) << "new kernels_actor failed.";
148       ret = RET_ERROR;
149       break;
150     }
151     for (auto &kernel : unit.kernels) {
152       for (auto &tensor : kernel->out_tensors()) {
153         for (size_t i = 0; i < graph_output_size; i++) {
154           if (graph_output_tensor[i] == tensor) {
155             kernels_actor->SetHaveOutput(true);
156             kernels_actor->AddOutputDataArrows(output_data_arrows_[i]);
157             kernels_actor->AddOutputData(outputs_data_[i]);
158             for (size_t j = 0; j < results_tensor_index_.size(); j++) {
159               if (results_tensor_index_[j] == static_cast<size_t>(output_data_arrows_[i]->from_output_index_)) {
160                 kernels_actor->AddResultsIndex(results_index_[j]);
161               }
162             }
163           }
164         }
165       }
166     }
167     kernels_actor->SetInActorIndexs(unit.input_indexs);
168     kernels_actor->SetOutActorIndexs(unit.output_indexs);
169     kernels_actor->SetIsSignleIn(unit.input_indexs.size() <= 1);
170     kernels_actor->set_thread_pool(thread_pool);
171     kernels_actor->set_actor_mgr(get_actor_mgr());
172     this->AddKernelsActor(kernels_actor);
173     (void)mindspore::Spawn(kernels_actor);
174   }
175   if (ret != RET_OK) {
176     kernels_actors_.clear();
177     this->SetBeginReadlyIndexs({});
178   } else {
179     this->SetBeginReadlyIndexs(split_kernels.graph_input);
180   }
181 
182   return ret;
183 }
184 
PostInit()185 int ParallelLiteActor::PostInit() {
186   auto ret = PrepareOutputData();
187   if (ret != RET_OK) {
188     MS_LOG(ERROR) << "run PrepareOutputData failed, name: " << kernel_->name();
189     return ret;
190   }
191   ret = KernelActorInit();
192   if (ret != RET_OK) {
193     MS_LOG(ERROR) << "run KernelActorInit failed, name: " << kernel_->name();
194     return ret;
195   }
196   return RET_OK;
197 }
198 
CheckReadyActors(const std::vector<size_t> & indices)199 void ParallelLiteActor::CheckReadyActors(const std::vector<size_t> &indices) {
200   for (size_t i = 0; i < indices.size(); i++) {
201     if (kernels_actors_[indices[i]]->GetReady()) {
202       Async(kernels_actors_[indices[i]]->GetAID(), get_actor_mgr(), &mindspore::lite::KernelsActor::Run);
203     }
204   }
205 }
206 
Run()207 void KernelsActor::Run() {
208   mindspore::OpContext<lite::Tensor> *context = parallel_lite_actor_->OpContext();
209   const KernelCallBack &before = *reinterpret_cast<const KernelCallBack *>(context->kernel_call_back_before_);
210   const KernelCallBack &after = *reinterpret_cast<const KernelCallBack *>(context->kernel_call_back_after_);
211 
212   for (auto &kernel : nodes_) {
213     MS_ASSERT(kernel != nullptr);
214     auto ret = kernel->Execute(before, after);
215     if (MS_UNLIKELY(ret != RET_OK)) {
216       MS_LOG(ERROR) << "run kernel failed, name: " << kernel->name();
217       context->SetFailed(ret);
218       return;
219     }
220   }
221   parallel_lite_actor_->CheckReadyActors(out_actors_indexs_);
222   if (have_output_) {
223     auto output_size = output_data_arrows_.size();
224     for (size_t i = 0; i < output_size; ++i) {
225       auto data = outputs_data_[i];
226       Async(output_data_arrows_[i]->to_op_id_, get_actor_mgr(), &mindspore::OpActor<Tensor>::RunOpData, data.get(),
227             context);
228       parallel_lite_actor_->AddOutputDataCount();
229     }
230     for (auto &index : results_index_) {
231       context->SetResult(index, RET_OK);
232     }
233   }
234 }
235 }  // namespace mindspore::lite
236