• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import shutil
22import threading
23import time
24from concurrent.futures import ThreadPoolExecutor
25from concurrent.futures import wait
26
27from _core.constants import ModeType
28from _core.constants import ConfigConst
29from _core.executor.request import Request
30from _core.logger import platform_logger
31from _core.plugin import Config
32from _core.utils import get_instance_name
33from _core.utils import get_filename_extension
34from _core.utils import check_mode
35from _core.exception import ParamError
36from _core.exception import ExecuteTerminate
37from _core.exception import DeviceError
38from _core.exception import LiteDeviceError
39from _core.report.reporter_helper import VisionHelper
40from _core.report.reporter_helper import ReportConstant
41from _core.report.reporter_helper import DataHelper
42from _core.report.reporter_helper import Suite
43from _core.report.reporter_helper import Case
44
45LOG = platform_logger("Concurrent")
46
47
48class Concurrent:
49    @classmethod
50    def executor_callback(cls, worker):
51        worker_exception = worker.exception()
52        if worker_exception:
53            LOG.error("Worker return exception: {}".format(worker_exception))
54
55    @classmethod
56    def concurrent_execute(cls, func, params_list, max_size=8):
57        """
58        Provider the ability to execute target function concurrently
59        :param func: target function name
60        :param params_list: the list of params in these target functions
61        :param max_size:  the max size of thread  you wanted  in thread pool
62        :return:
63        """
64        with ThreadPoolExecutor(max_size) as executor:
65            future_params = dict()
66            for params in params_list:
67                future = executor.submit(func, *params)
68                future_params.update({future: params})
69                future.add_done_callback(cls.executor_callback)
70            wait(future_params)  # wait all function complete
71            result_list = []
72            for future in future_params:
73                result_list.append((future.result(), future_params[future]))
74            return result_list
75
76
77class DriversThread(threading.Thread):
78    def __init__(self, test_driver, task, environment, message_queue):
79        threading.Thread.__init__(self)
80        self.test_driver = test_driver
81        self.listeners = None
82        self.task = task
83        self.environment = environment
84        self.message_queue = message_queue
85        self.thread_id = None
86        self.error_message = ""
87
88    def set_listeners(self, listeners):
89        self.listeners = listeners
90        if self.environment is None:
91            return
92
93        for listener in listeners:
94            listener.device_sn = self.environment.devices[0].device_sn
95
96    def set_thread_id(self, thread_id):
97        self.thread_id = thread_id
98
99    def run(self):
100        from xdevice import Scheduler
101        LOG.debug("thread id: %s start" % self.thread_id)
102        start_time = time.time()
103        execute_message = ExecuteMessage('', self.environment,
104                                         self.test_driver, self.thread_id)
105        driver, test = None, None
106        try:
107            if self.test_driver and Scheduler.is_execute:
108                # construct params
109                driver, test = self.test_driver
110                driver_request = self._get_driver_request(test,
111                                                          execute_message)
112                if driver_request is None:
113                    return
114
115                # setup device
116                self._do_task_setup(driver_request)
117
118                # driver execute
119                self.reset_device(driver_request.config)
120                driver.__execute__(driver_request)
121
122        except Exception as exception:
123            error_no = getattr(exception, "error_no", "00000")
124            if self.environment is None:
125                LOG.exception("exception: %s", exception, exc_info=False,
126                              error_no=error_no)
127            else:
128                LOG.exception(
129                    "device: %s, exception: %s" % (
130                        self.environment.__get_serial__(), exception),
131                    exc_info=False, error_no=error_no)
132            self.error_message = "{}: {}".format(
133                get_instance_name(exception), str(exception))
134
135        finally:
136            self._handle_finally(driver, execute_message, start_time, test)
137
138    @staticmethod
139    def reset_device(config):
140        if getattr(config, "reboot_per_module", False):
141            for device in config.environment.devices:
142                device.reboot()
143
144    def _handle_finally(self, driver, execute_message, start_time, test):
145        from xdevice import Scheduler
146        # output execute time
147        end_time = time.time()
148        execute_time = VisionHelper.get_execute_time(int(
149            end_time - start_time))
150        source_content = self.test_driver[1].source.source_file or \
151                         self.test_driver[1].source.source_string
152        LOG.info("Executed: %s, Execution Time: %s" % (
153            source_content, execute_time))
154
155        # inherit history report under retry mode
156        if driver and test:
157            execute_result = driver.__result__()
158            LOG.debug("execute_result:%s" % execute_result)
159            if getattr(self.task.config, "history_report_path", ""):
160                execute_result = self._inherit_execute_result(
161                    execute_result, test)
162            execute_message.set_result(execute_result)
163
164        # set execute state
165        if self.error_message:
166            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
167        else:
168            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
169
170        # free environment
171        if self.environment:
172            LOG.debug("thread %s free environment",
173                      execute_message.get_thread_id())
174            Scheduler.__free_environment__(execute_message.get_environment())
175
176        LOG.debug("put thread %s result", self.thread_id)
177        self.message_queue.put(execute_message)
178        LOG.info("")
179
180    def _do_task_setup(self, driver_request):
181        if check_mode(ModeType.decc) or getattr(
182                driver_request.config, ConfigConst.check_device, False):
183            return
184
185        if self.environment is None:
186            return
187
188        from xdevice import Scheduler
189        for device in self.environment.devices:
190            if not getattr(device, ConfigConst.need_kit_setup, True):
191                LOG.debug("device %s need kit setup is false" % device)
192                continue
193
194            # do task setup for device
195            kits_copy = copy.deepcopy(self.task.config.kits)
196            setattr(device, ConfigConst.task_kits, kits_copy)
197            for kit in getattr(device, ConfigConst.task_kits, []):
198                if not Scheduler.is_execute:
199                    break
200                try:
201                    kit.__setup__(device, request=driver_request)
202                except (ParamError, ExecuteTerminate, DeviceError,
203                        LiteDeviceError, ValueError, TypeError,
204                        SyntaxError, AttributeError) as exception:
205                    error_no = getattr(exception, "error_no", "00000")
206                    LOG.exception(
207                        "task setup device: %s, exception: %s" % (
208                            self.environment.__get_serial__(),
209                            exception), exc_info=False, error_no=error_no)
210            LOG.debug("set device %s need kit setup to false" % device)
211            setattr(device, ConfigConst.need_kit_setup, False)
212
213        # set product_info to self.task
214        if getattr(driver_request, ConfigConst.product_info, "") and not \
215                getattr(self.task, ConfigConst.product_info, ""):
216            product_info = getattr(driver_request, ConfigConst.product_info)
217            if not isinstance(product_info, dict):
218                LOG.warning("product info should be dict, %s",
219                            product_info)
220                return
221            setattr(self.task, ConfigConst.product_info, product_info)
222
223    def _get_driver_request(self, root_desc, execute_message):
224        config = Config()
225        config.update(copy.deepcopy(self.task.config).__dict__)
226        config.environment = self.environment
227        if getattr(config, "history_report_path", ""):
228            # modify config.testargs
229            history_report_path = getattr(config, "history_report_path", "")
230            module_name = root_desc.source.module_name
231            unpassed_test_params = self._get_unpassed_test_params(
232                history_report_path, module_name)
233            if not unpassed_test_params:
234                LOG.info("%s all test cases are passed, no need retry",
235                         module_name)
236                driver_request = Request(self.thread_id, root_desc,
237                                         self.listeners, config)
238                execute_message.set_request(driver_request)
239                return None
240            if unpassed_test_params[0] != module_name and \
241                    unpassed_test_params[0] != str(module_name).split(".")[0]:
242                test_args = getattr(config, "testargs", {})
243                test_params = []
244                for unpassed_test_param in unpassed_test_params:
245                    if unpassed_test_param not in test_params:
246                        test_params.append(unpassed_test_param)
247                test_args["test"] = test_params
248                if "class" in test_args.keys():
249                    test_args.pop("class")
250                setattr(config, "testargs", test_args)
251
252        for listener in self.listeners:
253            LOG.debug("thread id %s, listener %s" % (self.thread_id, listener))
254        driver_request = Request(self.thread_id, root_desc, self.listeners,
255                                 config)
256        execute_message.set_request(driver_request)
257        return driver_request
258
259    @classmethod
260    def _get_unpassed_test_params(cls, history_report_path, module_name):
261        unpassed_test_params = []
262        from _core.report.result_reporter import ResultReporter
263        params = ResultReporter.get_task_info_params(history_report_path)
264        if not params:
265            return unpassed_test_params
266
267        failed_list = params[3].get(module_name, [])
268        if not failed_list:
269            failed_list = params[3].get(str(module_name).split(".")[0], [])
270        unpassed_test_params.extend(failed_list)
271        LOG.debug("get unpassed test params %s", unpassed_test_params)
272        return unpassed_test_params
273
274    @classmethod
275    def _append_unpassed_test_param(cls, history_report_file,
276                                    unpassed_test_params):
277
278        testsuites_element = DataHelper.parse_data_report(history_report_file)
279        for testsuite_element in testsuites_element:
280            suite_name = testsuite_element.get("name", "")
281            suite = Suite()
282            suite.set_cases(testsuite_element)
283            for case in suite.cases:
284                if case.is_passed():
285                    continue
286                unpassed_test_param = "{}#{}#{}".format(
287                    suite_name, case.classname, case.name)
288                unpassed_test_params.append(unpassed_test_param)
289
290    def _inherit_execute_result(self, execute_result, root_desc):
291        module_name = root_desc.source.module_name
292        execute_result_name = "%s.xml" % module_name
293        history_execute_result = self._get_history_execute_result(
294            execute_result_name)
295        if not history_execute_result:
296            LOG.warning("%s no history execute result exists",
297                        execute_result_name)
298            return execute_result
299
300        if not check_mode(ModeType.decc):
301            if not os.path.exists(execute_result):
302                result_dir = \
303                    os.path.join(self.task.config.report_path, "result")
304                os.makedirs(result_dir, exist_ok=True)
305                target_execute_result = os.path.join(result_dir,
306                                                     execute_result_name)
307                shutil.copyfile(history_execute_result, target_execute_result)
308                LOG.info("copy %s to %s" % (history_execute_result,
309                                            target_execute_result))
310                return target_execute_result
311
312        real_execute_result = self._get_real_execute_result(execute_result)
313
314        # inherit history execute result
315        testsuites_element = DataHelper.parse_data_report(real_execute_result)
316        if self._is_empty_report(testsuites_element):
317            if check_mode(ModeType.decc):
318                LOG.info("empty report no need to inherit history execute"
319                         " result")
320            else:
321                LOG.info("empty report '%s' no need to inherit history execute"
322                         " result", history_execute_result)
323            return execute_result
324
325        real_history_execute_result = self._get_real_history_execute_result(
326            history_execute_result, module_name)
327
328        history_testsuites_element = DataHelper.parse_data_report(
329            real_history_execute_result)
330        if self._is_empty_report(history_testsuites_element):
331            LOG.info("history report '%s' is empty", history_execute_result)
332            return execute_result
333        if check_mode(ModeType.decc):
334            LOG.info("inherit history execute result")
335        else:
336            LOG.info("inherit history execute result: %s",
337                     history_execute_result)
338        self._inherit_element(history_testsuites_element, testsuites_element)
339
340        if check_mode(ModeType.decc):
341            from xdevice import SuiteReporter
342            SuiteReporter.append_report_result(
343                (execute_result, DataHelper.to_string(testsuites_element)))
344        else:
345            # generate inherit execute result
346            DataHelper.generate_report(testsuites_element, execute_result)
347        return execute_result
348
349    def _inherit_element(self, history_testsuites_element, testsuites_element):
350        for history_testsuite_element in history_testsuites_element:
351            history_testsuite_name = history_testsuite_element.get("name", "")
352            target_testsuite_element = None
353            for testsuite_element in testsuites_element:
354                if history_testsuite_name == testsuite_element.get("name", ""):
355                    target_testsuite_element = testsuite_element
356                    break
357
358            if target_testsuite_element is None:
359                testsuites_element.append(history_testsuite_element)
360                inherited_test = int(testsuites_element.get(
361                    ReportConstant.tests, 0)) + int(
362                    history_testsuite_element.get(ReportConstant.tests, 0))
363                testsuites_element.set(ReportConstant.tests,
364                                       str(inherited_test))
365                continue
366
367            pass_num = 0
368            for history_testcase_element in history_testsuite_element:
369                if self._check_testcase_pass(history_testcase_element):
370                    target_testsuite_element.append(history_testcase_element)
371                    pass_num += 1
372
373            inherited_test = int(target_testsuite_element.get(
374                ReportConstant.tests, 0)) + pass_num
375            target_testsuite_element.set(ReportConstant.tests,
376                                         str(inherited_test))
377            inherited_test = int(testsuites_element.get(
378                ReportConstant.tests, 0)) + pass_num
379            testsuites_element.set(ReportConstant.tests, str(inherited_test))
380
381    def _get_history_execute_result(self, execute_result_name):
382        if execute_result_name.endswith(".xml"):
383            execute_result_name = execute_result_name[:-4]
384        history_execute_result = \
385            self._get_data_report_from_record(execute_result_name)
386        if history_execute_result:
387            return history_execute_result
388        for root_dir, _, files in os.walk(
389                self.task.config.history_report_path):
390            for result_file in files:
391                if result_file.endswith(execute_result_name):
392                    history_execute_result = os.path.abspath(
393                        os.path.join(root_dir, result_file))
394        return history_execute_result
395
396    @classmethod
397    def _check_testcase_pass(cls, history_testcase_element):
398        case = Case()
399        case.result = history_testcase_element.get(ReportConstant.result, "")
400        case.status = history_testcase_element.get(ReportConstant.status, "")
401        case.message = history_testcase_element.get(ReportConstant.message, "")
402        if len(history_testcase_element) > 0:
403            if not case.result:
404                case.result = ReportConstant.false
405            case.message = history_testcase_element[0].get(
406                ReportConstant.message)
407
408        return case.is_passed()
409
410    @classmethod
411    def _is_empty_report(cls, testsuites_element):
412        if len(testsuites_element) < 1:
413            return True
414        if len(testsuites_element) >= 2:
415            return False
416
417        if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0:
418            return True
419        return False
420
421    def _get_data_report_from_record(self, execute_result_name):
422        history_report_path = \
423            getattr(self.task.config, "history_report_path", "")
424        if history_report_path:
425            from _core.report.result_reporter import ResultReporter
426            params = ResultReporter.get_task_info_params(history_report_path)
427            if params:
428                report_data_dict = dict(params[4])
429                if execute_result_name in report_data_dict.keys():
430                    return report_data_dict.get(execute_result_name)
431                elif execute_result_name.split(".")[0] in \
432                        report_data_dict.keys():
433                    return report_data_dict.get(
434                        execute_result_name.split(".")[0])
435        return ""
436
437    @classmethod
438    def _get_real_execute_result(cls, execute_result):
439        from xdevice import SuiteReporter
440        LOG.debug("get_real_execute_result length is: %s" %
441                  len(SuiteReporter.get_report_result()))
442        if check_mode(ModeType.decc):
443            for suite_report, report_result in \
444                    SuiteReporter.get_report_result():
445                if os.path.splitext(suite_report)[0] == \
446                        os.path.splitext(execute_result)[0]:
447                    return report_result
448            return ""
449        else:
450            return execute_result
451
452    @classmethod
453    def _get_real_history_execute_result(cls, history_execute_result,
454                                         module_name):
455        from xdevice import SuiteReporter
456        LOG.debug("get_real_history_execute_result: %s" %
457                  SuiteReporter.history_report_result)
458        if check_mode(ModeType.decc):
459            virtual_report_path, report_result = SuiteReporter. \
460                get_history_result_by_module(module_name)
461            return report_result
462        else:
463            return history_execute_result
464
465
466class QueueMonitorThread(threading.Thread):
467
468    def __init__(self, message_queue, current_driver_threads, test_drivers):
469        threading.Thread.__init__(self)
470        self.message_queue = message_queue
471        self.current_driver_threads = current_driver_threads
472        self.test_drivers = test_drivers
473
474    def run(self):
475        from xdevice import Scheduler
476        LOG.debug("queue monitor thread start")
477        while self.test_drivers or self.current_driver_threads:
478            if not self.current_driver_threads:
479                time.sleep(3)
480                continue
481            execute_message = self.message_queue.get()
482
483            self.current_driver_threads.pop(execute_message.get_thread_id())
484
485            if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH:
486                LOG.debug("thread id: %s execute finished" %
487                          execute_message.get_thread_id())
488            elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR:
489                LOG.debug("thread id: %s execute error" %
490                          execute_message.get_thread_id())
491
492            if Scheduler.upload_address:
493                Scheduler.upload_module_result(execute_message)
494
495        LOG.debug("queue monitor thread end")
496        if not Scheduler.is_execute:
497            LOG.info("terminate success")
498            Scheduler.terminate_result.put("terminate success")
499
500
501class ExecuteMessage:
502    DEVICE_RUN = 'device_run'
503    DEVICE_FINISH = 'device_finish'
504    DEVICE_ERROR = 'device_error'
505
506    def __init__(self, state, environment, drivers, thread_id):
507        self.state = state
508        self.environment = environment
509        self.drivers = drivers
510        self.thread_id = thread_id
511        self.request = None
512        self.result = None
513
514    def set_state(self, state):
515        self.state = state
516
517    def get_state(self):
518        return self.state
519
520    def set_request(self, request):
521        self.request = request
522
523    def get_request(self):
524        return self.request
525
526    def set_result(self, result):
527        self.result = result
528
529    def get_result(self):
530        return self.result
531
532    def get_environment(self):
533        return self.environment
534
535    def get_thread_id(self):
536        return self.thread_id
537
538    def get_drivers(self):
539        return self.drivers
540