• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""The parser for step trace data."""
16import csv
17import json
18import os
19import stat
20import struct
21from collections import namedtuple
22from decimal import Decimal
23from abc import abstractmethod
24
25from mindspore.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
26    ProfilerIOException, ProfilerRawFileException
27from mindspore import log
28from mindspore.profiler.common.util import get_summary_for_step_trace
29from mindspore.profiler.common.validator.validate_path import \
30    validate_and_normalize_path
31
32ProfilingHeadStruct = namedtuple(
33    'ProfilingHeadStruct', ['mode', 'rptType', 'bufSize']
34)
35
36StepTraceStruct = namedtuple(
37    'StepTraceStruct', ['timeStamp', 'index_id', 'model_id', 'stream_id', 'task_id', 'tag_id']
38)
39
40
41class BaseStepTraceParser:
42    """
43    The parser for step trace data.
44
45    Args:
46        input_dir (str): The directory that contains original step trace data.
47        output_file_path (str): The output file path.
48        job_id (int): The job id used to define the start of new step. Default: 0.
49        skip_first_step (bool): Whether skip the first step or not.
50        is_training_mode (bool): Whether in training mode or not.
51        is_gpu_kernel_async_launch (bool): Whether is gpu kernel async launch or not.
52    """
53
54    def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False,
55                 is_training_mode=True, is_gpu_kernel_async_launch=False):
56        self._input_dir = input_dir
57        self._output_path = output_file_path
58        self._job_id = job_id
59        self._skip_first_step = skip_first_step
60        self._result = []
61        self._header = []
62        self._step_num = 0
63        self._tag_map = {}
64        self._is_training_mode = is_training_mode
65        self._step_end_tag_id = 4
66        self._is_gpu_kernel_async_launch = is_gpu_kernel_async_launch
67        self._model_start_tag_id = 0
68        self._model_end_tag_id = 1
69        self._fp_tag_id = 2
70        self._bp_tag_id = 3
71        self._reduce_min_tag_id = 10000
72        self._reduce_max_tag_id = 20000
73        self._profiling_head_len = 4
74        self._profiling_head_pad_len = 4
75        self._st_data_len = 8 + 8 + 8 + 2 + 2 + 2
76
77    @property
78    def output_file(self):
79        """The property of step trace header."""
80        file_name = self._output_path.rsplit('/', 2)
81        return file_name[-1] if len(file_name) == 3 else ''
82
83    def show(self):
84        """The property of step trace info."""
85        summary_info = {}
86        if self._result:
87            summary_info = get_summary_for_step_trace(self._result[-1], self._header, self._is_training_mode)
88            summary_info['total_steps'] = len(self._result) - 1
89        print('\nStep trace summary info (unit: syscnt):')
90        print(summary_info)
91        print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
92              % self.output_file)
93
94    def parse_and_save(self):
95        """Parse step trace files and save the result."""
96        try:
97            source_files = self._get_step_trace_files()
98            if self._is_gpu_kernel_async_launch:
99                self._parse_async_launch(source_files)
100            else:
101                self._parse(source_files)
102            self._save()
103        except IOError as err:
104            log.warning(err)
105            raise ProfilerIOException()
106        else:
107            log.info("Finish to save intermediate result for step trace file.")
108
109    def record_point_info(self, point_info, output_path):
110        """
111        Record point info into json.
112
113        Args:
114            point_info (dict): The point info about tag id and relative op name.
115            output_path (str): The output path for saving point info.
116
117        Returns:
118            dict, parsed point info.
119        """
120
121    def update_tag_op_type_map(self, point_info):
122        """
123        update the map from tag id to op type.
124
125        Args:
126            point_info (dict): The point info about tag id and relative op name.
127        """
128        self._get_step_trace_files()
129        tag_map = {}
130        for tag, op_name in point_info.items():
131            op_type = self._get_op_type(tag, op_name)
132            tag_map[tag] = op_type
133        log.info("Get tag types for step trace analysis: %s", tag_map)
134        self._tag_map = tag_map
135
136    def _get_op_type(self, tag, name):
137        """
138        Get op type from tag and name.
139
140        Args:
141            tag (int): The tag id.
142            name (str): The op name.
143
144        Returns:
145            str, the op type or communication op name.
146        """
147        tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._step_end_tag_id: 'end'}
148        # get solid tag type
149        op_type = tag_map.get(tag, '')
150        if op_type:
151            return op_type
152        # check if the tag is step tag.
153        if tag == 0:
154            return 'start'
155        # analyze the reduce tag
156        op_name = name.rsplit('/', 1)[-1]
157        if not op_name:
158            log.warning("Unexpected op name:%s", name)
159
160        return op_name
161
162    def _get_step_trace_files(self):
163        """Get step trace files."""
164        return self._input_dir
165
166    @staticmethod
167    def _search_file(input_dir):
168        """Search step trace file under specific input directory."""
169        # validate input_dir
170        if not os.path.isdir(input_dir):
171            raise ProfilerPathErrorException(
172                '{} does not exist or is not a dir'.format(input_dir)
173            )
174        # get step trace files
175        files = os.listdir(input_dir)
176        step_trace_files = list(
177            filter(
178                lambda file: file.startswith('ts_track.data') and not file.endswith('.done'),
179                files
180            )
181        )
182        # validate result
183        if len(step_trace_files) > 1:
184            # the format of file name is like
185            # `training_trace.46.dev.profiler_default_tag.$id.slice_$number`
186            # use the $number as the sorted key
187            try:
188                step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
189            except ValueError as err:
190                log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
191                step_trace_files = []
192        else:
193            training_trace_files = list(
194                filter(
195                    lambda file: file.startswith('training_trace') and not file.endswith('.done'),
196                    files
197                )
198            )
199            if len(training_trace_files) >= 1:
200                log.warning("The training_trace file structure is changed, please upgrade "
201                            "mindspore and regenerate profiling data")
202
203        file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
204        log.info("Find %d step trace files.", len(file_paths))
205        return file_paths
206
207    @abstractmethod
208    def _parse(self, source_files):
209        """Parse source step trace files."""
210
211    def _get_next_step_trace(self, content, event_info):
212        """
213        Get next step trace info.
214
215        Args:
216            content (bytes): The input step trace info.
217            event_info (dict): The event info.
218
219        Returns:
220            Generator, return the step trace one by one.
221        """
222        start_time = event_info.get('end', '-')
223        event_info['start'] = start_time
224        if 'reduce' not in event_info.keys():
225            event_info['reduce'] = {}
226
227        i = 0
228        while i < len(content):
229            profiling_head_data = content[i:i + self._profiling_head_len]
230            parsed_head = struct.unpack('BBH', profiling_head_data)
231            profiling_head = ProfilingHeadStruct(*parsed_head)
232            if profiling_head.rptType == 10:
233                st_data = content[i + self._profiling_head_len + self._profiling_head_pad_len:
234                                  i + self._profiling_head_len + self._profiling_head_pad_len + self._st_data_len]
235                parsed_data = struct.unpack('QQQHHH', st_data)
236                next_event = StepTraceStruct(*parsed_data)
237                self._construct_event_info(next_event, event_info)
238
239                if event_info.get('end'):
240                    yield event_info
241                    start_time = event_info.get('end', '-')
242                    event_info.clear()
243                    event_info['start'] = start_time
244                    event_info['reduce'] = {}
245            i = i + profiling_head.bufSize
246
247    def _construct_event_info(self, next_event, event_info):
248        """Construct event info according to next_event."""
249        end_flag: bool = lambda tag: tag == self._step_end_tag_id
250        fp_flag: bool = lambda tag: tag == self._fp_tag_id
251        bp_flag: bool = lambda tag: tag == self._bp_tag_id
252        reduce_flag: bool = lambda tag: self._reduce_min_tag_id <= tag < self._reduce_max_tag_id
253
254        def _on_reduce_event(reduce_tag_id):
255            """Handle reduce event."""
256            stream_id = next_event.stream_id
257            if event_info['reduce'].get(stream_id):
258                event_info['reduce'][stream_id].append((reduce_tag_id, time_stamp))
259            else:
260                event_info['reduce'][stream_id] = [(reduce_tag_id, time_stamp)]
261
262        tag_id = next_event.tag_id
263        time_stamp = next_event.timeStamp
264        if end_flag(tag_id):
265            event_info['end'] = time_stamp
266        elif fp_flag(tag_id):
267            event_info['fp'] = time_stamp
268        elif bp_flag(tag_id):
269            event_info['bp'] = time_stamp
270        elif reduce_flag(tag_id):
271            _on_reduce_event(tag_id)
272
273    def _record_trace_event(self, step_trace):
274        """Record trace event."""
275        self._step_num += 1
276        start_time = step_trace.get('start')
277        end_time = step_trace.get('end')
278        fp_time = step_trace.get('fp')
279        bp_time = step_trace.get('bp')
280        if not (start_time and end_time and fp_time and bp_time):
281            log.warning("The step %d lacks basic time.", self._step_num)
282            return
283        if start_time == '-':
284            start_time = fp_time
285        row_data = {
286            'step_num': self._step_num,
287            'start_point': start_time,
288            'end_point': end_time,
289            'total': end_time - start_time,
290            'fp_point': fp_time,
291            'bp_point': bp_time,
292            'iteration_interval': fp_time - start_time,
293            'fp_and_bp': bp_time - fp_time,
294            'tail': end_time - bp_time
295        }
296        # update reduce info
297        self._update_reduce_info(step_trace, row_data)
298        # save the row data
299        if not self._header:
300            self._header = list(row_data.keys())
301        row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
302        self._result.append(row_data_list)
303
304    def _update_reduce_info(self, step_trace, row_data):
305        """Extract reduce info."""
306        reduce_time = step_trace.get('reduce', {})
307        for stream_id, time_points in reduce_time.items():
308            time_point_num = len(time_points)
309            if time_point_num % 2:
310                log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
311                continue
312            for index, point_id in enumerate(range(0, time_point_num, 2)):
313                field_name = f'stream_{stream_id}_{index}'
314                reduce_info = self._get_single_reduce_event_info(
315                    field_name, time_points[point_id], time_points[point_id + 1])
316                row_data.update(reduce_info)
317
318    def _get_single_reduce_event_info(self, field_name, start_point, end_point):
319        """
320        Get single reduce info.
321
322        Args:
323            field_name (str): The field name.
324            start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
325            end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
326
327        Returns:
328            dict, reduce info.
329        """
330        ret_dict = {}
331        return ret_dict
332
333    def _record_average_info(self):
334        """Calculate average info."""
335        result_size = len(self._result)
336        # calculate average data for each column in result data
337        average_data = [0] * len(self._header)
338        if result_size >= 2:
339            for row_info in self._result[1:]:
340                average_data = [
341                    Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
342                ]
343            average_data = [
344                round((item / (result_size - 1))) for item in average_data
345            ]
346            # change step num info in average_data to None
347            step_num_index = self._header.index('step_num')
348            average_data[step_num_index] = '-'
349        self._result.append(average_data)
350        log.info("Finish add average info for step trace.")
351
352    def _save(self):
353        """save step trace file."""
354        bp_point, tail, fp_duration = 5, -1, -2
355        log.info("Start to save step trace file.")
356        if not self._header:
357            return
358        try:
359            with open(self._output_path, 'w') as file_handle:
360                csv_writer = csv.writer(file_handle)
361                if not self._is_training_mode:
362                    self._header[fp_duration] = 'fp'
363                    self._header = self._header[:bp_point] + self._header[bp_point + 1:tail]
364                csv_writer.writerow(self._header)
365                for row_data in self._result:
366                    if not self._is_training_mode:
367                        row_data[fp_duration] += row_data[tail]
368                        row_data = row_data[:bp_point] + row_data[bp_point + 1:tail]
369                    csv_writer.writerow(row_data)
370            os.chmod(self._output_path, stat.S_IREAD | stat.S_IWRITE)
371        except (IOError, OSError) as err:
372            log.warning('Failed to save step trace raw info. %s', err)
373            raise ProfilerIOException
374
375
376class GpuStepTraceParser(BaseStepTraceParser):
377    """The parser for gpu step trace data."""
378
379    def get_fp_bp(self, f_obj, all_step_fp, all_step_bp):
380        """Parser the fp and bp."""
381        fp_start, bp_end = 0, 1
382        if self._is_gpu_kernel_async_launch:
383            for line in f_obj:
384                line = line.strip().split()
385                all_step_fp.append(line[1].split(',')[0])
386                all_step_bp.append(line[2].split(',')[0])
387        else:
388            lines = f_obj.readlines()
389            all_step_fp.append(lines[fp_start].split()[0])
390            all_step_bp.append(lines[bp_end].split()[0])
391
392    def record_point_info(self, source_file, output_path):
393        """
394        Record point info into json.
395
396        Args:
397            source_file (str): The file path of step trace original data.
398            output_path (str): The output path for saving point info.
399
400        Returns:
401            dict, parsed point info.
402        """
403        all_step_points = []
404        all_step_fp = []
405        all_step_bp = []
406        try:
407            with open(source_file, 'r') as f_obj:
408                self.get_fp_bp(f_obj, all_step_fp, all_step_bp)
409        except (IOError, OSError) as err:
410            log.warning(f'Failed to read {source_file}', err)
411            raise ProfilerIOException
412
413        for fp_name, bp_name in zip(all_step_fp, all_step_bp):
414            if self._is_training_mode:
415                points = {
416                    'fp_start': fp_name,
417                    'bp_end': bp_name
418                }
419            else:
420                points = {
421                    'fp_start': fp_name,
422                }
423            all_step_points.append(points)
424
425        try:
426            with open(output_path, 'w') as json_file:
427                if self._is_gpu_kernel_async_launch:
428                    json.dump(all_step_points, json_file)
429                else:
430                    json.dump(all_step_points[0], json_file)
431            os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE)
432        except (IOError, OSError) as err:
433            log.warning('Failed to save point info. %s', err)
434            raise ProfilerIOException
435
436        return all_step_points[0]
437
438    def _get_step_trace_files(self):
439        """Get step trace files."""
440        return self._input_dir
441
442    def _parse(self, source_file):
443        """Parse source step trace files."""
444        log.info("Start to parse step trace file.")
445        fp_start, bp_end, iter_end, iter_start = 0, 1, 2, 3
446        reduce_start = 4
447        start_time, end_time = 0, 1
448        step_trace_point_count = 3
449
450        source_file = validate_and_normalize_path(source_file)
451        try:
452            with open(source_file, 'r') as f:
453                lines = f.readlines()
454                if len(lines) < step_trace_point_count:
455                    raise ProfilerRawFileException(
456                        f"Failed to parse {source_file} file. The FP_POINT/BP_POINT/ITER_END_POINT "
457                        f"do not recognized correctly. Try to set the environment variable'PROFILING_FP_START' "
458                        f"and 'PROFILING_BP_END' to solve this problem. For example, "
459                        f"'export PROFILING_FP_START=Default/xxx/Conv2d-op1' ")
460                step_trace_info_all = [line.strip().split()[1:] for line in lines]
461                num_of_step = len(step_trace_info_all[0])
462                for step_trace_point in step_trace_info_all:
463                    if len(step_trace_point) != num_of_step:
464                        raise ProfilerRawFileException(
465                            f"Failed to parse {source_file} file. Due to the profiled "
466                            f"step_num of FP/BP/ITER_END Point are not equal")
467                iter_start_info = [step_trace_info_all[fp_start][0]] + \
468                                  step_trace_info_all[iter_end][:num_of_step]
469                step_trace_info_all.insert(iter_start, iter_start_info)
470        except (IOError, OSError) as err:
471            log.warning(f'Failed to read {source_file}', err)
472            raise ProfilerIOException
473
474        for step_num in range(num_of_step):
475            step_trace = {
476                'start': int(step_trace_info_all[iter_start][step_num].split(',')[start_time]),
477                'fp': int(step_trace_info_all[fp_start][step_num].split(',')[start_time]),
478                'bp': int(step_trace_info_all[bp_end][step_num].split(',')[end_time]),
479                'end': int(step_trace_info_all[iter_end][step_num].split(',')[end_time]),
480                'reduce': {}
481            }
482            num_of_step_point = len(step_trace_info_all)
483            if num_of_step_point > reduce_start:
484                reduce_info = {}
485                reduce_time_info = []
486                for reduce_idx in range(reduce_start, num_of_step_point):
487                    cur_reduce_time = step_trace_info_all[reduce_idx][step_num]
488                    reduce_time_info += cur_reduce_time.split(',')
489                reduce_info['ops'] = reduce_time_info
490                step_trace['reduce'] = reduce_info
491            self._record_trace_event(step_trace)
492        self._record_average_info()
493        log.info("Finish to parse step trace file.")
494
495    def _parse_one_step(self, line):
496        """
497        Parse step text line to dict obj.
498
499        Args:
500            line (str): The step trace line text, it contains five parts, each part is separated by a space.
501                part 1: start_op_name,start_op_time
502                part 2: fp_op_name,fp_time
503                part 3: bp_op_name,bp_time
504                part 4: end_op_name,end_time
505                part 5: [reduce_op_name,reduce1_start],it contains multiple reduce, each reduce is separated by a space.
506        """
507
508        line = line.strip().split()
509        start_time = int(line[0].split(',')[1][:-1])
510        fp_time = int(line[1].split(',')[1][:-1])
511        bp_time = int(line[2].split(',')[1][:-1])
512        end_time = int(line[3].split(',')[1][:-1])
513        reduce_info = {}
514        reduce_time_info = []
515
516        for reduce_item in line[4:]:
517            # add communication op start and end time, time unit from ns to 10ns.
518            reduce_time_info.append(reduce_item.split(',')[1][:-1])
519            reduce_time_info.append(reduce_item.split(',')[2][:-1])
520        step_trace = {
521            'start': start_time,
522            'fp': fp_time,
523            'bp': bp_time,
524            'end': end_time
525        }
526        if reduce_time_info:
527            reduce_info['ops'] = reduce_time_info
528        step_trace['reduce'] = reduce_info
529        self._record_trace_event(step_trace)
530
531    def _parse_async_launch(self, source_file):
532        """Parse source step trace files generated from async launch kernel."""
533        log.info("Start to parse step trace file.")
534        source_file = validate_and_normalize_path(source_file)
535
536        try:
537            with open(source_file, 'r') as f_obj:
538                for line in f_obj:
539                    self._parse_one_step(line)
540
541        except (IOError, OSError) as err:
542            log.warning(f'Failed to read {source_file}', err)
543            raise ProfilerIOException
544
545        self._record_average_info()
546        log.info("Finish to parse step trace file.")
547
548    def _get_single_reduce_event_info(self, field_name, start_point, end_point):
549        """
550        Get single reduce info.
551
552        Args:
553            field_name (str): The field name.
554            start_point (str): Start point time.
555            end_point (str): End point time.
556
557        Returns:
558            dict, reduce info.
559        """
560        reduce_info = {}
561
562        op_type = 'AllReduce'
563        # append field name with op type.
564        field_name += '_' + op_type
565        reduce_info[field_name] = int(end_point) - int(start_point)
566        reduce_info[field_name + '_start_point'] = start_point
567        reduce_info[field_name + '_end_point'] = end_point
568
569        return reduce_info
570
571
572class AscendStepTraceParser(BaseStepTraceParser):
573    """The parser for ascend step trace data."""
574    _event_size = 20
575    _fp_tag = 2
576    _bp_tag = 3
577    _step_trace_files = []
578
579    def record_point_info(self, point_info, output_path):
580        """
581        Record point info into json.
582
583        Args:
584            point_info (dict): The point info about tag id and relative op name.
585            output_path (str): The output path for saving point info.
586
587        Returns:
588            dict, parsed point info.
589        """
590        if self._is_training_mode:
591            points = {
592                'fp_start': point_info.get(self._fp_tag, ''),
593                'bp_end': point_info.get(self._bp_tag, '')
594            }
595        else:
596            points = {
597                'fp_start': point_info.get(self._fp_tag, ''),
598            }
599        if os.path.exists(output_path):
600            return points
601        try:
602            with open(output_path, 'w') as json_file:
603                json.dump(points, json_file)
604            os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE)
605        except (IOError, OSError) as err:
606            log.warning('Failed to save point info. %s', err)
607            raise ProfilerIOException
608        return points
609
610    def _get_step_trace_files(self):
611        """Get step trace files."""
612        # step trace files may under $profiler_dir or $profiler_dir/data
613        if self._step_trace_files:
614            return self._step_trace_files
615
616        profiler_dir = self._input_dir
617        step_trace_files = self._search_file(profiler_dir)
618        if not step_trace_files:
619            # try to find step trace files under $profiler_dir/data
620            profiler_dir = os.path.join(profiler_dir, 'data')
621            step_trace_files = self._search_file(profiler_dir)
622        if not step_trace_files:
623            raise ProfilerPathErrorException('Training trace file does not exist.')
624        self._step_trace_files = step_trace_files
625
626        return step_trace_files
627
628    def _parse(self, source_files):
629        """Parse source step trace files."""
630        log.info("Start to parse step trace file.")
631        event_info = {}
632
633        for source_file in source_files:
634            source_file = validate_and_normalize_path(source_file)
635            try:
636                with open(source_file, 'rb') as handler:
637                    content = handler.read()
638                    for step_trace in self._get_next_step_trace(content, event_info):
639                        if self._skip_first_step:
640                            self._skip_first_step = False
641                            continue
642                        self._record_trace_event(step_trace)
643            except (IOError, OSError) as err:
644                log.warning(f'Failed to read {source_file}', err)
645                raise ProfilerIOException
646
647        self._record_average_info()
648        log.info("Finish to parse step trace file.")
649
650    def _get_single_reduce_event_info(self, field_name, start_point, end_point):
651        """
652        Get single reduce info.
653
654        Args:
655            field_name (str): The field name.
656            start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
657            end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
658
659        Returns:
660            dict, reduce info.
661        """
662        reduce_info = {}
663        if end_point[0] - start_point[0] != 1 or start_point[0] % 2:
664            log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
665            return reduce_info
666        op_type = self._tag_map.get(start_point[0])
667        # append field name with op type.
668        if not op_type:
669            log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
670            field_name += '_parallel'
671        else:
672            field_name += '_' + op_type
673        reduce_info[field_name] = end_point[1] - start_point[1]
674        reduce_info[field_name + '_start_point'] = start_point[1]
675        reduce_info[field_name + '_end_point'] = end_point[1]
676
677        return reduce_info
678