• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import shutil
22import threading
23import time
24from concurrent.futures import ThreadPoolExecutor
25from concurrent.futures import wait
26
27from _core.constants import ModeType
28from _core.constants import ConfigConst
29from _core.constants import ReportConst
30from _core.executor.request import Request
31from _core.logger import platform_logger
32from _core.plugin import Config
33from _core.utils import get_instance_name
34from _core.utils import check_mode
35from _core.exception import ParamError
36from _core.exception import ExecuteTerminate
37from _core.exception import DeviceError
38from _core.exception import LiteDeviceError
39from _core.report.reporter_helper import VisionHelper
40from _core.report.reporter_helper import ReportConstant
41from _core.report.reporter_helper import DataHelper
42from _core.report.reporter_helper import Suite
43from _core.report.reporter_helper import Case
44
45LOG = platform_logger("Concurrent")
46
47
48class Concurrent:
49    @classmethod
50    def executor_callback(cls, worker):
51        worker_exception = worker.exception()
52        if worker_exception:
53            LOG.error("Worker return exception: {}".format(worker_exception))
54
55    @classmethod
56    def concurrent_execute(cls, func, params_list, max_size=8):
57        """
58        Provider the ability to execute target function concurrently
59        :param func: target function name
60        :param params_list: the list of params in these target functions
61        :param max_size:  the max size of thread  you wanted  in thread pool
62        :return:
63        """
64        with ThreadPoolExecutor(max_size) as executor:
65            future_params = dict()
66            for params in params_list:
67                future = executor.submit(func, *params)
68                future_params.update({future: params})
69                future.add_done_callback(cls.executor_callback)
70            wait(future_params)  # wait all function complete
71            result_list = []
72            for future in future_params:
73                result_list.append((future.result(), future_params[future]))
74            return result_list
75
76
77class DriversThread(threading.Thread):
78    def __init__(self, test_driver, task, environment, message_queue):
79        threading.Thread.__init__(self)
80        self.test_driver = test_driver
81        self.listeners = None
82        self.task = task
83        self.environment = environment
84        self.message_queue = message_queue
85        self.thread_id = None
86        self.error_message = ""
87
88    def set_listeners(self, listeners):
89        self.listeners = listeners
90        if self.environment is None:
91            return
92
93        for listener in listeners:
94            listener.device_sn = self.environment.devices[0].device_sn
95
96    def set_thread_id(self, thread_id):
97        self.thread_id = thread_id
98
99    def run(self):
100        from xdevice import Scheduler
101        LOG.debug("Thread id: %s start" % self.thread_id)
102        start_time = time.time()
103        execute_message = ExecuteMessage('', self.environment,
104                                         self.test_driver, self.thread_id)
105        driver, test = None, None
106        try:
107            if self.test_driver and Scheduler.is_execute:
108                # construct params
109                driver, test = self.test_driver
110                driver_request = self._get_driver_request(test,
111                                                          execute_message)
112                if driver_request is None:
113                    return
114
115                # setup device
116                self._do_task_setup(driver_request)
117
118                # driver execute
119                self.reset_device(driver_request.config)
120                driver.__execute__(driver_request)
121
122        except Exception as exception:
123            error_no = getattr(exception, "error_no", "00000")
124            if self.environment is None:
125                LOG.exception("Exception: %s", exception, exc_info=False,
126                              error_no=error_no)
127            else:
128                LOG.exception(
129                    "Device: %s, exception: %s" % (
130                        self.environment.__get_serial__(), exception),
131                    exc_info=False, error_no=error_no)
132            self.error_message = "{}: {}".format(
133                get_instance_name(exception), str(exception))
134
135        finally:
136            self._handle_finally(driver, execute_message, start_time, test)
137
138    @staticmethod
139    def reset_device(config):
140        if getattr(config, "reboot_per_module", False):
141            for device in config.environment.devices:
142                device.reboot()
143
144    def _handle_finally(self, driver, execute_message, start_time, test):
145        from xdevice import Scheduler
146        # output execute time
147        end_time = time.time()
148        execute_time = VisionHelper.get_execute_time(int(
149            end_time - start_time))
150        source_content = self.test_driver[1].source.source_file or \
151                         self.test_driver[1].source.source_string
152        LOG.info("Executed: %s, Execution Time: %s" % (
153            source_content, execute_time))
154
155        # inherit history report under retry mode
156        if driver and test:
157            execute_result = driver.__result__()
158            LOG.debug("Execute result:%s" % execute_result)
159            if getattr(self.task.config, "history_report_path", ""):
160                execute_result = self._inherit_execute_result(
161                    execute_result, test)
162            execute_message.set_result(execute_result)
163
164        # set execute state
165        if self.error_message:
166            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
167        else:
168            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
169
170        # free environment
171        if self.environment:
172            LOG.debug("Thread %s free environment",
173                      execute_message.get_thread_id())
174            Scheduler.__free_environment__(execute_message.get_environment())
175
176        LOG.debug("Put thread %s result", self.thread_id)
177        self.message_queue.put(execute_message)
178        LOG.info("")
179
180    def _do_task_setup(self, driver_request):
181        if check_mode(ModeType.decc) or getattr(
182                driver_request.config, ConfigConst.check_device, False):
183            return
184
185        if self.environment is None:
186            return
187
188        from xdevice import Scheduler
189        for device in self.environment.devices:
190            if not getattr(device, ConfigConst.need_kit_setup, True):
191                LOG.debug("Device %s need kit setup is false" % device)
192                continue
193
194            # do task setup for device
195            kits_copy = copy.deepcopy(self.task.config.kits)
196            setattr(device, ConfigConst.task_kits, kits_copy)
197            for kit in getattr(device, ConfigConst.task_kits, []):
198                if not Scheduler.is_execute:
199                    break
200                try:
201                    kit.__setup__(device, request=driver_request)
202                except (ParamError, ExecuteTerminate, DeviceError,
203                        LiteDeviceError, ValueError, TypeError,
204                        SyntaxError, AttributeError) as exception:
205                    error_no = getattr(exception, "error_no", "00000")
206                    LOG.exception(
207                        "Task setup device: %s, exception: %s" % (
208                            self.environment.__get_serial__(),
209                            exception), exc_info=False, error_no=error_no)
210            LOG.debug("Set device %s need kit setup to false" % device)
211            setattr(device, ConfigConst.need_kit_setup, False)
212
213        # set product_info to self.task
214        if getattr(driver_request, ConfigConst.product_info, "") and not \
215                getattr(self.task, ConfigConst.product_info, ""):
216            product_info = getattr(driver_request, ConfigConst.product_info)
217            if not isinstance(product_info, dict):
218                LOG.warning("Product info should be dict, %s",
219                            product_info)
220                return
221            setattr(self.task, ConfigConst.product_info, product_info)
222
223    def _get_driver_request(self, root_desc, execute_message):
224        config = Config()
225        config.update(copy.deepcopy(self.task.config).__dict__)
226        config.environment = self.environment
227        if getattr(config, "history_report_path", ""):
228            # modify config.testargs
229            history_report_path = getattr(config, "history_report_path", "")
230            module_name = root_desc.source.module_name
231            unpassed_test_params = self._get_unpassed_test_params(
232                history_report_path, module_name)
233            if not unpassed_test_params:
234                LOG.info("%s all test cases are passed, no need retry",
235                         module_name)
236                driver_request = Request(self.thread_id, root_desc,
237                                         self.listeners, config)
238                execute_message.set_request(driver_request)
239                return None
240            if unpassed_test_params[0] != module_name and \
241                    unpassed_test_params[0] != str(module_name).split(".")[0]:
242                test_args = getattr(config, "testargs", {})
243                test_params = []
244                for unpassed_test_param in unpassed_test_params:
245                    if unpassed_test_param not in test_params:
246                        test_params.append(unpassed_test_param)
247                test_args["test"] = test_params
248                if "class" in test_args.keys():
249                    test_args.pop("class")
250                setattr(config, "testargs", test_args)
251        if getattr(config, "tf_suite", ""):
252            if root_desc.source.module_name in config.tf_suite.keys():
253                config.tf_suite = config.tf_suite.get(
254                    root_desc.source.module_name)
255            else:
256                config.tf_suite = dict()
257        for listener in self.listeners:
258            LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
259        driver_request = Request(self.thread_id, root_desc, self.listeners,
260                                 config)
261        execute_message.set_request(driver_request)
262        return driver_request
263
264    @classmethod
265    def _get_unpassed_test_params(cls, history_report_path, module_name):
266        unpassed_test_params = []
267        from _core.report.result_reporter import ResultReporter
268        params = ResultReporter.get_task_info_params(history_report_path)
269        if not params:
270            return unpassed_test_params
271        failed_list = []
272        try:
273            from devicetest.agent.decc import Handler
274            if Handler.DAV.retry_select:
275                for i in Handler.DAV.case_id_list:
276                    failed_list.append(i + "#" + i)
277            else:
278                failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
279        except Exception:
280            failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
281        if not failed_list:
282            failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], [])
283        unpassed_test_params.extend(failed_list)
284        LOG.debug("Get unpassed test params %s", unpassed_test_params)
285        return unpassed_test_params
286
287    @classmethod
288    def _append_unpassed_test_param(cls, history_report_file,
289                                    unpassed_test_params):
290
291        testsuites_element = DataHelper.parse_data_report(history_report_file)
292        for testsuite_element in testsuites_element:
293            suite_name = testsuite_element.get("name", "")
294            suite = Suite()
295            suite.set_cases(testsuite_element)
296            for case in suite.cases:
297                if case.is_passed():
298                    continue
299                unpassed_test_param = "{}#{}#{}".format(
300                    suite_name, case.classname, case.name)
301                unpassed_test_params.append(unpassed_test_param)
302
303    def _inherit_execute_result(self, execute_result, root_desc):
304        module_name = root_desc.source.module_name
305        execute_result_name = "%s.xml" % module_name
306        history_execute_result = self._get_history_execute_result(
307            execute_result_name)
308        if not history_execute_result:
309            LOG.warning("%s no history execute result exists",
310                        execute_result_name)
311            return execute_result
312
313        if not check_mode(ModeType.decc):
314            if not os.path.exists(execute_result):
315                result_dir = \
316                    os.path.join(self.task.config.report_path, "result")
317                os.makedirs(result_dir, exist_ok=True)
318                target_execute_result = os.path.join(result_dir,
319                                                     execute_result_name)
320                shutil.copyfile(history_execute_result, target_execute_result)
321                LOG.info("Copy %s to %s" % (history_execute_result,
322                                            target_execute_result))
323                return target_execute_result
324
325        real_execute_result = self._get_real_execute_result(execute_result)
326
327        # inherit history execute result
328        testsuites_element = DataHelper.parse_data_report(real_execute_result)
329        if self._is_empty_report(testsuites_element):
330            if check_mode(ModeType.decc):
331                LOG.info("Empty report no need to inherit history execute"
332                         " result")
333            else:
334                LOG.info("Empty report '%s' no need to inherit history execute"
335                         " result", history_execute_result)
336            return execute_result
337
338        real_history_execute_result = self._get_real_history_execute_result(
339            history_execute_result, module_name)
340
341        history_testsuites_element = DataHelper.parse_data_report(
342            real_history_execute_result)
343        if self._is_empty_report(history_testsuites_element):
344            LOG.info("History report '%s' is empty", history_execute_result)
345            return execute_result
346        if check_mode(ModeType.decc):
347            LOG.info("Inherit history execute result")
348        else:
349            LOG.info("Inherit history execute result: %s",
350                     history_execute_result)
351        self._inherit_element(history_testsuites_element, testsuites_element)
352
353        if check_mode(ModeType.decc):
354            from xdevice import SuiteReporter
355            SuiteReporter.append_report_result(
356                (execute_result, DataHelper.to_string(testsuites_element)))
357        else:
358            # generate inherit execute result
359            DataHelper.generate_report(testsuites_element, execute_result)
360        return execute_result
361
362    def _inherit_element(self, history_testsuites_element, testsuites_element):
363        for history_testsuite_element in history_testsuites_element:
364            history_testsuite_name = history_testsuite_element.get("name", "")
365            target_testsuite_element = None
366            for testsuite_element in testsuites_element:
367                if history_testsuite_name == testsuite_element.get("name", ""):
368                    target_testsuite_element = testsuite_element
369                    break
370
371            if target_testsuite_element is None:
372                testsuites_element.append(history_testsuite_element)
373                inherited_test = int(testsuites_element.get(
374                    ReportConstant.tests, 0)) + int(
375                    history_testsuite_element.get(ReportConstant.tests, 0))
376                testsuites_element.set(ReportConstant.tests,
377                                       str(inherited_test))
378                continue
379
380            pass_num = 0
381            for history_testcase_element in history_testsuite_element:
382                if self._check_testcase_pass(history_testcase_element):
383                    target_testsuite_element.append(history_testcase_element)
384                    pass_num += 1
385
386            inherited_test = int(target_testsuite_element.get(
387                ReportConstant.tests, 0)) + pass_num
388            target_testsuite_element.set(ReportConstant.tests,
389                                         str(inherited_test))
390            inherited_test = int(testsuites_element.get(
391                ReportConstant.tests, 0)) + pass_num
392            testsuites_element.set(ReportConstant.tests, str(inherited_test))
393
394    def _get_history_execute_result(self, execute_result_name):
395        if execute_result_name.endswith(".xml"):
396            execute_result_name = execute_result_name[:-4]
397        history_execute_result = \
398            self._get_data_report_from_record(execute_result_name)
399        if history_execute_result:
400            return history_execute_result
401        for root_dir, _, files in os.walk(
402                self.task.config.history_report_path):
403            for result_file in files:
404                if result_file.endswith(execute_result_name):
405                    history_execute_result = os.path.abspath(
406                        os.path.join(root_dir, result_file))
407        return history_execute_result
408
409    @classmethod
410    def _check_testcase_pass(cls, history_testcase_element):
411        case = Case()
412        case.result = history_testcase_element.get(ReportConstant.result, "")
413        case.status = history_testcase_element.get(ReportConstant.status, "")
414        case.message = history_testcase_element.get(ReportConstant.message, "")
415        if len(history_testcase_element) > 0:
416            if not case.result:
417                case.result = ReportConstant.false
418            case.message = history_testcase_element[0].get(
419                ReportConstant.message)
420
421        return case.is_passed()
422
423    @classmethod
424    def _is_empty_report(cls, testsuites_element):
425        if len(testsuites_element) < 1:
426            return True
427        if len(testsuites_element) >= 2:
428            return False
429
430        if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0:
431            return True
432        return False
433
434    def _get_data_report_from_record(self, execute_result_name):
435        history_report_path = \
436            getattr(self.task.config, "history_report_path", "")
437        if history_report_path:
438            from _core.report.result_reporter import ResultReporter
439            params = ResultReporter.get_task_info_params(history_report_path)
440            if params:
441                report_data_dict = dict(params[ReportConst.data_reports])
442                if execute_result_name in report_data_dict.keys():
443                    return report_data_dict.get(execute_result_name)
444                elif execute_result_name.split(".")[0] in \
445                        report_data_dict.keys():
446                    return report_data_dict.get(
447                        execute_result_name.split(".")[0])
448        return ""
449
450    @classmethod
451    def _get_real_execute_result(cls, execute_result):
452        from xdevice import SuiteReporter
453        LOG.debug("Get real execute result length is: %s" %
454                  len(SuiteReporter.get_report_result()))
455        if check_mode(ModeType.decc):
456            for suite_report, report_result in \
457                    SuiteReporter.get_report_result():
458                if os.path.splitext(suite_report)[0] == \
459                        os.path.splitext(execute_result)[0]:
460                    return report_result
461            return ""
462        else:
463            return execute_result
464
465    @classmethod
466    def _get_real_history_execute_result(cls, history_execute_result,
467                                         module_name):
468        from xdevice import SuiteReporter
469        LOG.debug("Get real history execute result: %s" %
470                  SuiteReporter.history_report_result)
471        if check_mode(ModeType.decc):
472            virtual_report_path, report_result = SuiteReporter. \
473                get_history_result_by_module(module_name)
474            return report_result
475        else:
476            return history_execute_result
477
478
479class DriversDryRunThread(threading.Thread):
480    def __init__(self, test_driver, task, environment, message_queue):
481        threading.Thread.__init__(self)
482        self.test_driver = test_driver
483        self.listeners = None
484        self.task = task
485        self.environment = environment
486        self.message_queue = message_queue
487        self.thread_id = None
488        self.error_message = ""
489
490    def set_thread_id(self, thread_id):
491        self.thread_id = thread_id
492
493    def run(self):
494        from xdevice import Scheduler
495        LOG.debug("Thread id: %s start" % self.thread_id)
496        start_time = time.time()
497        execute_message = ExecuteMessage('', self.environment,
498                                         self.test_driver, self.thread_id)
499        driver, test = None, None
500        try:
501            if self.test_driver and Scheduler.is_execute:
502                # construct params
503                driver, test = self.test_driver
504                driver_request = self._get_driver_request(test,
505                                                          execute_message)
506                if driver_request is None:
507                    return
508
509                # setup device
510                self._do_task_setup(driver_request)
511
512                # driver execute
513                self.reset_device(driver_request.config)
514                driver.__dry_run_execute__(driver_request)
515
516        except Exception as exception:
517            error_no = getattr(exception, "error_no", "00000")
518            if self.environment is None:
519                LOG.exception("Exception: %s", exception, exc_info=False,
520                              error_no=error_no)
521            else:
522                LOG.exception(
523                    "Device: %s, exception: %s" % (
524                        self.environment.__get_serial__(), exception),
525                    exc_info=False, error_no=error_no)
526            self.error_message = "{}: {}".format(
527                get_instance_name(exception), str(exception))
528
529        finally:
530            self._handle_finally(driver, execute_message, start_time, test)
531
532    @staticmethod
533    def reset_device(config):
534        if getattr(config, "reboot_per_module", False):
535            for device in config.environment.devices:
536                device.reboot()
537
538    def _handle_finally(self, driver, execute_message, start_time, test):
539        from xdevice import Scheduler
540        # output execute time
541        end_time = time.time()
542        execute_time = VisionHelper.get_execute_time(int(
543            end_time - start_time))
544        source_content = self.test_driver[1].source.source_file or \
545                         self.test_driver[1].source.source_string
546        LOG.info("Executed: %s, Execution Time: %s" % (
547            source_content, execute_time))
548
549        # set execute state
550        if self.error_message:
551            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
552        else:
553            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
554
555        # free environment
556        if self.environment:
557            LOG.debug("Thread %s free environment",
558                      execute_message.get_thread_id())
559            Scheduler.__free_environment__(execute_message.get_environment())
560
561        LOG.debug("Put thread %s result", self.thread_id)
562        self.message_queue.put(execute_message)
563
564    def _do_task_setup(self, driver_request):
565        if check_mode(ModeType.decc) or getattr(
566                driver_request.config, ConfigConst.check_device, False):
567            return
568
569        if self.environment is None:
570            return
571
572        from xdevice import Scheduler
573        for device in self.environment.devices:
574            if not getattr(device, ConfigConst.need_kit_setup, True):
575                LOG.debug("Device %s need kit setup is false" % device)
576                continue
577
578            # do task setup for device
579            kits_copy = copy.deepcopy(self.task.config.kits)
580            setattr(device, ConfigConst.task_kits, kits_copy)
581            for kit in getattr(device, ConfigConst.task_kits, []):
582                if not Scheduler.is_execute:
583                    break
584                try:
585                    kit.__setup__(device, request=driver_request)
586                except (ParamError, ExecuteTerminate, DeviceError,
587                        LiteDeviceError, ValueError, TypeError,
588                        SyntaxError, AttributeError) as exception:
589                    error_no = getattr(exception, "error_no", "00000")
590                    LOG.exception(
591                        "Task setup device: %s, exception: %s" % (
592                            self.environment.__get_serial__(),
593                            exception), exc_info=False, error_no=error_no)
594            LOG.debug("Set device %s need kit setup to false" % device)
595            setattr(device, ConfigConst.need_kit_setup, False)
596
597        # set product_info to self.task
598        if getattr(driver_request, ConfigConst.product_info, "") and not \
599                getattr(self.task, ConfigConst.product_info, ""):
600            product_info = getattr(driver_request, ConfigConst.product_info)
601            if not isinstance(product_info, dict):
602                LOG.warning("Product info should be dict, %s",
603                            product_info)
604                return
605            setattr(self.task, ConfigConst.product_info, product_info)
606
607    def _get_driver_request(self, root_desc, execute_message):
608        config = Config()
609        config.update(copy.deepcopy(self.task.config).__dict__)
610        config.environment = self.environment
611        if self.listeners:
612            for listener in self.listeners:
613                LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
614        driver_request = Request(self.thread_id, root_desc, self.listeners,
615                                 config)
616        execute_message.set_request(driver_request)
617        return driver_request
618
619
620class QueueMonitorThread(threading.Thread):
621
622    def __init__(self, message_queue, current_driver_threads, test_drivers):
623        threading.Thread.__init__(self)
624        self.message_queue = message_queue
625        self.current_driver_threads = current_driver_threads
626        self.test_drivers = test_drivers
627
628    def run(self):
629        from xdevice import Scheduler
630        LOG.debug("Queue monitor thread start")
631        while self.test_drivers or self.current_driver_threads:
632            if not self.current_driver_threads:
633                time.sleep(3)
634                continue
635            execute_message = self.message_queue.get()
636
637            self.current_driver_threads.pop(execute_message.get_thread_id())
638
639            if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH:
640                LOG.debug("Thread id: %s execute finished" %
641                          execute_message.get_thread_id())
642            elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR:
643                LOG.debug("Thread id: %s execute error" %
644                          execute_message.get_thread_id())
645
646            if Scheduler.upload_address:
647                Scheduler.upload_module_result(execute_message)
648
649        LOG.debug("Queue monitor thread end")
650        if not Scheduler.is_execute:
651            LOG.info("Terminate success")
652            Scheduler.terminate_result.put("terminate success")
653
654
655class ExecuteMessage:
656    DEVICE_RUN = 'device_run'
657    DEVICE_FINISH = 'device_finish'
658    DEVICE_ERROR = 'device_error'
659
660    def __init__(self, state, environment, drivers, thread_id):
661        self.state = state
662        self.environment = environment
663        self.drivers = drivers
664        self.thread_id = thread_id
665        self.request = None
666        self.result = None
667
668    def set_state(self, state):
669        self.state = state
670
671    def get_state(self):
672        return self.state
673
674    def set_request(self, request):
675        self.request = request
676
677    def get_request(self):
678        return self.request
679
680    def set_result(self, result):
681        self.result = result
682
683    def get_result(self):
684        return self.result
685
686    def get_environment(self):
687        return self.environment
688
689    def get_thread_id(self):
690        return self.thread_id
691
692    def get_drivers(self):
693        return self.drivers
694