1 /**
2 * Copyright 2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/litert/parallel_lite_actor.h"
18 #include <utility>
19 #include <algorithm>
20 #include "src/litert/lite_mindrt.h"
21 #include "mindrt/include/mindrt.hpp"
22 #include "src/litert/kernel_exec_util.h"
23 #include "src/common/tensor_util.h"
24 #include "src/common/common.h"
25 #include "src/litert/inner_allocator.h"
26 #include "src/litert/kernel/cpu/base/partial_fusion.h"
27
28 namespace mindspore::lite {
RunOpData(OpData<lite::Tensor> * inputs,mindspore::OpContext<lite::Tensor> * context)29 void ParallelLiteActor::RunOpData(OpData<lite::Tensor> *inputs, mindspore::OpContext<lite::Tensor> *context) {
30 if (call_actor_) {
31 LiteOpActor::RunOpData(inputs, context);
32 return;
33 }
34 auto op_uuid = context->sequential_num_;
35 input_op_datas_[op_uuid].push_back(inputs);
36 inputs_data_[inputs->index_] = inputs->data_;
37 if (input_op_datas_[op_uuid].size() < kernel_->in_tensors().size()) {
38 return;
39 }
40 auto ret = InitInputData();
41 if (ret != RET_OK) {
42 MS_LOG(ERROR) << "run kernel failed, name: " << kernel_->name();
43 context->SetFailed(ret);
44 return;
45 }
46 SetOpContext(context);
47 auto subgraph_kernel = reinterpret_cast<kernel::SubGraphKernel *>(kernel_);
48 if (MS_UNLIKELY(subgraph_kernel->GetGraphChanged())) {
49 if (PostInit() != RET_OK) {
50 MS_LOG(ERROR) << "KernelActorInit failed, name: " << kernel_->name();
51 context->SetFailed(RET_ERROR);
52 return;
53 }
54 }
55 if (MS_UNLIKELY(!finish_)) {
56 // It is uniformly cleared to prevent the residual count caused by the failure of the last run from affecting this
57 // run
58 for (auto &kernels_actor : kernels_actors_) {
59 kernels_actor->ClearReady();
60 }
61 }
62 finish_ = false;
63 output_data_count_ = 0;
64
65 for (size_t i = 0; i < begin_readly_indexs_.size(); i++) {
66 Async(kernels_actors_[begin_readly_indexs_[i]]->GetAID(), get_actor_mgr(), &mindspore::lite::KernelsActor::Run);
67 }
68 return;
69 }
70
AddOutputDataCount()71 void ParallelLiteActor::AddOutputDataCount() {
72 auto last_count = output_data_count_.fetch_add(1);
73 if (static_cast<size_t>(last_count + 1) == output_data_arrows_.size()) {
74 input_op_datas_.erase(op_context_->sequential_num_);
75 output_data_count_ = 0;
76 finish_ = true;
77 }
78 }
79
DelKernelsActors()80 void ParallelLiteActor::DelKernelsActors() {
81 MS_LOG(INFO) << "start del KernelsActors.";
82 for (const auto &actor : kernels_actors_) {
83 mindspore::Terminate(actor->GetAID(), get_actor_mgr());
84 }
85 kernels_actors_.clear();
86 MS_LOG(INFO) << "end del KernelsActors.";
87 }
88
~ParallelLiteActor()89 ParallelLiteActor::~ParallelLiteActor() { DelKernelsActors(); }
90
KernelActorInit()91 int ParallelLiteActor::KernelActorInit() {
92 if (results_tensor_index_.size() != results_index_.size()) {
93 MS_LOG(ERROR) << "results_tensor_index_ size " << results_tensor_index_.size()
94 << " not equal to results_index_ size" << results_index_.size();
95 return RET_ERROR;
96 }
97 size_t max_tensor_index = kernel_->out_tensors().size();
98 MS_CHECK_TRUE_MSG(std::find_if(results_tensor_index_.begin(), results_tensor_index_.end(),
99 [max_tensor_index](const size_t index) { return index >= max_tensor_index; }) ==
100 results_tensor_index_.end(),
101 RET_ERROR, "results_tensor_index_ invalid.");
102
103 auto subgraph_kernel = reinterpret_cast<kernel::SubGraphKernel *>(kernel_);
104 kernel::KernelsArray split_kernels;
105 auto ret = subgraph_kernel->SubGraphSplitByOperator(&split_kernels);
106 if (ret != RET_OK) {
107 MS_LOG(ERROR) << "SubGraphSplitByOperator failed.";
108 return ret;
109 }
110 subgraph_kernel->SetGraphChanged(false);
111 size_t units_size = split_kernels.units.size();
112 if (units_size == 0) {
113 MS_LOG(DEBUG) << "split_kernels size is 0.";
114 call_actor_ = true;
115 return RET_OK;
116 }
117
118 if (output_data_arrows_.size() == 0) {
119 MS_LOG(ERROR) << "output_data_arrows_ size is 0.";
120 return RET_ERROR;
121 }
122 std::vector<lite::Tensor *> graph_output_tensor;
123 for (size_t i = 0; i < output_data_arrows_.size(); i++) {
124 auto &arrow = output_data_arrows_[i];
125 if (static_cast<size_t>(arrow->from_output_index_) >= max_tensor_index) {
126 MS_LOG(ERROR) << "arrow->from_output_index_ " << arrow->from_output_index_ << " greater than tensor maximum "
127 << max_tensor_index;
128 return RET_ERROR;
129 }
130 graph_output_tensor.push_back(kernel_->out_tensors().at(arrow->from_output_index_));
131 }
132 auto thread_pool = reinterpret_cast<ActorThreadPool *>(ctx_->thread_pool_);
133 size_t graph_output_size = graph_output_tensor.size();
134
135 int kernels_actor_num = 0;
136 DelKernelsActors();
137 std::string kernels_actor_name = "_" + std::to_string(split_kernels.units.size()) + "_" + GetAID().Name();
138 for (auto &unit : split_kernels.units) {
139 if (unit.kernels.size() == 0) {
140 MS_LOG(ERROR) << "kernels size is 0.";
141 ret = RET_ERROR;
142 break;
143 }
144 auto kernels_actor = std::make_shared<KernelsActor>(
145 this, std::to_string(kernels_actor_num++) + kernels_actor_name + unit.kernels.front()->name(), unit.kernels);
146 if (kernels_actor == nullptr) {
147 MS_LOG(ERROR) << "new kernels_actor failed.";
148 ret = RET_ERROR;
149 break;
150 }
151 for (auto &kernel : unit.kernels) {
152 for (auto &tensor : kernel->out_tensors()) {
153 for (size_t i = 0; i < graph_output_size; i++) {
154 if (graph_output_tensor[i] == tensor) {
155 kernels_actor->SetHaveOutput(true);
156 kernels_actor->AddOutputDataArrows(output_data_arrows_[i]);
157 kernels_actor->AddOutputData(outputs_data_[i]);
158 for (size_t j = 0; j < results_tensor_index_.size(); j++) {
159 if (results_tensor_index_[j] == static_cast<size_t>(output_data_arrows_[i]->from_output_index_)) {
160 kernels_actor->AddResultsIndex(results_index_[j]);
161 }
162 }
163 }
164 }
165 }
166 }
167 kernels_actor->SetInActorIndexs(unit.input_indexs);
168 kernels_actor->SetOutActorIndexs(unit.output_indexs);
169 kernels_actor->SetIsSignleIn(unit.input_indexs.size() <= 1);
170 kernels_actor->set_thread_pool(thread_pool);
171 kernels_actor->set_actor_mgr(get_actor_mgr());
172 this->AddKernelsActor(kernels_actor);
173 (void)mindspore::Spawn(kernels_actor);
174 }
175 if (ret != RET_OK) {
176 kernels_actors_.clear();
177 this->SetBeginReadlyIndexs({});
178 } else {
179 this->SetBeginReadlyIndexs(split_kernels.graph_input);
180 }
181
182 return ret;
183 }
184
PostInit()185 int ParallelLiteActor::PostInit() {
186 auto ret = PrepareOutputData();
187 if (ret != RET_OK) {
188 MS_LOG(ERROR) << "run PrepareOutputData failed, name: " << kernel_->name();
189 return ret;
190 }
191 ret = KernelActorInit();
192 if (ret != RET_OK) {
193 MS_LOG(ERROR) << "run KernelActorInit failed, name: " << kernel_->name();
194 return ret;
195 }
196 return RET_OK;
197 }
198
CheckReadyActors(const std::vector<size_t> & indices)199 void ParallelLiteActor::CheckReadyActors(const std::vector<size_t> &indices) {
200 for (size_t i = 0; i < indices.size(); i++) {
201 if (kernels_actors_[indices[i]]->GetReady()) {
202 Async(kernels_actors_[indices[i]]->GetAID(), get_actor_mgr(), &mindspore::lite::KernelsActor::Run);
203 }
204 }
205 }
206
Run()207 void KernelsActor::Run() {
208 mindspore::OpContext<lite::Tensor> *context = parallel_lite_actor_->OpContext();
209 const KernelCallBack &before = *reinterpret_cast<const KernelCallBack *>(context->kernel_call_back_before_);
210 const KernelCallBack &after = *reinterpret_cast<const KernelCallBack *>(context->kernel_call_back_after_);
211
212 for (auto &kernel : nodes_) {
213 MS_ASSERT(kernel != nullptr);
214 auto ret = kernel->Execute(before, after);
215 if (MS_UNLIKELY(ret != RET_OK)) {
216 MS_LOG(ERROR) << "run kernel failed, name: " << kernel->name();
217 context->SetFailed(ret);
218 return;
219 }
220 }
221 parallel_lite_actor_->CheckReadyActors(out_actors_indexs_);
222 if (have_output_) {
223 auto output_size = output_data_arrows_.size();
224 for (size_t i = 0; i < output_size; ++i) {
225 auto data = outputs_data_[i];
226 Async(output_data_arrows_[i]->to_op_id_, get_actor_mgr(), &mindspore::OpActor<Tensor>::RunOpData, data.get(),
227 context);
228 parallel_lite_actor_->AddOutputDataCount();
229 }
230 for (auto &index : results_index_) {
231 context->SetResult(index, RET_OK);
232 }
233 }
234 }
235 } // namespace mindspore::lite
236