1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3import os 4import sys 5import json 6import time 7import argparse 8import random 9import string 10import threading 11import multiprocessing 12from multiprocessing.sharedctypes import Value 13from copy import deepcopy 14from subprocess import getstatusoutput 15import yaml 16 17script_path = os.path.dirname(os.path.abspath(__file__)) 18sys.path.insert(0, script_path) 19 20# command line parameters 21parser = argparse.ArgumentParser() 22parser.add_argument("--env_config_file", dest="env_config_file", required=True, 23 help='env net config file', type=str) 24parser.add_argument("--case_env_config_path", dest="case_env_config_path", 25 required=True, 26 help='case and env mapping configuration file', type=str) 27parser.add_argument("--case_root", dest="case_root", required=True, 28 help='case root path', type=str) 29parser.add_argument("--filter_keyword", dest="filter_keyword", required=True, 30 help='filter key word, decide which cases to run', type=str) 31parser.add_argument("--env_type", dest="env_type", required=True, 32 help='env type', type=str) 33 34 35def opener(path, flags): 36 return os.open(path, flags, 0o644) 37 38 39class FileClass: 40 def __init__(self, fname, flags, mode): 41 self.fd = os.open(fname, flags, mode) 42 43 def __enter__(self): 44 return self 45 46 def __exit__(self, etype, evalue, etrace): 47 os.close(self.fd) 48 49 def write(self, text): 50 os.write(self.fd, str.encode(text)) 51 52 53def fopen(file_name, file_flags, file_mode): 54 return FileClass(file_name, file_flags, file_mode) 55 56 57class RhineThread(threading.Thread): 58 def __init__(self, func, args_in, key_args_in): 59 super(RhineThread, self).__init__() 60 self.args = args_in 61 self.key_args = key_args_in 62 self.func = func 63 self.result = None 64 65 def run(self): 66 self.result = self.func(*self.args, **self.key_args) 67 68 def get_result(self): 69 return self.result 70 71 72class CommonThread: 73 def __init__(self): 74 self.thread_list = [] 75 76 def register(self, func, args, key_args=None, is_daemon=True): 77 if key_args is None: 78 key_args = {} 79 t_thread = RhineThread(func, args_in=args, key_args_in=key_args) 80 t_thread.daemon = is_daemon 81 self.thread_list.append(t_thread) 82 83 def start(self): 84 if not self.thread_list: 85 return None 86 87 for t_thread in self.thread_list: 88 t_thread.start() 89 90 for t_thread in self.thread_list: 91 t_thread.join() 92 93 result_list = [t_thread.get_result() for t_thread in self.thread_list] 94 95 self.thread_list.clear() 96 return result_list 97 98 99class CaseRunner: 100 def __init__(self, env_config_file, case_env_config_path, case_root, 101 filter_keyword, env_type): 102 self.env_config_file = env_config_file 103 self.case_env_config_path = case_env_config_path 104 self.case_root = case_root 105 self.filter_keyword = filter_keyword 106 self.env_type = env_type 107 with open(case_env_config_path) as f: 108 self.case_env_config = yaml.safe_load(f) 109 110 self.script_path = script_path 111 self.pytest_ini = os.path.join(self.script_path, "config", "pytest.ini") 112 self.tmp_case_txt = "/tmp/tmp_case_info.txt" 113 self.tmp_attr_case_json = "/tmp/tmp_attr_case.json" 114 115 self.python_path = None 116 self.pytest_path = None 117 self.extend_envs = None 118 self.run_path = None 119 self.log_path = None 120 self.device_ids = None 121 self.overall_networks = False 122 self.check_env_config() 123 124 self.one_card_txt_info = None 125 self.single_txt_info = None 126 127 # testcase output log format 128 self.stty_col = 200 129 self.env_type_width = 15 130 self.result_width = 15 131 self.run_time_width = 15 132 self.test_case_width = 90 133 self.case_path_width = 50 134 self.progress_width = 15 135 136 @staticmethod 137 def get_filter_file_dirs(file_list): 138 """ group all testcase files by directory """ 139 dir_info = {} 140 for file in file_list: 141 dir_name = os.path.dirname(file) 142 case_file = file.split("/")[-1] 143 if not dir_info.get(dir_name): 144 dir_info[dir_name] = case_file 145 else: 146 dir_info[dir_name] += " {0}".format(case_file) 147 148 print("get_filter_file_dirs finished: {0}".format(json.dumps(dir_info, indent=2))) 149 return dir_info 150 151 @staticmethod 152 def write_header_to_csv(): 153 with fopen("run_cmds.csv", os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0o644) as fo: 154 fo.write("device_id, begin_time, end_time, num_executed, num_total, run_result, run_cmd\n") 155 156 @staticmethod 157 def write_result_to_csv(result_record): 158 dev_id, beg_t, end_t, num_executed, num_total, run_result, run_cmd = result_record 159 with fopen("run_cmds.csv", os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0o644) as fo: 160 fo.write('%d, %f, %f, %d, %d, %s, %s\n' 161 % (dev_id, beg_t, end_t, num_executed, num_total, run_result, run_cmd)) 162 163 def create_init_py(self, filepath): 164 init_file = os.path.join(filepath, "__init__.py") 165 # already has file `__init__.py` 166 if not os.path.exists(init_file): 167 with fopen(init_file, os.O_WRONLY|os.O_CREAT, 0o644) as fo: 168 fo.write("") 169 files = os.listdir(filepath) 170 for fi in files: 171 if str(fi).startswith("."): 172 continue 173 fi_d = os.path.join(filepath, fi) 174 if os.path.isdir(fi_d): 175 self.create_init_py(fi_d) 176 177 def check_env_config(self): 178 def is_executable(path): 179 ret, _ = getstatusoutput("test -f {0} && test -x {0}".format(path)) 180 return ret == 0 181 182 def search_executable(cmd): 183 return getstatusoutput("command -v {0}".format(cmd)) 184 185 def get_cfg_executable(cmd, path): 186 if is_executable(path): 187 return path 188 if not path.startswith("XXX"): 189 raise ValueError("Path `{0}` is not executable file".format(path)) 190 ret, path = search_executable(cmd) 191 if ret == 0: 192 return path 193 raise ValueError("Can not find executable cmd: {0}".format(cmd)) 194 195 def is_existed(path): 196 ret, _ = getstatusoutput("test -e {0}".format(path)) 197 return ret == 0 198 199 def is_directory(path): 200 ret, _ = getstatusoutput("test -d {0} && test -x {0}".format(path)) 201 return ret == 0 202 203 def get_cfg_directory(path): 204 if is_directory(path): 205 return path 206 if is_existed(path): 207 raise ValueError("Path `{0}` already exists, but is not a directory".format(path)) 208 ret, _ = getstatusoutput("mkdir -p {0}".format(path)) 209 if ret != 0: 210 raise ValueError("Try to make path `{0}` failed".format(path)) 211 return path 212 213 with open(self.env_config_file) as f: 214 env_net_config = yaml.safe_load(f) 215 self.python_path = get_cfg_executable('python', env_net_config['python_path']) 216 self.pytest_path = get_cfg_executable('pytest', env_net_config['pytest_path']) 217 self.run_path = get_cfg_directory(env_net_config['run_path']) 218 self.log_path = get_cfg_directory(env_net_config['log_path']) 219 self.extend_envs = env_net_config['extend_envs'] 220 if str(self.extend_envs).strip() == "" or self.extend_envs.startswith("XXX"): 221 _, exec_path = getstatusoutput("dirname {0}".format(self.pytest_path)) 222 self.extend_envs = "export PATH=%s:${PATH}; export PYTHONPATH=%s:${PYTHONPATH}" \ 223 % (exec_path, self.case_root) 224 # check and set device_ids 225 device_ids = env_net_config['virtualenv']['device_ids'] 226 if not device_ids or len(device_ids) > 8: 227 raise ValueError("Invalid config device_ids: {0}".format(device_ids)) 228 dev_id_list = [] 229 for e in device_ids: 230 # if device_ids contains duplicated elements, report error 231 if e in range(0, 8) and (not e in dev_id_list): 232 dev_id_list.append(e) 233 else: 234 raise ValueError("Invalid config device_ids: {0}".format(device_ids)) 235 self.device_ids = device_ids 236 if len(device_ids) == 8 and env_net_config['virtualenv']['overall_networks']: 237 self.overall_networks = True 238 else: 239 self.overall_networks = False 240 241 def clear_last_tmp_file(self): 242 clear_tmp_file_cmd = """rm -f %s; rm -f %s""" % ( 243 self.tmp_case_txt, self.tmp_attr_case_json) 244 os.system(clear_tmp_file_cmd) 245 246 def get_ready_for_filter(self): 247 # create `__init__.py` if not exist 248 self.create_init_py(self.case_root) 249 # clear temp files 250 self.clear_last_tmp_file() 251 return True 252 253 def get_suitable_file(self, search_case_type): 254 """ filter testcase files which satisfy condition (e.g. level0 and env type)""" 255 get_cases_info_cmd = """ grep -rE "%s" %s/* --include="*.py" """ % (search_case_type, self.case_root) 256 status, output = getstatusoutput(get_cases_info_cmd) 257 if int(status) == 1 and not output: 258 print("get_singel_env_case_filter :: no suitable cases found for search_case_type:{0}" 259 .format(search_case_type)) 260 # write blank to result json file 261 with fopen(self.tmp_attr_case_json, os.O_WRONLY|os.O_CREAT, 0o644) as fo: 262 fo.write(json.dumps([])) 263 return False, "cases result is null" 264 265 filter_case_cmd = """%s | awk -F ':' '{print $1}' | uniq""" % (get_cases_info_cmd) 266 status, output = getstatusoutput(filter_case_cmd) 267 if int(status) != 0 or not output: 268 print("get_suitable_file failed for get case filter, status:{0},output:{1},filter_case_cmd:{2}" \ 269 .format(status, output, filter_case_cmd)) 270 return False, None 271 272 return True, output.split("\n") 273 274 def filter_case_with_one_dir(self, case_path, case_file, extend_envs, pytest_path, search_case_type): 275 """ filter testcases from single directory """ 276 tmp_file = os.path.join(os.path.dirname(self.tmp_case_txt), "{0}.txt".format(str(case_path).replace("/", "_"))) 277 case_type_filter = "" 278 for case_type in search_case_type.split("|"): 279 if not case_type_filter: 280 case_type_filter += "({0} ".format(case_type) 281 else: 282 case_type_filter += " or {0}".format(case_type) 283 case_type_filter += ")" 284 285 filter_case_cmd = """{0}; export PYTHONPATH={1}:$PYTHONPATH; """ \ 286 """cd {1} && {2} --collect-only -m '{3} and {4}' -c {5} {6} 2>&1 >{7}""". \ 287 format(extend_envs, case_path, pytest_path, self.filter_keyword, case_type_filter, 288 self.pytest_ini, case_file, tmp_file) 289 290 print("filter_case_with_one_dir filter_case_cmd is : {0}".format(filter_case_cmd)) 291 status, _ = getstatusoutput(filter_case_cmd) 292 if int(status) == 5: 293 print("no any gate test case to be found in {0}".format(case_path)) 294 elif int(status) != 0: 295 return {"ret_val": False, "output_file": tmp_file, "case_path": case_path} 296 297 return {"ret_val": True, "output_file": tmp_file, "case_path": case_path} 298 299 def filer_case_with_files(self, file_list, search_case_type): 300 extend_envs = """source /etc/profile; {0}""".format(self.extend_envs) 301 302 # create one thread for per directory to filter testcases 303 t_thread_list = CommonThread() 304 dir_info = self.get_filter_file_dirs(file_list) 305 for case_path, case_files in dir_info.items(): 306 t_thread_list.register(self.filter_case_with_one_dir, args=( 307 case_path, case_files, extend_envs, self.pytest_path, search_case_type)) 308 309 result_list = t_thread_list.start() 310 311 fail_filter_list = [] 312 for result in result_list: 313 output_path = result["output_file"] 314 if not result["ret_val"]: 315 fail_filter_list.append({"file_path": result["case_path"], "output_path": output_path}) 316 continue 317 318 # filter successfully, write result to temp file for later use 319 fuse_cmd = """ [ -f {0} ] && cat {0} >> {1} ; rm -f {0}""".format(output_path, self.tmp_case_txt) 320 os.system(fuse_cmd) 321 322 # write detailed message for failed thread 323 if fail_filter_list: 324 for fail_filter in fail_filter_list: 325 file_path = fail_filter["file_path"] 326 output_path = fail_filter["output_path"] 327 print("filter failed for {0}".format(file_path)) 328 print_fail_cmd = "[ -f {0} ] && cat {0} ; rm -f {0}".format(output_path) 329 _, output = getstatusoutput(print_fail_cmd) 330 print(output) 331 return False 332 333 return True 334 335 def get_env_case_filter(self): 336 print(f'get_env_case_filter start\ncase_type: {self.case_env_config["case_type"]}') 337 search_case_type = "" 338 for case_type, case_env in dict( 339 self.case_env_config["case_type"]).items(): 340 if str(case_env).__contains__(self.env_type) and case_type != "ALL": 341 if not search_case_type: 342 search_case_type = case_type 343 else: 344 search_case_type += "|{0}".format(case_type) 345 if not search_case_type: 346 print("get_singel_env_case_filter, no suitable case_env_config element found for env : {0}".format( 347 self.env_type)) 348 return False, None 349 350 # filter testcase files first 351 ret_val, search_file_list = self.get_suitable_file(search_case_type) 352 # execute success but not found any testcase 353 if not ret_val and isinstance(search_file_list, (str,)) and search_file_list == "cases result is null": 354 return False, "cases result is null" 355 if not ret_val: 356 return False, None 357 358 # filter testcases from files 359 if not self.get_ready_for_filter(): 360 return False, None 361 362 # filter testcases by file 363 ret_val = self.filer_case_with_files(search_file_list, search_case_type) 364 if not ret_val: 365 print("filer_case_with_files fail") 366 return False, None 367 368 return True, "filter success" 369 370 def parser_case_module(self): 371 """ parse testcase module info from pytest collect-only result """ 372 print("parser_case_module start") 373 case_content = [] 374 tmp_dict = {} 375 376 file_handle = open(self.tmp_case_txt, "r") 377 last_package = "" 378 last_class = "" 379 last_module = "" 380 case_root = "" 381 while True: 382 lines = file_handle.readlines(2000) 383 if not lines: 384 if tmp_dict and tmp_dict not in case_content and ( 385 tmp_dict.get("function_list") or tmp_dict.get("class")): 386 case_content.append(tmp_dict) 387 break 388 max_len = len(lines) 389 for index, line in enumerate(lines): 390 if line.startswith("rootdir: "): 391 case_root = line.split(':')[1].split(',')[0].strip().replace(' ', '') 392 elif line.startswith("<Package "): 393 if tmp_dict: 394 case_content.append(tmp_dict) 395 tmp_dict = {} 396 397 psr_pkg = line.replace("<Package ", "").split(">")[0] 398 # pytest result absolute path 399 if psr_pkg.startswith(os.sep): 400 last_package = psr_pkg 401 # pytest result relative path 402 else: 403 last_package = case_root 404 405 tmp_dict["package"] = last_package 406 elif line.startswith(" <Module "): 407 module = line.replace(" <Module ", "").split(">")[0] 408 if last_module and module != last_module: 409 if tmp_dict.get("function_list") or tmp_dict.get("class"): 410 case_content.append(tmp_dict) 411 tmp_dict = {"package": last_package, "module": module} 412 tmp_dict["module"] = module 413 last_module = module 414 elif line.startswith("<Module "): 415 module = line.replace("<Module ", "").split(">")[0] 416 if last_module and module != last_module: 417 if tmp_dict.get("function_list") or tmp_dict.get("class"): 418 case_content.append(tmp_dict) 419 tmp_dict = {"package": last_package, "module": module} 420 421 tmp_dict["module"] = module 422 last_module = module 423 elif line.startswith(" <Function "): 424 if not tmp_dict.get("function_list", None): 425 tmp_dict["function_list"] = [] 426 tmp_dict.get("function_list").append(line.replace(" <Function ", "").split(">")[0]) 427 if index < max_len - 1 and str(lines[index + 1]).__contains__("<Module"): 428 case_content.append(tmp_dict) 429 tmp_dict = {"package": last_package, "module": last_module, "class": last_class} 430 continue 431 if index < max_len - 1 and not lines[index + 1].startswith(" <Function "): 432 case_content.append(tmp_dict) 433 tmp_dict = {} 434 435 elif line.startswith(" <Function "): 436 if not tmp_dict.get("function_list", None): 437 tmp_dict["function_list"] = [] 438 tmp_dict.get("function_list").append(line.replace(" <Function ", "").split(">")[0]) 439 if index < max_len - 1 and str(lines[index + 1]).__contains__("<Module"): 440 case_content.append(tmp_dict) 441 tmp_dict = {"package": last_package, "module": last_module} 442 continue 443 444 if index < max_len - 1 and not lines[index + 1].startswith(" <Function "): 445 case_content.append(tmp_dict) 446 tmp_dict = {} 447 elif line.startswith(" <Class "): 448 class_info = line.replace(" <Class ", "").split(">")[0] 449 if last_class and class_info != last_class: 450 if tmp_dict.get("function_list") or tmp_dict.get("class"): 451 case_content.append(tmp_dict) 452 tmp_dict = {"package": last_package, "module": last_module, "class": class_info} 453 tmp_dict["class"] = class_info 454 last_class = class_info 455 else: 456 continue 457 file_handle.close() 458 return case_content 459 460 def parser_module_attr(self, module_list): 461 script_root = self.script_path 462 random_str = "".join(random.sample(string.ascii_letters + string.digits, 8)) 463 file_name = str(time.time()) + "_" + random_str + ".json" 464 result_file = os.path.join(os.path.dirname(__file__), file_name) 465 json_ret = list() 466 467 get_cmd = "cd {0}; {1} psr_module_attr.py --case_root {2} --module_list '{3}' --result_file {4}" \ 468 "".format(script_root, sys.executable, self.case_root, json.dumps(module_list), result_file) 469 status, output = getstatusoutput(get_cmd) 470 print("parser_module_attr get_cmd : {0}, output : {1}".format(get_cmd, output)) 471 if int(status) != 0 or not output: 472 print("parser_module_attr failed for get_cmd : {0}".format(get_cmd)) 473 return json_ret 474 475 if os.path.exists(result_file): 476 with open(result_file) as f_h: 477 json_ret = json.load(f_h) 478 getstatusoutput("rm -f {}".format(result_file)) 479 else: 480 print("parser_module_attr result_file not found") 481 482 return json_ret 483 484 def mul_parser_case_attr(self, module_content): 485 """ 486 parse attr info of testcase from module info 487 """ 488 print("mul_parser_case_attr start") 489 core_nums = 16 490 491 split_list = [] 492 for index in range(0, len(module_content), core_nums): 493 split_list.append(module_content[index:index + core_nums]) 494 495 # use process pool to parse mark attr info from module info 496 process_pool = multiprocessing.Pool(16) 497 pool_map_result = process_pool.map_async(self.parser_module_attr, 498 split_list) 499 pool_map_result.wait() 500 if pool_map_result.ready() and pool_map_result.successful(): 501 result_list = pool_map_result.get() 502 cases_attr_info = [] 503 for result in result_list: 504 cases_attr_info += result 505 process_pool.close() 506 else: 507 message = "mul_parser_case_attr failed for process_pool run" 508 print(message) 509 print(message) 510 return None 511 512 print("mul_parser_case_attr ret_val_list is : {0}".format( 513 cases_attr_info)) 514 515 if None in cases_attr_info: 516 message = "mul_parser_case_attr :: some case module parser failed" 517 print(message) 518 return None 519 520 return cases_attr_info 521 522 def filter_main(self): 523 filer_ret, msg = self.get_env_case_filter() 524 if not filer_ret: 525 if not msg: 526 print("get_env_case_filter failed") 527 return False 528 if msg == "cases result is null": 529 print("get_env_case_filter ::cases result is null") 530 return True 531 532 module_content = self.parser_case_module() 533 if not module_content: 534 print("filter_main parser_case_module failed, no available case for {0}".format(self.filter_keyword)) 535 return True 536 537 # get attribute info of testcases 538 cases_attr_info = self.mul_parser_case_attr(module_content) 539 if not cases_attr_info: 540 print("filter_main parser_case_attr failed, no available case attr for {0}".format(self.filter_keyword)) 541 return False 542 543 # write attribute info to file 544 with fopen(self.tmp_attr_case_json, os.O_WRONLY|os.O_CREAT, 0o644) as fo: 545 fo.write(json.dumps(cases_attr_info, indent=2)) 546 547 return True 548 549 def group_cases_by_run_mode(self): 550 """ group testcases by run mode: env_onecard/env_single """ 551 self.one_card_txt_info = [] 552 self.single_txt_info = [] 553 with open(self.tmp_attr_case_json, 'r') as f: 554 cases_attr_info = json.load(f) 555 556 def group_one_obj(pkg_name, case_name, obj): 557 new_obj = {} 558 new_obj["package"] = pkg_name 559 new_obj["case_name"] = case_name 560 new_obj["obj_name"] = obj["class_name"] if "class_name" in obj else obj["function"] 561 run_case_mod = obj["run_case_mod"] 562 new_obj["run_case_mod"] = run_case_mod 563 new_obj["env_type"] = deepcopy(obj["env_type"]) 564 if run_case_mod in ["env_card", "env_onecard"]: 565 self.one_card_txt_info.append(new_obj) 566 else: 567 self.single_txt_info.append(new_obj) 568 569 for case_attr in cases_attr_info: 570 package = case_attr["package"] 571 case_name = case_attr["case_name"] 572 function_list = case_attr["function_list"] 573 class_info = case_attr["class"] 574 # test object is based function 575 if not class_info: 576 for func in function_list: 577 group_one_obj(package, case_name, func) 578 # test object is based class 579 else: 580 tmp_class_info = deepcopy(class_info) 581 if isinstance(tmp_class_info, (dict,)): 582 group_one_obj(package, case_name, tmp_class_info) 583 584 print(f"\nSummary of testcases 1P: {len(self.one_card_txt_info)}, 8P: {len(self.single_txt_info)}\n" + \ 585 f"Devices used: {self.device_ids}") 586 587 def get_one_card_testcases(self): 588 """ get one card testcases """ 589 testcases = mp.Queue() 590 if self.one_card_txt_info: 591 for _, v in dict(self.one_card_txt_info).items(): 592 for testcase in v["param"]["testcase"]: 593 idx = testcases.qsize() 594 case_file = testcase['number'] 595 case_path = testcase['package'] 596 case_func = testcase['function'] 597 test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func) 598 testcases.put((idx, test_case, case_path)) 599 return testcases 600 601 def get_rel_path(self, path): 602 root_len = len(self.case_root) if self.case_root[-1] == '/' else len(self.case_root) + 1 603 return path[root_len:] 604 605 def run_multi_card_testcases(self, num_all_st): 606 num_pass = 0 607 num_fail = 0 608 time_beg = time.time() 609 for case_idx, testcase in enumerate(self.single_txt_info): 610 case_file = testcase['case_name'] 611 case_path = testcase['package'] 612 case_func = testcase['obj_name'] 613 test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func) 614 615 run_cmd = f'cd {case_path}; {self.extend_envs}; ' + \ 616 f'python -u -m pytest -s -v {test_case} &> {self.log_path}/{case_idx}.log' 617 beg_t = time.time() 618 retval, _ = getstatusoutput(run_cmd) 619 end_t = time.time() 620 621 case_type = "8P" 622 run_time = float(end_t - beg_t) 623 run_time = "%.3f" % (run_time) 624 result = '\033[32mPASS\033[0m' if retval == 0 else '\033[31mFAIL\033[0m' 625 if retval == 0: 626 num_pass += 1 627 else: 628 num_fail += 1 629 progress = "[%4d / %4d]" % (case_idx + 1, num_all_st) 630 case_name = "{0}.py::{1}".format(case_file, case_func) 631 test_result = (progress, case_name, case_type, result, run_time, self.get_rel_path(case_path)) 632 self.write_result_to_csv((-1, beg_t, end_t, case_idx + 1, num_all_st, result, run_cmd)) 633 self.out_summary_result(test_result) 634 time_end = time.time() 635 return (num_pass, num_fail, time_end - time_beg) 636 637 def run_one_card_testcases(self, num_all_st, num_8p=None): 638 time_beg = time.time() 639 # one card work function 640 def one_card_task(print_lock, device_id, testcases, num_total, num_executed, num_failed): 641 while not testcases.empty(): 642 case_idx, testcase = testcases.get() 643 case_file = testcase['case_name'] 644 case_path = testcase['package'] 645 case_func = testcase['obj_name'] 646 test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func) 647 648 run_cmd = f'cd {self.run_path}; export DEVICE_ID={device_id}; {self.extend_envs}; ' + \ 649 f'python -u -m pytest -s -v {test_case} &> {self.log_path}/{case_idx}.log' 650 beg_t = time.time() 651 retval, _ = getstatusoutput(run_cmd) 652 end_t = time.time() 653 654 num_executed.value += 1 655 if retval != 0: 656 num_failed.value += 1 657 658 case_type = "1P" 659 run_time = float(end_t - beg_t) 660 run_time = "%.3f" % (run_time) 661 result = '\033[32mPASS\033[0m' if retval == 0 else '\033[31mFAIL\033[0m' 662 progress = "[%4d / %4d]" % (num_executed.value, num_all_st) 663 case_name = "{0}.py::{1}".format(case_file, case_func) 664 test_result = (progress, case_name, case_type, result, run_time, self.get_rel_path(case_path)) 665 666 # acquire print lock 667 with print_lock: 668 self.write_result_to_csv((device_id, beg_t, end_t, num_executed.value, num_all_st, 669 result, run_cmd)) 670 671 if retval == 0: 672 os.system(f'rm -f logs/{case_idx}.log') 673 674 # output summary of testcase 675 self.out_summary_result(test_result) 676 677 678 testcases = multiprocessing.Queue() 679 for idx, testcase in enumerate(self.one_card_txt_info): 680 testcases.put((idx, testcase)) 681 lock = multiprocessing.Lock() 682 jobs = [] 683 num_executed = Value('i', 0 if num_8p is None else num_8p) 684 num_failed = Value('i', 0) 685 num_total = testcases.qsize() 686 for dev_id in self.device_ids: 687 p = multiprocessing.Process(target=one_card_task, args=(lock, dev_id, testcases, num_total, num_executed, 688 num_failed)) 689 jobs.append(p) 690 p.start() 691 692 for job in jobs: 693 job.join() 694 695 time_end = time.time() 696 return (num_total - num_failed.value, num_failed.value, time_end - time_beg) 697 698 def out_summary_head_info(self): 699 print("Start Run Case\n") 700 print("[Case Result Info]\n") 701 print("-" * self.stty_col, flush=True) 702 703 print(format("Progress", f"<{self.progress_width}"), 704 format("Testcase", f"<{self.test_case_width}"), 705 format("EnvType", f"<{self.env_type_width}"), 706 format("Result", f"<{self.result_width}"), 707 format("RunTime", f"<{self.run_time_width}"), 708 format("CasePath", f"<{self.case_path_width}"), 709 flush=True) 710 print(format("------------", f"<{self.progress_width}"), 711 format("------------", f"<{self.test_case_width}"), 712 format("------------", f"<{self.env_type_width}"), 713 format("------", f"<{self.result_width}"), 714 format("------", f"<{self.run_time_width}"), 715 format("------------", f"<{self.case_path_width}"), 716 flush=True) 717 718 def out_summary_result(self, result): 719 """output result info of one testcase""" 720 progress, case_name, case_type, result, run_time, case_path = result 721 722 print(format(progress, f"<{self.progress_width}"), 723 format(case_name, f"<{self.test_case_width}"), 724 format(case_type, f"<{self.env_type_width}"), 725 format(result, f"<{self.result_width + 9}"), 726 format(run_time, f"<{self.run_time_width}"), 727 format(case_path, f"<{self.case_path_width}"), 728 flush=True) 729 730 def out_summary_tail_info(self, result_1p, result_8p=None): 731 print("-" * self.stty_col, flush=True) 732 # result_1p --> (num_pass, num_fail, run_time) 733 def print_resut_by_env(env_type, result): 734 if result is None: 735 return 736 num_pass, num_fail, run_time = result 737 print(f"\n\n[EnvType ] {env_type}", flush=True) 738 print(f"[CaseNumbers] {num_pass + num_fail}", flush=True) 739 print(f"[RunTime ] %.3f" % run_time, flush=True) 740 741 742 num_pass_total, num_fail_total, _ = result_1p 743 if result_8p is not None: 744 num_pass_total += result_8p[0] 745 num_fail_total += result_8p[1] 746 print("Total Tests : %d" % (num_pass_total + num_fail_total), flush=True) 747 print("Total Failures: %d" % num_fail_total, flush=True) 748 print("Total Success : %d" % num_pass_total, flush=True) 749 print("\n\nEnd Run Case", flush=True) 750 print_resut_by_env("8P", result_8p) 751 print_resut_by_env("1P", result_1p) 752 753 754if __name__ == '__main__': 755 cmd_args = parser.parse_args() 756 input_args = ( 757 cmd_args.env_config_file, cmd_args.case_env_config_path, cmd_args.case_root, 758 cmd_args.filter_keyword, cmd_args.env_type) 759 runner_handle = CaseRunner(*input_args) 760 761 if not runner_handle.filter_main(): 762 print("case_filter failed !") 763 sys.exit(1) 764 765 runner_handle.group_cases_by_run_mode() 766 767 runner_handle.out_summary_head_info() 768 runner_handle.write_header_to_csv() 769 770 number_all = len(runner_handle.one_card_txt_info) 771 if runner_handle.overall_networks: 772 number_8p = len(runner_handle.single_txt_info) 773 number_all += number_8p 774 result_multi_card = runner_handle.run_multi_card_testcases(number_all) 775 result_one_card = runner_handle.run_one_card_testcases(number_all, number_8p) 776 runner_handle.out_summary_tail_info(result_one_card, result_multi_card) 777 else: 778 result_one_card = runner_handle.run_one_card_testcases(number_all) 779 runner_handle.out_summary_tail_info(result_one_card) 780 781 print("\nExecute testcases finished!") 782 sys.exit(0) 783