1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""The parser for step trace data.""" 16import csv 17import json 18import os 19import stat 20import struct 21from collections import namedtuple 22from decimal import Decimal 23from abc import abstractmethod 24 25from mindspore.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \ 26 ProfilerIOException, ProfilerRawFileException 27from mindspore import log 28from mindspore.profiler.common.util import get_summary_for_step_trace 29from mindspore.profiler.common.validator.validate_path import \ 30 validate_and_normalize_path 31 32ProfilingHeadStruct = namedtuple( 33 'ProfilingHeadStruct', ['mode', 'rptType', 'bufSize'] 34) 35 36StepTraceStruct = namedtuple( 37 'StepTraceStruct', ['timeStamp', 'index_id', 'model_id', 'stream_id', 'task_id', 'tag_id'] 38) 39 40 41class BaseStepTraceParser: 42 """ 43 The parser for step trace data. 44 45 Args: 46 input_dir (str): The directory that contains original step trace data. 47 output_file_path (str): The output file path. 48 job_id (int): The job id used to define the start of new step. Default: 0. 49 skip_first_step (bool): Whether skip the first step or not. 50 is_training_mode (bool): Whether in training mode or not. 51 is_gpu_kernel_async_launch (bool): Whether is gpu kernel async launch or not. 52 """ 53 54 def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False, 55 is_training_mode=True, is_gpu_kernel_async_launch=False): 56 self._input_dir = input_dir 57 self._output_path = output_file_path 58 self._job_id = job_id 59 self._skip_first_step = skip_first_step 60 self._result = [] 61 self._header = [] 62 self._step_num = 0 63 self._tag_map = {} 64 self._is_training_mode = is_training_mode 65 self._step_end_tag_id = 4 66 self._is_gpu_kernel_async_launch = is_gpu_kernel_async_launch 67 self._model_start_tag_id = 0 68 self._model_end_tag_id = 1 69 self._fp_tag_id = 2 70 self._bp_tag_id = 3 71 self._reduce_min_tag_id = 10000 72 self._reduce_max_tag_id = 20000 73 self._profiling_head_len = 4 74 self._profiling_head_pad_len = 4 75 self._st_data_len = 8 + 8 + 8 + 2 + 2 + 2 76 77 @property 78 def output_file(self): 79 """The property of step trace header.""" 80 file_name = self._output_path.rsplit('/', 2) 81 return file_name[-1] if len(file_name) == 3 else '' 82 83 def show(self): 84 """The property of step trace info.""" 85 summary_info = {} 86 if self._result: 87 summary_info = get_summary_for_step_trace(self._result[-1], self._header, self._is_training_mode) 88 summary_info['total_steps'] = len(self._result) - 1 89 print('\nStep trace summary info (unit: syscnt):') 90 print(summary_info) 91 print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s' 92 % self.output_file) 93 94 def parse_and_save(self): 95 """Parse step trace files and save the result.""" 96 try: 97 source_files = self._get_step_trace_files() 98 if self._is_gpu_kernel_async_launch: 99 self._parse_async_launch(source_files) 100 else: 101 self._parse(source_files) 102 self._save() 103 except IOError as err: 104 log.warning(err) 105 raise ProfilerIOException() 106 else: 107 log.info("Finish to save intermediate result for step trace file.") 108 109 def record_point_info(self, point_info, output_path): 110 """ 111 Record point info into json. 112 113 Args: 114 point_info (dict): The point info about tag id and relative op name. 115 output_path (str): The output path for saving point info. 116 117 Returns: 118 dict, parsed point info. 119 """ 120 121 def update_tag_op_type_map(self, point_info): 122 """ 123 update the map from tag id to op type. 124 125 Args: 126 point_info (dict): The point info about tag id and relative op name. 127 """ 128 self._get_step_trace_files() 129 tag_map = {} 130 for tag, op_name in point_info.items(): 131 op_type = self._get_op_type(tag, op_name) 132 tag_map[tag] = op_type 133 log.info("Get tag types for step trace analysis: %s", tag_map) 134 self._tag_map = tag_map 135 136 def _get_op_type(self, tag, name): 137 """ 138 Get op type from tag and name. 139 140 Args: 141 tag (int): The tag id. 142 name (str): The op name. 143 144 Returns: 145 str, the op type or communication op name. 146 """ 147 tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._step_end_tag_id: 'end'} 148 # get solid tag type 149 op_type = tag_map.get(tag, '') 150 if op_type: 151 return op_type 152 # check if the tag is step tag. 153 if tag == 0: 154 return 'start' 155 # analyze the reduce tag 156 op_name = name.rsplit('/', 1)[-1] 157 if not op_name: 158 log.warning("Unexpected op name:%s", name) 159 160 return op_name 161 162 def _get_step_trace_files(self): 163 """Get step trace files.""" 164 return self._input_dir 165 166 @staticmethod 167 def _search_file(input_dir): 168 """Search step trace file under specific input directory.""" 169 # validate input_dir 170 if not os.path.isdir(input_dir): 171 raise ProfilerPathErrorException( 172 '{} does not exist or is not a dir'.format(input_dir) 173 ) 174 # get step trace files 175 files = os.listdir(input_dir) 176 step_trace_files = list( 177 filter( 178 lambda file: file.startswith('ts_track.data') and not file.endswith('.done'), 179 files 180 ) 181 ) 182 # validate result 183 if len(step_trace_files) > 1: 184 # the format of file name is like 185 # `training_trace.46.dev.profiler_default_tag.$id.slice_$number` 186 # use the $number as the sorted key 187 try: 188 step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1])) 189 except ValueError as err: 190 log.warning("Unable to parse file names: %s. %s", step_trace_files, err) 191 step_trace_files = [] 192 else: 193 training_trace_files = list( 194 filter( 195 lambda file: file.startswith('training_trace') and not file.endswith('.done'), 196 files 197 ) 198 ) 199 if len(training_trace_files) >= 1: 200 log.warning("The training_trace file structure is changed, please upgrade " 201 "mindspore and regenerate profiling data") 202 203 file_paths = [os.path.join(input_dir, file) for file in step_trace_files] 204 log.info("Find %d step trace files.", len(file_paths)) 205 return file_paths 206 207 @abstractmethod 208 def _parse(self, source_files): 209 """Parse source step trace files.""" 210 211 def _get_next_step_trace(self, content, event_info): 212 """ 213 Get next step trace info. 214 215 Args: 216 content (bytes): The input step trace info. 217 event_info (dict): The event info. 218 219 Returns: 220 Generator, return the step trace one by one. 221 """ 222 start_time = event_info.get('end', '-') 223 event_info['start'] = start_time 224 if 'reduce' not in event_info.keys(): 225 event_info['reduce'] = {} 226 227 i = 0 228 while i < len(content): 229 profiling_head_data = content[i:i + self._profiling_head_len] 230 parsed_head = struct.unpack('BBH', profiling_head_data) 231 profiling_head = ProfilingHeadStruct(*parsed_head) 232 if profiling_head.rptType == 10: 233 st_data = content[i + self._profiling_head_len + self._profiling_head_pad_len: 234 i + self._profiling_head_len + self._profiling_head_pad_len + self._st_data_len] 235 parsed_data = struct.unpack('QQQHHH', st_data) 236 next_event = StepTraceStruct(*parsed_data) 237 self._construct_event_info(next_event, event_info) 238 239 if event_info.get('end'): 240 yield event_info 241 start_time = event_info.get('end', '-') 242 event_info.clear() 243 event_info['start'] = start_time 244 event_info['reduce'] = {} 245 i = i + profiling_head.bufSize 246 247 def _construct_event_info(self, next_event, event_info): 248 """Construct event info according to next_event.""" 249 end_flag: bool = lambda tag: tag == self._step_end_tag_id 250 fp_flag: bool = lambda tag: tag == self._fp_tag_id 251 bp_flag: bool = lambda tag: tag == self._bp_tag_id 252 reduce_flag: bool = lambda tag: self._reduce_min_tag_id <= tag < self._reduce_max_tag_id 253 254 def _on_reduce_event(reduce_tag_id): 255 """Handle reduce event.""" 256 stream_id = next_event.stream_id 257 if event_info['reduce'].get(stream_id): 258 event_info['reduce'][stream_id].append((reduce_tag_id, time_stamp)) 259 else: 260 event_info['reduce'][stream_id] = [(reduce_tag_id, time_stamp)] 261 262 tag_id = next_event.tag_id 263 time_stamp = next_event.timeStamp 264 if end_flag(tag_id): 265 event_info['end'] = time_stamp 266 elif fp_flag(tag_id): 267 event_info['fp'] = time_stamp 268 elif bp_flag(tag_id): 269 event_info['bp'] = time_stamp 270 elif reduce_flag(tag_id): 271 _on_reduce_event(tag_id) 272 273 def _record_trace_event(self, step_trace): 274 """Record trace event.""" 275 self._step_num += 1 276 start_time = step_trace.get('start') 277 end_time = step_trace.get('end') 278 fp_time = step_trace.get('fp') 279 bp_time = step_trace.get('bp') 280 if not (start_time and end_time and fp_time and bp_time): 281 log.warning("The step %d lacks basic time.", self._step_num) 282 return 283 if start_time == '-': 284 start_time = fp_time 285 row_data = { 286 'step_num': self._step_num, 287 'start_point': start_time, 288 'end_point': end_time, 289 'total': end_time - start_time, 290 'fp_point': fp_time, 291 'bp_point': bp_time, 292 'iteration_interval': fp_time - start_time, 293 'fp_and_bp': bp_time - fp_time, 294 'tail': end_time - bp_time 295 } 296 # update reduce info 297 self._update_reduce_info(step_trace, row_data) 298 # save the row data 299 if not self._header: 300 self._header = list(row_data.keys()) 301 row_data_list = [row_data.get(header_name, 0) for header_name in self._header] 302 self._result.append(row_data_list) 303 304 def _update_reduce_info(self, step_trace, row_data): 305 """Extract reduce info.""" 306 reduce_time = step_trace.get('reduce', {}) 307 for stream_id, time_points in reduce_time.items(): 308 time_point_num = len(time_points) 309 if time_point_num % 2: 310 log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num) 311 continue 312 for index, point_id in enumerate(range(0, time_point_num, 2)): 313 field_name = f'stream_{stream_id}_{index}' 314 reduce_info = self._get_single_reduce_event_info( 315 field_name, time_points[point_id], time_points[point_id + 1]) 316 row_data.update(reduce_info) 317 318 def _get_single_reduce_event_info(self, field_name, start_point, end_point): 319 """ 320 Get single reduce info. 321 322 Args: 323 field_name (str): The field name. 324 start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count). 325 end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count). 326 327 Returns: 328 dict, reduce info. 329 """ 330 ret_dict = {} 331 return ret_dict 332 333 def _record_average_info(self): 334 """Calculate average info.""" 335 result_size = len(self._result) 336 # calculate average data for each column in result data 337 average_data = [0] * len(self._header) 338 if result_size >= 2: 339 for row_info in self._result[1:]: 340 average_data = [ 341 Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data) 342 ] 343 average_data = [ 344 round((item / (result_size - 1))) for item in average_data 345 ] 346 # change step num info in average_data to None 347 step_num_index = self._header.index('step_num') 348 average_data[step_num_index] = '-' 349 self._result.append(average_data) 350 log.info("Finish add average info for step trace.") 351 352 def _save(self): 353 """save step trace file.""" 354 bp_point, tail, fp_duration = 5, -1, -2 355 log.info("Start to save step trace file.") 356 if not self._header: 357 return 358 try: 359 with open(self._output_path, 'w') as file_handle: 360 csv_writer = csv.writer(file_handle) 361 if not self._is_training_mode: 362 self._header[fp_duration] = 'fp' 363 self._header = self._header[:bp_point] + self._header[bp_point + 1:tail] 364 csv_writer.writerow(self._header) 365 for row_data in self._result: 366 if not self._is_training_mode: 367 row_data[fp_duration] += row_data[tail] 368 row_data = row_data[:bp_point] + row_data[bp_point + 1:tail] 369 csv_writer.writerow(row_data) 370 os.chmod(self._output_path, stat.S_IREAD | stat.S_IWRITE) 371 except (IOError, OSError) as err: 372 log.warning('Failed to save step trace raw info. %s', err) 373 raise ProfilerIOException 374 375 376class GpuStepTraceParser(BaseStepTraceParser): 377 """The parser for gpu step trace data.""" 378 379 def get_fp_bp(self, f_obj, all_step_fp, all_step_bp): 380 """Parser the fp and bp.""" 381 fp_start, bp_end = 0, 1 382 if self._is_gpu_kernel_async_launch: 383 for line in f_obj: 384 line = line.strip().split() 385 all_step_fp.append(line[1].split(',')[0]) 386 all_step_bp.append(line[2].split(',')[0]) 387 else: 388 lines = f_obj.readlines() 389 all_step_fp.append(lines[fp_start].split()[0]) 390 all_step_bp.append(lines[bp_end].split()[0]) 391 392 def record_point_info(self, source_file, output_path): 393 """ 394 Record point info into json. 395 396 Args: 397 source_file (str): The file path of step trace original data. 398 output_path (str): The output path for saving point info. 399 400 Returns: 401 dict, parsed point info. 402 """ 403 all_step_points = [] 404 all_step_fp = [] 405 all_step_bp = [] 406 try: 407 with open(source_file, 'r') as f_obj: 408 self.get_fp_bp(f_obj, all_step_fp, all_step_bp) 409 except (IOError, OSError) as err: 410 log.warning(f'Failed to read {source_file}', err) 411 raise ProfilerIOException 412 413 for fp_name, bp_name in zip(all_step_fp, all_step_bp): 414 if self._is_training_mode: 415 points = { 416 'fp_start': fp_name, 417 'bp_end': bp_name 418 } 419 else: 420 points = { 421 'fp_start': fp_name, 422 } 423 all_step_points.append(points) 424 425 try: 426 with open(output_path, 'w') as json_file: 427 if self._is_gpu_kernel_async_launch: 428 json.dump(all_step_points, json_file) 429 else: 430 json.dump(all_step_points[0], json_file) 431 os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE) 432 except (IOError, OSError) as err: 433 log.warning('Failed to save point info. %s', err) 434 raise ProfilerIOException 435 436 return all_step_points[0] 437 438 def _get_step_trace_files(self): 439 """Get step trace files.""" 440 return self._input_dir 441 442 def _parse(self, source_file): 443 """Parse source step trace files.""" 444 log.info("Start to parse step trace file.") 445 fp_start, bp_end, iter_end, iter_start = 0, 1, 2, 3 446 reduce_start = 4 447 start_time, end_time = 0, 1 448 step_trace_point_count = 3 449 450 source_file = validate_and_normalize_path(source_file) 451 try: 452 with open(source_file, 'r') as f: 453 lines = f.readlines() 454 if len(lines) < step_trace_point_count: 455 raise ProfilerRawFileException( 456 f"Failed to parse {source_file} file. The FP_POINT/BP_POINT/ITER_END_POINT " 457 f"do not recognized correctly. Try to set the environment variable'PROFILING_FP_START' " 458 f"and 'PROFILING_BP_END' to solve this problem. For example, " 459 f"'export PROFILING_FP_START=Default/xxx/Conv2d-op1' ") 460 step_trace_info_all = [line.strip().split()[1:] for line in lines] 461 num_of_step = len(step_trace_info_all[0]) 462 for step_trace_point in step_trace_info_all: 463 if len(step_trace_point) != num_of_step: 464 raise ProfilerRawFileException( 465 f"Failed to parse {source_file} file. Due to the profiled " 466 f"step_num of FP/BP/ITER_END Point are not equal") 467 iter_start_info = [step_trace_info_all[fp_start][0]] + \ 468 step_trace_info_all[iter_end][:num_of_step] 469 step_trace_info_all.insert(iter_start, iter_start_info) 470 except (IOError, OSError) as err: 471 log.warning(f'Failed to read {source_file}', err) 472 raise ProfilerIOException 473 474 for step_num in range(num_of_step): 475 step_trace = { 476 'start': int(step_trace_info_all[iter_start][step_num].split(',')[start_time]), 477 'fp': int(step_trace_info_all[fp_start][step_num].split(',')[start_time]), 478 'bp': int(step_trace_info_all[bp_end][step_num].split(',')[end_time]), 479 'end': int(step_trace_info_all[iter_end][step_num].split(',')[end_time]), 480 'reduce': {} 481 } 482 num_of_step_point = len(step_trace_info_all) 483 if num_of_step_point > reduce_start: 484 reduce_info = {} 485 reduce_time_info = [] 486 for reduce_idx in range(reduce_start, num_of_step_point): 487 cur_reduce_time = step_trace_info_all[reduce_idx][step_num] 488 reduce_time_info += cur_reduce_time.split(',') 489 reduce_info['ops'] = reduce_time_info 490 step_trace['reduce'] = reduce_info 491 self._record_trace_event(step_trace) 492 self._record_average_info() 493 log.info("Finish to parse step trace file.") 494 495 def _parse_one_step(self, line): 496 """ 497 Parse step text line to dict obj. 498 499 Args: 500 line (str): The step trace line text, it contains five parts, each part is separated by a space. 501 part 1: start_op_name,start_op_time 502 part 2: fp_op_name,fp_time 503 part 3: bp_op_name,bp_time 504 part 4: end_op_name,end_time 505 part 5: [reduce_op_name,reduce1_start],it contains multiple reduce, each reduce is separated by a space. 506 """ 507 508 line = line.strip().split() 509 start_time = int(line[0].split(',')[1][:-1]) 510 fp_time = int(line[1].split(',')[1][:-1]) 511 bp_time = int(line[2].split(',')[1][:-1]) 512 end_time = int(line[3].split(',')[1][:-1]) 513 reduce_info = {} 514 reduce_time_info = [] 515 516 for reduce_item in line[4:]: 517 # add communication op start and end time, time unit from ns to 10ns. 518 reduce_time_info.append(reduce_item.split(',')[1][:-1]) 519 reduce_time_info.append(reduce_item.split(',')[2][:-1]) 520 step_trace = { 521 'start': start_time, 522 'fp': fp_time, 523 'bp': bp_time, 524 'end': end_time 525 } 526 if reduce_time_info: 527 reduce_info['ops'] = reduce_time_info 528 step_trace['reduce'] = reduce_info 529 self._record_trace_event(step_trace) 530 531 def _parse_async_launch(self, source_file): 532 """Parse source step trace files generated from async launch kernel.""" 533 log.info("Start to parse step trace file.") 534 source_file = validate_and_normalize_path(source_file) 535 536 try: 537 with open(source_file, 'r') as f_obj: 538 for line in f_obj: 539 self._parse_one_step(line) 540 541 except (IOError, OSError) as err: 542 log.warning(f'Failed to read {source_file}', err) 543 raise ProfilerIOException 544 545 self._record_average_info() 546 log.info("Finish to parse step trace file.") 547 548 def _get_single_reduce_event_info(self, field_name, start_point, end_point): 549 """ 550 Get single reduce info. 551 552 Args: 553 field_name (str): The field name. 554 start_point (str): Start point time. 555 end_point (str): End point time. 556 557 Returns: 558 dict, reduce info. 559 """ 560 reduce_info = {} 561 562 op_type = 'AllReduce' 563 # append field name with op type. 564 field_name += '_' + op_type 565 reduce_info[field_name] = int(end_point) - int(start_point) 566 reduce_info[field_name + '_start_point'] = start_point 567 reduce_info[field_name + '_end_point'] = end_point 568 569 return reduce_info 570 571 572class AscendStepTraceParser(BaseStepTraceParser): 573 """The parser for ascend step trace data.""" 574 _event_size = 20 575 _fp_tag = 2 576 _bp_tag = 3 577 _step_trace_files = [] 578 579 def record_point_info(self, point_info, output_path): 580 """ 581 Record point info into json. 582 583 Args: 584 point_info (dict): The point info about tag id and relative op name. 585 output_path (str): The output path for saving point info. 586 587 Returns: 588 dict, parsed point info. 589 """ 590 if self._is_training_mode: 591 points = { 592 'fp_start': point_info.get(self._fp_tag, ''), 593 'bp_end': point_info.get(self._bp_tag, '') 594 } 595 else: 596 points = { 597 'fp_start': point_info.get(self._fp_tag, ''), 598 } 599 if os.path.exists(output_path): 600 return points 601 try: 602 with open(output_path, 'w') as json_file: 603 json.dump(points, json_file) 604 os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE) 605 except (IOError, OSError) as err: 606 log.warning('Failed to save point info. %s', err) 607 raise ProfilerIOException 608 return points 609 610 def _get_step_trace_files(self): 611 """Get step trace files.""" 612 # step trace files may under $profiler_dir or $profiler_dir/data 613 if self._step_trace_files: 614 return self._step_trace_files 615 616 profiler_dir = self._input_dir 617 step_trace_files = self._search_file(profiler_dir) 618 if not step_trace_files: 619 # try to find step trace files under $profiler_dir/data 620 profiler_dir = os.path.join(profiler_dir, 'data') 621 step_trace_files = self._search_file(profiler_dir) 622 if not step_trace_files: 623 raise ProfilerPathErrorException('Training trace file does not exist.') 624 self._step_trace_files = step_trace_files 625 626 return step_trace_files 627 628 def _parse(self, source_files): 629 """Parse source step trace files.""" 630 log.info("Start to parse step trace file.") 631 event_info = {} 632 633 for source_file in source_files: 634 source_file = validate_and_normalize_path(source_file) 635 try: 636 with open(source_file, 'rb') as handler: 637 content = handler.read() 638 for step_trace in self._get_next_step_trace(content, event_info): 639 if self._skip_first_step: 640 self._skip_first_step = False 641 continue 642 self._record_trace_event(step_trace) 643 except (IOError, OSError) as err: 644 log.warning(f'Failed to read {source_file}', err) 645 raise ProfilerIOException 646 647 self._record_average_info() 648 log.info("Finish to parse step trace file.") 649 650 def _get_single_reduce_event_info(self, field_name, start_point, end_point): 651 """ 652 Get single reduce info. 653 654 Args: 655 field_name (str): The field name. 656 start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count). 657 end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count). 658 659 Returns: 660 dict, reduce info. 661 """ 662 reduce_info = {} 663 if end_point[0] - start_point[0] != 1 or start_point[0] % 2: 664 log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point) 665 return reduce_info 666 op_type = self._tag_map.get(start_point[0]) 667 # append field name with op type. 668 if not op_type: 669 log.warning("Can't recognize the inner type for point tag: %d.", start_point[0]) 670 field_name += '_parallel' 671 else: 672 field_name += '_' + op_type 673 reduce_info[field_name] = end_point[1] - start_point[1] 674 reduce_info[field_name + '_start_point'] = start_point[1] 675 reduce_info[field_name + '_end_point'] = end_point[1] 676 677 return reduce_info 678