1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from _core.logger import platform_logger 20 21from _core.context.single import SingleType 22from _core.context.service import ContextImpl 23from _core.context.channel import CommandQueue 24 25__all__ = ['Context'] 26 27LOG = platform_logger("Center") 28 29 30class CompatibleSession: 31 upload_address: str = "" 32 task_type: str = "" 33 task_name: str = "" 34 mode: str = "" # 当前模式 35 proxy = None # 外部网络 36 device_labels = [] 37 38 39def check_params(param): 40 attrs = ["upload_address", "task_type", "task_name", "mode", "device_labels"] 41 session = None 42 for attr in attrs: 43 result = getattr(param, attr, None) 44 if not result: 45 continue 46 session = CompatibleSession() 47 session.upload_address = param.upload_address 48 session.task_type = param.task_type 49 session.task_name = param.task_name 50 session.mode = param.mode 51 session.proxy = param.proxy 52 session.device_labels = param.device_labels 53 break 54 return session 55 56 57class Context(metaclass=SingleType): 58 _session = None 59 60 @staticmethod 61 def session(): 62 if Context._session: 63 return Context._session 64 from xdevice import Scheduler 65 result = check_params(Scheduler) 66 if result: 67 Context._session = result 68 else: 69 Context._session = ContextImpl().get_session_info() 70 return Context._session 71 72 @staticmethod 73 def command_queue(): 74 return CommandQueue() 75 76 @staticmethod 77 def is_executing(): 78 if not ContextImpl().has_attr("xDevice"): 79 return True 80 return ContextImpl().read_attr("xDevice") 81 82 @staticmethod 83 def set_execute_status(status: bool): 84 ContextImpl().write_attr("xDevice", status) 85 86 @staticmethod 87 def is_task_executing(): 88 if Context.get_scheduler(): 89 return Context.get_scheduler().is_executing() 90 return False 91 92 @staticmethod 93 def get_scheduler(): 94 if ContextImpl().get_active_state(): 95 scheduler = ContextImpl().get_active_state().get_active_scheduler() 96 if scheduler: 97 return scheduler 98 LOG.warning("Scheduler unloaded") 99 return None 100 101 @staticmethod 102 def terminate_cmd_exec(): 103 ContextImpl().write_attr("xDevice", False) 104 if Context.get_scheduler(): 105 return Context.get_scheduler().terminate_cmd_exec 106 return None 107