• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/perf/auto_tune.h"
18 
19 #include <algorithm>
20 #include <functional>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 #include <string>
25 #include <sstream>
26 #include <iomanip>
27 #ifndef ENABLE_ANDROID
28 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
29 #include "minddata/dataset/engine/serdes.h"
30 #endif
31 #include "minddata/dataset/util/task_manager.h"
32 
33 namespace mindspore {
34 namespace dataset {
AutoTune(TreeAdapter * tree_adap,ProfilingManager * profiling_mgr)35 AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr)
36     : tree_adapter_(tree_adap),
37       profiling_manager_(profiling_mgr),
38       tree_modifier_(std::make_unique<TreeModifier>(tree_adapter_)),
39       leaf_op_id_(-1),
40       cur_epoch_running_(1),
41       last_epoch_autotuned_(0),
42       cur_step_running_(1),
43       last_step_autotuned_(0),
44       mode_(0),
45       step_gap_(GlobalContext::config_manager()->autotune_interval()),
46       skip_flag_(true),
47       AT_phase_(AutoTunePhase::kAutoTunePhaseTime),
48       AT_change_(false),
49       phase_1_best_time_(-1),
50       phase_1_no_improve_count_(0),
51       count_down_(0),
52       phase_3_state_(AutoTuneMemPhase::kAutoTuneMemInit),
53       phase_3_ID_(0),
54       avg_batch_time(0.0),
55       phase_3_prev_avg_(0.0),
56       save_autoconfig_(GlobalContext::config_manager()->save_autoconfig()) {
57   max_workers_ = GlobalContext::config_manager()->num_cpu_threads();
58   autotune_json_filepath_ = GlobalContext::config_manager()->get_autotune_json_filepath();
59 }
60 
Main()61 Status AutoTune::Main() {
62   TaskManager::FindMe()->Post();
63   MS_LOG(INFO) << "Dataset AutoTune thread has started.";
64   if (step_gap_ != 0) {
65     mode_ = AutoTuneMode::kAutoTuneModeStep;
66   } else {
67     mode_ = AutoTuneMode::kAutoTuneModeEpoch;
68   }
69   const bool nodes_offloaded = !tree_adapter_->GetOffloadJson().empty();
70   if (nodes_offloaded) {
71     // When nodes are offloaded they are removed from the optimized IR tree.
72     // Serializing the optimized IR Tree and then deserializing will not work.
73     MS_LOG(WARNING) << "Some nodes have been offloaded. AutoTune is unable to write the autotune configuration to "
74                        "disk. Disable offload to prevent this from happening.";
75   }
76   bool output_final_config = save_autoconfig_ && !nodes_offloaded;
77   bool output_intermediate_config = save_intermediate_autoconfig_ && output_final_config;
78   RETURN_IF_NOT_OK(ATMainLoop(output_intermediate_config));
79   RETURN_IF_NOT_OK(profiling_manager_->Stop());
80   PostMainLogging();
81 #ifndef ENABLE_ANDROID
82   if (output_final_config &&
83       (SaveAutotuneConfig(autotune_json_filepath_ + "_" + profiling_manager_->GetRankID() + ".json").IsError())) {
84     MS_LOG(WARNING) << "Failed to write the final autotune configuration to disk";
85   }
86 #endif
87   return Status::OK();
88 }
89 
ATMainLoop(bool output_intermediate_config)90 Status AutoTune::ATMainLoop(bool output_intermediate_config) {
91   std::unique_lock<std::mutex> _lock(mux_);
92   int loop_cnt = 0;
93   Status rc;
94   while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) {
95 #ifndef ENABLE_ANDROID
96     auto last_epoch = cur_epoch_running_;
97     auto last_step = cur_step_running_;
98 #endif
99     RETURN_IF_NOT_OK(UpdateCurrentRunInfo());
100     if (!WarmupSkipCheck()) {
101       // Warm up complete - AT normally
102       if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
103         rc = RunIterationEpoch();
104       } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
105         rc = RunIterationStep();
106       }
107       if (rc.IsError()) {
108         if (rc.StatusCode() != StatusCode::kMDInterrupted) {
109           MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
110         }
111         RETURN_IF_NOT_OK(profiling_manager_->Stop());
112         break;
113       }
114 #ifndef ENABLE_ANDROID
115       if (last_epoch != cur_epoch_running_ || last_step != cur_step_running_) {
116         if (output_intermediate_config &&
117             (SaveAutotuneConfig(autotune_json_filepath_ + "_" + profiling_manager_->GetRankID() + "_" +
118                                 std::to_string(loop_cnt) + ".json")
119                .IsError())) {
120           MS_LOG(WARNING) << "Failed to write the current iteration autotune configuration to disk";
121         }
122         ++loop_cnt;
123       }
124 #endif
125       if (AT_phase_ == AutoTunePhase::kAutoTuneEnd) {
126         MS_LOG(INFO) << "Dataset AutoTune stop, optimization complete.";
127         break;
128       }
129     }
130     rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->monitor_sampling_interval());
131     // The thread may be interrupted for tree termination when waiting (we should not report error in this case)
132     if (rc.IsError() && rc != StatusCode::kMDInterrupted) {
133       return rc;
134     }
135   }
136   return Status::OK();
137 }
138 
139 #ifndef ENABLE_ANDROID
SaveAutotuneConfig(const std::string & file_name)140 Status AutoTune::SaveAutotuneConfig(const std::string &file_name) {
141   Path jsonpath(file_name);
142 
143   std::string parent_dir = jsonpath.ParentPath();
144   if (access(parent_dir.c_str(), R_OK) == -1) {
145     std::string err_msg = "AutoTune has no access to specified path: " + parent_dir + ", check permission.";
146     LOG_AND_RETURN_STATUS_SYNTAX_ERROR(err_msg);
147   }
148 
149   if (jsonpath.Exists()) {
150     std::string err_msg = "File: <" + file_name +
151                           "> already exists. File will be overwritten with the AutoTuned data pipeline configuration.";
152     MS_LOG(WARNING) << err_msg;
153   }
154 
155   RETURN_IF_NOT_OK(SetAutotuneConfigJson());
156   // The Execution Tree is built by visiting the optimized IR Tree in DFS order.
157   // So we visit the optimized IR tree in DFS order and try to match each IR node with its corresponding dataset op.
158   RETURN_IF_NOT_OK(Serdes::UpdateOptimizedIRTreeJSON(&autotune_config_json_, ops_));
159   std::vector<std::string> summary;
160   RETURN_IF_NOT_OK(SummarizeTreeConfiguration(&summary));
161   nlohmann::json out_json;
162   out_json["summary"] = summary;
163   out_json["tree"] = autotune_config_json_;
164   std::string remark_value = "The following file has been auto-generated by the Dataset AutoTune.";
165   if (tree_modifier_->GetRequestsCount() == 0) {
166     remark_value += " Dataset Pipeline is not the bottleneck. No configuration changes were made by Dataset AutoTune.";
167   }
168   out_json["remark"] = remark_value;
169   RETURN_IF_NOT_OK(Serdes::SaveJSONToFile(out_json, file_name, true));
170   return Status::OK();
171 }
172 
SetAutotuneConfigJson()173 Status AutoTune::SetAutotuneConfigJson() {
174   if (autotune_config_json_.empty()) {
175     nlohmann::json out_json;
176     RETURN_IF_NOT_OK(Serdes::SaveToJSON(tree_adapter_->RootIRNode(), "", &out_json));
177     // We do not want to serialize DataQueueNode/DataQueueOp
178     if (out_json["op_type"] == kTransferNode) {
179       CHECK_FAIL_RETURN_UNEXPECTED(
180         out_json["children"].size() == 1,
181         "Expected Transfer node to have exactly 1 child but it has " + std::to_string(out_json["children"].size()));
182       out_json = out_json["children"][0];
183     }
184     autotune_config_json_ = std::move(out_json);
185   }
186   return Status::OK();
187 }
188 #endif
189 
SummarizeTreeConfiguration(std::vector<std::string> * out)190 Status AutoTune::SummarizeTreeConfiguration(std::vector<std::string> *out) {
191   constexpr int op_name_width = 20;
192   constexpr int val_width = 2;
193   for (int i = static_cast<int>(ops_.size()) - 1; i >= 0; --i) {
194     const auto op = ops_[i];
195     if (!op->inlined() && op->Name() != "DataQueueOp") {
196       std::stringstream s;
197       s << std::left << std::setw(op_name_width) << op->NameWithID() << "(num_parallel_workers:" << std::right
198         << std::setw(val_width) << (op->NumWorkers() == 0 ? "NA" : std::to_string(op->NumWorkers()))
199         << ", prefetch_size:" << std::setw(val_width) << op->ConnectorCapacity() << ")";
200       (void)out->emplace_back(s.str());
201     }
202   }
203   return Status::OK();
204 }
205 
PostMainLogging() const206 void AutoTune::PostMainLogging() const {
207   MS_LOG(INFO) << "Dataset AutoTune thread is finished.";
208   MS_LOG(INFO) << "Printing the final tree configuration";
209   PrintTreeConfiguration();
210   // Print the suggestion in logs only if autotune requested some changes
211   if (tree_modifier_->GetRequestsCount() > 0) {
212     MS_LOG(INFO) << "Suggest to set proper num_parallel_workers for each Operation or use global setting API: "
213                  << "mindspore.dataset.config.set_num_parallel_workers";
214     MS_LOG(INFO) << "Suggest to choose maximum prefetch_size from tuned result and set by global setting API: "
215                  << "mindspore.dataset.config.set_prefetch_size";
216   }
217 }
218 
PrintTreeConfiguration() const219 void AutoTune::PrintTreeConfiguration() const {
220   ExecutionTree const *tree = tree_adapter_->tree_.get();
221   for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
222     if (!itr->inlined() && itr->Name() != "DataQueueOp") {
223       MS_LOG(INFO) << itr->NameWithID() << " num_parallel_workers: " << itr->NumWorkers()
224                    << " prefetch_size: " << itr->ConnectorCapacity();
225     }
226   }
227 }
228 
LaunchThread()229 Status AutoTune::LaunchThread() {
230   MS_LOG(INFO) << "Launching Dataset AutoTune thread";
231   Status rc = CollectOpsInfo();
232   if (rc.IsError()) {
233     if (rc.StatusCode() != StatusCode::kMDInterrupted) {
234       MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
235     }
236     RETURN_IF_NOT_OK(profiling_manager_->Stop());
237     return Status::OK();
238   }
239   RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService()));
240   RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this)));
241   return Status::OK();
242 }
243 
CollectOpsInfo()244 Status AutoTune::CollectOpsInfo() {
245   ExecutionTree const *tree = tree_adapter_->tree_.get();
246   RETURN_UNEXPECTED_IF_NULL(tree);
247   for (auto itr = tree->begin(); itr != tree->end(); ++itr) {
248     ops_[itr->id()] = itr.get();
249     // Get all parallel ops (num_workers>0) except leaf nodes (no children)
250     if (itr->NumWorkers() > 0) {
251       parallel_ops_ids_.push_back(itr->id());
252     }
253   }
254   // Sort parallel ops in reverse order of IDs (i.e., bottommost op is first)
255   std::sort(parallel_ops_ids_.begin(), parallel_ops_ids_.end(), std::greater<>());
256   leaf_op_id_ = static_cast<int32_t>(ops_.size()) - 1;
257   return Status::OK();
258 }
259 
GetOpConnectorCapacity(int32_t op_id,int64_t * capacity)260 Status AutoTune::GetOpConnectorCapacity(int32_t op_id, int64_t *capacity) {
261   auto item = ops_.find(op_id);
262   CHECK_FAIL_RETURN_UNEXPECTED(item != ops_.end(), "Invalid Operator ID.");
263   *capacity = item->second->ConnectorCapacity();
264   return Status::OK();
265 }
266 
GetOpsCpuUtil(std::map<int32_t,double> * ops_cpu_util)267 Status AutoTune::GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util) {
268   // Loop over all itr keys and get avg cpu usage
269   for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
270     std::vector<uint16_t> sys_util;
271     std::vector<uint16_t> user_util;
272 #ifndef ENABLE_ANDROID
273     if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
274       RETURN_IF_NOT_OK(profiling_manager_->GetSysCpuUtilByEpoch(itr->first, cur_epoch_running_, &sys_util));
275       RETURN_IF_NOT_OK(profiling_manager_->GetUserCpuUtilByEpoch(itr->first, cur_epoch_running_, &user_util));
276     } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
277       RETURN_IF_NOT_OK(
278         profiling_manager_->GetSysCpuUtilByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &sys_util));
279       RETURN_IF_NOT_OK(
280         profiling_manager_->GetUserCpuUtilByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &user_util));
281     }
282 #endif
283     double sys_cpu_util = Mean(sys_util);
284     double user_cpu_util = Mean(user_util);
285     (*ops_cpu_util)[itr->first] = sys_cpu_util + user_cpu_util;
286   }
287   return Status::OK();
288 }
289 
GetOpsQueueUtil(std::map<int32_t,double> * out_ops_queue_util,std::map<int32_t,double> * in_ops_queue_util)290 Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util,
291                                  std::map<int32_t, double> *in_ops_queue_util) {
292   // Loop over all itr keys in the ops_ and get output_queue usage
293   for (auto itr = ops_.begin(); itr != ops_.end(); ++itr) {
294     if (itr->second->inlined()) {
295       (*out_ops_queue_util)[itr->first] = -1;
296       continue;
297     }
298     std::vector<int32_t> sizes;
299     if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
300       RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(itr->first, cur_epoch_running_, &sizes));
301     } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
302       RETURN_IF_NOT_OK(
303         profiling_manager_->GetConnectorSizeByStep(itr->first, last_step_autotuned_, cur_step_running_ - 1, &sizes));
304     }
305     double avg_size = Mean(sizes);
306     int64_t capacity = itr->second->ConnectorCapacity();
307     CHECK_FAIL_RETURN_UNEXPECTED(capacity != 0, "Capacity of connector should not be 0");
308     (*out_ops_queue_util)[itr->first] = avg_size / capacity;
309   }
310   for (auto itr = ops_.rbegin(); itr != ops_.rend(); ++itr) {
311     // Assume that leaf op has 100% input queue util
312     if (itr->first + 1 == ops_.size()) {
313       (*in_ops_queue_util)[itr->first] = 1;
314       continue;
315     }
316     // Input queue is the output queue of the child
317     (*in_ops_queue_util)[itr->first] = (*out_ops_queue_util)[itr->first + 1];
318     // If the child is an inlined op, use the prev known utilization
319     if ((*in_ops_queue_util)[itr->first] == -1.0) {
320       (*in_ops_queue_util)[itr->first] = (*in_ops_queue_util)[itr->first + 1];
321     }
322   }
323   for (const auto &op : ops_) {
324     if (op.second->inlined()) {
325       (*in_ops_queue_util)[op.first] = -1;
326     }
327   }
328   return Status::OK();
329 }
330 
GetOpsNumWorker(std::map<int32_t,int32_t> * ops_num_workers)331 Status AutoTune::GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers) {
332   for (const auto &op : ops_) {
333     (*ops_num_workers)[op.first] = op.second->NumWorkers();
334   }
335   return Status::OK();
336 }
337 
IsSink() const338 bool AutoTune::IsSink() const {
339   std::shared_ptr<Tracing> node;
340   return profiling_manager_->GetTracingNode(kDeviceQueueTracingName, &node).IsOk();
341 }
342 
343 template <typename T>
Mean(const std::vector<T> & items) const344 double AutoTune::Mean(const std::vector<T> &items) const {
345   if (items.size() == 0) {
346     return 0;
347   }
348   return std::accumulate(items.begin(), items.end(), 0.0) / static_cast<double>(items.size());
349 }
350 
UpdateCurrentRunInfo()351 Status AutoTune::UpdateCurrentRunInfo() {
352   // Get current running epoch
353   cur_epoch_running_ = profiling_manager_->GetNumOfProfiledEpochs();
354   // Get current running step
355   int32_t step_temp = 0;
356   RETURN_IF_NOT_OK(profiling_manager_->GetNumberOfProfiledSteps(&step_temp));
357   cur_step_running_ = step_temp;
358   return Status::OK();
359 }
360 
WarmupSkipCheck()361 bool AutoTune::WarmupSkipCheck() {
362   if (skip_flag_ == false) {
363     return false;
364   }
365   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
366     if (cur_epoch_running_ > EPOCH_WARMUP) {
367       skip_flag_ = false;
368       return false;
369     }
370   } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
371     int64_t skip_value = std::max(STEP_WARMUP, step_gap_);
372     if (cur_step_running_ > skip_value) {
373       last_step_autotuned_ = std::min(STEP_WARMUP, step_gap_);
374       skip_flag_ = false;
375       return false;
376     }
377   }
378   return true;
379 }
380 
RunIterationEpoch()381 Status AutoTune::RunIterationEpoch() {
382   // Run every epoch
383   if (last_epoch_autotuned_ < cur_epoch_running_ - 1) {
384     MS_LOG(INFO) << "Run Dataset AutoTune at epoch # " << cur_epoch_running_;
385     RETURN_IF_NOT_OK(RunIteration());
386     last_epoch_autotuned_ = cur_epoch_running_ - 1;
387   }
388   return Status::OK();
389 }
390 
RunIterationStep()391 Status AutoTune::RunIterationStep() {
392   // Run at autotune step interval
393   if (cur_step_running_ - last_step_autotuned_ >= step_gap_) {
394     MS_LOG(INFO) << "Run AutoTune at step # " << cur_step_running_;
395     RETURN_IF_NOT_OK(RunIteration());
396     last_step_autotuned_ = cur_step_running_;
397   }
398   return Status::OK();
399 }
400 
RegisterWorkersQueue()401 Status AutoTune::RegisterWorkersQueue() {
402   ExecutionTree *tree = tree_adapter_->tree_.get();
403   for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
404     if (!itr->inlined() && itr->Name() != "DataQueueOp") {
405       (void)phase_1_best_workers.push_back(itr->NumWorkers());
406       (void)phase_1_best_queue.push_back(itr->ConnectorCapacity());
407     }
408   }
409   return Status::OK();
410 }
411 
ResetWorkersQueue()412 Status AutoTune::ResetWorkersQueue() {
413   if (phase_1_best_workers.size() == 0 || phase_1_best_queue.size() == 0) {
414     return Status::OK();
415   }
416   ExecutionTree *tree = tree_adapter_->tree_.get();
417   int counter = 0;
418   for (auto itr = tree->begin(); itr != tree->end(); (void)itr++) {
419     if (!itr->inlined() && itr->Name() != "DataQueueOp") {
420       int32_t target_workers = phase_1_best_workers[counter];
421       int32_t target_queue = phase_1_best_queue[counter];
422       RETURN_IF_NOT_OK(RequestNumWorkerChange(itr->id(), -1, &target_workers));
423       RETURN_IF_NOT_OK(RequestConnectorCapacityChange(itr->id(), -1, target_queue));
424       counter++;
425     }
426   }
427   return Status::OK();
428 }
429 
TrackPipelineTime()430 Status AutoTune::TrackPipelineTime() {
431   std::vector<int32_t> pipeline_times;
432   std::vector<int32_t> batch_times;
433   // Select early stop threshold based on running mode
434   int early_stop_threshold_mode;
435   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
436     RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_running_, &pipeline_times));
437     RETURN_IF_NOT_OK(profiling_manager_->GetBatchTimeByEpoch(cur_epoch_running_ - 1, &batch_times));
438     early_stop_threshold_mode = EARLY_STOP_TRIAL_THRESHOLD_EPOCH;
439   } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
440     RETURN_IF_NOT_OK(
441       profiling_manager_->GetPipelineTimeByStep(last_step_autotuned_, cur_step_running_ - 1, &pipeline_times));
442     RETURN_IF_NOT_OK(profiling_manager_->GetBatchTimeByStep(last_step_autotuned_, cur_step_running_ - 1, &batch_times));
443     early_stop_threshold_mode = EARLY_STOP_TRIAL_THRESHOLD_STEP;
444   }
445   double avg_time_pipeline = Mean(pipeline_times);
446   double avg_time_batch = Mean(batch_times);
447   (void)avg_pipeline_times_.push_back(avg_time_pipeline);
448   MS_LOG(INFO) << "Average Pipeline time is " << avg_time_pipeline << " ms. The avg pipeline time for all epochs is "
449                << Mean(avg_pipeline_times_) << "ms";
450   // Time phase (phase 1) improvement tracking
451   if (AT_phase_ == AutoTunePhase::kAutoTunePhaseTime) {
452     if (phase_1_best_time_ < 0) {
453       phase_1_best_time_ = avg_time_batch;  // set first value
454     } else if (avg_time_batch < phase_1_best_time_) {
455       phase_1_no_improve_count_ = 0;
456       phase_1_best_time_ = avg_time_batch;
457       // Trigger save process
458       if (AT_change_) {
459         AT_change_ = false;  // Reset for next analysis run
460         RETURN_IF_NOT_OK(RegisterWorkersQueue());
461       }
462     } else {
463       phase_1_no_improve_count_++;
464     }
465     if (phase_1_no_improve_count_ > early_stop_threshold_mode) {
466       // Reset best config and exit
467       AT_phase_ = AutoTunePhase::kAutoTunePhaseMemory;
468       RETURN_IF_NOT_OK(ResetWorkersQueue());
469     }
470   }
471   return Status::OK();
472 }
473 
RunIteration()474 Status AutoTune::RunIteration() {
475   RETURN_IF_NOT_OK(TrackPipelineTime());
476   if (AT_phase_ == AutoTunePhase::kAutoTunePhaseTime) {
477     RETURN_IF_NOT_OK(AnalyseTime());
478   } else if (AT_phase_ == AutoTunePhase::kAutoTunePhaseMemory) {
479     RETURN_IF_NOT_OK(AnalyseMemory());
480   }
481   return Status::OK();
482 }
483 
GetConnectorSize(std::vector<int32_t> * sizes) const484 Status AutoTune::GetConnectorSize(std::vector<int32_t> *sizes) const {
485   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
486     RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(cur_epoch_running_, sizes));
487   } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
488     RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByStep(last_step_autotuned_, cur_step_running_ - 1, sizes));
489   }
490   return Status::OK();
491 }
492 
GetConnectorCapacity(std::vector<int32_t> * capacities) const493 Status AutoTune::GetConnectorCapacity(std::vector<int32_t> *capacities) const {
494   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
495     RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByEpoch(cur_epoch_running_, capacities));
496   } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
497     RETURN_IF_NOT_OK(
498       profiling_manager_->GetConnectorCapacityByStep(last_step_autotuned_, cur_step_running_ - 1, capacities));
499   }
500   return Status::OK();
501 }
502 
GetConnectorUtil(double * usage_avg_last,double * avg_size,double * avg_capacity)503 Status AutoTune::GetConnectorUtil(double *usage_avg_last, double *avg_size, double *avg_capacity) {
504   std::vector<int32_t> sizes;
505   RETURN_IF_NOT_OK(GetConnectorSize(&sizes));
506   *avg_size = Mean(sizes);
507   std::vector<int32_t> capacities;
508   RETURN_IF_NOT_OK(GetConnectorCapacity(&capacities));
509   *avg_capacity = Mean(capacities);
510   CHECK_FAIL_RETURN_UNEXPECTED(*avg_capacity != 0.0, "Capacities of connectors should not be 0.0");
511   // size here means size of queue utilized
512   *usage_avg_last = (*avg_size / *avg_capacity);
513   return Status::OK();
514 }
515 
GetEmptyQueueFrequency(float * empty_freq) const516 Status AutoTune::GetEmptyQueueFrequency(float *empty_freq) const {
517   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
518     RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByEpoch(cur_epoch_running_, empty_freq));
519   } else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
520     RETURN_IF_NOT_OK(
521       profiling_manager_->GetEmptyQueueFrequencyByStep(last_step_autotuned_, cur_step_running_ - 1, empty_freq));
522   }
523   return Status::OK();
524 }
525 
IsDSaBottleneck(bool * isBottleneck)526 Status AutoTune::IsDSaBottleneck(bool *isBottleneck) {
527   double usage_avg_last, avg_size, avg_capacity;
528   RETURN_IF_NOT_OK(GetConnectorUtil(&usage_avg_last, &avg_size, &avg_capacity));
529   float empty_freq = 0;
530   RETURN_IF_NOT_OK(GetEmptyQueueFrequency(&empty_freq));
531   if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
532     MS_LOG(INFO) << "Step # " << cur_step_running_ << ". Status:";
533   } else {
534     MS_LOG(INFO) << "Epoch # " << cur_epoch_running_ << ". Status:";
535   }
536   // Reporting values
537   MS_LOG(INFO) << "Device Connector Size: " << avg_size << ", Connector Capacity: " << avg_capacity
538                << ", Utilization: " << (usage_avg_last * TO_PERCENT) << "%"
539                << ", Empty Freq: " << (empty_freq * TO_PERCENT) << "% ";
540   // Decision
541   if (usage_avg_last < DEVICE_CONNECTOR_UTIL_THRESHOLD) {
542     MS_LOG(INFO) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% < "
543                  << (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
544                  << "% threshold, dataset pipeline performance may benefit from tuning.";
545     *isBottleneck = true;
546   } else {
547     MS_LOG(INFO) << "Utilization: " << (usage_avg_last * TO_PERCENT) << "% > "
548                  << (DEVICE_CONNECTOR_UTIL_THRESHOLD * TO_PERCENT)
549                  << "% threshold, dataset pipeline performance is OK.";
550     *isBottleneck = false;
551   }
552   return Status::OK();
553 }
554 
RequestNumWorkerChange(int32_t op_id,int32_t old_workers,int32_t * num_workers_requested)555 Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested) {
556   AT_change_ = true;
557   int new_workers = std::min(*num_workers_requested, max_workers_);
558   new_workers = std::max(new_workers, MIN_NUM_WORKERS);
559   RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers)));
560   if (old_workers == -1) {
561     MS_LOG(INFO) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
562                  << "to value: [" << new_workers << "].";
563   } else {
564     MS_LOG(INFO) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
565                  << "From old value: [" << old_workers << "] to new value: [" << new_workers << "].";
566   }
567   *num_workers_requested = new_workers;
568   return Status::OK();
569 }
570 
RequestConnectorCapacityChange(int32_t op_id,int32_t old_size,int32_t new_size)571 Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size) {
572   AT_change_ = true;
573   new_size = std::min(new_size, MAX_QUEUE_SIZE);
574   new_size = std::max(new_size, MIN_QUEUE_SIZE);
575   RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ResizeConnectorRequest>(new_size)));
576   if (old_size == -1) {
577     MS_LOG(INFO) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID()
578                  << "to value: [" << new_size << "].";
579   } else {
580     MS_LOG(INFO) << "Added request to change \"prefetch_size\" of Operator: " << ops_[op_id]->NameWithID()
581                  << "From old value: [" << old_size << "] to new value: [" << new_size << "].";
582   }
583   return Status::OK();
584 }
585 
SkipOpsCheck(int op_id)586 bool AutoTune::SkipOpsCheck(int op_id) {
587   // Skip Generator op
588   if (ops_[op_id]->Name() == "GeneratorOp") {
589     return true;
590   }
591   //  NonMappableDataset is not supported in AutoTune
592 #ifndef ENABLE_ANDROID
593   if (std::dynamic_pointer_cast<NonMappableLeafOp>(ops_[op_id]) != nullptr) {
594     return true;
595   }
596 #endif
597   return false;
598 }
599 
AnalyseTime()600 Status AutoTune::AnalyseTime() {
601   // check for connector queue bottleneck
602   bool isBottleneck = false;
603   RETURN_IF_NOT_OK(IsDSaBottleneck(&isBottleneck));
604   if (!isBottleneck) {
605     return Status::OK();
606   }
607   // collect stats
608   std::map<int32_t, int32_t> ops_num_workers;
609   RETURN_IF_NOT_OK(GetOpsNumWorker(&ops_num_workers));
610   std::map<int32_t, double> out_ops_queue_util;
611   std::map<int32_t, double> in_ops_queue_util;
612   RETURN_IF_NOT_OK(GetOpsQueueUtil(&out_ops_queue_util, &in_ops_queue_util));
613   std::map<int32_t, double> ops_cpu_util;
614   RETURN_IF_NOT_OK(GetOpsCpuUtil(&ops_cpu_util));
615   // check parallel ops in loop
616   for (const auto &op_id : parallel_ops_ids_) {
617     if (SkipOpsCheck(op_id)) {
618       continue;
619     }
620     // op specifics
621     double output_queue_util = out_ops_queue_util[op_id];
622     double input_queue_util = in_ops_queue_util[op_id];
623     double cpu_util = ops_cpu_util[op_id];
624     int32_t num_workers = ops_num_workers[op_id];
625     CHECK_FAIL_RETURN_UNEXPECTED(num_workers != 0, "ParallelOp with num_workers=0");
626     // derived metrics
627     double queue_diff = input_queue_util - output_queue_util;
628     int64_t queue_capacity;
629     RETURN_IF_NOT_OK(GetOpConnectorCapacity(op_id, &queue_capacity));
630     int64_t new_queue_capacity = queue_capacity;
631     int32_t requested_workers = 0;
632     MS_LOG(DEBUG) << "Op (" << ops_[op_id]->NameWithID() << ") CPU=" << cpu_util / num_workers
633                   << ", in=" << input_queue_util << "out=" << output_queue_util;
634     // map decisions - queue
635     if (queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD) {
636       MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID()
637                    << ") is slow, input connector utilization=" << input_queue_util
638                    << ", output connector utilization=" << output_queue_util << ", diff= " << queue_diff << " > "
639                    << INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << " threshold.";
640       requested_workers = num_workers + INCREMENT_WORKER;
641       RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
642     } else if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
643       MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
644                    << (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
645       requested_workers = num_workers + INCREMENT_WORKER;
646       RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
647     }
648     if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
649         ((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
650       MS_LOG(INFO) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
651                    << (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
652       new_queue_capacity = queue_capacity + INCREMENT_QUEUE_SIZE;
653       if (requested_workers == 0) {
654         requested_workers = num_workers;
655       }
656       new_queue_capacity = std::max(new_queue_capacity, static_cast<int64_t>(requested_workers));
657       RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, new_queue_capacity));
658     }
659   }
660   return Status::OK();
661 }
662 
MemoryPhaseCompareMetric(double prev_avg,double cur_avg)663 bool AutoTune::MemoryPhaseCompareMetric(double prev_avg, double cur_avg) {
664   double lower_bound = prev_avg - (prev_avg * MEMORY_COMPARISON_LOWER_BOUND_PERCENT);
665   // If cur_avg worse than lower bound - negative impact on performance
666   // lower bound set to account for expected fluctuations in util numbers
667   if (cur_avg < lower_bound) {
668     return false;
669   } else {
670     return true;
671   }
672 }
673 
AnalyseMemory()674 Status AutoTune::AnalyseMemory() {
675   double prev_avg = 0;
676   double cur_avg = 0;
677   bool comp_flag = true;
678   double connector_avg_size;
679   double connector_avg_capacity;
680   int total = parallel_ops_ids_.size();
681   if (total == 0) {
682     AT_phase_ = AutoTunePhase::kAutoTuneEnd;
683     return Status::OK();
684   }
685   std::map<int32_t, int32_t> ops_num_workers;
686   RETURN_IF_NOT_OK(GetOpsNumWorker(&ops_num_workers));
687   double reduce_percent_mode;
688   // Decrease queue sizes faster in epoch mode
689   if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
690     reduce_percent_mode = QUEUE_REDUCTION_PERCENTAGE_EPOCH;
691   } else if (AutoTuneMode::kAutoTuneModeStep) {
692     reduce_percent_mode = QUEUE_REDUCTION_PERCENTAGE_STEP;
693   }
694   // Init state on first call
695   if (phase_3_state_ == AutoTuneMemPhase::kAutoTuneMemInit) {
696     count_down_ = 0;
697     for (auto op_id : parallel_ops_ids_) {
698       if ((SkipOpsCheck(op_id)) || (ops_[op_id]->Name() == "DataQueueOp")) {
699         // Op not supported - ignore throughout AT
700         (void)OP_values.push_back(-1);
701         continue;
702       }
703       // Op supported - attempt memory reduction on this
704       (void)OP_values.push_back(0);
705       count_down_++;
706     }
707     phase_3_state_ = AutoTuneMemPhase::kAutoTuneMemSet;
708     phase_3_ID_ = 0;
709   }
710 
711   // Exit when all viable ops have been tested
712   // Or if none found
713   if (count_down_ == 0) {
714     AT_phase_ = AutoTunePhase::kAutoTuneEnd;
715     return Status::OK();
716   }
717 
718   if (phase_3_state_ == AutoTuneMemPhase::kAutoTuneMemSet) {
719     RETURN_IF_NOT_OK(GetConnectorUtil(&phase_3_prev_avg_, &connector_avg_size, &connector_avg_capacity));
720     // Search for next viable op that can be tested
721     while (OP_values[phase_3_ID_] == -1) {
722       phase_3_ID_ = ((phase_3_ID_ + 1) % total);
723     }
724     int64_t current_queue_size;
725     RETURN_IF_NOT_OK(GetOpConnectorCapacity(parallel_ops_ids_[phase_3_ID_], &current_queue_size));
726     int op_workers = ops_num_workers[parallel_ops_ids_[phase_3_ID_]];
727     int target_mem = current_queue_size * reduce_percent_mode;
728     if (std::max(target_mem, op_workers) == op_workers) {
729       // Lower bound on range of queue size testing
730       OP_values[phase_3_ID_] = -1;
731       count_down_--;
732       target_mem = op_workers;
733     } else {
734       // Save current queue size for possible recovery and switch to compare mode
735       OP_values[phase_3_ID_] = current_queue_size;
736       phase_3_state_ = AutoTuneMemPhase::kAutotTuneMemCompare;
737     }
738     RequestConnectorCapacityChange(parallel_ops_ids_[phase_3_ID_], -1, target_mem);
739   } else if (phase_3_state_ == AutoTuneMemPhase::kAutotTuneMemCompare) {
740     // Analyse impact on model from previous change made
741     RETURN_IF_NOT_OK(GetConnectorUtil(&cur_avg, &connector_avg_size, &connector_avg_capacity));
742     prev_avg = phase_3_prev_avg_;
743     comp_flag = MemoryPhaseCompareMetric(prev_avg, cur_avg);
744     // Compare current avg against pre-change avg
745     if (comp_flag == false) {
746       int reset_val = OP_values[phase_3_ID_];
747       RequestConnectorCapacityChange(parallel_ops_ids_[phase_3_ID_], -1, reset_val);
748       // Op queue size reduction caused performance decrease - ignore onwards
749       OP_values[phase_3_ID_] = -1;
750       count_down_--;
751     }
752     phase_3_state_ = AutoTuneMemPhase::kAutoTuneMemSet;
753   }
754   return Status::OK();
755 }
756 }  // namespace dataset
757 }  // namespace mindspore
758