1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""The parser for parsing hccl files.""" 16import csv 17import json 18import os 19import stat 20from enum import Enum 21import numpy as np 22 23from mindspore.profiler.common.exceptions.exceptions import \ 24 ProfilerPathErrorException, ProfilerFileNotFoundException, \ 25 ProfilerDirNotFoundException, ProfilerRawFileException 26from mindspore import log as logger 27from mindspore.profiler.common.validator.validate_path import \ 28 validate_and_normalize_path 29 30 31class CommunicationInfo(Enum): 32 """ 33 Communication related enumeration types. 34 35 Enum: 36 RDMA: Communication link between servers in cluster training. 37 SDMA: Communication link inside server in cluster training. 38 LOCAL: The operation of this card has no transmission process. 39 RDMASEND:Communication operator of RDMA link. 40 REDUCE_INLINE:Communication operator of SDMA link. 41 MEMCPY:Communication operator of SDMA link. 42 NOTIFY_RECORD: Communication operator of SDMA link. 43 NOTIFY_WAIT: operator of LOCAL. 44 """ 45 RDMA = 'RDMA' 46 SDMA = 'SDMA' 47 LOCAL = 'LOCAL' 48 RDMASEND = 'RDMASend' 49 REDUCE_INLINE = 'Reduce Inline' 50 MEMCPY = 'Memcpy' 51 NOTIFY_RECORD = 'Notify Record' 52 NOTIFY_WAIT = 'Notify Wait' 53 54 55class HcclParser: 56 """ 57 The parser for parsing hccl file. 58 59 Args: 60 source_dir (str): The hccl source dir. 61 device_id (str): The device ID. 62 rank_id (str): The rank ID. 63 output_path (str): The directory of the parsed file. Default: `./`. 64 65 Raises: 66 ProfilerPathErrorException: If the hccl file path or the output path is invalid. 67 ProfilerFileNotFoundException: If the hccl file or the output dir does not exist. 68 """ 69 _parsed_hccl_file_name = 'hccl_raw_{}.csv' 70 _col_names = ['step_num', 'communication_cost', 'wait_cost', 'link_info', 'communication_operator_cost'] 71 72 def __init__(self, source_dir, device_id, rank_id, output_path): 73 self._dev_id = device_id 74 self._rank_id = rank_id 75 self._source_dir = source_dir 76 self._save_path = self._get_save_path(output_path) 77 self._step_trace_info = self._get_step_trace_info(output_path) 78 self._communication_operator_name_mapping_info = self._get_communication_operator_name_mapping_info() 79 80 def parse(self): 81 """Parse communication info.""" 82 self._parse_and_save(self._source_dir) 83 84 def _parse_communication_cost(self, operators_cost_info, info, operators_dict): 85 """Parse communication cost.""" 86 for key, value in operators_cost_info.items(): 87 for item in value: 88 # index0:step_num 89 if info[0] == item[0]: 90 operators_dict[key] = item 91 92 def _parse_and_save(self, dir_path): 93 """Parse and save communication info.""" 94 communication_info_cache = list() 95 operators_cost_info = self._get_communication_operators_cost_info(dir_path) 96 for key, value in operators_cost_info.items(): 97 for item in value: 98 communication_info_cache.append(item) 99 communication_info_cache = self._merge_communication_info_by_step_num(communication_info_cache) 100 for info in communication_info_cache: 101 operators_dict = dict() 102 self._parse_communication_cost(operators_cost_info, info, operators_dict) 103 info.append(operators_dict) 104 # Calculate device communication average. 105 device_communication_average_value = self._calculate_communication_average_value(communication_info_cache) 106 # Calculate operator communication average. 107 operators_average_value = dict() 108 for key, value in operators_cost_info.items(): 109 average_value = self._calculate_communication_average_value(value) 110 # The symbol '-' is used to indicate that the line is average information. 111 average_value.insert(0, '-') 112 operators_average_value[key] = average_value 113 device_communication_average_value.append(operators_average_value) 114 # The symbol '-' is used to indicate that the line is average information. 115 device_communication_average_value.insert(0, '-') 116 with open(self._save_path, 'w', newline='') as save_file: 117 csv_writer = csv.writer(save_file) 118 csv_writer.writerow(self._col_names) 119 for item in communication_info_cache: 120 # item[3]:link_info which is a dictionary that needs to be encoded before it is written to a CSV file. 121 # item[4]:it is a dictionary that needs to be encoded before it is written to a CSV file. 122 item[3] = json.dumps(item[3]) 123 item[4] = json.dumps(item[4]) 124 csv_writer.writerow(item) 125 # device_communication_average_value[3]: average value for link info 126 # device_communication_average_value[4]: average value for operator info 127 device_communication_average_value[3] = json.dumps(device_communication_average_value[3]) 128 device_communication_average_value[4] = json.dumps(device_communication_average_value[4]) 129 130 csv_writer.writerow(device_communication_average_value) 131 os.chmod(self._save_path, stat.S_IREAD | stat.S_IWRITE) 132 133 def _get_save_path(self, output_path): 134 """ 135 Get the save path. 136 137 Args: 138 output_path (str): The output dir. 139 140 Returns: 141 str, the save path. 142 """ 143 output_path = self._validate_dir_path(output_path) 144 return os.path.join( 145 output_path, self._parsed_hccl_file_name.format(self._rank_id) 146 ) 147 148 def _get_step_trace_info(self, source_dir): 149 """Get the start and end timestamps in a step and communication operators names.""" 150 file_path = os.path.join( 151 source_dir, 152 f'step_trace_raw_{self._rank_id}_detail_time.csv' 153 ) 154 try: 155 file_path = validate_and_normalize_path(file_path) 156 except RuntimeError: 157 logger.warning('file path is invalid.') 158 raise ProfilerPathErrorException('file path is invalid.') 159 if not os.path.isfile(file_path): 160 logger.warning('The step trace file <%s> not found.', file_path) 161 raise ProfilerFileNotFoundException(file_path) 162 163 with open(file_path, 'r') as src_file: 164 csv_reader = csv.reader(src_file) 165 # The first row of step trace file is like: step_num, start_point,...,communication_operator_name. 166 # The position number of the first communication operator name is 9. 167 communication_operators_names = next(csv_reader)[9:] 168 169 # index_0:step_num, index_1:start_point, index_2:end_point 170 # The unit of time stamp is 10ns. To convert it to μs, you need to divide it by 100. 171 step_timestamps_info = [[info[0], float(info[1]) / 100, float(info[2]) / 100] 172 for info in csv_reader if info[0].isdigit()] 173 174 return [communication_operators_names, step_timestamps_info] 175 176 def _get_communication_operator_name_mapping_info(self): 177 """Get the name of communication operators mapping between hccl and step trace.""" 178 dir_path = self._validate_dir_path(self._source_dir) 179 # The name of the operator in hccl is like:operatorName_{Ordered_number}_xx_xx. 180 operators_names_in_hccl = [entry.name for entry in os.scandir(dir_path) if entry.is_dir() 181 and entry.name.endswith(self._dev_id)] 182 operators_names_in_hccl_set = set({i.split('_')[0] for i in operators_names_in_hccl}) 183 op_names_in_hccl_dic = dict() 184 for item in operators_names_in_hccl_set: 185 op_names_in_hccl_dic[item] = sorted([i for i in operators_names_in_hccl if i.split('_')[0] == item], 186 key=lambda x: int(x.split('_')[1])) 187 188 # The op_info in step trace is like: [op_name,op_name_start_point,op_name_end_point] 189 # The name of the operator in step trace can be obtained every three. 190 # The name of the operator in step trace is like: stream_xx_xx_operatorName-opxx. 191 operators_names_in_step_trace = [self._step_trace_info[0][i] 192 for i in range(0, len(self._step_trace_info[0]), 3)] 193 op_names_in_step_trace_set = set({i.split('_')[3].split('-')[0] for i in operators_names_in_step_trace}) 194 op_names_in_step_trace_dic = dict() 195 for item in op_names_in_step_trace_set: 196 op_names_in_step_trace_dic[item] = [i for i in operators_names_in_step_trace 197 if i.split('_')[3].split('-')[0] == item] 198 199 communication_operator_mapping_info = dict() 200 for hccl_key, hccl_value in op_names_in_hccl_dic.items(): 201 for step_trace_key, step_trace_value in op_names_in_step_trace_dic.items(): 202 if hccl_key.lower() == step_trace_key.lower(): 203 communication_operator_mapping_info[hccl_key] = list(zip(hccl_value, step_trace_value)) 204 205 logger.info("Communication operator name mapping info is %s", communication_operator_mapping_info) 206 207 return communication_operator_mapping_info 208 209 def _calculate_the_step_by_timestamp(self, timestamp): 210 """Calculate the step according to the timestamp.""" 211 # index0:communication_operator_name, index1:step_timestamps_info 212 step_timestamps_info = self._step_trace_info[1] 213 step_timestamps_len = len(step_timestamps_info) 214 # index_0:step_num, index_1:start_point, index_2:end_point 215 if timestamp < step_timestamps_info[0][1]: 216 step_num = "1" 217 elif step_timestamps_info[step_timestamps_len - 1][2] < timestamp: 218 step_num = step_timestamps_info[step_timestamps_len - 1][0] 219 else: 220 for item in step_timestamps_info: 221 if item[1] <= timestamp < item[2]: 222 step_num = item[0] 223 return step_num 224 225 def _get_communication_operators_cost_info(self, dir_path): 226 """Obtain time-consuming information of all communication operators.""" 227 operators_cost_info = dict() 228 dir_path = self._validate_dir_path(dir_path) 229 operators_dir = [entry.name for entry in os.scandir(dir_path) if entry.is_dir() 230 and entry.name.endswith(self._dev_id)] 231 operator_dir_path = [os.path.join(dir_path, operator_dir) for operator_dir in operators_dir] 232 for operator_dir in operator_dir_path: 233 operator_cost = self._calculate_communication_operator_cost(operator_dir) 234 operator_name = os.path.basename(operator_dir) 235 op_mapping_info = self._communication_operator_name_mapping_info.get(operator_name.split('_')[0], []) 236 # index1: operator name in step trace. 237 op_mapping_name = [item[1] for item in op_mapping_info if item[0] == operator_name] 238 if not op_mapping_name: 239 logger.warning("The mapping relationship between op name in hccl and op name in step trace " 240 "cannot be found. Use op name in hccl to show the name of the communication operator.") 241 else: 242 operator_name = op_mapping_name[0] 243 operators_cost_info[operator_name] = operator_cost 244 return operators_cost_info 245 246 def _calculate_communication_operator_cost(self, dir_path): 247 """Calculate communication operator cost. Such as allReduce_1,allReduce_2.""" 248 dir_path = self._validate_dir_path(dir_path) 249 files = [entry.name for entry in os.scandir(dir_path) if entry.is_file()] 250 files_path = [os.path.join(dir_path, file) for file in files] 251 operator_cost = list(map(self._calculate_communication_operator_iter_cost, files_path)) 252 # Add the same step_num merge. 253 steps_operator_cost = self._merge_communication_info_by_step_num(operator_cost) 254 return steps_operator_cost 255 256 def _merge_communication_info_by_step_num(self, communication_info: list): 257 """According to step num to merge communication info.""" 258 steps_communication_info = list() 259 info_set = set() 260 for item in communication_info: 261 # index0:step_num,index1:communication_cost,index2:communication_wait_cost,index3:link_info 262 if item[0].isdigit(): 263 info_set.add(int(item[0])) 264 info_set = sorted(info_set) 265 for item in info_set: 266 item = str(item) 267 step_communication_info = [info for info in communication_info if info[0] == item] 268 step_communication_cost = sum([i[1] for i in step_communication_info]) 269 step_communication_wait_cost = sum([i[2] for i in step_communication_info]) 270 step_communication_link = self._calculate_link_value([i[3] for i in step_communication_info], "total") 271 steps_communication_info.append([item, step_communication_cost, 272 step_communication_wait_cost, step_communication_link]) 273 return steps_communication_info 274 275 def _calculate_communication_operator_iter_cost(self, file_path): 276 """Calculate the time-consuming of communication operator in one execution round.""" 277 278 def _inner_calculate_communication_operator_iter_cost(events): 279 total_notify_wait = HcclParser._calculate_notify_wait_time(events) 280 # Divide information by src dst rank_id. 281 src_dst_dict = self._divide_communication_info_by_src_dst_rank(events) 282 src_dst_link_info = self._calculate_src_dst_link_info(src_dst_dict) 283 communication_cost, communication_wait = self._calculate_device_communication_cost(src_dst_link_info) 284 total_notify_wait -= communication_wait 285 return [communication_cost, total_notify_wait, src_dst_link_info] 286 287 file_path = self._validate_file_path(file_path) 288 with open(file_path, 'r') as src_file: 289 try: 290 operator_info = json.load(src_file) 291 except (json.JSONDecodeError, TypeError) as err: 292 logger.warning(err) 293 raise ProfilerRawFileException('Fail to parse operator file.') 294 trace_events = operator_info.get("traceEvents") 295 operator_timestamp = trace_events[0].get("ts", 0) 296 step_id = self._calculate_the_step_by_timestamp(operator_timestamp) 297 # Statistics of communication operators in all streams. 298 total_communication_operator_iter_cost = \ 299 _inner_calculate_communication_operator_iter_cost(trace_events) 300 # Statistics of communication operators in mainstream. 301 threads_dict = self._divide_communication_info_by_thread(trace_events) 302 # The largest value is mainstream. 303 major_thread = sorted(threads_dict, reverse=True)[0] 304 major_thread_trace_events = threads_dict.get(major_thread) 305 mainstream_communication_operator_iter_cost = \ 306 _inner_calculate_communication_operator_iter_cost(major_thread_trace_events) 307 # index0:communication_cost,index1:communication_wait_cost,index2:link_info 308 return [step_id, mainstream_communication_operator_iter_cost[0], 309 mainstream_communication_operator_iter_cost[1], 310 total_communication_operator_iter_cost[2]] 311 312 @staticmethod 313 def _divide_communication_info_by_thread(trace_events: list): 314 """Divide information by thread.""" 315 threads_dict = dict() 316 for item in trace_events: 317 thread_id = item.get("tid") 318 if thread_id not in threads_dict.keys(): 319 threads_dict[thread_id] = [item] 320 else: 321 threads_dict[thread_id].append(item) 322 return threads_dict 323 324 def _divide_communication_info_by_src_dst_rank(self, trace_event: list): 325 """Divide information by src rank id and dst rank id""" 326 src_dst_dict = dict() 327 for item in trace_event: 328 src_rank = item.get("args").get("src rank") 329 dst_rank = item.get("args").get("dst rank") 330 if src_rank is None or dst_rank is None: 331 continue 332 333 # When the SDMA operation is in the card, 334 # the source card or destination card is 0xffffffff, and it needs to be converted to localrank. 335 if int(src_rank) == int('0xffffffff', 16): 336 src_rank = dst_rank 337 338 if int(dst_rank) == int('0xffffffff', 16): 339 dst_rank = src_rank 340 341 if item.get("args").get("transport type") == CommunicationInfo.LOCAL.value: 342 item["args"]["src rank"] = dst_rank 343 item["args"]["dst rank"] = src_rank 344 src_dst_key = str(dst_rank) + '-' + str(src_rank) 345 else: 346 src_dst_key = str(src_rank) + '-' + str(dst_rank) 347 348 if src_dst_key not in src_dst_dict.keys(): 349 src_dst_dict[src_dst_key] = [item] 350 else: 351 src_dst_dict[src_dst_key].append(item) 352 return src_dst_dict 353 354 def _divide_communication_info_by_link_type(self, trace_event: list): 355 """Divide information by link type.""" 356 link_type_dict = dict() 357 for item in trace_event: 358 link_type_key = item.get("args").get("transport type") 359 if link_type_key is None: 360 continue 361 if link_type_key in (CommunicationInfo.RDMA.value, CommunicationInfo.SDMA.value): 362 task_type = item.get("args").get("task type") 363 # Filter out the Notify Record operator in SDMA, because it does not transmit the actual amount of data. 364 if task_type == CommunicationInfo.NOTIFY_RECORD.value: 365 continue 366 if link_type_dict.get(link_type_key): 367 link_type_dict[link_type_key].append(item) 368 else: 369 link_type_dict[link_type_key] = [item] 370 if link_type_key == CommunicationInfo.LOCAL.value: 371 if link_type_dict.get(CommunicationInfo.RDMA.value): 372 link_type_dict[CommunicationInfo.RDMA.value].append(item) 373 return link_type_dict 374 375 def _calculate_device_communication_cost(self, src_dst_link_info: dict): 376 """Calculate notify wait time.""" 377 total_communication_time = 0 378 total_wait_time = 0 379 for src_dst_value in src_dst_link_info.values(): 380 for link_type_value in src_dst_value.values(): 381 # time_cost:0,size_cost:1,brand_width:2,wait_time:3 382 total_communication_time += link_type_value[0] 383 if len(link_type_value) > 3: 384 total_wait_time += link_type_value[3] 385 return total_communication_time, total_wait_time 386 387 def _parse_link_cost(self, result_dict, key, link_type_dict): 388 """Parse link cost.""" 389 for link_type_key, link_type_value in link_type_dict.items(): 390 if link_type_key == CommunicationInfo.RDMA.value: 391 # Divide information by thread. 392 rdma_infos = [] 393 threads_dict = self._divide_communication_info_by_thread(link_type_value) 394 for thread_value in threads_dict.values(): 395 rdma_info = self._calculate_adma_link_info(thread_value) 396 rdma_infos.append(rdma_info) 397 rdma_total_cost = np.sum(rdma_infos, axis=0).tolist() 398 result_dict[key][link_type_key] = rdma_total_cost 399 if link_type_key == CommunicationInfo.SDMA.value: 400 sdma_total_cost = self._calculate_sdma_link_info(link_type_value) 401 result_dict[key][link_type_key] = sdma_total_cost 402 403 def _calculate_src_dst_link_info(self, src_dst_dict: dict): 404 """Calculate src dst link info.""" 405 result_dict = dict() 406 for key, value in src_dst_dict.items(): 407 # Divide information by link type. 408 link_type_dict = self._divide_communication_info_by_link_type(value) 409 if not link_type_dict: 410 continue 411 result_dict[key] = dict() 412 self._parse_link_cost(result_dict, key, link_type_dict) 413 return result_dict 414 415 @staticmethod 416 def _calculate_adma_link_info(trace_event: list): 417 """ 418 Calculate RDMA link info. 419 420 When the link is RDMA,it is necessary to match three consecutive operators RDMASend, RDMASend \ 421 and Notify Wait,and take the sum of the time of the three operators as one communication time. 422 """ 423 rdma_communication_time = 0 424 rdma_communication_size = 0 425 rdma_communication_wait_time = 0 426 start_index = 0 427 end_index = len(trace_event) - 1 428 while start_index < end_index: 429 first_task_type = trace_event[start_index].get("args").get("task type") 430 if first_task_type == CommunicationInfo.RDMASEND.value and start_index < end_index - 1: 431 second_task_type = trace_event[start_index + 1].get("args").get("task type") 432 third_task_type = trace_event[start_index + 2].get("args").get("task type") 433 if second_task_type == CommunicationInfo.RDMASEND.value and \ 434 third_task_type == CommunicationInfo.NOTIFY_WAIT.value: 435 rdma_send_cost = trace_event[start_index].get("dur", 0) 436 notify_record_cost = trace_event[start_index + 1].get("dur", 0) 437 notify_wait_cost = trace_event[start_index + 2].get("dur", 0) 438 rdma_communication_time += rdma_send_cost + notify_record_cost + notify_wait_cost 439 rdma_communication_wait_time += notify_wait_cost 440 rdma_size = trace_event[start_index].get("args").get("size") 441 rdma_size = int(rdma_size, 16) if rdma_size else 0 442 notify_record_size = trace_event[start_index + 1].get("args").get("size") 443 notify_record_size = int(notify_record_size, 16) if notify_record_size else 0 444 rdma_communication_size += rdma_size + notify_record_size 445 start_index += 2 446 start_index += 1 447 448 # The unit of rdma_communication_wait_time is ms. 449 # The unit of rdma_bandwidth is KB/s. 450 # The unit of rdma_communication_size is k_byte and The unit of rdma_communication_time is ms. 451 rdma_communication_wait_time = rdma_communication_wait_time / 1e3 452 rdma_communication_size = rdma_communication_size / 1e3 453 rdma_communication_time = rdma_communication_time / 1e3 454 rdma_bandwidth = rdma_communication_size / (rdma_communication_time / 1e3) \ 455 if rdma_communication_size else 0 456 457 return [rdma_communication_time, rdma_communication_size, rdma_bandwidth, rdma_communication_wait_time] 458 459 def _calculate_sdma_link_info(self, trace_event: list): 460 """ 461 Calculate SDMA link info. 462 463 When the link is SDMA, the communication time of the primary link is the sum of the execution time\ 464 of Reduce inline and Memcpy operators. 465 """ 466 sdma_communication_time = 0 467 sdma_communication_size = 0 468 469 for item in trace_event: 470 task_type = item.get("args").get("task type") 471 if task_type in (CommunicationInfo.REDUCE_INLINE.value, CommunicationInfo.MEMCPY.value): 472 sdma_communication_time += item.get("dur", 0) 473 sdma_size = int(item.get("args").get("size"), 16) if item.get("args").get("size") else 0 474 sdma_communication_size += sdma_size 475 476 # The unit of sdma_bandwidth is KB/s. 477 # The unit of sdma_communication_size is k_byte and The unit of sdma_communication_time is ms. 478 sdma_communication_time = sdma_communication_time / 1e3 479 sdma_communication_size = sdma_communication_size / 1e3 480 sdma_bandwidth = sdma_communication_size / (sdma_communication_time / 1e3) \ 481 if sdma_communication_size else 0 482 return [sdma_communication_time, sdma_communication_size, sdma_bandwidth] 483 484 @staticmethod 485 def _calculate_notify_wait_time(trace_event: list): 486 """Calculate notify wait time.""" 487 total_notify_wait_time = 0 488 for item in trace_event: 489 task_type = item.get("args").get("task type") 490 if task_type == CommunicationInfo.NOTIFY_WAIT.value: 491 total_notify_wait_time += item.get("dur", 0) 492 # The unit of total_notify_wait_time is ms. 493 total_notify_wait_time = total_notify_wait_time / 1e3 494 return total_notify_wait_time 495 496 def _calculate_communication_average_value(self, communication_info: list): 497 """Calculate communication average value.""" 498 communication_info_size = len(communication_info) 499 if communication_info_size == 0: 500 return [] 501 # index1: communication_cost,index2:wait_cost,index3:link_info 502 communication_cost_average = sum([i[1] for i in communication_info]) / communication_info_size 503 wait_cost_average = sum([i[2] for i in communication_info]) / communication_info_size 504 link_info = [i[3] for i in communication_info] 505 calculate_type = 'average' 506 link_average_info = HcclParser._calculate_link_value(link_info, calculate_type) 507 return [communication_cost_average, wait_cost_average, link_average_info] 508 509 @staticmethod 510 def _parser_link_dict(result_dict, src_dst_key, src_dst_value): 511 """Parser link info to dict.""" 512 if src_dst_key not in result_dict.keys(): 513 result_dict[src_dst_key] = dict() 514 for link_key, link_value in src_dst_value.items(): 515 if link_key not in result_dict[src_dst_key].keys(): 516 result_dict[src_dst_key][link_key] = list() 517 result_dict[src_dst_key][link_key].append(link_value) 518 519 @staticmethod 520 def _calculate_link_value(link_info: list, calculate_type): 521 """Calculate link average or total value.""" 522 result_dict = dict() 523 for item in link_info: 524 for src_dst_key, src_dst_value in item.items(): 525 HcclParser._parser_link_dict(result_dict, src_dst_key, src_dst_value) 526 for src_dst_key, src_dst_value in result_dict.items(): 527 for link_key, _ in src_dst_value.items(): 528 if calculate_type == 'average': 529 result_dict[src_dst_key][link_key] = np.mean(result_dict[src_dst_key][link_key], axis=0).tolist() 530 if calculate_type == 'total': 531 result_dict[src_dst_key][link_key] = np.sum(result_dict[src_dst_key][link_key], axis=0).tolist() 532 533 return result_dict 534 535 def _validate_file_path(self, file_path): 536 """Validate file path.""" 537 try: 538 file_path = validate_and_normalize_path(file_path) 539 except RuntimeError: 540 logger.warning('file path is invalid.') 541 raise ProfilerPathErrorException('file path is invalid.') 542 if not os.path.isfile(file_path): 543 logger.warning('The file <%s> not found.', file_path) 544 raise ProfilerFileNotFoundException(file_path) 545 return file_path 546 547 def _validate_dir_path(self, dir_path): 548 """Validate dir path.""" 549 try: 550 dir_path = validate_and_normalize_path(dir_path) 551 except RuntimeError: 552 logger.warning('dir path is invalid.') 553 raise ProfilerPathErrorException('dir path is invalid.') 554 if not os.path.isdir(dir_path): 555 logger.warning('The dir <%s> not found.', dir_path) 556 raise ProfilerDirNotFoundException(dir_path) 557 return dir_path 558