• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import queue
20import time
21import uuid
22import copy
23from abc import ABC
24from abc import abstractmethod
25from typing import List
26
27from _core.error import ErrorMessage
28from _core.exception import ExecuteTerminate
29from _core.exception import LiteDeviceError
30from _core.exception import DeviceError
31from _core.context.abs import Sub
32from _core.executor.concurrent import QueueMonitorThread
33from _core.logger import platform_logger
34from _core.constants import ModeType
35
36from _core.utils import convert_mac
37from _core.interface import LifeCycle
38from _core.constants import ConfigConst
39from _core.plugin import get_plugin
40from _core.exception import ParamError
41from _core.plugin import Plugin
42from _core.constants import ListenerType
43from _core.context.result import ExecuteFinished
44from _core.constants import TestExecType
45from _core.context.center import Context
46from _core.context.handler import report_not_executed
47from _core.context.life_stage import ILifeStageListener
48from _core.context.life_stage import StageEvent
49
50LOG = platform_logger("Impl")
51
52__all__ = ["BaseScheduler"]
53
54
55class BaseScheduler(Sub, ABC):
56    _auto_retry = -1
57    _queue_monitor_thread = None
58    _channel = Context.command_queue()
59    test_number = 0
60    _stage_listeners: List[ILifeStageListener] = []
61
62    @classmethod
63    def add_life_stage_listener(cls, listener: ILifeStageListener):
64        cls._stage_listeners.append(listener)
65
66    @classmethod
67    def notify_stage(cls, stage_event: StageEvent):
68        for listener in cls._stage_listeners:
69            listener.__on_event__(stage_event)
70
71    @classmethod
72    def remove_life_stage_listener(cls):
73        cls._stage_listeners.clear()
74
75    @classmethod
76    def __max_command_size__(cls) -> int:
77        return 50
78
79    @classmethod
80    def _start_auto_retry(cls):
81        if not cls.is_need_auto_retry():
82            cls._auto_retry = -1
83            LOG.debug("No need auto retry")
84            return
85        if cls._auto_retry > 0:
86            cls._auto_retry -= 1
87            if cls._auto_retry == 0:
88                cls._auto_retry = -1
89            from _core.command.console import Console
90            console = Console()
91            console.command_parser("run --retry")
92
93    @classmethod
94    def _check_auto_retry(cls, options):
95        if cls._auto_retry < 0 and \
96                int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
97            value = int(getattr(options, ConfigConst.auto_retry, 0))
98            cls._auto_retry = value if value <= 10 else 10
99
100    @classmethod
101    def _handler_repeat(cls, task) -> list:
102        drivers_list = list()
103        for index in range(1, task.config.repeat + 1):
104            repeat_list = cls._construct_repeat_list(task, index)
105            if repeat_list:
106                drivers_list.extend(repeat_list)
107        return drivers_list
108
109    @classmethod
110    def _construct_repeat_list(cls, task, index):
111        repeat_list = list()
112        for driver_index, _ in enumerate(task.test_drivers):
113            cur_test_driver = copy.deepcopy(task.test_drivers[driver_index])
114            desc = cur_test_driver[1]
115            desc.unique_id = '{}_{}'.format(desc.unique_id, index)
116            repeat_list.append(cur_test_driver)
117        return repeat_list
118
119    def __execute__(self, task):
120        if not self._channel.is_empty():
121            task_id = str(uuid.uuid1()).split("-")[0]
122            LOG.debug("Run command: {}".format(convert_mac(self._channel.get_last())))
123            run_command = self._channel.pop()
124            self._channel.append((task_id, run_command, task.config.report_path))
125            if self._channel.size() > self.__max_command_size__():
126                self._channel.pop(0)
127
128        unavailable = 0
129        err_msg = ""
130        try:
131            unavailable, err_msg = self._check_task(task)
132            if unavailable:
133                error_message = ErrorMessage.Common.Code_0101014.format(err_msg)
134                LOG.error("Exec task error: {}".format(error_message))
135                raise ParamError(error_message)
136            self._prepare_environment(task)
137            repeat = getattr(task.config, ConfigConst.repeat, 1)
138            if repeat > 1:
139                self.set_repeat_index(repeat)
140                task.test_drivers = self._handler_repeat(task)
141            else:
142                self.set_repeat_index(1)
143            self.test_number = len(task.test_drivers)
144            self._do_execute_(task)
145        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
146                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
147            error_no = getattr(exception, "error_no", "")
148            err_msg = "%s[%s]" % (str(exception), error_no) if error_no else str(exception)
149            error_no = error_no if error_no else "00000"
150            LOG.exception(exception, exc_info=True, error_no=error_no)
151        finally:
152            task_info = self.generate_task_report(task)
153            listeners = self.__create_listeners__(task)
154            for listener in listeners:
155                listener.__ended__(LifeCycle.TestTask, task_info,
156                                   test_type=task_info.test_type, task=task)
157            finished = ExecuteFinished(unavailable, err_msg)
158            self._on_execute_finished_(task, finished)
159
160    def run_in_loop(self, task, run_func, loop_finally=None):
161        try:
162            current_driver_threads = {}
163            test_drivers = task.test_drivers
164            message_queue = queue.Queue()
165            # execute test drivers
166            params = message_queue, test_drivers, current_driver_threads
167            self._queue_monitor_thread = self._start_queue_monitor(*params)
168            while test_drivers:
169                if len(current_driver_threads) > 5:
170                    time.sleep(3)
171                    continue
172                # clear remaining test drivers when scheduler is terminated
173                if not self.is_executing():
174                    LOG.info("Clear test drivers")
175                    self._clear_not_executed(task, test_drivers)
176                    break
177                # 处理监控线程
178                # get test driver and device
179                self._run(run_func, task, *params)
180                self.peek_monitor(*params)
181                test_drivers.pop(0)
182            while True:
183                if not self._queue_monitor_thread.is_alive():
184                    break
185                time.sleep(3)
186        finally:
187            if callable(loop_finally):
188                loop_finally()
189
190    def is_monitor_alive(self):
191        return self._queue_monitor_thread and self._queue_monitor_thread.is_alive()
192
193    def peek_monitor(self, message_queue, test_drivers, current_driver_threads):
194        if self.is_monitor_alive():
195            return
196        self._start_queue_monitor(message_queue, test_drivers, current_driver_threads)
197
198    @classmethod
199    def _clear_not_executed(cls, task, test_drivers):
200        if Context.session().mode != ModeType.decc:
201            # clear all
202            test_drivers.clear()
203            return
204        # The result is reported only in DECC mode, and also clear all.
205        LOG.error("Case no run: task execution terminated!", error_no="00300")
206        error_message = "Execute Terminate[00300]"
207        report_not_executed(task.config.report_path, test_drivers, error_message)
208        test_drivers.clear()
209
210    def _run(self, run_func, task, message_queue, test_drivers, current_driver_threads):
211        if callable(run_func):
212            run_func(task, test_drivers, current_driver_threads, message_queue)
213
214    @staticmethod
215    def _start_queue_monitor(message_queue, test_drivers,
216                             current_driver_threads):
217        queue_monitor_thread = QueueMonitorThread(message_queue,
218                                                  current_driver_threads,
219                                                  test_drivers)
220        queue_monitor_thread.daemon = True
221        queue_monitor_thread.start()
222        return queue_monitor_thread
223
224    def _on_task_prepare_(self, options):
225        self._check_auto_retry(options)
226
227    def _on_task_finished_(self):
228        from _core.context.log import RuntimeLogs
229        self._start_auto_retry()
230        RuntimeLogs.stop_task_logcat()
231        RuntimeLogs.stop_encrypt_log()
232
233    @classmethod
234    def __create_listeners__(cls, task) -> list:
235        listeners = []
236        # append log listeners
237        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
238        for log_listener in log_listeners:
239            log_listener_instance = log_listener.__class__()
240            listeners.append(log_listener_instance)
241        # append report listeners
242        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
243        for report_listener in report_listeners:
244            report_listener_instance = report_listener.__class__()
245            setattr(report_listener_instance, "report_path",
246                    task.config.report_path)
247            listeners.append(report_listener_instance)
248        # append upload listeners
249        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
250        for upload_listener in upload_listeners:
251            upload_listener_instance = upload_listener.__class__()
252            listeners.append(upload_listener_instance)
253        return listeners
254
255    @classmethod
256    def _exec_type_(cls) -> list:
257        return [TestExecType.device_test, TestExecType.host_test, TestExecType.host_driven_test]
258
259    @classmethod
260    def _check_task(cls, task):
261        error_items = []
262        unavailable = 0
263        for des in task.root.children:
264            if des.error:
265                error_items.append(des.error.error_msg)
266                unavailable += 1
267        return unavailable, ";".join(error_items)
268
269    def _prepare_environment(self, task):
270        if getattr(task.config, ConfigConst.test_environment, ""):
271            self._reset_environment(task.config.get(
272                ConfigConst.test_environment, ""))
273        elif getattr(task.config, ConfigConst.configfile, ""):
274            self._reset_environment(config_file=task.config.get(
275                ConfigConst.configfile, ""))
276
277    @classmethod
278    def _call_terminate(cls):
279        cls.set_repeat_index(0)
280        cls._auto_retry = 0
281        return cls._terminate()
282
283    @classmethod
284    @abstractmethod
285    def _terminate(cls):
286        pass
287