1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15""" 16Module to provide conversion capabalities from .timestamp async dump files to .npy. 17It's an internal module for debugger backend but not exposed to users. 18""" 19import os 20import glob 21import stat 22import sys 23from pathlib import Path 24from importlib import import_module 25from collections import namedtuple 26 27import numpy as np 28 29 30class ConvertToolLoader: 31 """ 32 Module to load CANN conversion tool. 33 """ 34 35 def __init__(self): 36 self.utils = None 37 self.common = None 38 self.dump_data_parser = None 39 self.format_conversion = None 40 self.progress = None 41 self.log = None 42 self.compare_none_error = None 43 self.compare_exception = None 44 self.toolkit_path = self.find_toolkit_path() 45 self.load_convert_tool() 46 47 @staticmethod 48 def find_toolkit_path(): 49 """ 50 Find the path to Ascend toolkit. 51 """ 52 ascend_toolkit_path = os.getenv("ASCEND_TOOLKIT_PATH") 53 if not ascend_toolkit_path: 54 ascend_toolkit_path = "/usr/local/Ascend" 55 if not os.path.exists(ascend_toolkit_path): 56 raise ValueError( 57 "Path {} does not exist. Please install Ascend run packages " \ 58 "and set the environment variable $ASCEND_TOOLKIT_PATH correctly.".format(ascend_toolkit_path)) 59 toolkit_search_path = Path(ascend_toolkit_path).resolve() 60 msaccucmp_file_list = list(toolkit_search_path.rglob('msaccucmp.py*')) 61 if not msaccucmp_file_list: 62 toolkit_search_path = toolkit_search_path / 'tools' 63 msaccucmp_file_list = list(toolkit_search_path.rglob('msaccucmp.py*')) 64 if not msaccucmp_file_list: 65 raise ValueError("Failed to find msaccucmp.py or msaccucmp.pyc file under {}. " \ 66 "Please install Ascend toolkit.".format(ascend_toolkit_path)) 67 return msaccucmp_file_list[0].parent 68 69 def load_convert_tool(self): 70 """ 71 Load CANN conversion tool from the toolkit path. 72 """ 73 # add toolkit path to system searching module path 74 if str(self.toolkit_path) not in sys.path: 75 sys.path.insert(0, str(self.toolkit_path)) 76 try: 77 self.utils = import_module('utils') 78 self.common = import_module('common') 79 self.dump_data_parser = import_module( 80 'dump_data_parser').DumpDataParser 81 self.format_conversion = import_module( 82 'shape_conversion').FormatConversionMain 83 except ModuleNotFoundError: 84 self.reset_system_path() 85 raise ModuleNotFoundError( 86 "Failed to load CANN conversion tools under {}. Please make sure Ascend " \ 87 "toolkit has been installed properly.".format(self.toolkit_path)) 88 89 try: 90 self.progress = import_module("progress").Progress 91 except (ModuleNotFoundError, AttributeError): 92 self.progress = self.utils.Progress 93 try: 94 self.log = import_module("log") 95 if not hasattr(self.log, "print_error_log"): 96 raise ModuleNotFoundError 97 except ModuleNotFoundError: 98 self.log = self.utils 99 try: 100 compare_error = import_module("compare_error") 101 self.compare_none_error = compare_error.CompareError.MSACCUCMP_NONE_ERROR 102 self.compare_exception = compare_error.CompareError 103 except ModuleNotFoundError: 104 self.compare_none_error = self.utils.VECTOR_COMPARISON_NONE_ERROR 105 self.compare_exception = self.utils.CompareError 106 107 def reset_system_path(self): 108 """ 109 Restore system searching module path 110 """ 111 if str(self.toolkit_path) in sys.path: 112 sys.path.remove(str(self.toolkit_path)) 113 114 115def parse_args(file_list, output_path): 116 """ 117 Helper function to parse the input argument for the conversion configuration. 118 """ 119 args_dict = dict() 120 args_dict['dump_version'] = '2.0' 121 args_dict['format'] = 'NCHW' 122 args_dict['output_file_type'] = 'npy' 123 args_dict['dump_path'] = output_path 124 args_dict['output_path'] = output_path 125 args_dict['file_list'] = file_list 126 args_dict['input'] = None 127 args_dict['output'] = None 128 args_dict['shape'] = None 129 args_dict['custom_script_path'] = None 130 args_parser = namedtuple("args_parser", args_dict.keys()) 131 return args_parser(**args_dict) 132 133 134class AsyncDumpConverter: 135 """ 136 Convert the target async dump data into npy files. 137 """ 138 139 def __init__(self, file_list, output_path): 140 # check input path 141 file_list = [os.path.realpath(file_item) for file_item in file_list] 142 output_path = os.path.realpath(output_path) 143 144 self.convert_tool = ConvertToolLoader() 145 self.args = parse_args(file_list, output_path) 146 self.files_to_convert = self.args.file_list 147 self.output_path = self.args.output_path 148 self.failed_file_path = os.path.join( 149 self.output_path, 'convert_failed_file_list.txt') 150 self.clear_failed_list_file() 151 152 def clear_failed_list_file(self): 153 """ 154 Remove existing failed txt file. 155 """ 156 if self.failed_file_path and os.path.exists(self.failed_file_path): 157 os.remove(self.failed_file_path) 158 159 def convert_files(self): 160 """ 161 Main entry of the converter to convert async dump files into npy format. 162 """ 163 self.convert_tool.log.print_info_log('Start to convert async dump files.') 164 try: 165 if self.args.format is not None: 166 convert = self.convert_tool.format_conversion(self.args) 167 else: 168 convert = self.convert_tool.dump_data_parser(self.args) 169 # 1. check if arguments are valid 170 convert.check_arguments_valid() 171 # 2. convert format for dump data 172 ret_code = self.handle_multi_process(convert, self.files_to_convert) 173 self._rename_generated_npy_files() 174 if ret_code != self.convert_tool.compare_none_error: 175 if os.path.exists(self.failed_file_path): 176 self.convert_failed_tensors() 177 finally: 178 # clean up sys.path no matter conversion is successful or not to avoid pollution 179 self.convert_tool.reset_system_path() 180 self.convert_tool.log.print_info_log('Finish to convert async dump files.') 181 182 def convert_failed_tensors(self): 183 """ 184 Convert the failed tensor recorded in the failed txt file. 185 """ 186 self.convert_tool.log.print_info_log( 187 'Start to convert failed tensors recorded in ' + self.failed_file_path + '.') 188 with open(self.failed_file_path) as failed_lines: 189 for failed_line in failed_lines: 190 try: 191 failed_line_list = failed_line.rstrip().split(',') 192 self.convert_one_failed_tensor(failed_line_list) 193 except (ValueError, OSError, AttributeError, self.convert_tool.compare_exception) as err: 194 self.convert_tool.log.print_error_log( 195 'Failed to convert ' + failed_line + ' to Host format: ' + str(err)) 196 197 def convert_one_failed_tensor(self, failed_tensor): 198 """ 199 Convert failed operator one by one. 200 """ 201 if len(failed_tensor) <= 1: 202 raise ValueError( 203 "Invalid tensor info in convert_failed_file_list.txt") 204 file_path = failed_tensor[0] 205 type_index = failed_tensor[1:] 206 op_data = self.convert_tool.utils.parse_dump_file( 207 file_path, self.args.dump_version) 208 for type_index_item in type_index: 209 tensor_type, index = type_index_item.split(':') 210 index = int(index) 211 tensor = getattr(op_data, tensor_type)[index] 212 dump_data_array = self.convert_tool.utils.deserialize_dump_data_to_array(tensor) 213 array = dump_data_array.reshape(tensor.shape.dim) 214 out_path = self._generate_path(file_path, tensor_type, index, tensor.format) 215 self._save_tensor_to_npy_file(out_path, array) 216 217 def handle_multi_process(self, convert_obj, files): 218 """ 219 Convert async format files to npy in a multithreaded manner. 220 """ 221 return_code = self.convert_tool.compare_none_error 222 # try looking for function in compatibility with the toolkit package version. 223 progress = self.convert_tool.progress(len(files)) 224 if hasattr(convert_obj, 'multi_process'): 225 setattr(convert_obj.multi_process, '_progress', progress) 226 else: 227 setattr(convert_obj, 'progress', progress) 228 multi_process_file_list, big_file_list = self._get_file_list(files, convert_obj) 229 if multi_process_file_list: 230 if hasattr(convert_obj, 'multi_process'): 231 ret_mp = getattr(convert_obj.multi_process, '_do_multi_process')(multi_process_file_list) 232 else: 233 ret_mp = getattr(convert_obj, '_do_multi_process')(multi_process_file_list) 234 if ret_mp != self.convert_tool.compare_none_error: 235 return_code = ret_mp 236 if big_file_list: 237 ret_bf = self._process_big_file(big_file_list, convert_obj) 238 if ret_bf != self.convert_tool.compare_none_error: 239 return_code = ret_bf 240 if return_code != self.convert_tool.compare_none_error: 241 if os.path.exists(self.failed_file_path): 242 self.convert_tool.log.print_info_log( 243 'The list of file that failed to convert has been written to "' 244 + self.failed_file_path + '".') 245 return return_code 246 247 def _get_file_list(self, files, convert_obj): 248 """ 249 Process to get file lists in multi_process. 250 """ 251 multi_process_file_list = [] 252 big_file_list = [] 253 if hasattr(convert_obj, 'multi_process'): 254 max_file_size = getattr(convert_obj.multi_process, 'get_max_file_size')() 255 else: 256 max_file_size = getattr(convert_obj, '_get_max_file_size')() 257 for cur_file in files: 258 cur_path = cur_file 259 if os.path.isfile(cur_path): 260 if os.path.getsize(cur_path) > max_file_size: 261 big_file_list.append(cur_path) 262 else: 263 multi_process_file_list.append(cur_path) 264 return multi_process_file_list, big_file_list 265 266 def _process_big_file(self, big_file_list, convert_obj): 267 """ 268 Process big file in multi_process. 269 """ 270 return_code = self.convert_tool.compare_none_error 271 for big_file in big_file_list: 272 if hasattr(convert_obj, '_convert_format_for_one_file'): 273 ret_bf, _ = getattr(convert_obj, '_convert_format_for_one_file')(big_file) 274 else: 275 ret_bf, _ = getattr(convert_obj, 'convert_format_for_one_file')(big_file) 276 if hasattr(convert_obj, 'multi_process'): 277 getattr(convert_obj.multi_process, '_handle_result_callback')([ret_bf, big_file]) 278 else: 279 getattr(convert_obj, '_handle_result_callback')([ret_bf, big_file]) 280 if ret_bf != self.convert_tool.compare_none_error: 281 return_code = ret_bf 282 return return_code 283 284 @staticmethod 285 def _save_tensor_to_npy_file(out_path, dump_data_array): 286 """ 287 Save tensor file into npy format. 288 """ 289 np.save(out_path, dump_data_array) 290 os.chmod(out_path, stat.S_IRUSR) 291 292 def _generate_path(self, file_path, tensor_type, idx, tensor_format): 293 """ 294 Generate path and filename to the target npy files 295 """ 296 file_name = os.path.basename(file_path) 297 name_splits = file_name.split('.') 298 name_splits[1] = name_splits[1].split('_')[-1] 299 file_name_no_scope = '.'.join(name_splits) 300 out_file_name = "%s.%s.%d.%s.npy" % ( 301 file_name_no_scope, 302 tensor_type, 303 idx, 304 self.convert_tool.common.get_format_string(tensor_format) 305 ) 306 return os.path.join(self.output_path, out_file_name) 307 308 def _rename_generated_npy_files(self): 309 """ 310 In order to follow dump naming convention, rename npy files generated by CANN conversion tool. 311 """ 312 target_file_list = [] 313 for in_file in self.files_to_convert: 314 target_file_list.extend(glob.glob(in_file + "*.npy")) 315 for target_file in target_file_list: 316 old_filename = os.path.basename(target_file) 317 name_splits = old_filename.split('.') 318 name_splits[1] = name_splits[1].split('_')[-1] 319 name_splits[-2] = self.args.format 320 new_file_name = '.'.join(name_splits) 321 out_path = os.path.join(self.output_path, new_file_name) 322 os.rename(target_file, out_path) 323 os.chmod(out_path, stat.S_IRUSR) 324 self.convert_tool.log.print_info_log("Rename file " + target_file + " to " + out_path) 325