1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15""" 16The parser for AI CPU preprocess data. 17""" 18import os 19import stat 20 21from mindspore.profiler.common.util import fwrite_format, get_file_join_name 22from mindspore import log as logger 23 24 25class DataPreProcessParser: 26 """ 27 The Parser for AI CPU preprocess data. 28 29 Args: 30 input_path(str): The profiling job path. 31 output_filename(str): The output data path and name. 32 33 """ 34 _source_file_target_old = 'DATA_PREPROCESS.dev.AICPU.' 35 _source_file_target = 'DATA_PREPROCESS.AICPU.' 36 _dst_file_title = 'title:DATA_PREPROCESS AICPU' 37 _dst_file_column_title = ['serial_number', 'node_type_name', 'total_time(ms)', 38 'dispatch_time(ms)', 'execution_time(ms)', 'run_start', 39 'run_end'] 40 _ms_unit = 1000 41 42 def __init__(self, input_path, output_filename): 43 self._input_path = input_path 44 self._output_filename = output_filename 45 self._source_file_name = self._get_source_file() 46 self._ms_kernel_flag = 3 47 self._other_kernel_flag = 6 48 self._ms_kernel_run_end_index = 2 49 self._other_kernel_run_end_index = 5 50 self._dispatch_time_index = 5 51 self._total_time_index = 6 52 self._result_list = [] 53 self._min_cycle_counter = float('inf') 54 55 def _get_source_file(self): 56 """Get log file name, which was created by ada service.""" 57 file_name = get_file_join_name(self._input_path, self._source_file_target) 58 if not file_name: 59 file_name = get_file_join_name(self._input_path, self._source_file_target_old) 60 if not file_name: 61 data_path = os.path.join(self._input_path, "data") 62 file_name = get_file_join_name(data_path, self._source_file_target) 63 if not file_name: 64 file_name = get_file_join_name(data_path, self._source_file_target_old) 65 return file_name 66 67 def _get_kernel_result(self, number, node_list, thread_list): 68 """Get the profiling data form different aicpu kernel""" 69 try: 70 if len(node_list) == self._ms_kernel_flag: 71 node_type_name = node_list[0].split(':')[-1] 72 run_end_index = self._ms_kernel_run_end_index 73 elif len(node_list) == self._other_kernel_flag: 74 node_type_name = node_list[0].split(':')[-1].split('/')[-1].split('-')[0] 75 run_end_index = self._other_kernel_run_end_index 76 else: 77 logger.warning("the data format can't support 'node_list':%s", str(node_list)) 78 return None 79 80 us_unit = 100 # Convert 10ns to 1us. 81 run_start_counter = float(node_list[1].split(':')[-1].split(' ')[1]) / us_unit 82 run_end_counter = float(node_list[run_end_index].split(':')[-1].split(' ')[1]) / us_unit 83 run_start = node_list[1].split(':')[-1].split(' ')[0] 84 run_end = node_list[run_end_index].split(':')[-1].split(' ')[0] 85 exe_time = (float(run_end) - float(run_start)) / self._ms_unit 86 total_time = float(thread_list[self._total_time_index].split('=')[-1].split()[0]) / self._ms_unit 87 dispatch_time = float(thread_list[self._dispatch_time_index].split('=')[-1].split()[0]) / self._ms_unit 88 89 return [number, node_type_name, total_time, dispatch_time, exe_time, 90 run_start_counter, run_end_counter] 91 except IndexError as e: 92 logger.error(e) 93 return None 94 95 def execute(self): 96 """Execute the parser, get result data, and write it to the output file.""" 97 98 if not os.path.exists(self._source_file_name): 99 logger.info("Did not find the aicpu profiling source file") 100 return 101 102 with open(self._source_file_name, 'rb') as ai_cpu_data: 103 ai_cpu_str = str(ai_cpu_data.read().replace(b'\n\x00', b' ___ ') 104 .replace(b'\x00', b' ___ '))[2:-1] 105 ai_cpu_lines = ai_cpu_str.split(" ___ ") 106 os.chmod(self._source_file_name, stat.S_IREAD | stat.S_IWRITE) 107 result_list = list() 108 ai_cpu_total_time_summary = 0 109 # Node serial number. 110 serial_number = 1 111 for i in range(len(ai_cpu_lines) - 1): 112 node_line = ai_cpu_lines[i] 113 thread_line = ai_cpu_lines[i + 1] 114 if "Node" in node_line and "Thread" in thread_line: 115 # Get the node data from node_line 116 result = self._get_kernel_result( 117 serial_number, 118 node_line.split(','), 119 thread_line.split(',') 120 ) 121 122 if result is None: 123 continue 124 125 result_list.append(result) 126 # Calculate the total time. 127 total_time = result[2] 128 ai_cpu_total_time_summary += total_time 129 # Increase node serial number. 130 serial_number += 1 131 elif "Node" in node_line and "Thread" not in thread_line: 132 node_type_name = node_line.split(',')[0].split(':')[-1] 133 logger.warning("The node type:%s cannot find thread data", node_type_name) 134 135 if result_list: 136 ai_cpu_total_time = format(ai_cpu_total_time_summary, '.6f') 137 result_list.append(["AI CPU Total Time(ms):", ai_cpu_total_time]) 138 fwrite_format(self._output_filename, " ".join(self._dst_file_column_title), is_start=True, is_print=True) 139 fwrite_format(self._output_filename, result_list, is_print=True) 140 141 # For timeline display. 142 self._result_list = result_list 143 144 def query_aicpu_data(self): 145 """ 146 Get execution time of AI CPU operator. 147 148 Returns: 149 a dict, the metadata of AI CPU operator execution time. 150 """ 151 stream_id = 0 # Default stream id for AI CPU. 152 pid = 9000 # Default pid for AI CPU. 153 total_time = 0 154 min_cycle_counter = float('inf') 155 aicpu_info = [] 156 op_count_list = [] 157 for aicpu_item in self._result_list: 158 if "AI CPU Total Time(ms):" in aicpu_item: 159 total_time = aicpu_item[-1] 160 continue 161 162 op_name = aicpu_item[1] 163 start_time = float(aicpu_item[5]) / self._ms_unit 164 min_cycle_counter = min(min_cycle_counter, start_time) 165 duration = aicpu_item[4] 166 aicpu_info.append([op_name, stream_id, start_time, duration, pid]) 167 168 # Record the number of operator types. 169 if op_name not in op_count_list: 170 op_count_list.append(op_name) 171 172 self._min_cycle_counter = min_cycle_counter 173 aicpu_dict = { 174 'info': aicpu_info, 175 'total_time': float(total_time), 176 'op_exe_times': len(aicpu_info), 177 'num_of_ops': len(op_count_list), 178 'num_of_streams': 1 179 } 180 181 return aicpu_dict 182 183 @property 184 def min_cycle_counter(self): 185 """Get minimum cycle counter in AI CPU.""" 186 return self._min_cycle_counter 187