• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""The parser for parsing hccl files."""
16import csv
17import json
18import os
19import stat
20from enum import Enum
21import numpy as np
22
23from mindspore.profiler.common.exceptions.exceptions import \
24    ProfilerPathErrorException, ProfilerFileNotFoundException, \
25    ProfilerDirNotFoundException, ProfilerRawFileException
26from mindspore import log as logger
27from mindspore.profiler.common.validator.validate_path import \
28    validate_and_normalize_path
29
30
31class CommunicationInfo(Enum):
32    """
33    Communication related enumeration types.
34
35    Enum:
36        RDMA: Communication link between servers in cluster training.
37        SDMA: Communication link inside server in cluster training.
38        LOCAL: The operation of this card has no transmission process.
39        RDMASEND:Communication operator of RDMA link.
40        REDUCE_INLINE:Communication operator of SDMA link.
41        MEMCPY:Communication operator of SDMA link.
42        NOTIFY_RECORD: Communication operator of SDMA link.
43        NOTIFY_WAIT: operator of LOCAL.
44    """
45    RDMA = 'RDMA'
46    SDMA = 'SDMA'
47    LOCAL = 'LOCAL'
48    RDMASEND = 'RDMASend'
49    REDUCE_INLINE = 'Reduce Inline'
50    MEMCPY = 'Memcpy'
51    NOTIFY_RECORD = 'Notify Record'
52    NOTIFY_WAIT = 'Notify Wait'
53
54
55class HcclParser:
56    """
57    The parser for parsing hccl file.
58
59    Args:
60        source_dir (str): The hccl source dir.
61        device_id (str): The device ID.
62        rank_id (str): The rank ID.
63        output_path (str): The directory of the parsed file. Default: `./`.
64
65    Raises:
66        ProfilerPathErrorException: If the hccl file path or the output path is invalid.
67        ProfilerFileNotFoundException: If the hccl file or the output dir does not exist.
68    """
69    _parsed_hccl_file_name = 'hccl_raw_{}.csv'
70    _col_names = ['step_num', 'communication_cost', 'wait_cost', 'link_info', 'communication_operator_cost']
71
72    def __init__(self, source_dir, device_id, rank_id, output_path):
73        self._dev_id = device_id
74        self._rank_id = rank_id
75        self._source_dir = source_dir
76        self._save_path = self._get_save_path(output_path)
77        self._step_trace_info = self._get_step_trace_info(output_path)
78        self._communication_operator_name_mapping_info = self._get_communication_operator_name_mapping_info()
79
80    def parse(self):
81        """Parse communication info."""
82        self._parse_and_save(self._source_dir)
83
84    def _parse_communication_cost(self, operators_cost_info, info, operators_dict):
85        """Parse communication cost."""
86        for key, value in operators_cost_info.items():
87            for item in value:
88                # index0:step_num
89                if info[0] == item[0]:
90                    operators_dict[key] = item
91
92    def _parse_and_save(self, dir_path):
93        """Parse and save communication info."""
94        communication_info_cache = list()
95        operators_cost_info = self._get_communication_operators_cost_info(dir_path)
96        for key, value in operators_cost_info.items():
97            for item in value:
98                communication_info_cache.append(item)
99        communication_info_cache = self._merge_communication_info_by_step_num(communication_info_cache)
100        for info in communication_info_cache:
101            operators_dict = dict()
102            self._parse_communication_cost(operators_cost_info, info, operators_dict)
103            info.append(operators_dict)
104        # Calculate device communication average.
105        device_communication_average_value = self._calculate_communication_average_value(communication_info_cache)
106        # Calculate operator communication average.
107        operators_average_value = dict()
108        for key, value in operators_cost_info.items():
109            average_value = self._calculate_communication_average_value(value)
110            # The symbol '-' is used to indicate that the line is average information.
111            average_value.insert(0, '-')
112            operators_average_value[key] = average_value
113        device_communication_average_value.append(operators_average_value)
114        # The symbol '-' is used to indicate that the line is average information.
115        device_communication_average_value.insert(0, '-')
116        with open(self._save_path, 'w', newline='') as save_file:
117            csv_writer = csv.writer(save_file)
118            csv_writer.writerow(self._col_names)
119            for item in communication_info_cache:
120                # item[3]:link_info which is a dictionary that needs to be encoded before it is written to a CSV file.
121                # item[4]:it is a dictionary that needs to be encoded before it is written to a CSV file.
122                item[3] = json.dumps(item[3])
123                item[4] = json.dumps(item[4])
124                csv_writer.writerow(item)
125            # device_communication_average_value[3]: average value for link info
126            # device_communication_average_value[4]: average value for operator info
127            device_communication_average_value[3] = json.dumps(device_communication_average_value[3])
128            device_communication_average_value[4] = json.dumps(device_communication_average_value[4])
129
130            csv_writer.writerow(device_communication_average_value)
131        os.chmod(self._save_path, stat.S_IREAD | stat.S_IWRITE)
132
133    def _get_save_path(self, output_path):
134        """
135        Get the save path.
136
137        Args:
138            output_path (str): The output dir.
139
140        Returns:
141            str, the save path.
142        """
143        output_path = self._validate_dir_path(output_path)
144        return os.path.join(
145            output_path, self._parsed_hccl_file_name.format(self._rank_id)
146        )
147
148    def _get_step_trace_info(self, source_dir):
149        """Get the start and end timestamps in a step and communication operators names."""
150        file_path = os.path.join(
151            source_dir,
152            f'step_trace_raw_{self._rank_id}_detail_time.csv'
153        )
154        try:
155            file_path = validate_and_normalize_path(file_path)
156        except RuntimeError:
157            logger.warning('file path is invalid.')
158            raise ProfilerPathErrorException('file path is invalid.')
159        if not os.path.isfile(file_path):
160            logger.warning('The step trace file <%s> not found.', file_path)
161            raise ProfilerFileNotFoundException(file_path)
162
163        with open(file_path, 'r') as src_file:
164            csv_reader = csv.reader(src_file)
165            # The first row of step trace file is like: step_num, start_point,...,communication_operator_name.
166            # The position number of the first communication operator name is 9.
167            communication_operators_names = next(csv_reader)[9:]
168
169            # index_0:step_num, index_1:start_point, index_2:end_point
170            # The unit of time stamp is 10ns. To convert it to μs, you need to divide it by 100.
171            step_timestamps_info = [[info[0], float(info[1]) / 100, float(info[2]) / 100]
172                                    for info in csv_reader if info[0].isdigit()]
173
174        return [communication_operators_names, step_timestamps_info]
175
176    def _get_communication_operator_name_mapping_info(self):
177        """Get the name of communication operators mapping between hccl and step trace."""
178        dir_path = self._validate_dir_path(self._source_dir)
179        # The name of the operator in hccl is like:operatorName_{Ordered_number}_xx_xx.
180        operators_names_in_hccl = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()
181                                   and entry.name.endswith(self._dev_id)]
182        operators_names_in_hccl_set = set({i.split('_')[0] for i in operators_names_in_hccl})
183        op_names_in_hccl_dic = dict()
184        for item in operators_names_in_hccl_set:
185            op_names_in_hccl_dic[item] = sorted([i for i in operators_names_in_hccl if i.split('_')[0] == item],
186                                                key=lambda x: int(x.split('_')[1]))
187
188        # The op_info in step trace is like: [op_name,op_name_start_point,op_name_end_point]
189        # The name of the operator in step trace can be obtained every three.
190        # The name of the operator in step trace is like: stream_xx_xx_operatorName-opxx.
191        operators_names_in_step_trace = [self._step_trace_info[0][i]
192                                         for i in range(0, len(self._step_trace_info[0]), 3)]
193        op_names_in_step_trace_set = set({i.split('_')[3].split('-')[0] for i in operators_names_in_step_trace})
194        op_names_in_step_trace_dic = dict()
195        for item in op_names_in_step_trace_set:
196            op_names_in_step_trace_dic[item] = [i for i in operators_names_in_step_trace
197                                                if i.split('_')[3].split('-')[0] == item]
198
199        communication_operator_mapping_info = dict()
200        for hccl_key, hccl_value in op_names_in_hccl_dic.items():
201            for step_trace_key, step_trace_value in op_names_in_step_trace_dic.items():
202                if hccl_key.lower() == step_trace_key.lower():
203                    communication_operator_mapping_info[hccl_key] = list(zip(hccl_value, step_trace_value))
204
205        logger.info("Communication operator name mapping info is %s", communication_operator_mapping_info)
206
207        return communication_operator_mapping_info
208
209    def _calculate_the_step_by_timestamp(self, timestamp):
210        """Calculate the step according to the timestamp."""
211        # index0:communication_operator_name, index1:step_timestamps_info
212        step_timestamps_info = self._step_trace_info[1]
213        step_timestamps_len = len(step_timestamps_info)
214        # index_0:step_num, index_1:start_point, index_2:end_point
215        if timestamp < step_timestamps_info[0][1]:
216            step_num = "1"
217        elif step_timestamps_info[step_timestamps_len - 1][2] < timestamp:
218            step_num = step_timestamps_info[step_timestamps_len - 1][0]
219        else:
220            for item in step_timestamps_info:
221                if item[1] <= timestamp < item[2]:
222                    step_num = item[0]
223        return step_num
224
225    def _get_communication_operators_cost_info(self, dir_path):
226        """Obtain time-consuming information of all communication operators."""
227        operators_cost_info = dict()
228        dir_path = self._validate_dir_path(dir_path)
229        operators_dir = [entry.name for entry in os.scandir(dir_path) if entry.is_dir()
230                         and entry.name.endswith(self._dev_id)]
231        operator_dir_path = [os.path.join(dir_path, operator_dir) for operator_dir in operators_dir]
232        for operator_dir in operator_dir_path:
233            operator_cost = self._calculate_communication_operator_cost(operator_dir)
234            operator_name = os.path.basename(operator_dir)
235            op_mapping_info = self._communication_operator_name_mapping_info.get(operator_name.split('_')[0], [])
236            # index1: operator name in step trace.
237            op_mapping_name = [item[1] for item in op_mapping_info if item[0] == operator_name]
238            if not op_mapping_name:
239                logger.warning("The mapping relationship between op name in hccl and op name in step trace "
240                               "cannot be found. Use op name in hccl to show the name of the communication operator.")
241            else:
242                operator_name = op_mapping_name[0]
243            operators_cost_info[operator_name] = operator_cost
244        return operators_cost_info
245
246    def _calculate_communication_operator_cost(self, dir_path):
247        """Calculate communication operator cost. Such as allReduce_1,allReduce_2."""
248        dir_path = self._validate_dir_path(dir_path)
249        files = [entry.name for entry in os.scandir(dir_path) if entry.is_file()]
250        files_path = [os.path.join(dir_path, file) for file in files]
251        operator_cost = list(map(self._calculate_communication_operator_iter_cost, files_path))
252        # Add the same step_num merge.
253        steps_operator_cost = self._merge_communication_info_by_step_num(operator_cost)
254        return steps_operator_cost
255
256    def _merge_communication_info_by_step_num(self, communication_info: list):
257        """According to step num to merge communication info."""
258        steps_communication_info = list()
259        info_set = set()
260        for item in communication_info:
261            # index0:step_num,index1:communication_cost,index2:communication_wait_cost,index3:link_info
262            if item[0].isdigit():
263                info_set.add(int(item[0]))
264        info_set = sorted(info_set)
265        for item in info_set:
266            item = str(item)
267            step_communication_info = [info for info in communication_info if info[0] == item]
268            step_communication_cost = sum([i[1] for i in step_communication_info])
269            step_communication_wait_cost = sum([i[2] for i in step_communication_info])
270            step_communication_link = self._calculate_link_value([i[3] for i in step_communication_info], "total")
271            steps_communication_info.append([item, step_communication_cost,
272                                             step_communication_wait_cost, step_communication_link])
273        return steps_communication_info
274
275    def _calculate_communication_operator_iter_cost(self, file_path):
276        """Calculate the time-consuming of communication operator in one execution round."""
277
278        def _inner_calculate_communication_operator_iter_cost(events):
279            total_notify_wait = HcclParser._calculate_notify_wait_time(events)
280            # Divide information by src dst rank_id.
281            src_dst_dict = self._divide_communication_info_by_src_dst_rank(events)
282            src_dst_link_info = self._calculate_src_dst_link_info(src_dst_dict)
283            communication_cost, communication_wait = self._calculate_device_communication_cost(src_dst_link_info)
284            total_notify_wait -= communication_wait
285            return [communication_cost, total_notify_wait, src_dst_link_info]
286
287        file_path = self._validate_file_path(file_path)
288        with open(file_path, 'r') as src_file:
289            try:
290                operator_info = json.load(src_file)
291            except (json.JSONDecodeError, TypeError) as err:
292                logger.warning(err)
293                raise ProfilerRawFileException('Fail to parse operator file.')
294        trace_events = operator_info.get("traceEvents")
295        operator_timestamp = trace_events[0].get("ts", 0)
296        step_id = self._calculate_the_step_by_timestamp(operator_timestamp)
297        # Statistics of communication operators in all streams.
298        total_communication_operator_iter_cost = \
299            _inner_calculate_communication_operator_iter_cost(trace_events)
300        # Statistics of communication operators in mainstream.
301        threads_dict = self._divide_communication_info_by_thread(trace_events)
302        # The largest value is mainstream.
303        major_thread = sorted(threads_dict, reverse=True)[0]
304        major_thread_trace_events = threads_dict.get(major_thread)
305        mainstream_communication_operator_iter_cost = \
306            _inner_calculate_communication_operator_iter_cost(major_thread_trace_events)
307        # index0:communication_cost,index1:communication_wait_cost,index2:link_info
308        return [step_id, mainstream_communication_operator_iter_cost[0],
309                mainstream_communication_operator_iter_cost[1],
310                total_communication_operator_iter_cost[2]]
311
312    @staticmethod
313    def _divide_communication_info_by_thread(trace_events: list):
314        """Divide information by thread."""
315        threads_dict = dict()
316        for item in trace_events:
317            thread_id = item.get("tid")
318            if thread_id not in threads_dict.keys():
319                threads_dict[thread_id] = [item]
320            else:
321                threads_dict[thread_id].append(item)
322        return threads_dict
323
324    def _divide_communication_info_by_src_dst_rank(self, trace_event: list):
325        """Divide information by src rank id and dst rank id"""
326        src_dst_dict = dict()
327        for item in trace_event:
328            src_rank = item.get("args").get("src rank")
329            dst_rank = item.get("args").get("dst rank")
330            if src_rank is None or dst_rank is None:
331                continue
332
333            # When the SDMA operation is in the card,
334            # the source card or destination card is 0xffffffff, and it needs to be converted to localrank.
335            if int(src_rank) == int('0xffffffff', 16):
336                src_rank = dst_rank
337
338            if int(dst_rank) == int('0xffffffff', 16):
339                dst_rank = src_rank
340
341            if item.get("args").get("transport type") == CommunicationInfo.LOCAL.value:
342                item["args"]["src rank"] = dst_rank
343                item["args"]["dst rank"] = src_rank
344                src_dst_key = str(dst_rank) + '-' + str(src_rank)
345            else:
346                src_dst_key = str(src_rank) + '-' + str(dst_rank)
347
348            if src_dst_key not in src_dst_dict.keys():
349                src_dst_dict[src_dst_key] = [item]
350            else:
351                src_dst_dict[src_dst_key].append(item)
352        return src_dst_dict
353
354    def _divide_communication_info_by_link_type(self, trace_event: list):
355        """Divide information by link type."""
356        link_type_dict = dict()
357        for item in trace_event:
358            link_type_key = item.get("args").get("transport type")
359            if link_type_key is None:
360                continue
361            if link_type_key in (CommunicationInfo.RDMA.value, CommunicationInfo.SDMA.value):
362                task_type = item.get("args").get("task type")
363                # Filter out the Notify Record operator in SDMA, because it does not transmit the actual amount of data.
364                if task_type == CommunicationInfo.NOTIFY_RECORD.value:
365                    continue
366                if link_type_dict.get(link_type_key):
367                    link_type_dict[link_type_key].append(item)
368                else:
369                    link_type_dict[link_type_key] = [item]
370            if link_type_key == CommunicationInfo.LOCAL.value:
371                if link_type_dict.get(CommunicationInfo.RDMA.value):
372                    link_type_dict[CommunicationInfo.RDMA.value].append(item)
373        return link_type_dict
374
375    def _calculate_device_communication_cost(self, src_dst_link_info: dict):
376        """Calculate notify wait time."""
377        total_communication_time = 0
378        total_wait_time = 0
379        for src_dst_value in src_dst_link_info.values():
380            for link_type_value in src_dst_value.values():
381                # time_cost:0,size_cost:1,brand_width:2,wait_time:3
382                total_communication_time += link_type_value[0]
383                if len(link_type_value) > 3:
384                    total_wait_time += link_type_value[3]
385        return total_communication_time, total_wait_time
386
387    def _parse_link_cost(self, result_dict, key, link_type_dict):
388        """Parse link cost."""
389        for link_type_key, link_type_value in link_type_dict.items():
390            if link_type_key == CommunicationInfo.RDMA.value:
391                # Divide information by thread.
392                rdma_infos = []
393                threads_dict = self._divide_communication_info_by_thread(link_type_value)
394                for thread_value in threads_dict.values():
395                    rdma_info = self._calculate_adma_link_info(thread_value)
396                    rdma_infos.append(rdma_info)
397                rdma_total_cost = np.sum(rdma_infos, axis=0).tolist()
398                result_dict[key][link_type_key] = rdma_total_cost
399            if link_type_key == CommunicationInfo.SDMA.value:
400                sdma_total_cost = self._calculate_sdma_link_info(link_type_value)
401                result_dict[key][link_type_key] = sdma_total_cost
402
403    def _calculate_src_dst_link_info(self, src_dst_dict: dict):
404        """Calculate src dst link info."""
405        result_dict = dict()
406        for key, value in src_dst_dict.items():
407            # Divide information by link type.
408            link_type_dict = self._divide_communication_info_by_link_type(value)
409            if not link_type_dict:
410                continue
411            result_dict[key] = dict()
412            self._parse_link_cost(result_dict, key, link_type_dict)
413        return result_dict
414
415    @staticmethod
416    def _calculate_adma_link_info(trace_event: list):
417        """
418        Calculate RDMA link info.
419
420        When the link is RDMA,it is necessary to match three consecutive operators RDMASend, RDMASend \
421        and Notify Wait,and take the sum of the time of the three operators as one communication time.
422        """
423        rdma_communication_time = 0
424        rdma_communication_size = 0
425        rdma_communication_wait_time = 0
426        start_index = 0
427        end_index = len(trace_event) - 1
428        while start_index < end_index:
429            first_task_type = trace_event[start_index].get("args").get("task type")
430            if first_task_type == CommunicationInfo.RDMASEND.value and start_index < end_index - 1:
431                second_task_type = trace_event[start_index + 1].get("args").get("task type")
432                third_task_type = trace_event[start_index + 2].get("args").get("task type")
433                if second_task_type == CommunicationInfo.RDMASEND.value and \
434                        third_task_type == CommunicationInfo.NOTIFY_WAIT.value:
435                    rdma_send_cost = trace_event[start_index].get("dur", 0)
436                    notify_record_cost = trace_event[start_index + 1].get("dur", 0)
437                    notify_wait_cost = trace_event[start_index + 2].get("dur", 0)
438                    rdma_communication_time += rdma_send_cost + notify_record_cost + notify_wait_cost
439                    rdma_communication_wait_time += notify_wait_cost
440                    rdma_size = trace_event[start_index].get("args").get("size")
441                    rdma_size = int(rdma_size, 16) if rdma_size else 0
442                    notify_record_size = trace_event[start_index + 1].get("args").get("size")
443                    notify_record_size = int(notify_record_size, 16) if notify_record_size else 0
444                    rdma_communication_size += rdma_size + notify_record_size
445                    start_index += 2
446            start_index += 1
447
448        # The unit of rdma_communication_wait_time is ms.
449        # The unit of rdma_bandwidth is KB/s.
450        # The unit of rdma_communication_size is k_byte and The unit of rdma_communication_time is ms.
451        rdma_communication_wait_time = rdma_communication_wait_time / 1e3
452        rdma_communication_size = rdma_communication_size / 1e3
453        rdma_communication_time = rdma_communication_time / 1e3
454        rdma_bandwidth = rdma_communication_size / (rdma_communication_time / 1e3) \
455            if rdma_communication_size else 0
456
457        return [rdma_communication_time, rdma_communication_size, rdma_bandwidth, rdma_communication_wait_time]
458
459    def _calculate_sdma_link_info(self, trace_event: list):
460        """
461        Calculate SDMA link info.
462
463        When the link is SDMA, the communication time of the primary link is the sum of the execution time\
464        of Reduce inline and Memcpy operators.
465        """
466        sdma_communication_time = 0
467        sdma_communication_size = 0
468
469        for item in trace_event:
470            task_type = item.get("args").get("task type")
471            if task_type in (CommunicationInfo.REDUCE_INLINE.value, CommunicationInfo.MEMCPY.value):
472                sdma_communication_time += item.get("dur", 0)
473                sdma_size = int(item.get("args").get("size"), 16) if item.get("args").get("size") else 0
474                sdma_communication_size += sdma_size
475
476        # The unit of sdma_bandwidth is KB/s.
477        # The unit of sdma_communication_size is k_byte and The unit of sdma_communication_time is ms.
478        sdma_communication_time = sdma_communication_time / 1e3
479        sdma_communication_size = sdma_communication_size / 1e3
480        sdma_bandwidth = sdma_communication_size / (sdma_communication_time / 1e3) \
481            if sdma_communication_size else 0
482        return [sdma_communication_time, sdma_communication_size, sdma_bandwidth]
483
484    @staticmethod
485    def _calculate_notify_wait_time(trace_event: list):
486        """Calculate notify wait time."""
487        total_notify_wait_time = 0
488        for item in trace_event:
489            task_type = item.get("args").get("task type")
490            if task_type == CommunicationInfo.NOTIFY_WAIT.value:
491                total_notify_wait_time += item.get("dur", 0)
492        # The unit of total_notify_wait_time is ms.
493        total_notify_wait_time = total_notify_wait_time / 1e3
494        return total_notify_wait_time
495
496    def _calculate_communication_average_value(self, communication_info: list):
497        """Calculate communication average value."""
498        communication_info_size = len(communication_info)
499        if communication_info_size == 0:
500            return []
501        # index1: communication_cost,index2:wait_cost,index3:link_info
502        communication_cost_average = sum([i[1] for i in communication_info]) / communication_info_size
503        wait_cost_average = sum([i[2] for i in communication_info]) / communication_info_size
504        link_info = [i[3] for i in communication_info]
505        calculate_type = 'average'
506        link_average_info = HcclParser._calculate_link_value(link_info, calculate_type)
507        return [communication_cost_average, wait_cost_average, link_average_info]
508
509    @staticmethod
510    def _parser_link_dict(result_dict, src_dst_key, src_dst_value):
511        """Parser link info to dict."""
512        if src_dst_key not in result_dict.keys():
513            result_dict[src_dst_key] = dict()
514        for link_key, link_value in src_dst_value.items():
515            if link_key not in result_dict[src_dst_key].keys():
516                result_dict[src_dst_key][link_key] = list()
517            result_dict[src_dst_key][link_key].append(link_value)
518
519    @staticmethod
520    def _calculate_link_value(link_info: list, calculate_type):
521        """Calculate link average or total value."""
522        result_dict = dict()
523        for item in link_info:
524            for src_dst_key, src_dst_value in item.items():
525                HcclParser._parser_link_dict(result_dict, src_dst_key, src_dst_value)
526        for src_dst_key, src_dst_value in result_dict.items():
527            for link_key, _ in src_dst_value.items():
528                if calculate_type == 'average':
529                    result_dict[src_dst_key][link_key] = np.mean(result_dict[src_dst_key][link_key], axis=0).tolist()
530                if calculate_type == 'total':
531                    result_dict[src_dst_key][link_key] = np.sum(result_dict[src_dst_key][link_key], axis=0).tolist()
532
533        return result_dict
534
535    def _validate_file_path(self, file_path):
536        """Validate file path."""
537        try:
538            file_path = validate_and_normalize_path(file_path)
539        except RuntimeError:
540            logger.warning('file path is invalid.')
541            raise ProfilerPathErrorException('file path is invalid.')
542        if not os.path.isfile(file_path):
543            logger.warning('The file <%s> not found.', file_path)
544            raise ProfilerFileNotFoundException(file_path)
545        return file_path
546
547    def _validate_dir_path(self, dir_path):
548        """Validate dir path."""
549        try:
550            dir_path = validate_and_normalize_path(dir_path)
551        except RuntimeError:
552            logger.warning('dir path is invalid.')
553            raise ProfilerPathErrorException('dir path is invalid.')
554        if not os.path.isdir(dir_path):
555            logger.warning('The  dir <%s> not found.', dir_path)
556            raise ProfilerDirNotFoundException(dir_path)
557        return dir_path
558