• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""
16Module to provide conversion capabalities from .timestamp async dump files to .npy.
17It's an internal module for debugger backend but not exposed to users.
18"""
19import os
20import glob
21import stat
22import sys
23from pathlib import Path
24from importlib import import_module
25from collections import namedtuple
26
27import numpy as np
28
29
30class ConvertToolLoader:
31    """
32    Module to load CANN conversion tool.
33    """
34
35    def __init__(self):
36        self.utils = None
37        self.common = None
38        self.dump_data_parser = None
39        self.format_conversion = None
40        self.progress = None
41        self.log = None
42        self.compare_none_error = None
43        self.compare_exception = None
44        self.toolkit_path = self.find_toolkit_path()
45        self.load_convert_tool()
46
47    @staticmethod
48    def find_toolkit_path():
49        """
50        Find the path to Ascend toolkit.
51        """
52        ascend_toolkit_path = os.getenv("ASCEND_TOOLKIT_PATH")
53        if not ascend_toolkit_path:
54            ascend_toolkit_path = "/usr/local/Ascend"
55        if not os.path.exists(ascend_toolkit_path):
56            raise ValueError(
57                "Path {} does not exist. Please install Ascend run packages " \
58                "and set the environment variable $ASCEND_TOOLKIT_PATH correctly.".format(ascend_toolkit_path))
59        toolkit_search_path = Path(ascend_toolkit_path).resolve()
60        msaccucmp_file_list = list(toolkit_search_path.rglob('msaccucmp.py*'))
61        if not msaccucmp_file_list:
62            toolkit_search_path = toolkit_search_path / 'tools'
63            msaccucmp_file_list = list(toolkit_search_path.rglob('msaccucmp.py*'))
64        if not msaccucmp_file_list:
65            raise ValueError("Failed to find msaccucmp.py or msaccucmp.pyc file under {}. " \
66                             "Please install Ascend toolkit.".format(ascend_toolkit_path))
67        return msaccucmp_file_list[0].parent
68
69    def load_convert_tool(self):
70        """
71        Load CANN conversion tool from the toolkit path.
72        """
73        # add toolkit path to system searching module path
74        if str(self.toolkit_path) not in sys.path:
75            sys.path.insert(0, str(self.toolkit_path))
76        try:
77            self.utils = import_module('utils')
78            self.common = import_module('common')
79            self.dump_data_parser = import_module(
80                'dump_data_parser').DumpDataParser
81            self.format_conversion = import_module(
82                'shape_conversion').FormatConversionMain
83        except ModuleNotFoundError:
84            self.reset_system_path()
85            raise ModuleNotFoundError(
86                "Failed to load CANN conversion tools under {}. Please make sure Ascend " \
87                "toolkit has been installed properly.".format(self.toolkit_path))
88
89        try:
90            self.progress = import_module("progress").Progress
91        except (ModuleNotFoundError, AttributeError):
92            self.progress = self.utils.Progress
93        try:
94            self.log = import_module("log")
95            if not hasattr(self.log, "print_error_log"):
96                raise ModuleNotFoundError
97        except ModuleNotFoundError:
98            self.log = self.utils
99        try:
100            compare_error = import_module("compare_error")
101            self.compare_none_error = compare_error.CompareError.MSACCUCMP_NONE_ERROR
102            self.compare_exception = compare_error.CompareError
103        except ModuleNotFoundError:
104            self.compare_none_error = self.utils.VECTOR_COMPARISON_NONE_ERROR
105            self.compare_exception = self.utils.CompareError
106
107    def reset_system_path(self):
108        """
109        Restore system searching module path
110        """
111        if str(self.toolkit_path) in sys.path:
112            sys.path.remove(str(self.toolkit_path))
113
114
115def parse_args(file_list, output_path):
116    """
117    Helper function to parse the input argument for the conversion configuration.
118    """
119    args_dict = dict()
120    args_dict['dump_version'] = '2.0'
121    args_dict['format'] = 'NCHW'
122    args_dict['output_file_type'] = 'npy'
123    args_dict['dump_path'] = output_path
124    args_dict['output_path'] = output_path
125    args_dict['file_list'] = file_list
126    args_dict['input'] = None
127    args_dict['output'] = None
128    args_dict['shape'] = None
129    args_dict['custom_script_path'] = None
130    args_parser = namedtuple("args_parser", args_dict.keys())
131    return args_parser(**args_dict)
132
133
134class AsyncDumpConverter:
135    """
136    Convert the target async dump data into npy files.
137    """
138
139    def __init__(self, file_list, output_path):
140        # check input path
141        file_list = [os.path.realpath(file_item) for file_item in file_list]
142        output_path = os.path.realpath(output_path)
143
144        self.convert_tool = ConvertToolLoader()
145        self.args = parse_args(file_list, output_path)
146        self.files_to_convert = self.args.file_list
147        self.output_path = self.args.output_path
148        self.failed_file_path = os.path.join(
149            self.output_path, 'convert_failed_file_list.txt')
150        self.clear_failed_list_file()
151
152    def clear_failed_list_file(self):
153        """
154        Remove existing failed txt file.
155        """
156        if self.failed_file_path and os.path.exists(self.failed_file_path):
157            os.remove(self.failed_file_path)
158
159    def convert_files(self):
160        """
161        Main entry of the converter to convert async dump files into npy format.
162        """
163        self.convert_tool.log.print_info_log('Start to convert async dump files.')
164        try:
165            if self.args.format is not None:
166                convert = self.convert_tool.format_conversion(self.args)
167            else:
168                convert = self.convert_tool.dump_data_parser(self.args)
169            # 1. check if arguments are valid
170            convert.check_arguments_valid()
171            # 2. convert format for dump data
172            ret_code = self.handle_multi_process(convert, self.files_to_convert)
173            self._rename_generated_npy_files()
174            if ret_code != self.convert_tool.compare_none_error:
175                if os.path.exists(self.failed_file_path):
176                    self.convert_failed_tensors()
177        finally:
178            # clean up sys.path no matter conversion is successful or not to avoid pollution
179            self.convert_tool.reset_system_path()
180        self.convert_tool.log.print_info_log('Finish to convert async dump files.')
181
182    def convert_failed_tensors(self):
183        """
184        Convert the failed tensor recorded in the failed txt file.
185        """
186        self.convert_tool.log.print_info_log(
187            'Start to convert failed tensors recorded in ' + self.failed_file_path + '.')
188        with open(self.failed_file_path) as failed_lines:
189            for failed_line in failed_lines:
190                try:
191                    failed_line_list = failed_line.rstrip().split(',')
192                    self.convert_one_failed_tensor(failed_line_list)
193                except (ValueError, OSError, AttributeError, self.convert_tool.compare_exception) as err:
194                    self.convert_tool.log.print_error_log(
195                        'Failed to convert ' + failed_line + ' to Host format: ' + str(err))
196
197    def convert_one_failed_tensor(self, failed_tensor):
198        """
199        Convert failed operator one by one.
200        """
201        if len(failed_tensor) <= 1:
202            raise ValueError(
203                "Invalid tensor info in convert_failed_file_list.txt")
204        file_path = failed_tensor[0]
205        type_index = failed_tensor[1:]
206        op_data = self.convert_tool.utils.parse_dump_file(
207            file_path, self.args.dump_version)
208        for type_index_item in type_index:
209            tensor_type, index = type_index_item.split(':')
210            index = int(index)
211            tensor = getattr(op_data, tensor_type)[index]
212            dump_data_array = self.convert_tool.utils.deserialize_dump_data_to_array(tensor)
213            array = dump_data_array.reshape(tensor.shape.dim)
214            out_path = self._generate_path(file_path, tensor_type, index, tensor.format)
215            self._save_tensor_to_npy_file(out_path, array)
216
217    def handle_multi_process(self, convert_obj, files):
218        """
219        Convert async format files to npy in a multithreaded manner.
220        """
221        return_code = self.convert_tool.compare_none_error
222        # try looking for function in compatibility with the toolkit package version.
223        progress = self.convert_tool.progress(len(files))
224        if hasattr(convert_obj, 'multi_process'):
225            setattr(convert_obj.multi_process, '_progress', progress)
226        else:
227            setattr(convert_obj, 'progress', progress)
228        multi_process_file_list, big_file_list = self._get_file_list(files, convert_obj)
229        if multi_process_file_list:
230            if hasattr(convert_obj, 'multi_process'):
231                ret_mp = getattr(convert_obj.multi_process, '_do_multi_process')(multi_process_file_list)
232            else:
233                ret_mp = getattr(convert_obj, '_do_multi_process')(multi_process_file_list)
234            if ret_mp != self.convert_tool.compare_none_error:
235                return_code = ret_mp
236        if big_file_list:
237            ret_bf = self._process_big_file(big_file_list, convert_obj)
238            if ret_bf != self.convert_tool.compare_none_error:
239                return_code = ret_bf
240        if return_code != self.convert_tool.compare_none_error:
241            if os.path.exists(self.failed_file_path):
242                self.convert_tool.log.print_info_log(
243                    'The list of file that failed to convert has been written to "'
244                    + self.failed_file_path + '".')
245        return return_code
246
247    def _get_file_list(self, files, convert_obj):
248        """
249        Process to get file lists in multi_process.
250        """
251        multi_process_file_list = []
252        big_file_list = []
253        if hasattr(convert_obj, 'multi_process'):
254            max_file_size = getattr(convert_obj.multi_process, 'get_max_file_size')()
255        else:
256            max_file_size = getattr(convert_obj, '_get_max_file_size')()
257        for cur_file in files:
258            cur_path = cur_file
259            if os.path.isfile(cur_path):
260                if os.path.getsize(cur_path) > max_file_size:
261                    big_file_list.append(cur_path)
262                else:
263                    multi_process_file_list.append(cur_path)
264        return multi_process_file_list, big_file_list
265
266    def _process_big_file(self, big_file_list, convert_obj):
267        """
268        Process big file in multi_process.
269        """
270        return_code = self.convert_tool.compare_none_error
271        for big_file in big_file_list:
272            if hasattr(convert_obj, '_convert_format_for_one_file'):
273                ret_bf, _ = getattr(convert_obj, '_convert_format_for_one_file')(big_file)
274            else:
275                ret_bf, _ = getattr(convert_obj, 'convert_format_for_one_file')(big_file)
276            if hasattr(convert_obj, 'multi_process'):
277                getattr(convert_obj.multi_process, '_handle_result_callback')([ret_bf, big_file])
278            else:
279                getattr(convert_obj, '_handle_result_callback')([ret_bf, big_file])
280            if ret_bf != self.convert_tool.compare_none_error:
281                return_code = ret_bf
282        return return_code
283
284    @staticmethod
285    def _save_tensor_to_npy_file(out_path, dump_data_array):
286        """
287        Save tensor file into npy format.
288        """
289        np.save(out_path, dump_data_array)
290        os.chmod(out_path, stat.S_IRUSR)
291
292    def _generate_path(self, file_path, tensor_type, idx, tensor_format):
293        """
294        Generate path and filename to the target npy files
295        """
296        file_name = os.path.basename(file_path)
297        name_splits = file_name.split('.')
298        name_splits[1] = name_splits[1].split('_')[-1]
299        file_name_no_scope = '.'.join(name_splits)
300        out_file_name = "%s.%s.%d.%s.npy" % (
301            file_name_no_scope,
302            tensor_type,
303            idx,
304            self.convert_tool.common.get_format_string(tensor_format)
305        )
306        return os.path.join(self.output_path, out_file_name)
307
308    def _rename_generated_npy_files(self):
309        """
310        In order to follow dump naming convention, rename npy files generated by CANN conversion tool.
311        """
312        target_file_list = []
313        for in_file in self.files_to_convert:
314            target_file_list.extend(glob.glob(in_file + "*.npy"))
315        for target_file in target_file_list:
316            old_filename = os.path.basename(target_file)
317            name_splits = old_filename.split('.')
318            name_splits[1] = name_splits[1].split('_')[-1]
319            name_splits[-2] = self.args.format
320            new_file_name = '.'.join(name_splits)
321            out_path = os.path.join(self.output_path, new_file_name)
322            os.rename(target_file, out_path)
323            os.chmod(out_path, stat.S_IRUSR)
324            self.convert_tool.log.print_info_log("Rename file " + target_file + " to " + out_path)
325