1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import queue 20import time 21import uuid 22import copy 23from abc import ABC 24from abc import abstractmethod 25from typing import List 26 27from _core.error import ErrorMessage 28from _core.exception import ExecuteTerminate 29from _core.exception import LiteDeviceError 30from _core.exception import DeviceError 31from _core.context.abs import Sub 32from _core.executor.concurrent import QueueMonitorThread 33from _core.logger import platform_logger 34from _core.constants import ModeType 35 36from _core.utils import convert_mac 37from _core.interface import LifeCycle 38from _core.constants import ConfigConst 39from _core.plugin import get_plugin 40from _core.exception import ParamError 41from _core.plugin import Plugin 42from _core.constants import ListenerType 43from _core.context.result import ExecuteFinished 44from _core.constants import TestExecType 45from _core.context.center import Context 46from _core.context.handler import report_not_executed 47from _core.context.life_stage import ILifeStageListener 48from _core.context.life_stage import StageEvent 49 50LOG = platform_logger("Impl") 51 52__all__ = ["BaseScheduler"] 53 54 55class BaseScheduler(Sub, ABC): 56 _auto_retry = -1 57 _queue_monitor_thread = None 58 _channel = Context.command_queue() 59 test_number = 0 60 _stage_listeners: List[ILifeStageListener] = [] 61 62 @classmethod 63 def add_life_stage_listener(cls, listener: ILifeStageListener): 64 cls._stage_listeners.append(listener) 65 66 @classmethod 67 def notify_stage(cls, stage_event: StageEvent): 68 for listener in cls._stage_listeners: 69 listener.__on_event__(stage_event) 70 71 @classmethod 72 def remove_life_stage_listener(cls): 73 cls._stage_listeners.clear() 74 75 @classmethod 76 def __max_command_size__(cls) -> int: 77 return 50 78 79 @classmethod 80 def _start_auto_retry(cls): 81 if not cls.is_need_auto_retry(): 82 cls._auto_retry = -1 83 LOG.debug("No need auto retry") 84 return 85 if cls._auto_retry > 0: 86 cls._auto_retry -= 1 87 if cls._auto_retry == 0: 88 cls._auto_retry = -1 89 from _core.command.console import Console 90 console = Console() 91 console.command_parser("run --retry") 92 93 @classmethod 94 def _check_auto_retry(cls, options): 95 if cls._auto_retry < 0 and \ 96 int(getattr(options, ConfigConst.auto_retry, 0)) > 0: 97 value = int(getattr(options, ConfigConst.auto_retry, 0)) 98 cls._auto_retry = value if value <= 10 else 10 99 100 @classmethod 101 def _handler_repeat(cls, task) -> list: 102 drivers_list = list() 103 for index in range(1, task.config.repeat + 1): 104 repeat_list = cls._construct_repeat_list(task, index) 105 if repeat_list: 106 drivers_list.extend(repeat_list) 107 return drivers_list 108 109 @classmethod 110 def _construct_repeat_list(cls, task, index): 111 repeat_list = list() 112 for driver_index, _ in enumerate(task.test_drivers): 113 cur_test_driver = copy.deepcopy(task.test_drivers[driver_index]) 114 desc = cur_test_driver[1] 115 desc.unique_id = '{}_{}'.format(desc.unique_id, index) 116 repeat_list.append(cur_test_driver) 117 return repeat_list 118 119 def __execute__(self, task): 120 if not self._channel.is_empty(): 121 task_id = str(uuid.uuid1()).split("-")[0] 122 LOG.debug("Run command: {}".format(convert_mac(self._channel.get_last()))) 123 run_command = self._channel.pop() 124 self._channel.append((task_id, run_command, task.config.report_path)) 125 if self._channel.size() > self.__max_command_size__(): 126 self._channel.pop(0) 127 128 unavailable = 0 129 err_msg = "" 130 try: 131 unavailable, err_msg = self._check_task(task) 132 if unavailable: 133 error_message = ErrorMessage.Common.Code_0101014.format(err_msg) 134 LOG.error("Exec task error: {}".format(error_message)) 135 raise ParamError(error_message) 136 self._prepare_environment(task) 137 repeat = getattr(task.config, ConfigConst.repeat, 1) 138 if repeat > 1: 139 self.set_repeat_index(repeat) 140 task.test_drivers = self._handler_repeat(task) 141 else: 142 self.set_repeat_index(1) 143 self.test_number = len(task.test_drivers) 144 self._do_execute_(task) 145 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 146 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 147 error_no = getattr(exception, "error_no", "") 148 err_msg = "%s[%s]" % (str(exception), error_no) if error_no else str(exception) 149 error_no = error_no if error_no else "00000" 150 LOG.exception(exception, exc_info=True, error_no=error_no) 151 finally: 152 task_info = self.generate_task_report(task) 153 listeners = self.__create_listeners__(task) 154 for listener in listeners: 155 listener.__ended__(LifeCycle.TestTask, task_info, 156 test_type=task_info.test_type, task=task) 157 finished = ExecuteFinished(unavailable, err_msg) 158 self._on_execute_finished_(task, finished) 159 160 def run_in_loop(self, task, run_func, loop_finally=None): 161 try: 162 current_driver_threads = {} 163 test_drivers = task.test_drivers 164 message_queue = queue.Queue() 165 # execute test drivers 166 params = message_queue, test_drivers, current_driver_threads 167 self._queue_monitor_thread = self._start_queue_monitor(*params) 168 while test_drivers: 169 if len(current_driver_threads) > 5: 170 time.sleep(3) 171 continue 172 # clear remaining test drivers when scheduler is terminated 173 if not self.is_executing(): 174 LOG.info("Clear test drivers") 175 self._clear_not_executed(task, test_drivers) 176 break 177 # 处理监控线程 178 # get test driver and device 179 self._run(run_func, task, *params) 180 self.peek_monitor(*params) 181 test_drivers.pop(0) 182 while True: 183 if not self._queue_monitor_thread.is_alive(): 184 break 185 time.sleep(3) 186 finally: 187 if callable(loop_finally): 188 loop_finally() 189 190 def is_monitor_alive(self): 191 return self._queue_monitor_thread and self._queue_monitor_thread.is_alive() 192 193 def peek_monitor(self, message_queue, test_drivers, current_driver_threads): 194 if self.is_monitor_alive(): 195 return 196 self._start_queue_monitor(message_queue, test_drivers, current_driver_threads) 197 198 @classmethod 199 def _clear_not_executed(cls, task, test_drivers): 200 if Context.session().mode != ModeType.decc: 201 # clear all 202 test_drivers.clear() 203 return 204 # The result is reported only in DECC mode, and also clear all. 205 LOG.error("Case no run: task execution terminated!", error_no="00300") 206 error_message = "Execute Terminate[00300]" 207 report_not_executed(task.config.report_path, test_drivers, error_message) 208 test_drivers.clear() 209 210 def _run(self, run_func, task, message_queue, test_drivers, current_driver_threads): 211 if callable(run_func): 212 run_func(task, test_drivers, current_driver_threads, message_queue) 213 214 @staticmethod 215 def _start_queue_monitor(message_queue, test_drivers, 216 current_driver_threads): 217 queue_monitor_thread = QueueMonitorThread(message_queue, 218 current_driver_threads, 219 test_drivers) 220 queue_monitor_thread.daemon = True 221 queue_monitor_thread.start() 222 return queue_monitor_thread 223 224 def _on_task_prepare_(self, options): 225 self._check_auto_retry(options) 226 227 def _on_task_finished_(self): 228 from _core.context.log import RuntimeLogs 229 self._start_auto_retry() 230 RuntimeLogs.stop_task_logcat() 231 RuntimeLogs.stop_encrypt_log() 232 233 @classmethod 234 def __create_listeners__(cls, task) -> list: 235 listeners = [] 236 # append log listeners 237 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 238 for log_listener in log_listeners: 239 log_listener_instance = log_listener.__class__() 240 listeners.append(log_listener_instance) 241 # append report listeners 242 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 243 for report_listener in report_listeners: 244 report_listener_instance = report_listener.__class__() 245 setattr(report_listener_instance, "report_path", 246 task.config.report_path) 247 listeners.append(report_listener_instance) 248 # append upload listeners 249 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 250 for upload_listener in upload_listeners: 251 upload_listener_instance = upload_listener.__class__() 252 listeners.append(upload_listener_instance) 253 return listeners 254 255 @classmethod 256 def _exec_type_(cls) -> list: 257 return [TestExecType.device_test, TestExecType.host_test, TestExecType.host_driven_test] 258 259 @classmethod 260 def _check_task(cls, task): 261 error_items = [] 262 unavailable = 0 263 for des in task.root.children: 264 if des.error: 265 error_items.append(des.error.error_msg) 266 unavailable += 1 267 return unavailable, ";".join(error_items) 268 269 def _prepare_environment(self, task): 270 if getattr(task.config, ConfigConst.test_environment, ""): 271 self._reset_environment(task.config.get( 272 ConfigConst.test_environment, "")) 273 elif getattr(task.config, ConfigConst.configfile, ""): 274 self._reset_environment(config_file=task.config.get( 275 ConfigConst.configfile, "")) 276 277 @classmethod 278 def _call_terminate(cls): 279 cls.set_repeat_index(0) 280 cls._auto_retry = 0 281 return cls._terminate() 282 283 @classmethod 284 @abstractmethod 285 def _terminate(cls): 286 pass 287