• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import sys
5import json
6import time
7import argparse
8import random
9import string
10import threading
11import multiprocessing
12from multiprocessing.sharedctypes import Value
13from copy import deepcopy
14from subprocess import getstatusoutput
15import yaml
16
17script_path = os.path.dirname(os.path.abspath(__file__))
18sys.path.insert(0, script_path)
19
20# command line parameters
21parser = argparse.ArgumentParser()
22parser.add_argument("--env_config_file", dest="env_config_file", required=True,
23                    help='env net config file', type=str)
24parser.add_argument("--case_env_config_path", dest="case_env_config_path",
25                    required=True,
26                    help='case and env mapping configuration file', type=str)
27parser.add_argument("--case_root", dest="case_root", required=True,
28                    help='case root path', type=str)
29parser.add_argument("--filter_keyword", dest="filter_keyword", required=True,
30                    help='filter key word, decide which cases to run', type=str)
31parser.add_argument("--env_type", dest="env_type", required=True,
32                    help='env type', type=str)
33
34
35def opener(path, flags):
36    return os.open(path, flags, 0o644)
37
38
39class FileClass:
40    def __init__(self, fname, flags, mode):
41        self.fd = os.open(fname, flags, mode)
42
43    def __enter__(self):
44        return self
45
46    def __exit__(self, etype, evalue, etrace):
47        os.close(self.fd)
48
49    def write(self, text):
50        os.write(self.fd, str.encode(text))
51
52
53def fopen(file_name, file_flags, file_mode):
54    return FileClass(file_name, file_flags, file_mode)
55
56
57class RhineThread(threading.Thread):
58    def __init__(self, func, args_in, key_args_in):
59        super(RhineThread, self).__init__()
60        self.args = args_in
61        self.key_args = key_args_in
62        self.func = func
63        self.result = None
64
65    def run(self):
66        self.result = self.func(*self.args, **self.key_args)
67
68    def get_result(self):
69        return self.result
70
71
72class CommonThread:
73    def __init__(self):
74        self.thread_list = []
75
76    def register(self, func, args, key_args=None, is_daemon=True):
77        if key_args is None:
78            key_args = {}
79        t_thread = RhineThread(func, args_in=args, key_args_in=key_args)
80        t_thread.daemon = is_daemon
81        self.thread_list.append(t_thread)
82
83    def start(self):
84        if not self.thread_list:
85            return None
86
87        for t_thread in self.thread_list:
88            t_thread.start()
89
90        for t_thread in self.thread_list:
91            t_thread.join()
92
93        result_list = [t_thread.get_result() for t_thread in self.thread_list]
94
95        self.thread_list.clear()
96        return result_list
97
98
99class CaseRunner:
100    def __init__(self, env_config_file, case_env_config_path, case_root,
101                 filter_keyword, env_type):
102        self.env_config_file = env_config_file
103        self.case_env_config_path = case_env_config_path
104        self.case_root = case_root
105        self.filter_keyword = filter_keyword
106        self.env_type = env_type
107        with open(case_env_config_path) as f:
108            self.case_env_config = yaml.safe_load(f)
109
110        self.script_path = script_path
111        self.pytest_ini = os.path.join(self.script_path, "config", "pytest.ini")
112        self.tmp_case_txt = "/tmp/tmp_case_info.txt"
113        self.tmp_attr_case_json = "/tmp/tmp_attr_case.json"
114
115        self.python_path = None
116        self.pytest_path = None
117        self.extend_envs = None
118        self.run_path = None
119        self.log_path = None
120        self.device_ids = None
121        self.overall_networks = False
122        self.check_env_config()
123
124        self.one_card_txt_info = None
125        self.single_txt_info = None
126
127        # testcase output log format
128        self.stty_col = 200
129        self.env_type_width = 15
130        self.result_width = 15
131        self.run_time_width = 15
132        self.test_case_width = 90
133        self.case_path_width = 50
134        self.progress_width = 15
135
136    @staticmethod
137    def get_filter_file_dirs(file_list):
138        """ group all testcase files by directory """
139        dir_info = {}
140        for file in file_list:
141            dir_name = os.path.dirname(file)
142            case_file = file.split("/")[-1]
143            if not dir_info.get(dir_name):
144                dir_info[dir_name] = case_file
145            else:
146                dir_info[dir_name] += " {0}".format(case_file)
147
148        print("get_filter_file_dirs finished: {0}".format(json.dumps(dir_info, indent=2)))
149        return dir_info
150
151    @staticmethod
152    def write_header_to_csv():
153        with fopen("run_cmds.csv", os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0o644) as fo:
154            fo.write("device_id, begin_time, end_time, num_executed, num_total, run_result, run_cmd\n")
155
156    @staticmethod
157    def write_result_to_csv(result_record):
158        dev_id, beg_t, end_t, num_executed, num_total, run_result, run_cmd = result_record
159        with fopen("run_cmds.csv", os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0o644) as fo:
160            fo.write('%d, %f, %f, %d, %d, %s, %s\n'
161                     % (dev_id, beg_t, end_t, num_executed, num_total, run_result, run_cmd))
162
163    def create_init_py(self, filepath):
164        init_file = os.path.join(filepath, "__init__.py")
165        # already has file `__init__.py`
166        if not os.path.exists(init_file):
167            with fopen(init_file, os.O_WRONLY|os.O_CREAT, 0o644) as fo:
168                fo.write("")
169        files = os.listdir(filepath)
170        for fi in files:
171            if str(fi).startswith("."):
172                continue
173            fi_d = os.path.join(filepath, fi)
174            if os.path.isdir(fi_d):
175                self.create_init_py(fi_d)
176
177    def check_env_config(self):
178        def is_executable(path):
179            ret, _ = getstatusoutput("test -f {0} && test -x {0}".format(path))
180            return ret == 0
181
182        def search_executable(cmd):
183            return getstatusoutput("command -v {0}".format(cmd))
184
185        def get_cfg_executable(cmd, path):
186            if is_executable(path):
187                return path
188            if not path.startswith("XXX"):
189                raise ValueError("Path `{0}` is not executable file".format(path))
190            ret, path = search_executable(cmd)
191            if ret == 0:
192                return path
193            raise ValueError("Can not find executable cmd: {0}".format(cmd))
194
195        def is_existed(path):
196            ret, _ = getstatusoutput("test -e {0}".format(path))
197            return ret == 0
198
199        def is_directory(path):
200            ret, _ = getstatusoutput("test -d {0} && test -x {0}".format(path))
201            return ret == 0
202
203        def get_cfg_directory(path):
204            if is_directory(path):
205                return path
206            if is_existed(path):
207                raise ValueError("Path `{0}` already exists, but is not a directory".format(path))
208            ret, _ = getstatusoutput("mkdir -p {0}".format(path))
209            if ret != 0:
210                raise ValueError("Try to make path `{0}` failed".format(path))
211            return path
212
213        with open(self.env_config_file) as f:
214            env_net_config = yaml.safe_load(f)
215        self.python_path = get_cfg_executable('python', env_net_config['python_path'])
216        self.pytest_path = get_cfg_executable('pytest', env_net_config['pytest_path'])
217        self.run_path = get_cfg_directory(env_net_config['run_path'])
218        self.log_path = get_cfg_directory(env_net_config['log_path'])
219        self.extend_envs = env_net_config['extend_envs']
220        if str(self.extend_envs).strip() == "" or self.extend_envs.startswith("XXX"):
221            _, exec_path = getstatusoutput("dirname {0}".format(self.pytest_path))
222            self.extend_envs = "export PATH=%s:${PATH}; export PYTHONPATH=%s:${PYTHONPATH}" \
223                               % (exec_path, self.case_root)
224        # check and set device_ids
225        device_ids = env_net_config['virtualenv']['device_ids']
226        if not device_ids or len(device_ids) > 8:
227            raise ValueError("Invalid config device_ids: {0}".format(device_ids))
228        dev_id_list = []
229        for e in device_ids:
230            # if device_ids contains duplicated elements, report error
231            if e in range(0, 8) and (not e in dev_id_list):
232                dev_id_list.append(e)
233            else:
234                raise ValueError("Invalid config device_ids: {0}".format(device_ids))
235        self.device_ids = device_ids
236        if len(device_ids) == 8 and env_net_config['virtualenv']['overall_networks']:
237            self.overall_networks = True
238        else:
239            self.overall_networks = False
240
241    def clear_last_tmp_file(self):
242        clear_tmp_file_cmd = """rm -f %s; rm -f %s""" % (
243            self.tmp_case_txt, self.tmp_attr_case_json)
244        os.system(clear_tmp_file_cmd)
245
246    def get_ready_for_filter(self):
247        # create `__init__.py` if not exist
248        self.create_init_py(self.case_root)
249        # clear temp files
250        self.clear_last_tmp_file()
251        return True
252
253    def get_suitable_file(self, search_case_type):
254        """ filter testcase files which satisfy condition (e.g. level0 and env type)"""
255        get_cases_info_cmd = """  grep -rE  "%s" %s/* --include="*.py" """ % (search_case_type, self.case_root)
256        status, output = getstatusoutput(get_cases_info_cmd)
257        if int(status) == 1 and not output:
258            print("get_singel_env_case_filter :: no suitable cases found for search_case_type:{0}"
259                  .format(search_case_type))
260            # write blank to result json file
261            with fopen(self.tmp_attr_case_json, os.O_WRONLY|os.O_CREAT, 0o644) as fo:
262                fo.write(json.dumps([]))
263            return False, "cases result is null"
264
265        filter_case_cmd = """%s | awk -F ':' '{print $1}' | uniq""" % (get_cases_info_cmd)
266        status, output = getstatusoutput(filter_case_cmd)
267        if int(status) != 0 or not output:
268            print("get_suitable_file failed for get case filter, status:{0},output:{1},filter_case_cmd:{2}" \
269                  .format(status, output, filter_case_cmd))
270            return False, None
271
272        return True, output.split("\n")
273
274    def filter_case_with_one_dir(self, case_path, case_file, extend_envs, pytest_path, search_case_type):
275        """ filter testcases from single directory """
276        tmp_file = os.path.join(os.path.dirname(self.tmp_case_txt), "{0}.txt".format(str(case_path).replace("/", "_")))
277        case_type_filter = ""
278        for case_type in search_case_type.split("|"):
279            if not case_type_filter:
280                case_type_filter += "({0} ".format(case_type)
281            else:
282                case_type_filter += " or {0}".format(case_type)
283            case_type_filter += ")"
284
285        filter_case_cmd = """{0}; export PYTHONPATH={1}:$PYTHONPATH; """ \
286                          """cd {1} &&  {2} --collect-only -m '{3} and {4}'  -c {5} {6} 2>&1 >{7}""". \
287            format(extend_envs, case_path, pytest_path, self.filter_keyword, case_type_filter,
288                   self.pytest_ini, case_file, tmp_file)
289
290        print("filter_case_with_one_dir filter_case_cmd is : {0}".format(filter_case_cmd))
291        status, _ = getstatusoutput(filter_case_cmd)
292        if int(status) == 5:
293            print("no any gate test case to be found in {0}".format(case_path))
294        elif int(status) != 0:
295            return {"ret_val": False, "output_file": tmp_file, "case_path": case_path}
296
297        return {"ret_val": True, "output_file": tmp_file, "case_path": case_path}
298
299    def filer_case_with_files(self, file_list, search_case_type):
300        extend_envs = """source /etc/profile; {0}""".format(self.extend_envs)
301
302        # create one thread for per directory to filter testcases
303        t_thread_list = CommonThread()
304        dir_info = self.get_filter_file_dirs(file_list)
305        for case_path, case_files in dir_info.items():
306            t_thread_list.register(self.filter_case_with_one_dir, args=(
307                case_path, case_files, extend_envs, self.pytest_path, search_case_type))
308
309        result_list = t_thread_list.start()
310
311        fail_filter_list = []
312        for result in result_list:
313            output_path = result["output_file"]
314            if not result["ret_val"]:
315                fail_filter_list.append({"file_path": result["case_path"], "output_path": output_path})
316                continue
317
318            # filter successfully, write result to temp file for later use
319            fuse_cmd = """ [ -f {0} ] && cat {0} >> {1} ; rm -f {0}""".format(output_path, self.tmp_case_txt)
320            os.system(fuse_cmd)
321
322        # write detailed message for failed thread
323        if fail_filter_list:
324            for fail_filter in fail_filter_list:
325                file_path = fail_filter["file_path"]
326                output_path = fail_filter["output_path"]
327                print("filter failed for {0}".format(file_path))
328                print_fail_cmd = "[ -f {0} ] && cat {0} ; rm -f {0}".format(output_path)
329                _, output = getstatusoutput(print_fail_cmd)
330                print(output)
331            return False
332
333        return True
334
335    def get_env_case_filter(self):
336        print(f'get_env_case_filter start\ncase_type: {self.case_env_config["case_type"]}')
337        search_case_type = ""
338        for case_type, case_env in dict(
339                self.case_env_config["case_type"]).items():
340            if str(case_env).__contains__(self.env_type) and case_type != "ALL":
341                if not search_case_type:
342                    search_case_type = case_type
343                else:
344                    search_case_type += "|{0}".format(case_type)
345        if not search_case_type:
346            print("get_singel_env_case_filter, no suitable case_env_config element found for env : {0}".format(
347                self.env_type))
348            return False, None
349
350        # filter testcase files first
351        ret_val, search_file_list = self.get_suitable_file(search_case_type)
352        # execute success but not found any testcase
353        if not ret_val and isinstance(search_file_list, (str,)) and search_file_list == "cases result is null":
354            return False, "cases result is null"
355        if not ret_val:
356            return False, None
357
358        # filter testcases from files
359        if not self.get_ready_for_filter():
360            return False, None
361
362        # filter testcases by file
363        ret_val = self.filer_case_with_files(search_file_list, search_case_type)
364        if not ret_val:
365            print("filer_case_with_files fail")
366            return False, None
367
368        return True, "filter success"
369
370    def parser_case_module(self):
371        """ parse testcase module info from pytest collect-only result """
372        print("parser_case_module start")
373        case_content = []
374        tmp_dict = {}
375
376        file_handle = open(self.tmp_case_txt, "r")
377        last_package = ""
378        last_class = ""
379        last_module = ""
380        case_root = ""
381        while True:
382            lines = file_handle.readlines(2000)
383            if not lines:
384                if tmp_dict and tmp_dict not in case_content and (
385                        tmp_dict.get("function_list") or tmp_dict.get("class")):
386                    case_content.append(tmp_dict)
387                break
388            max_len = len(lines)
389            for index, line in enumerate(lines):
390                if line.startswith("rootdir: "):
391                    case_root = line.split(':')[1].split(',')[0].strip().replace(' ', '')
392                elif line.startswith("<Package "):
393                    if tmp_dict:
394                        case_content.append(tmp_dict)
395                        tmp_dict = {}
396
397                    psr_pkg = line.replace("<Package ", "").split(">")[0]
398                    # pytest result absolute path
399                    if psr_pkg.startswith(os.sep):
400                        last_package = psr_pkg
401                    # pytest result relative path
402                    else:
403                        last_package = case_root
404
405                    tmp_dict["package"] = last_package
406                elif line.startswith("  <Module "):
407                    module = line.replace("  <Module ", "").split(">")[0]
408                    if last_module and module != last_module:
409                        if tmp_dict.get("function_list") or tmp_dict.get("class"):
410                            case_content.append(tmp_dict)
411                        tmp_dict = {"package": last_package, "module": module}
412                    tmp_dict["module"] = module
413                    last_module = module
414                elif line.startswith("<Module "):
415                    module = line.replace("<Module ", "").split(">")[0]
416                    if last_module and module != last_module:
417                        if tmp_dict.get("function_list") or tmp_dict.get("class"):
418                            case_content.append(tmp_dict)
419                        tmp_dict = {"package": last_package, "module": module}
420
421                    tmp_dict["module"] = module
422                    last_module = module
423                elif line.startswith("        <Function "):
424                    if not tmp_dict.get("function_list", None):
425                        tmp_dict["function_list"] = []
426                    tmp_dict.get("function_list").append(line.replace("        <Function ", "").split(">")[0])
427                    if index < max_len - 1 and str(lines[index + 1]).__contains__("<Module"):
428                        case_content.append(tmp_dict)
429                        tmp_dict = {"package": last_package, "module": last_module, "class": last_class}
430                        continue
431                    if index < max_len - 1 and not lines[index + 1].startswith("        <Function "):
432                        case_content.append(tmp_dict)
433                        tmp_dict = {}
434
435                elif line.startswith("    <Function "):
436                    if not tmp_dict.get("function_list", None):
437                        tmp_dict["function_list"] = []
438                    tmp_dict.get("function_list").append(line.replace("    <Function ", "").split(">")[0])
439                    if index < max_len - 1 and str(lines[index + 1]).__contains__("<Module"):
440                        case_content.append(tmp_dict)
441                        tmp_dict = {"package": last_package, "module": last_module}
442                        continue
443
444                    if index < max_len - 1 and not lines[index + 1].startswith("    <Function "):
445                        case_content.append(tmp_dict)
446                        tmp_dict = {}
447                elif line.startswith("    <Class "):
448                    class_info = line.replace("    <Class ", "").split(">")[0]
449                    if last_class and class_info != last_class:
450                        if tmp_dict.get("function_list") or tmp_dict.get("class"):
451                            case_content.append(tmp_dict)
452                        tmp_dict = {"package": last_package, "module": last_module, "class": class_info}
453                    tmp_dict["class"] = class_info
454                    last_class = class_info
455                else:
456                    continue
457        file_handle.close()
458        return case_content
459
460    def parser_module_attr(self, module_list):
461        script_root = self.script_path
462        random_str = "".join(random.sample(string.ascii_letters + string.digits, 8))
463        file_name = str(time.time()) + "_" + random_str + ".json"
464        result_file = os.path.join(os.path.dirname(__file__), file_name)
465        json_ret = list()
466
467        get_cmd = "cd {0}; {1} psr_module_attr.py --case_root {2} --module_list '{3}' --result_file {4}" \
468                  "".format(script_root, sys.executable, self.case_root, json.dumps(module_list), result_file)
469        status, output = getstatusoutput(get_cmd)
470        print("parser_module_attr get_cmd : {0}, output : {1}".format(get_cmd, output))
471        if int(status) != 0 or not output:
472            print("parser_module_attr failed for get_cmd : {0}".format(get_cmd))
473            return json_ret
474
475        if os.path.exists(result_file):
476            with open(result_file) as f_h:
477                json_ret = json.load(f_h)
478            getstatusoutput("rm -f {}".format(result_file))
479        else:
480            print("parser_module_attr result_file not found")
481
482        return json_ret
483
484    def mul_parser_case_attr(self, module_content):
485        """
486        parse attr info of testcase from module info
487        """
488        print("mul_parser_case_attr start")
489        core_nums = 16
490
491        split_list = []
492        for index in range(0, len(module_content), core_nums):
493            split_list.append(module_content[index:index + core_nums])
494
495        # use process pool to parse mark attr info from module info
496        process_pool = multiprocessing.Pool(16)
497        pool_map_result = process_pool.map_async(self.parser_module_attr,
498                                                 split_list)
499        pool_map_result.wait()
500        if pool_map_result.ready() and pool_map_result.successful():
501            result_list = pool_map_result.get()
502            cases_attr_info = []
503            for result in result_list:
504                cases_attr_info += result
505            process_pool.close()
506        else:
507            message = "mul_parser_case_attr failed for process_pool run"
508            print(message)
509            print(message)
510            return None
511
512        print("mul_parser_case_attr ret_val_list is : {0}".format(
513            cases_attr_info))
514
515        if None in cases_attr_info:
516            message = "mul_parser_case_attr :: some case module parser failed"
517            print(message)
518            return None
519
520        return cases_attr_info
521
522    def filter_main(self):
523        filer_ret, msg = self.get_env_case_filter()
524        if not filer_ret:
525            if not msg:
526                print("get_env_case_filter failed")
527                return False
528            if msg == "cases result is null":
529                print("get_env_case_filter ::cases result is null")
530                return True
531
532        module_content = self.parser_case_module()
533        if not module_content:
534            print("filter_main parser_case_module failed, no available case for {0}".format(self.filter_keyword))
535            return True
536
537        # get attribute info of testcases
538        cases_attr_info = self.mul_parser_case_attr(module_content)
539        if not cases_attr_info:
540            print("filter_main parser_case_attr failed, no available case attr for {0}".format(self.filter_keyword))
541            return False
542
543        # write attribute info to file
544        with fopen(self.tmp_attr_case_json, os.O_WRONLY|os.O_CREAT, 0o644) as fo:
545            fo.write(json.dumps(cases_attr_info, indent=2))
546
547        return True
548
549    def group_cases_by_run_mode(self):
550        """ group testcases by run mode: env_onecard/env_single """
551        self.one_card_txt_info = []
552        self.single_txt_info = []
553        with open(self.tmp_attr_case_json, 'r') as f:
554            cases_attr_info = json.load(f)
555
556        def group_one_obj(pkg_name, case_name, obj):
557            new_obj = {}
558            new_obj["package"] = pkg_name
559            new_obj["case_name"] = case_name
560            new_obj["obj_name"] = obj["class_name"] if "class_name" in obj else obj["function"]
561            run_case_mod = obj["run_case_mod"]
562            new_obj["run_case_mod"] = run_case_mod
563            new_obj["env_type"] = deepcopy(obj["env_type"])
564            if run_case_mod in ["env_card", "env_onecard"]:
565                self.one_card_txt_info.append(new_obj)
566            else:
567                self.single_txt_info.append(new_obj)
568
569        for case_attr in cases_attr_info:
570            package = case_attr["package"]
571            case_name = case_attr["case_name"]
572            function_list = case_attr["function_list"]
573            class_info = case_attr["class"]
574            # test object is based function
575            if not class_info:
576                for func in function_list:
577                    group_one_obj(package, case_name, func)
578            # test object is based class
579            else:
580                tmp_class_info = deepcopy(class_info)
581                if isinstance(tmp_class_info, (dict,)):
582                    group_one_obj(package, case_name, tmp_class_info)
583
584        print(f"\nSummary of testcases 1P: {len(self.one_card_txt_info)}, 8P: {len(self.single_txt_info)}\n" + \
585              f"Devices used: {self.device_ids}")
586
587    def get_one_card_testcases(self):
588        """ get one card testcases """
589        testcases = mp.Queue()
590        if self.one_card_txt_info:
591            for _, v in dict(self.one_card_txt_info).items():
592                for testcase in v["param"]["testcase"]:
593                    idx = testcases.qsize()
594                    case_file = testcase['number']
595                    case_path = testcase['package']
596                    case_func = testcase['function']
597                    test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func)
598                    testcases.put((idx, test_case, case_path))
599        return testcases
600
601    def get_rel_path(self, path):
602        root_len = len(self.case_root) if self.case_root[-1] == '/' else len(self.case_root) + 1
603        return path[root_len:]
604
605    def run_multi_card_testcases(self, num_all_st):
606        num_pass = 0
607        num_fail = 0
608        time_beg = time.time()
609        for case_idx, testcase in enumerate(self.single_txt_info):
610            case_file = testcase['case_name']
611            case_path = testcase['package']
612            case_func = testcase['obj_name']
613            test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func)
614
615            run_cmd = f'cd {case_path}; {self.extend_envs}; ' + \
616                        f'python -u -m pytest -s -v {test_case} &> {self.log_path}/{case_idx}.log'
617            beg_t = time.time()
618            retval, _ = getstatusoutput(run_cmd)
619            end_t = time.time()
620
621            case_type = "8P"
622            run_time = float(end_t - beg_t)
623            run_time = "%.3f" % (run_time)
624            result = '\033[32mPASS\033[0m' if retval == 0 else '\033[31mFAIL\033[0m'
625            if retval == 0:
626                num_pass += 1
627            else:
628                num_fail += 1
629            progress = "[%4d / %4d]" % (case_idx + 1, num_all_st)
630            case_name = "{0}.py::{1}".format(case_file, case_func)
631            test_result = (progress, case_name, case_type, result, run_time, self.get_rel_path(case_path))
632            self.write_result_to_csv((-1, beg_t, end_t, case_idx + 1, num_all_st, result, run_cmd))
633            self.out_summary_result(test_result)
634        time_end = time.time()
635        return (num_pass, num_fail, time_end - time_beg)
636
637    def run_one_card_testcases(self, num_all_st, num_8p=None):
638        time_beg = time.time()
639        # one card  work function
640        def one_card_task(print_lock, device_id, testcases, num_total, num_executed, num_failed):
641            while not testcases.empty():
642                case_idx, testcase = testcases.get()
643                case_file = testcase['case_name']
644                case_path = testcase['package']
645                case_func = testcase['obj_name']
646                test_case = "{0}/{1}.py::{2}".format(case_path, case_file, case_func)
647
648                run_cmd = f'cd {self.run_path}; export DEVICE_ID={device_id}; {self.extend_envs}; ' + \
649                          f'python -u -m pytest -s -v {test_case} &> {self.log_path}/{case_idx}.log'
650                beg_t = time.time()
651                retval, _ = getstatusoutput(run_cmd)
652                end_t = time.time()
653
654                num_executed.value += 1
655                if retval != 0:
656                    num_failed.value += 1
657
658                case_type = "1P"
659                run_time = float(end_t - beg_t)
660                run_time = "%.3f" % (run_time)
661                result = '\033[32mPASS\033[0m' if retval == 0 else '\033[31mFAIL\033[0m'
662                progress = "[%4d / %4d]" % (num_executed.value, num_all_st)
663                case_name = "{0}.py::{1}".format(case_file, case_func)
664                test_result = (progress, case_name, case_type, result, run_time, self.get_rel_path(case_path))
665
666                # acquire print lock
667                with print_lock:
668                    self.write_result_to_csv((device_id, beg_t, end_t, num_executed.value, num_all_st,
669                                              result, run_cmd))
670
671                    if retval == 0:
672                        os.system(f'rm -f logs/{case_idx}.log')
673
674                    # output summary of testcase
675                    self.out_summary_result(test_result)
676
677
678        testcases = multiprocessing.Queue()
679        for idx, testcase in enumerate(self.one_card_txt_info):
680            testcases.put((idx, testcase))
681        lock = multiprocessing.Lock()
682        jobs = []
683        num_executed = Value('i', 0 if num_8p is None else num_8p)
684        num_failed = Value('i', 0)
685        num_total = testcases.qsize()
686        for dev_id in self.device_ids:
687            p = multiprocessing.Process(target=one_card_task, args=(lock, dev_id, testcases, num_total, num_executed,
688                                                                    num_failed))
689            jobs.append(p)
690            p.start()
691
692        for job in jobs:
693            job.join()
694
695        time_end = time.time()
696        return (num_total - num_failed.value, num_failed.value, time_end - time_beg)
697
698    def out_summary_head_info(self):
699        print("Start Run Case\n")
700        print("[Case Result Info]\n")
701        print("-" * self.stty_col, flush=True)
702
703        print(format("Progress", f"<{self.progress_width}"),
704              format("Testcase", f"<{self.test_case_width}"),
705              format("EnvType", f"<{self.env_type_width}"),
706              format("Result", f"<{self.result_width}"),
707              format("RunTime", f"<{self.run_time_width}"),
708              format("CasePath", f"<{self.case_path_width}"),
709              flush=True)
710        print(format("------------", f"<{self.progress_width}"),
711              format("------------", f"<{self.test_case_width}"),
712              format("------------", f"<{self.env_type_width}"),
713              format("------", f"<{self.result_width}"),
714              format("------", f"<{self.run_time_width}"),
715              format("------------", f"<{self.case_path_width}"),
716              flush=True)
717
718    def out_summary_result(self, result):
719        """output result info of one testcase"""
720        progress, case_name, case_type, result, run_time, case_path = result
721
722        print(format(progress, f"<{self.progress_width}"),
723              format(case_name, f"<{self.test_case_width}"),
724              format(case_type, f"<{self.env_type_width}"),
725              format(result, f"<{self.result_width + 9}"),
726              format(run_time, f"<{self.run_time_width}"),
727              format(case_path, f"<{self.case_path_width}"),
728              flush=True)
729
730    def out_summary_tail_info(self, result_1p, result_8p=None):
731        print("-" * self.stty_col, flush=True)
732        # result_1p --> (num_pass, num_fail, run_time)
733        def print_resut_by_env(env_type, result):
734            if result is None:
735                return
736            num_pass, num_fail, run_time = result
737            print(f"\n\n[EnvType    ] {env_type}", flush=True)
738            print(f"[CaseNumbers] {num_pass + num_fail}", flush=True)
739            print(f"[RunTime    ] %.3f" % run_time, flush=True)
740
741
742        num_pass_total, num_fail_total, _ = result_1p
743        if result_8p is not None:
744            num_pass_total += result_8p[0]
745            num_fail_total += result_8p[1]
746        print("Total Tests   : %d" % (num_pass_total + num_fail_total), flush=True)
747        print("Total Failures: %d" % num_fail_total, flush=True)
748        print("Total Success : %d" % num_pass_total, flush=True)
749        print("\n\nEnd Run Case", flush=True)
750        print_resut_by_env("8P", result_8p)
751        print_resut_by_env("1P", result_1p)
752
753
754if __name__ == '__main__':
755    cmd_args = parser.parse_args()
756    input_args = (
757        cmd_args.env_config_file, cmd_args.case_env_config_path, cmd_args.case_root,
758        cmd_args.filter_keyword, cmd_args.env_type)
759    runner_handle = CaseRunner(*input_args)
760
761    if not runner_handle.filter_main():
762        print("case_filter failed !")
763        sys.exit(1)
764
765    runner_handle.group_cases_by_run_mode()
766
767    runner_handle.out_summary_head_info()
768    runner_handle.write_header_to_csv()
769
770    number_all = len(runner_handle.one_card_txt_info)
771    if runner_handle.overall_networks:
772        number_8p = len(runner_handle.single_txt_info)
773        number_all += number_8p
774        result_multi_card = runner_handle.run_multi_card_testcases(number_all)
775        result_one_card = runner_handle.run_one_card_testcases(number_all, number_8p)
776        runner_handle.out_summary_tail_info(result_one_card, result_multi_card)
777    else:
778        result_one_card = runner_handle.run_one_card_testcases(number_all)
779        runner_handle.out_summary_tail_info(result_one_card)
780
781    print("\nExecute testcases finished!")
782    sys.exit(0)
783