• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import shutil
22import threading
23import time
24from concurrent.futures import ThreadPoolExecutor
25from concurrent.futures import wait
26
27from _core.constants import ModeType
28from _core.constants import ConfigConst
29from _core.constants import ReportConst
30from _core.executor.request import Request
31from _core.logger import platform_logger
32from _core.plugin import Config
33from _core.utils import get_instance_name
34from _core.utils import check_mode
35from _core.exception import ParamError
36from _core.exception import ExecuteTerminate
37from _core.exception import DeviceError
38from _core.exception import LiteDeviceError
39from _core.report.reporter_helper import VisionHelper
40from _core.report.reporter_helper import ReportConstant
41from _core.report.reporter_helper import DataHelper
42from _core.report.reporter_helper import Suite
43from _core.report.reporter_helper import Case
44
45LOG = platform_logger("Concurrent")
46
47
48class Concurrent:
49    @classmethod
50    def executor_callback(cls, worker):
51        worker_exception = worker.exception()
52        if worker_exception:
53            LOG.error("Worker return exception: {}".format(worker_exception))
54
55    @classmethod
56    def concurrent_execute(cls, func, params_list, max_size=8):
57        """
58        Provider the ability to execute target function concurrently
59        :param func: target function name
60        :param params_list: the list of params in these target functions
61        :param max_size:  the max size of thread  you wanted  in thread pool
62        :return:
63        """
64        with ThreadPoolExecutor(max_size) as executor:
65            future_params = dict()
66            for params in params_list:
67                future = executor.submit(func, *params)
68                future_params.update({future: params})
69                future.add_done_callback(cls.executor_callback)
70            wait(future_params)  # wait all function complete
71            result_list = []
72            for future in future_params:
73                result_list.append((future.result(), future_params[future]))
74            return result_list
75
76
77class DriversThread(threading.Thread):
78    def __init__(self, test_driver, task, environment, message_queue):
79        threading.Thread.__init__(self)
80        self.test_driver = test_driver
81        self.listeners = None
82        self.task = task
83        self.environment = environment
84        self.message_queue = message_queue
85        self.thread_id = None
86        self.error_message = ""
87
88    def set_listeners(self, listeners):
89        self.listeners = listeners
90        if self.environment is None:
91            return
92
93        for listener in listeners:
94            listener.device_sn = self.environment.devices[0].device_sn
95
96    def set_thread_id(self, thread_id):
97        self.thread_id = thread_id
98
99    def run(self):
100        from xdevice import Scheduler
101        LOG.debug("Thread id: %s start" % self.thread_id)
102        start_time = time.time()
103        execute_message = ExecuteMessage('', self.environment,
104                                         self.test_driver, self.thread_id)
105        driver, test = None, None
106        try:
107            if self.test_driver and Scheduler.is_execute:
108                # construct params
109                driver, test = self.test_driver
110                driver_request = self._get_driver_request(test,
111                                                          execute_message)
112                if driver_request is None:
113                    return
114
115                # setup device
116                self._do_task_setup(driver_request)
117
118                # driver execute
119                self.reset_device(driver_request.config)
120                driver.__execute__(driver_request)
121
122        except Exception as exception:
123            error_no = getattr(exception, "error_no", "00000")
124            if self.environment is None:
125                LOG.exception("Exception: %s", exception, exc_info=False,
126                              error_no=error_no)
127            else:
128                LOG.exception(
129                    "Device: %s, exception: %s" % (
130                        self.environment.__get_serial__(), exception),
131                    exc_info=False, error_no=error_no)
132            self.error_message = "{}: {}".format(
133                get_instance_name(exception), str(exception))
134
135        finally:
136            self._handle_finally(driver, execute_message, start_time, test)
137
138    @staticmethod
139    def reset_device(config):
140        if getattr(config, "reboot_per_module", False):
141            for device in config.environment.devices:
142                device.reboot()
143
144    def _handle_finally(self, driver, execute_message, start_time, test):
145        from xdevice import Scheduler
146        # output execute time
147        end_time = time.time()
148        execute_time = VisionHelper.get_execute_time(int(
149            end_time - start_time))
150        source_content = self.test_driver[1].source.source_file or \
151                         self.test_driver[1].source.source_string
152        LOG.info("Executed: %s, Execution Time: %s" % (
153            source_content, execute_time))
154
155        # inherit history report under retry mode
156        if driver and test:
157            execute_result = driver.__result__()
158            LOG.debug("Execute result:%s" % execute_result)
159            if getattr(self.task.config, "history_report_path", ""):
160                execute_result = self._inherit_execute_result(
161                    execute_result, test)
162            execute_message.set_result(execute_result)
163
164        # set execute state
165        if self.error_message:
166            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
167        else:
168            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
169
170        # free environment
171        if self.environment:
172            LOG.debug("Thread %s free environment",
173                      execute_message.get_thread_id())
174            Scheduler.__free_environment__(execute_message.get_environment())
175
176        LOG.debug("Put thread %s result", self.thread_id)
177        self.message_queue.put(execute_message)
178        LOG.info("")
179
180    def _do_task_setup(self, driver_request):
181        if check_mode(ModeType.decc) or getattr(
182                driver_request.config, ConfigConst.check_device, False):
183            return
184
185        if self.environment is None:
186            return
187
188        from xdevice import Scheduler
189        for device in self.environment.devices:
190            if not getattr(device, ConfigConst.need_kit_setup, True):
191                LOG.debug("Device %s need kit setup is false" % device)
192                continue
193
194            # do task setup for device
195            kits_copy = copy.deepcopy(self.task.config.kits)
196            setattr(device, ConfigConst.task_kits, kits_copy)
197            for kit in getattr(device, ConfigConst.task_kits, []):
198                if not Scheduler.is_execute:
199                    break
200                try:
201                    kit.__setup__(device, request=driver_request)
202                except (ParamError, ExecuteTerminate, DeviceError,
203                        LiteDeviceError, ValueError, TypeError,
204                        SyntaxError, AttributeError) as exception:
205                    error_no = getattr(exception, "error_no", "00000")
206                    LOG.exception(
207                        "Task setup device: %s, exception: %s" % (
208                            self.environment.__get_serial__(),
209                            exception), exc_info=False, error_no=error_no)
210            LOG.debug("Set device %s need kit setup to false" % device)
211            setattr(device, ConfigConst.need_kit_setup, False)
212
213        # set product_info to self.task
214        if getattr(driver_request, ConfigConst.product_info, "") and not \
215                getattr(self.task, ConfigConst.product_info, ""):
216            product_info = getattr(driver_request, ConfigConst.product_info)
217            if not isinstance(product_info, dict):
218                LOG.warning("Product info should be dict, %s",
219                            product_info)
220                return
221            setattr(self.task, ConfigConst.product_info, product_info)
222
223    def _get_driver_request(self, root_desc, execute_message):
224        config = Config()
225        config.update(copy.deepcopy(self.task.config).__dict__)
226        config.environment = self.environment
227        if getattr(config, "history_report_path", ""):
228            # modify config.testargs
229            history_report_path = getattr(config, "history_report_path", "")
230            module_name = root_desc.source.module_name
231            unpassed_test_params = self._get_unpassed_test_params(
232                history_report_path, module_name)
233            if not unpassed_test_params:
234                LOG.info("%s all test cases are passed, no need retry",
235                         module_name)
236                driver_request = Request(self.thread_id, root_desc,
237                                         self.listeners, config)
238                execute_message.set_request(driver_request)
239                return None
240            if unpassed_test_params[0] != module_name and \
241                    unpassed_test_params[0] != str(module_name).split(".")[0]:
242                test_args = getattr(config, "testargs", {})
243                test_params = []
244                for unpassed_test_param in unpassed_test_params:
245                    if unpassed_test_param not in test_params:
246                        test_params.append(unpassed_test_param)
247                test_args["test"] = test_params
248                if "class" in test_args.keys():
249                    test_args.pop("class")
250                setattr(config, "testargs", test_args)
251
252        for listener in self.listeners:
253            LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
254        driver_request = Request(self.thread_id, root_desc, self.listeners,
255                                 config)
256        execute_message.set_request(driver_request)
257        return driver_request
258
259    @classmethod
260    def _get_unpassed_test_params(cls, history_report_path, module_name):
261        unpassed_test_params = []
262        from _core.report.result_reporter import ResultReporter
263        params = ResultReporter.get_task_info_params(history_report_path)
264        if not params:
265            return unpassed_test_params
266        failed_list = []
267        try:
268            from devicetest.agent.decc import Handler
269            if Handler.DAV.retry_select:
270                for i in Handler.DAV.case_id_list:
271                    failed_list.append(i + "#" + i)
272            else:
273                failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
274        except Exception:
275            failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
276        if not failed_list:
277            failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], [])
278        unpassed_test_params.extend(failed_list)
279        LOG.debug("Get unpassed test params %s", unpassed_test_params)
280        return unpassed_test_params
281
282    @classmethod
283    def _append_unpassed_test_param(cls, history_report_file,
284                                    unpassed_test_params):
285
286        testsuites_element = DataHelper.parse_data_report(history_report_file)
287        for testsuite_element in testsuites_element:
288            suite_name = testsuite_element.get("name", "")
289            suite = Suite()
290            suite.set_cases(testsuite_element)
291            for case in suite.cases:
292                if case.is_passed():
293                    continue
294                unpassed_test_param = "{}#{}#{}".format(
295                    suite_name, case.classname, case.name)
296                unpassed_test_params.append(unpassed_test_param)
297
298    def _inherit_execute_result(self, execute_result, root_desc):
299        module_name = root_desc.source.module_name
300        execute_result_name = "%s.xml" % module_name
301        history_execute_result = self._get_history_execute_result(
302            execute_result_name)
303        if not history_execute_result:
304            LOG.warning("%s no history execute result exists",
305                        execute_result_name)
306            return execute_result
307
308        if not check_mode(ModeType.decc):
309            if not os.path.exists(execute_result):
310                result_dir = \
311                    os.path.join(self.task.config.report_path, "result")
312                os.makedirs(result_dir, exist_ok=True)
313                target_execute_result = os.path.join(result_dir,
314                                                     execute_result_name)
315                shutil.copyfile(history_execute_result, target_execute_result)
316                LOG.info("Copy %s to %s" % (history_execute_result,
317                                            target_execute_result))
318                return target_execute_result
319
320        real_execute_result = self._get_real_execute_result(execute_result)
321
322        # inherit history execute result
323        testsuites_element = DataHelper.parse_data_report(real_execute_result)
324        if self._is_empty_report(testsuites_element):
325            if check_mode(ModeType.decc):
326                LOG.info("Empty report no need to inherit history execute"
327                         " result")
328            else:
329                LOG.info("Empty report '%s' no need to inherit history execute"
330                         " result", history_execute_result)
331            return execute_result
332
333        real_history_execute_result = self._get_real_history_execute_result(
334            history_execute_result, module_name)
335
336        history_testsuites_element = DataHelper.parse_data_report(
337            real_history_execute_result)
338        if self._is_empty_report(history_testsuites_element):
339            LOG.info("History report '%s' is empty", history_execute_result)
340            return execute_result
341        if check_mode(ModeType.decc):
342            LOG.info("Inherit history execute result")
343        else:
344            LOG.info("Inherit history execute result: %s",
345                     history_execute_result)
346        self._inherit_element(history_testsuites_element, testsuites_element)
347
348        if check_mode(ModeType.decc):
349            from xdevice import SuiteReporter
350            SuiteReporter.append_report_result(
351                (execute_result, DataHelper.to_string(testsuites_element)))
352        else:
353            # generate inherit execute result
354            DataHelper.generate_report(testsuites_element, execute_result)
355        return execute_result
356
357    def _inherit_element(self, history_testsuites_element, testsuites_element):
358        for history_testsuite_element in history_testsuites_element:
359            history_testsuite_name = history_testsuite_element.get("name", "")
360            target_testsuite_element = None
361            for testsuite_element in testsuites_element:
362                if history_testsuite_name == testsuite_element.get("name", ""):
363                    target_testsuite_element = testsuite_element
364                    break
365
366            if target_testsuite_element is None:
367                testsuites_element.append(history_testsuite_element)
368                inherited_test = int(testsuites_element.get(
369                    ReportConstant.tests, 0)) + int(
370                    history_testsuite_element.get(ReportConstant.tests, 0))
371                testsuites_element.set(ReportConstant.tests,
372                                       str(inherited_test))
373                continue
374
375            pass_num = 0
376            for history_testcase_element in history_testsuite_element:
377                if self._check_testcase_pass(history_testcase_element):
378                    target_testsuite_element.append(history_testcase_element)
379                    pass_num += 1
380
381            inherited_test = int(target_testsuite_element.get(
382                ReportConstant.tests, 0)) + pass_num
383            target_testsuite_element.set(ReportConstant.tests,
384                                         str(inherited_test))
385            inherited_test = int(testsuites_element.get(
386                ReportConstant.tests, 0)) + pass_num
387            testsuites_element.set(ReportConstant.tests, str(inherited_test))
388
389    def _get_history_execute_result(self, execute_result_name):
390        if execute_result_name.endswith(".xml"):
391            execute_result_name = execute_result_name[:-4]
392        history_execute_result = \
393            self._get_data_report_from_record(execute_result_name)
394        if history_execute_result:
395            return history_execute_result
396        for root_dir, _, files in os.walk(
397                self.task.config.history_report_path):
398            for result_file in files:
399                if result_file.endswith(execute_result_name):
400                    history_execute_result = os.path.abspath(
401                        os.path.join(root_dir, result_file))
402        return history_execute_result
403
404    @classmethod
405    def _check_testcase_pass(cls, history_testcase_element):
406        case = Case()
407        case.result = history_testcase_element.get(ReportConstant.result, "")
408        case.status = history_testcase_element.get(ReportConstant.status, "")
409        case.message = history_testcase_element.get(ReportConstant.message, "")
410        if len(history_testcase_element) > 0:
411            if not case.result:
412                case.result = ReportConstant.false
413            case.message = history_testcase_element[0].get(
414                ReportConstant.message)
415
416        return case.is_passed()
417
418    @classmethod
419    def _is_empty_report(cls, testsuites_element):
420        if len(testsuites_element) < 1:
421            return True
422        if len(testsuites_element) >= 2:
423            return False
424
425        if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0:
426            return True
427        return False
428
429    def _get_data_report_from_record(self, execute_result_name):
430        history_report_path = \
431            getattr(self.task.config, "history_report_path", "")
432        if history_report_path:
433            from _core.report.result_reporter import ResultReporter
434            params = ResultReporter.get_task_info_params(history_report_path)
435            if params:
436                report_data_dict = dict(params[ReportConst.data_reports])
437                if execute_result_name in report_data_dict.keys():
438                    return report_data_dict.get(execute_result_name)
439                elif execute_result_name.split(".")[0] in \
440                        report_data_dict.keys():
441                    return report_data_dict.get(
442                        execute_result_name.split(".")[0])
443        return ""
444
445    @classmethod
446    def _get_real_execute_result(cls, execute_result):
447        from xdevice import SuiteReporter
448        LOG.debug("Get real execute result length is: %s" %
449                  len(SuiteReporter.get_report_result()))
450        if check_mode(ModeType.decc):
451            for suite_report, report_result in \
452                    SuiteReporter.get_report_result():
453                if os.path.splitext(suite_report)[0] == \
454                        os.path.splitext(execute_result)[0]:
455                    return report_result
456            return ""
457        else:
458            return execute_result
459
460    @classmethod
461    def _get_real_history_execute_result(cls, history_execute_result,
462                                         module_name):
463        from xdevice import SuiteReporter
464        LOG.debug("Get real history execute result: %s" %
465                  SuiteReporter.history_report_result)
466        if check_mode(ModeType.decc):
467            virtual_report_path, report_result = SuiteReporter. \
468                get_history_result_by_module(module_name)
469            return report_result
470        else:
471            return history_execute_result
472
473
474class DriversDryRunThread(threading.Thread):
475    def __init__(self, test_driver, task, environment, message_queue):
476        threading.Thread.__init__(self)
477        self.test_driver = test_driver
478        self.listeners = None
479        self.task = task
480        self.environment = environment
481        self.message_queue = message_queue
482        self.thread_id = None
483        self.error_message = ""
484
485    def set_thread_id(self, thread_id):
486        self.thread_id = thread_id
487
488    def run(self):
489        from xdevice import Scheduler
490        LOG.debug("Thread id: %s start" % self.thread_id)
491        start_time = time.time()
492        execute_message = ExecuteMessage('', self.environment,
493                                         self.test_driver, self.thread_id)
494        driver, test = None, None
495        try:
496            if self.test_driver and Scheduler.is_execute:
497                # construct params
498                driver, test = self.test_driver
499                driver_request = self._get_driver_request(test,
500                                                          execute_message)
501                if driver_request is None:
502                    return
503
504                # setup device
505                self._do_task_setup(driver_request)
506
507                # driver execute
508                self.reset_device(driver_request.config)
509                driver.__dry_run_execute__(driver_request)
510
511        except Exception as exception:
512            error_no = getattr(exception, "error_no", "00000")
513            if self.environment is None:
514                LOG.exception("Exception: %s", exception, exc_info=False,
515                              error_no=error_no)
516            else:
517                LOG.exception(
518                    "Device: %s, exception: %s" % (
519                        self.environment.__get_serial__(), exception),
520                    exc_info=False, error_no=error_no)
521            self.error_message = "{}: {}".format(
522                get_instance_name(exception), str(exception))
523
524        finally:
525            self._handle_finally(driver, execute_message, start_time, test)
526
527    @staticmethod
528    def reset_device(config):
529        if getattr(config, "reboot_per_module", False):
530            for device in config.environment.devices:
531                device.reboot()
532
533    def _handle_finally(self, driver, execute_message, start_time, test):
534        from xdevice import Scheduler
535        # output execute time
536        end_time = time.time()
537        execute_time = VisionHelper.get_execute_time(int(
538            end_time - start_time))
539        source_content = self.test_driver[1].source.source_file or \
540                         self.test_driver[1].source.source_string
541        LOG.info("Executed: %s, Execution Time: %s" % (
542            source_content, execute_time))
543
544        # set execute state
545        if self.error_message:
546            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
547        else:
548            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
549
550        # free environment
551        if self.environment:
552            LOG.debug("Thread %s free environment",
553                      execute_message.get_thread_id())
554            Scheduler.__free_environment__(execute_message.get_environment())
555
556        LOG.debug("Put thread %s result", self.thread_id)
557        self.message_queue.put(execute_message)
558
559    def _do_task_setup(self, driver_request):
560        if check_mode(ModeType.decc) or getattr(
561                driver_request.config, ConfigConst.check_device, False):
562            return
563
564        if self.environment is None:
565            return
566
567        from xdevice import Scheduler
568        for device in self.environment.devices:
569            if not getattr(device, ConfigConst.need_kit_setup, True):
570                LOG.debug("Device %s need kit setup is false" % device)
571                continue
572
573            # do task setup for device
574            kits_copy = copy.deepcopy(self.task.config.kits)
575            setattr(device, ConfigConst.task_kits, kits_copy)
576            for kit in getattr(device, ConfigConst.task_kits, []):
577                if not Scheduler.is_execute:
578                    break
579                try:
580                    kit.__setup__(device, request=driver_request)
581                except (ParamError, ExecuteTerminate, DeviceError,
582                        LiteDeviceError, ValueError, TypeError,
583                        SyntaxError, AttributeError) as exception:
584                    error_no = getattr(exception, "error_no", "00000")
585                    LOG.exception(
586                        "Task setup device: %s, exception: %s" % (
587                            self.environment.__get_serial__(),
588                            exception), exc_info=False, error_no=error_no)
589            LOG.debug("Set device %s need kit setup to false" % device)
590            setattr(device, ConfigConst.need_kit_setup, False)
591
592        # set product_info to self.task
593        if getattr(driver_request, ConfigConst.product_info, "") and not \
594                getattr(self.task, ConfigConst.product_info, ""):
595            product_info = getattr(driver_request, ConfigConst.product_info)
596            if not isinstance(product_info, dict):
597                LOG.warning("Product info should be dict, %s",
598                            product_info)
599                return
600            setattr(self.task, ConfigConst.product_info, product_info)
601
602    def _get_driver_request(self, root_desc, execute_message):
603        config = Config()
604        config.update(copy.deepcopy(self.task.config).__dict__)
605        config.environment = self.environment
606        if self.listeners:
607            for listener in self.listeners:
608                LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
609        driver_request = Request(self.thread_id, root_desc, self.listeners,
610                                 config)
611        execute_message.set_request(driver_request)
612        return driver_request
613
614
615class QueueMonitorThread(threading.Thread):
616
617    def __init__(self, message_queue, current_driver_threads, test_drivers):
618        threading.Thread.__init__(self)
619        self.message_queue = message_queue
620        self.current_driver_threads = current_driver_threads
621        self.test_drivers = test_drivers
622
623    def run(self):
624        from xdevice import Scheduler
625        LOG.debug("Queue monitor thread start")
626        while self.test_drivers or self.current_driver_threads:
627            if not self.current_driver_threads:
628                time.sleep(3)
629                continue
630            execute_message = self.message_queue.get()
631
632            self.current_driver_threads.pop(execute_message.get_thread_id())
633
634            if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH:
635                LOG.debug("Thread id: %s execute finished" %
636                          execute_message.get_thread_id())
637            elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR:
638                LOG.debug("Thread id: %s execute error" %
639                          execute_message.get_thread_id())
640
641            if Scheduler.upload_address:
642                Scheduler.upload_module_result(execute_message)
643
644        LOG.debug("Queue monitor thread end")
645        if not Scheduler.is_execute:
646            LOG.info("Terminate success")
647            Scheduler.terminate_result.put("terminate success")
648
649
650class ExecuteMessage:
651    DEVICE_RUN = 'device_run'
652    DEVICE_FINISH = 'device_finish'
653    DEVICE_ERROR = 'device_error'
654
655    def __init__(self, state, environment, drivers, thread_id):
656        self.state = state
657        self.environment = environment
658        self.drivers = drivers
659        self.thread_id = thread_id
660        self.request = None
661        self.result = None
662
663    def set_state(self, state):
664        self.state = state
665
666    def get_state(self):
667        return self.state
668
669    def set_request(self, request):
670        self.request = request
671
672    def get_request(self):
673        return self.request
674
675    def set_result(self, result):
676        self.result = result
677
678    def get_result(self):
679        return self.result
680
681    def get_environment(self):
682        return self.environment
683
684    def get_thread_id(self):
685        return self.thread_id
686
687    def get_drivers(self):
688        return self.drivers
689