1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/perf/auto_tune.h"
18
19 #include <algorithm>
20 #include <functional>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 #include <string>
25 #include <sstream>
26 #include <iomanip>
27 #ifndef ENABLE_ANDROID
28 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
29 #include "minddata/dataset/engine/serdes.h"
30 #endif
31 #include "minddata/dataset/util/task_manager.h"
32
33 namespace mindspore {
34 namespace dataset {
AutoTune(TreeAdapter * tree_adap,ProfilingManager * profiling_mgr)35 AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr)
36 : tree_adapter_(tree_adap),
37 profiling_manager_(profiling_mgr),
38 tree_modifier_(std::make_unique<TreeModifier>(tree_adapter_)),
39 leaf_op_id_(-1),
40 cur_epoch_running_(1),
41 last_epoch_autotuned_(0),
42 cur_step_running_(1),
43 last_step_autotuned_(0),
44 mode_(0),
45 step_gap_(GlobalContext::config_manager()->autotune_interval()),
46 skip_flag_(true),
47 AT_phase_(AutoTunePhase::kAutoTunePhaseTime),
48 AT_change_(false),
49 phase_1_best_time_(-1),
50 phase_1_no_improve_count_(0),
51 count_down_(0),
52 phase_3_state_(AutoTuneMemPhase::kAutoTuneMemInit),
53 phase_3_ID_(0),
54 avg_batch_time(0.0),
55 phase_3_prev_avg_(0.0),
56 save_autoconfig_(GlobalContext::config_manager()->save_autoconfig()) {
57 max_workers_ = GlobalContext::config_manager()->num_cpu_threads();
58 autotune_json_filepath_ = GlobalContext::config_manager()->get_autotune_json_filepath();
59 }
60
Main()61 Status AutoTune::Main() {
62 TaskManager::FindMe()->Post();
63 MS_LOG(INFO) << "Dataset AutoTune thread has started.";
64 if (step_gap_ != 0) {
65 mode_ = AutoTuneMode::kAutoTuneModeStep;
66 } else {
67 mode_ = AutoTuneMode::kAutoTuneModeEpoch;
68 }
69 const bool nodes_offloaded = !tree_adapter_->GetOffloadJson().empty();
70 if (nodes_offloaded) {
71 // When nodes are offloaded they are removed from the optimized IR tree.
72 // Serializing the optimized IR Tree and then deserializing will not work.
73 MS_LOG(WARNING) << "Some nodes have been offloaded. AutoTune is unable to write the autotune configuration to "
74 "disk. Disable offload to prevent this from happening.";
75 }
76 bool output_final_config = save_autoconfig_ && !nodes_offloaded;
77 bool output_intermediate_config = save_intermediate_autoconfig_ && output_final_config;
78 RETURN_IF_NOT_OK(ATMainLoop(output_intermediate_config));
79 RETURN_IF_NOT_OK(profiling_manager_->Stop());
80 PostMainLogging();
81 #ifndef ENABLE_ANDROID
82 if (output_final_config &&
83 (SaveAutotuneConfig(autotune_json_filepath_ + "_" + profiling_manager_->GetRankID() + ".json").IsError())) {
84 MS_LOG(WARNING) << "Failed to write the final autotune configuration to disk";
85 }
86 #endif
87 return Status::OK();
88 }
89
ATMainLoop(bool output_intermediate_config)90 Status AutoTune::ATMainLoop(bool output_intermediate_config) {
91 std::unique_lock<std::mutex> _lock(mux_);
92 int loop_cnt = 0;
93 Status rc;
94 while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) {
95 #ifndef ENABLE_ANDROID
96 auto last_epoch = cur_epoch_running_;
97 auto last_step = cur_step_running_;
98 #endif
99 RETURN_IF_NOT_OK(UpdateCurrentRunInfo());
100 if (!WarmupSkipCheck()) {
101 // Warm up complete - AT normally
102 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
103 rc = RunIterationEpoch();
104 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
105 rc = RunIterationStep();
106 }
107 if (rc.IsError()) {
108 if (rc.StatusCode() != StatusCode::kMDInterrupted) {
109 MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
110 }
111 RETURN_IF_NOT_OK(profiling_manager_->Stop());
112 break;
113 }
114 #ifndef ENABLE_ANDROID
115 if (last_epoch != cur_epoch_running_ || last_step != cur_step_running_) {
116 if (output_intermediate_config &&
117 (SaveAutotuneConfig(autotune_json_filepath_ + "_" + profiling_manager_->GetRankID() + "_" +
118 std::to_string(loop_cnt) + ".json")
119 .IsError())) {
120 MS_LOG(WARNING) << "Failed to write the current iteration autotune configuration to disk";
121 }
122 ++loop_cnt;
123 }
124 #endif
125 if (AT_phase_ == AutoTunePhase::kAutoTuneEnd) {
126 MS_LOG(INFO) << "Dataset AutoTune stop, optimization complete.";
127 break;
128 }
129 }
130 rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->monitor_sampling_interval());
131 // The thread may be interrupted for tree termination when waiting (we should not report error in this case)
132 if (rc.IsError() && rc != StatusCode::kMDInterrupted) {
133 return rc;
134 }
135 }
136 return Status::OK();
137 }
138
139 #ifndef ENABLE_ANDROID
SaveAutotuneConfig(const std::string & file_name)140 Status AutoTune::SaveAutotuneConfig(const std::string &file_name) {
141 Path jsonpath(file_name);
142
143 std::string parent_dir = jsonpath.ParentPath();
144 if (access(parent_dir.c_str(), R_OK) == -1) {
145 std::string err_msg = "AutoTune has no access to specified path: " + parent_dir + ", check permission.";
146 LOG_AND_RETURN_STATUS_SYNTAX_ERROR(err_msg);
147 }
148
149 if (jsonpath.Exists()) {
150 std::string err_msg = "File: <" + file_name +
151 "> already exists. File will be overwritten with the AutoTuned data pipeline configuration.";
152 MS_LOG(WARNING) << err_msg;
153 }
154
155 RETURN_IF_NOT_OK(SetAutotuneConfigJson());
156 // The Execution Tree is built by visiting the optimized IR Tree in DFS order.
157 // So we visit the optimized IR tree in DFS order and try to match each IR node with its corresponding dataset op.
158 RETURN_IF_NOT_OK(Serdes::UpdateOptimizedIRTreeJSON(&autotune_config_json_, ops_));
159 std::vector<std::string> summary;
160 RETURN_IF_NOT_OK(SummarizeTreeConfiguration(&summary));
161 nlohmann::json out_json;
162 out_json["summary"] = summary;
163 out_json["tree"] = autotune_config_json_;
164 std::string remark_value = "The following file has been auto-generated by the Dataset AutoTune.";
165 if (tree_modifier_->GetRequestsCount() == 0) {
166 remark_value += " Dataset Pipeline is not the bottleneck. No configuration changes were made by Dataset AutoTune.";
167 }
168 out_json["remark"] = remark_value;
169 RETURN_IF_NOT_OK(Serdes::SaveJSONToFile(out_json, file_name, true));
170 return Status::OK();
171 }
172
SetAutotuneConfigJson()173 Status AutoTune::SetAutotuneConfigJson() {
174 if (autotune_config_json_.empty()) {
175 nlohmann::json out_json;
176 RETURN_IF_NOT_OK(Serdes::SaveToJSON(tree_adapter_->RootIRNode(), "", &out_json));
177 // We do not want to serialize DataQueueNode/DataQueueOp
178 if (out_json["op_type"] == kTransferNode) {
179 CHECK_FAIL_RETURN_UNEXPECTED(
180 out_json["children"].size() == 1,
181 "Expected Transfer node to have exactly 1 child but it has " + std::to_string(out_json["children"].size()));
182 out_json = out_json["children"][0];
183 }
184 autotune_config_json_ = std::move(out_json);
185 }
186 return Status::OK();
187 }
188 #endif
189
SummarizeTreeConfiguration(std::vector<std::string> * out)190 Status AutoTune::SummarizeTreeConfiguration(std::vector<std::string> *out) {
191 constexpr int op_name_width = 20;
192 constexpr int val_width = 2;
193 for (int i = static_cast<int>(ops_.size()) - 1; i >= 0; --i) {
194 const auto op = ops_[i];
195 if (!op->inlined() && op->Name() != "DataQueueOp") {
196 std::stringstream s;
197 s << std::left << std::setw(op_name_width) << op->NameWithID() << "(num_parallel_workers:" << std::right
198 << std::setw(val_width) << (op->NumWorkers() == 0 ? "NA" : std::to_string(op->NumWorkers()))
199 << ", prefetch_size:" << std::setw(val_width) << op->ConnectorCapacity() << ")";
200 (void)out->emplace_back(s.str());
201 }
202 }
203 return Status::OK();
204 }
205
PostMainLogging() const206 void AutoTune::PostMainLogging() const {
207 MS_LOG(INFO) << "Dataset AutoTune thread is finished.";
208 MS_LOG(INFO) << "Printing the final tree configuration";
209 PrintTreeConfiguration();
210 // Print the suggestion in logs only if autotune requested some changes
211 if (tree_modifier_->GetRequestsCount() > 0) {
212 MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: "
213 << "mindspore.dataset.config.set_num_parallel_workers";
214 MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: "
215 << "mindspore.dataset.config.set_prefetch_size";
216 }
217 }
218
PrintTreeConfiguration() const219 void AutoTune::PrintTreeConfiguration() const {
220 ExecutionTree const *tree = tree_adapter_->tree_.get();
221 for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
222 if (!itr->inlined() && itr->Name() != "DataQueueOp") {
223 MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers()
224 << " prefetch_size: " << itr->ConnectorCapacity();
225 }
226 }
227 }
228
LaunchThread()229 Status AutoTune::LaunchThread() {
230 MS_LOG(INFO) << "Launching Dataset AutoTune thread";
231 Status rc = CollectOpsInfo();
232 if (rc.IsError()) {
233 if (rc.StatusCode() != StatusCode::kMDInterrupted) {
234 MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
235 }
236 RETURN_IF_NOT_OK(profiling_manager_->Stop());
237 return Status::OK();
238 }
239 RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService()));
240 RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this)));
241 return Status::OK();
242 }
243
CollectOpsInfo()244 Status AutoTune::CollectOpsInfo() {
245 ExecutionTree const *tree = tree_adapter_->tree_.get();
246 RETURN_UNEXPECTED_IF_NULL(tree);
247 for (auto itr = tree->begin(); itr != tree->end(); ++itr) {
248 ops_[itr->id()] = itr.get();
249 // Get all parallel ops (num_workers>0) except leaf nodes (no children)
250 if (itr->NumWorkers() > 0) {
251 parallel_ops_ids_.push_back(itr->id());
252 }
253 }
254 // Sort parallel ops in reverse order of IDs (i.e., bottommost op is first)
255 std::sort(parallel_ops_ids_.begin(), parallel_ops_ids_.end(), std::greater<>());
256 leaf_op_id_ = static_cast<int32_t>(ops_.size()) - 1;
257 return Status::OK();
258 }
259
GetOpConnectorCapacity(int32_t op_id,int64_t * capacity)260 Status AutoTune::GetOpConnectorCapacity(int32_t op_id, int64_t *capacity) {
261 auto item = ops_.find(op_id);
262 CHECK_FAIL_RETURN_UNEXPECTED(item != ops_.end(), "Invalid Operator ID.");
263 *capacity = item->second->ConnectorCapacity();
264 return Status::OK();
265 }
266
GetOpsCpuUtil(std::map<int32_t,double> * ops_cpu_util)267 Status AutoTune::GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util) {
268 // Loop over all itr keys and get avg cpu usage
269 for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
270 std::vector<uint16_t> sys_util;
271 std::vector<uint16_t> user_util;
272 #ifndef ENABLE_ANDROID
273 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
274 RETURN_IF_NOT_OK(profiling_manager_->GetSysCpuUtilByEpoch(itr->first, cur_epoch_running_, &sys_util));
275 RETURN_IF_NOT_OK(profiling_manager_->GetUserCpuUtilByEpoch(itr->first, cur_epoch_running_, &user_util));
276 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
277 RETURN_IF_NOT_OK(
278 profiling_manager_->GetSysCpuUtilByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &sys_util));
279 RETURN_IF_NOT_OK(
280 profiling_manager_->GetUserCpuUtilByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &user_util));
281 }
282 #endif
283 double sys_cpu_util = Mean(sys_util);
284 double user_cpu_util = Mean(user_util);
285 (*ops_cpu_util)[itr->first] = sys_cpu_util + user_cpu_util;
286 }
287 return Status::OK();
288 }
289
GetOpsQueueUtil(std::map<int32_t,double> * out_ops_queue_util,std::map<int32_t,double> * in_ops_queue_util)290 Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util,
291 std::map<int32_t, double> *in_ops_queue_util) {
292 // Loop over all itr keys in the ops_ and get output_queue usage
293 for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
294 if (itr->second->inlined()) {
295 (*out_ops_queue_util)[itr->first] = -1;
296 continue;
297 }
298 std::vector<int32_t> sizes;
299 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
300 RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(itr->first, cur_epoch_running_, &sizes));
301 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
302 RETURN_IF_NOT_OK(
303 profiling_manager_->GetConnectorSizeByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &sizes));
304 }
305 double avg_size = Mean(sizes);
306 int64_t capacity = itr->second->ConnectorCapacity();
307 CHECK_FAIL_RETURN_UNEXPECTED(capacity != 0, "Capacity of connector should not be 0");
308 (*out_ops_queue_util)[itr->first] = avg_size / capacity;
309 }
310 for (auto itr = ops_.rbegin(); itr != ops_.rend(); ++itr) {
311 // Assume that leaf op has 100% input queue util
312 if (itr->first + 1 == ops_.size()) {
313 (*in_ops_queue_util)[itr->first] = 1;
314 continue;
315 }
316 // Input queue is the output queue of the child
317 (*in_ops_queue_util)[itr->first] = (*out_ops_queue_util)[itr->first + 1];
318 // If the child is an inlined op, use the prev known utilization
319 if ((*in_ops_queue_util)[itr->first] == -1.0) {
320 (*in_ops_queue_util)[itr->first] = (*in_ops_queue_util)[itr->first + 1];
321 }
322 }
323 for (const auto &op : ops_) {
324 if (op.second->inlined()) {
325 (*in_ops_queue_util)[op.first] = -1;
326 }
327 }
328 return Status::OK();
329 }
330
GetOpsNumWorker(std::map<int32_t,int32_t> * ops_num_workers)331 Status AutoTune::GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers) {
332 for (const auto &op : ops_) {
333 (*ops_num_workers)[op.first] = op.second->NumWorkers();
334 }
335 return Status::OK();
336 }
337
IsSink() const338 bool AutoTune::IsSink() const {
339 std::shared_ptr<Tracing> node;
340 return profiling_manager_->GetTracingNode(kDeviceQueueTracingName, &node).IsOk();
341 }
342
343 template <typename T>
Mean(const std::vector<T> & items) const344 double AutoTune::Mean(const std::vector<T> &items) const {
345 if (items.size() == 0) {
346 return 0;
347 }
348 return std::accumulate(items.begin(), items.end(), 0.0) / static_cast<double>(items.size());
349 }
350
UpdateCurrentRunInfo()351 Status AutoTune::UpdateCurrentRunInfo() {
352 // Get current running epoch
353 cur_epoch_running_ = profiling_manager_->GetNumOfProfiledEpochs();
354 // Get current running step
355 int32_t step_temp = 0;
356 RETURN_IF_NOT_OK(profiling_manager_->GetNumberOfProfiledSteps(&step_temp));
357 cur_step_running_ = step_temp;
358 return Status::OK();
359 }
360
WarmupSkipCheck()361 bool AutoTune::WarmupSkipCheck() {
362 if (skip_flag_ == false) {
363 return false;
364 }
365 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
366 if (cur_epoch_running_ > EPOCH_WARMUP) {
367 skip_flag_ = false;
368 return false;
369 }
370 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
371 int64_t skip_value = std::max(STEP_WARMUP, step_gap_);
372 if (cur_step_running_ > skip_value) {
373 last_step_autotuned_ = std::min(STEP_WARMUP, step_gap_);
374 skip_flag_ = false;
375 return false;
376 }
377 }
378 return true;
379 }
380
RunIterationEpoch()381 Status AutoTune::RunIterationEpoch() {
382 // Run every epoch
383 if (last_epoch_autotuned_ < cur_epoch_running_ - 1) {
384 MS_LOG(INFO) << "Run Dataset AutoTune at epoch # " << cur_epoch_running_;
385 RETURN_IF_NOT_OK(RunIteration());
386 last_epoch_autotuned_ = cur_epoch_running_ - 1;
387 }
388 return Status::OK();
389 }
390
RunIterationStep()391 Status AutoTune::RunIterationStep() {
392 // Run at autotune step interval
393 if (cur_step_running_ - last_step_autotuned_ >= step_gap_) {
394 MS_LOG(INFO) << "Run AutoTune at step # " << cur_step_running_;
395 RETURN_IF_NOT_OK(RunIteration());
396 last_step_autotuned_ = cur_step_running_;
397 }
398 return Status::OK();
399 }
400
RegisterWorkersQueue()401 Status AutoTune::RegisterWorkersQueue() {
402 ExecutionTree *tree = tree_adapter_->tree_.get();
403 for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
404 if (!itr->inlined() && itr->Name() != "DataQueueOp") {
405 (void)phase_1_best_workers.push_back(itr->NumWorkers());
406 (void)phase_1_best_queue.push_back(itr->ConnectorCapacity());
407 }
408 }
409 return Status::OK();
410 }
411
ResetWorkersQueue()412 Status AutoTune::ResetWorkersQueue() {
413 if (phase_1_best_workers.size() == 0 || phase_1_best_queue.size() == 0) {
414 return Status::OK();
415 }
416 ExecutionTree *tree = tree_adapter_->tree_.get();
417 int counter = 0;
418 for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
419 if (!itr->inlined() && itr->Name() != "DataQueueOp") {
420 int32_t target_workers = phase_1_best_workers[counter];
421 int32_t target_queue = phase_1_best_queue[counter];
422 RETURN_IF_NOT_OK(RequestNumWorkerChange(itr->id(), -1, &target_workers));
423 RETURN_IF_NOT_OK(RequestConnectorCapacityChange(itr->id(), -1, target_queue));
424 counter++;
425 }
426 }
427 return Status::OK();
428 }
429
TrackPipelineTime()430 Status AutoTune::TrackPipelineTime() {
431 std::vector<int32_t> pipeline_times;
432 std::vector<int32_t> batch_times;
433 // Select early stop threshold based on running mode
434 int early_stop_threshold_mode;
435 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
436 RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_running_, &pipeline_times));
437 RETURN_IF_NOT_OK(profiling_manager_->GetBatchTimeByEpoch(cur_epoch_running_ - 1, &batch_times));
438 early_stop_threshold_mode = EARLY_STOP_TRIAL_THRESHOLD_EPOCH;
439 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
440 RETURN_IF_NOT_OK(
441 profiling_manager_->GetPipelineTimeByStep(last_step_autotuned_, cur_step_running_ - 1, &pipeline_times));
442 RETURN_IF_NOT_OK(profiling_manager_->GetBatchTimeByStep(last_step_autotuned_, cur_step_running_ - 1, &batch_times));
443 early_stop_threshold_mode = EARLY_STOP_TRIAL_THRESHOLD_STEP;
444 }
445 double avg_time_pipeline = Mean(pipeline_times);
446 double avg_time_batch = Mean(batch_times);
447 (void)avg_pipeline_times_.push_back(avg_time_pipeline);
448 MS_LOG(INFO) << "Average Pipeline time is " << avg_time_pipeline << " ms. The avg pipeline time for all epochs is "
449 << Mean(avg_pipeline_times_) << "ms";
450 // Time phase (phase 1) improvement tracking
451 if (AT_phase_ == AutoTunePhase::kAutoTunePhaseTime) {
452 if (phase_1_best_time_ < 0) {
453 phase_1_best_time_ = avg_time_batch; // set first value
454 } else if (avg_time_batch < phase_1_best_time_) {
455 phase_1_no_improve_count_ = 0;
456 phase_1_best_time_ = avg_time_batch;
457 // Trigger save process
458 if (AT_change_) {
459 AT_change_ = false; // Reset for next analysis run
460 RETURN_IF_NOT_OK(RegisterWorkersQueue());
461 }
462 } else {
463 phase_1_no_improve_count_++;
464 }
465 if (phase_1_no_improve_count_ > early_stop_threshold_mode) {
466 // Reset best config and exit
467 AT_phase_ = AutoTunePhase::kAutoTunePhaseMemory;
468 RETURN_IF_NOT_OK(ResetWorkersQueue());
469 }
470 }
471 return Status::OK();
472 }
473
RunIteration()474 Status AutoTune::RunIteration() {
475 RETURN_IF_NOT_OK(TrackPipelineTime());
476 if (AT_phase_ == AutoTunePhase::kAutoTunePhaseTime) {
477 RETURN_IF_NOT_OK(AnalyseTime());
478 } else if (AT_phase_ == AutoTunePhase::kAutoTunePhaseMemory) {
479 RETURN_IF_NOT_OK(AnalyseMemory());
480 }
481 return Status::OK();
482 }
483
GetConnectorSize(std::vector<int32_t> * sizes) const484 Status AutoTune::GetConnectorSize(std::vector<int32_t> *sizes) const {
485 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
486 RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(cur_epoch_running_, sizes));
487 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
488 RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByStep(last_step_autotuned_, cur_step_running_ - 1, sizes));
489 }
490 return Status::OK();
491 }
492
GetConnectorCapacity(std::vector<int32_t> * capacities) const493 Status AutoTune::GetConnectorCapacity(std::vector<int32_t> *capacities) const {
494 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
495 RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByEpoch(cur_epoch_running_, capacities));
496 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
497 RETURN_IF_NOT_OK(
498 profiling_manager_->GetConnectorCapacityByStep(last_step_autotuned_, cur_step_running_ - 1, capacities));
499 }
500 return Status::OK();
501 }
502
GetConnectorUtil(double * usage_avg_last,double * avg_size,double * avg_capacity)503 Status AutoTune::GetConnectorUtil(double *usage_avg_last, double *avg_size, double *avg_capacity) {
504 std::vector<int32_t> sizes;
505 RETURN_IF_NOT_OK(GetConnectorSize(&sizes));
506 *avg_size = Mean(sizes);
507 std::vector<int32_t> capacities;
508 RETURN_IF_NOT_OK(GetConnectorCapacity(&capacities));
509 *avg_capacity = Mean(capacities);
510 CHECK_FAIL_RETURN_UNEXPECTED(*avg_capacity != 0.0, "Capacities of connectors should not be 0.0");
511 // size here means size of queue utilized
512 *usage_avg_last = (*avg_size / *avg_capacity);
513 return Status::OK();
514 }
515
GetEmptyQueueFrequency(float * empty_freq) const516 Status AutoTune::GetEmptyQueueFrequency(float *empty_freq) const {
517 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
518 RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByEpoch(cur_epoch_running_, empty_freq));
519 } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
520 RETURN_IF_NOT_OK(
521 profiling_manager_->GetEmptyQueueFrequencyByStep(last_step_autotuned_, cur_step_running_ - 1, empty_freq));
522 }
523 return Status::OK();
524 }
525
IsDSaBottleneck(bool * isBottleneck)526 Status AutoTune::IsDSaBottleneck(bool *isBottleneck) {
527 double usage_avg_last, avg_size, avg_capacity;
528 RETURN_IF_NOT_OK(GetConnectorUtil(&usage_avg_last, &avg_size, &avg_capacity));
529 float empty_freq = 0;
530 RETURN_IF_NOT_OK(GetEmptyQueueFrequency(&empty_freq));
531 if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
532 MS_LOG(INFO) << "Step # " << cur_step_running_ << ". Status:";
533 } else {
534 MS_LOG(INFO) << "Epoch # " << cur_epoch_running_ << ". Status:";
535 }
536 // Reporting values
537 MS_LOG(INFO) << "Device Connector Size: " << avg_size << ", Connector Capacity: " << avg_capacity
538 << ", Utilization: " << (usage_avg_last * TO_PERCENT) << "%"
539 << ", Empty Freq: " << (empty_freq * TO_PERCENT) << "% ";
540 // Decision
541 if (usage_avg_last < DEVICE_CONNECTOR_UTIL_THRESHOLD) {
542 MS_LOG(INFO) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% < "
543 << (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
544 << "% threshold, dataset pipeline performance may benefit from tuning.";
545 *isBottleneck = true;
546 } else {
547 MS_LOG(INFO) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% > "
548 << (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
549 << "% threshold, dataset pipeline performance is OK.";
550 *isBottleneck = false;
551 }
552 return Status::OK();
553 }
554
RequestNumWorkerChange(int32_t op_id,int32_t old_workers,int32_t * num_workers_requested)555 Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested) {
556 AT_change_ = true;
557 int new_workers = std::min(*num_workers_requested, max_workers_);
558 new_workers = std::max(new_workers, MIN_NUM_WORKERS);
559 RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers)));
560 if (old_workers == -1) {
561 MS_LOG(INFO) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
562 << "to value: [" << new_workers << "].";
563 } else {
564 MS_LOG(INFO) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
565 << "From old value: [" << old_workers << "] to new value: [" << new_workers << "].";
566 }
567 *num_workers_requested = new_workers;
568 return Status::OK();
569 }
570
RequestConnectorCapacityChange(int32_t op_id,int32_t old_size,int32_t new_size)571 Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size) {
572 AT_change_ = true;
573 new_size = std::min(new_size, MAX_QUEUE_SIZE);
574 new_size = std::max(new_size, MIN_QUEUE_SIZE);
575 RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ResizeConnectorRequest>(new_size)));
576 if (old_size == -1) {
577 MS_LOG(INFO) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID()
578 << "to value: [" << new_size << "].";
579 } else {
580 MS_LOG(INFO) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID()
581 << "From old value: [" << old_size << "] to new value: [" << new_size << "].";
582 }
583 return Status::OK();
584 }
585
SkipOpsCheck(int op_id)586 bool AutoTune::SkipOpsCheck(int op_id) {
587 // Skip Generator op
588 if (ops_[op_id]->Name() == "GeneratorOp") {
589 return true;
590 }
591 // NonMappableDataset is not supported in AutoTune
592 #ifndef ENABLE_ANDROID
593 if (std::dynamic_pointer_cast<NonMappableLeafOp>(ops_[op_id]) != nullptr) {
594 return true;
595 }
596 #endif
597 return false;
598 }
599
AnalyseTime()600 Status AutoTune::AnalyseTime() {
601 // check for connector queue bottleneck
602 bool isBottleneck = false;
603 RETURN_IF_NOT_OK(IsDSaBottleneck(&isBottleneck));
604 if (!isBottleneck) {
605 return Status::OK();
606 }
607 // collect stats
608 std::map<int32_t, int32_t> ops_num_workers;
609 RETURN_IF_NOT_OK(GetOpsNumWorker(&ops_num_workers));
610 std::map<int32_t, double> out_ops_queue_util;
611 std::map<int32_t, double> in_ops_queue_util;
612 RETURN_IF_NOT_OK(GetOpsQueueUtil(&out_ops_queue_util, &in_ops_queue_util));
613 std::map<int32_t, double> ops_cpu_util;
614 RETURN_IF_NOT_OK(GetOpsCpuUtil(&ops_cpu_util));
615 // check parallel ops in loop
616 for (const auto &op_id : parallel_ops_ids_) {
617 if (SkipOpsCheck(op_id)) {
618 continue;
619 }
620 // op specifics
621 double output_queue_util = out_ops_queue_util[op_id];
622 double input_queue_util = in_ops_queue_util[op_id];
623 double cpu_util = ops_cpu_util[op_id];
624 int32_t num_workers = ops_num_workers[op_id];
625 CHECK_FAIL_RETURN_UNEXPECTED(num_workers != 0, "ParallelOp with num_workers=0");
626 // derived metrics
627 double queue_diff = input_queue_util - output_queue_util;
628 int64_t queue_capacity;
629 RETURN_IF_NOT_OK(GetOpConnectorCapacity(op_id, &queue_capacity));
630 int64_t new_queue_capacity = queue_capacity;
631 int32_t requested_workers = 0;
632 MS_LOG(DEBUG) << "Op (" << ops_[op_id]->NameWithID() << ") CPU=" << cpu_util / num_workers
633 << ", in=" << input_queue_util << "out=" << output_queue_util;
634 // map decisions - queue
635 if (queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD) {
636 MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID()
637 << ") is slow, input connector utilization=" << input_queue_util
638 << ", output connector utilization=" << output_queue_util << ", diff= " << queue_diff << " > "
639 << INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << " threshold.";
640 requested_workers = num_workers + INCREMENT_WORKER;
641 RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
642 } else if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
643 MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
644 << (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
645 requested_workers = num_workers + INCREMENT_WORKER;
646 RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
647 }
648 if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
649 ((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
650 MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
651 << (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
652 new_queue_capacity = queue_capacity + INCREMENT_QUEUE_SIZE;
653 if (requested_workers == 0) {
654 requested_workers = num_workers;
655 }
656 new_queue_capacity = std::max(new_queue_capacity, static_cast<int64_t>(requested_workers));
657 RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, new_queue_capacity));
658 }
659 }
660 return Status::OK();
661 }
662
MemoryPhaseCompareMetric(double prev_avg,double cur_avg)663 bool AutoTune::MemoryPhaseCompareMetric(double prev_avg, double cur_avg) {
664 double lower_bound = prev_avg - (prev_avg * MEMORY_COMPARISON_LOWER_BOUND_PERCENT);
665 // If cur_avg worse than lower bound - negative impact on performance
666 // lower bound set to account for expected fluctuations in util numbers
667 if (cur_avg < lower_bound) {
668 return false;
669 } else {
670 return true;
671 }
672 }
673
AnalyseMemory()674 Status AutoTune::AnalyseMemory() {
675 double prev_avg = 0;
676 double cur_avg = 0;
677 bool comp_flag = true;
678 double connector_avg_size;
679 double connector_avg_capacity;
680 int total = parallel_ops_ids_.size();
681 if (total == 0) {
682 AT_phase_ = AutoTunePhase::kAutoTuneEnd;
683 return Status::OK();
684 }
685 std::map<int32_t, int32_t> ops_num_workers;
686 RETURN_IF_NOT_OK(GetOpsNumWorker(&ops_num_workers));
687 double reduce_percent_mode;
688 // Decrease queue sizes faster in epoch mode
689 if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
690 reduce_percent_mode = QUEUE_REDUCTION_PERCENTAGE_EPOCH;
691 } else if (AutoTuneMode::kAutoTuneModeStep) {
692 reduce_percent_mode = QUEUE_REDUCTION_PERCENTAGE_STEP;
693 }
694 // Init state on first call
695 if (phase_3_state_ == AutoTuneMemPhase::kAutoTuneMemInit) {
696 count_down_ = 0;
697 for (auto op_id : parallel_ops_ids_) {
698 if ((SkipOpsCheck(op_id)) || (ops_[op_id]->Name() == "DataQueueOp")) {
699 // Op not supported - ignore throughout AT
700 (void)OP_values.push_back(-1);
701 continue;
702 }
703 // Op supported - attempt memory reduction on this
704 (void)OP_values.push_back(0);
705 count_down_++;
706 }
707 phase_3_state_ = AutoTuneMemPhase::kAutoTuneMemSet;
708 phase_3_ID_ = 0;
709 }
710
711 // Exit when all viable ops have been tested
712 // Or if none found
713 if (count_down_ == 0) {
714 AT_phase_ = AutoTunePhase::kAutoTuneEnd;
715 return Status::OK();
716 }
717
718 if (phase_3_state_ == AutoTuneMemPhase::kAutoTuneMemSet) {
719 RETURN_IF_NOT_OK(GetConnectorUtil(&phase_3_prev_avg_, &connector_avg_size, &connector_avg_capacity));
720 // Search for next viable op that can be tested
721 while (OP_values[phase_3_ID_] == -1) {
722 phase_3_ID_ = ((phase_3_ID_ + 1) % total);
723 }
724 int64_t current_queue_size;
725 RETURN_IF_NOT_OK(GetOpConnectorCapacity(parallel_ops_ids_[phase_3_ID_], ¤t_queue_size));
726 int op_workers = ops_num_workers[parallel_ops_ids_[phase_3_ID_]];
727 int target_mem = current_queue_size * reduce_percent_mode;
728 if (std::max(target_mem, op_workers) == op_workers) {
729 // Lower bound on range of queue size testing
730 OP_values[phase_3_ID_] = -1;
731 count_down_--;
732 target_mem = op_workers;
733 } else {
734 // Save current queue size for possible recovery and switch to compare mode
735 OP_values[phase_3_ID_] = current_queue_size;
736 phase_3_state_ = AutoTuneMemPhase::kAutotTuneMemCompare;
737 }
738 RequestConnectorCapacityChange(parallel_ops_ids_[phase_3_ID_], -1, target_mem);
739 } else if (phase_3_state_ == AutoTuneMemPhase::kAutotTuneMemCompare) {
740 // Analyse impact on model from previous change made
741 RETURN_IF_NOT_OK(GetConnectorUtil(&cur_avg, &connector_avg_size, &connector_avg_capacity));
742 prev_avg = phase_3_prev_avg_;
743 comp_flag = MemoryPhaseCompareMetric(prev_avg, cur_avg);
744 // Compare current avg against pre-change avg
745 if (comp_flag == false) {
746 int reset_val = OP_values[phase_3_ID_];
747 RequestConnectorCapacityChange(parallel_ops_ids_[phase_3_ID_], -1, reset_val);
748 // Op queue size reduction caused performance decrease - ignore onwards
749 OP_values[phase_3_ID_] = -1;
750 count_down_--;
751 }
752 phase_3_state_ = AutoTuneMemPhase::kAutoTuneMemSet;
753 }
754 return Status::OK();
755 }
756 } // namespace dataset
757 } // namespace mindspore
758