1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import logging 20import os 21import sys 22import time 23import threading 24import queue 25from logging.handlers import RotatingFileHandler 26 27from _core.constants import LogMode 28from _core.constants import LogType 29from _core.plugin import Plugin 30from _core.plugin import get_plugin 31from _core.exception import ParamError 32 33 34__all__ = ["Log", "platform_logger", "device_logger", "shutdown", 35 "add_task_file_handler", "remove_task_file_handler", 36 "change_logger_level", 37 "add_encrypt_file_handler", "remove_encrypt_file_handler", 38 "redirect_driver_log_begin", "redirect_driver_log_end"] 39 40_HANDLERS = [] 41_LOGGERS = [] 42MAX_LOG_LENGTH = 20 * 1024 * 1024 43MAX_ENCRYPT_LOG_LENGTH = 5 * 1024 * 1024 44MAX_LOG_NUMS = 1000 45MAX_LOG_CACHE_SIZE = 10 46 47 48def _new_file_handler(log_file, log_level, mode="a"): 49 from xdevice import Variables 50 handler = RotatingFileHandler(log_file, mode=mode, maxBytes=MAX_LOG_LENGTH, 51 backupCount=MAX_LOG_NUMS, encoding="utf-8") 52 handler.setFormatter(logging.Formatter(Variables.report_vars.log_format)) 53 handler.setLevel(log_level) 54 return handler 55 56 57def _query_log_level(): 58 log_level = getattr(sys, "log_level", logging.INFO) if hasattr( 59 sys, "log_level") else logging.DEBUG 60 return log_level 61 62 63class DriverLogFilter(logging.Filter): 64 65 def __init__(self, thread_id): 66 super().__init__() 67 self.thread_id = thread_id 68 69 def filter(self, record): 70 return record.thread == self.thread_id 71 72 73class SchedulerLogFilter(logging.Filter): 74 75 def __init__(self): 76 super().__init__() 77 self.driver_thread_ids = [] 78 79 def filter(self, record): 80 return record.thread not in self.driver_thread_ids 81 82 def add_driver_thread_id(self, thread_id): 83 self.driver_thread_ids.append(thread_id) 84 85 def del_driver_thread_id(self, thread_id): 86 self.driver_thread_ids.remove(thread_id) 87 88 89class Log: 90 task_file_handler = None 91 92 def __init__(self): 93 self.level = logging.INFO 94 self.handlers = [] 95 self.loggers = {} 96 self.task_file_filter = None 97 self.task_file_handler = None 98 self.encrypt_file_handler = None 99 self.driver_log_handler = {} 100 self._lock = threading.Lock() 101 102 def __initial__(self, log_handler_flag, log_file=None, level=None, 103 log_format=None): 104 if _LOGGERS: 105 return 106 107 self.handlers = [] 108 if log_file and "console" in log_handler_flag: 109 file_handler = RotatingFileHandler( 110 log_file, mode="a", maxBytes=MAX_LOG_LENGTH, backupCount=MAX_LOG_NUMS, 111 encoding="UTF-8") 112 file_handler.setFormatter(logging.Formatter(log_format)) 113 self.handlers.append(file_handler) 114 if "console" in log_handler_flag \ 115 and getattr(sys, LogMode.name, LogMode.default) != LogMode.no_console: 116 stream_handler = logging.StreamHandler(sys.stdout) 117 stream_handler.setFormatter(logging.Formatter(log_format)) 118 self.handlers.append(stream_handler) 119 120 if level: 121 self.level = level 122 self.loggers = {} 123 self.task_file_handler = None 124 _HANDLERS.extend(self.handlers) 125 126 def set_level(self, level): 127 self.level = level 128 129 def __logger__(self, name=None): 130 if not name: 131 return _init_global_logger(name) 132 elif name in self.loggers: 133 return self.loggers.get(name) 134 else: 135 log = self.loggers.setdefault(name, FrameworkLog(name)) 136 _LOGGERS.append(log) 137 log.add_platform_level(self.level) 138 for handler in self.handlers: 139 log.add_platform_handler(handler) 140 if self.task_file_handler: 141 log.add_task_log(self.task_file_handler) 142 return log 143 144 def add_driver_log_handler(self, thread_id, log_file): 145 log_dir = os.path.dirname(log_file) 146 if not os.path.exists(log_dir): 147 os.makedirs(log_dir) 148 try: 149 self._lock.acquire() 150 # 1.仅输出当前驱动执行线程的日志到文件 151 log_level = _query_log_level() 152 handler = _new_file_handler(log_file, log_level) 153 handler.addFilter(DriverLogFilter(thread_id)) 154 # 2.添加驱动执行线程的日志处理器,使得新建的日志对象可用 155 self.handlers.append(handler) 156 # 3.为已有的日志对象添加驱动执行线程日志处理器 157 for _, log in self.loggers.items(): 158 log.add_task_log(handler) 159 # 4.为调度日志对象记录器,添加当前执行线程的日志过滤标识 160 if self.task_file_filter is not None \ 161 and isinstance(self.task_file_filter, SchedulerLogFilter): 162 self.task_file_filter.add_driver_thread_id(thread_id) 163 self.driver_log_handler.setdefault(thread_id, handler) 164 finally: 165 self._lock.release() 166 167 def del_driver_log_handler(self, thread_id): 168 try: 169 self._lock.acquire() 170 # 为调度日志对象记录器,移除当前执行线程的日志过滤标识 171 if self.task_file_filter is not None \ 172 and isinstance(self.task_file_filter, SchedulerLogFilter): 173 self.task_file_filter.del_driver_thread_id(thread_id) 174 if thread_id in self.driver_log_handler.keys(): 175 # 关闭驱动执行线程日志处理器 176 handler = self.driver_log_handler.pop(thread_id) 177 handler.close() 178 # 移除驱动执行线程日志处理器 179 self.handlers.remove(handler) 180 # 为已有的日志对象,移除驱动执行线程日志处理器 181 for _, log in self.loggers.items(): 182 log.remove_task_log(handler, False) 183 finally: 184 self._lock.release() 185 186 def add_task_file_handler(self, log_file): 187 self.task_file_filter = SchedulerLogFilter() 188 log_level = _query_log_level() 189 self.task_file_handler = _new_file_handler(log_file, log_level) 190 self.task_file_handler.addFilter(self.task_file_filter) 191 for _, log in self.loggers.items(): 192 log.add_task_log(self.task_file_handler) 193 194 def remove_task_file_handler(self): 195 if self.task_file_handler is None: 196 return 197 for _, log in self.loggers.items(): 198 log.remove_task_log(self.task_file_handler, True) 199 self.task_file_handler.close() 200 self.task_file_handler = None 201 202 def add_encrypt_file_handler(self, log_file): 203 from xdevice import Variables 204 205 file_handler = \ 206 EncryptFileHandler(log_file, mode="ab", 207 max_bytes=MAX_ENCRYPT_LOG_LENGTH, 208 backup_count=MAX_LOG_NUMS, encoding="utf-8") 209 file_handler.setFormatter(logging.Formatter( 210 Variables.report_vars.log_format)) 211 self.encrypt_file_handler = file_handler 212 for _, log in self.loggers.items(): 213 log.add_encrypt_log(self.encrypt_file_handler) 214 215 def remove_encrypt_file_handler(self): 216 if self.encrypt_file_handler is None: 217 return 218 for _, log in self.loggers.items(): 219 log.remove_encrypt_log(self.encrypt_file_handler) 220 self.encrypt_file_handler.close() 221 self.encrypt_file_handler = None 222 223 224class FrameworkLog: 225 226 def __init__(self, name): 227 self.name = name 228 self.platform_log = logging.Logger(name) 229 self.task_log = None 230 self.encrypt_log = None 231 self.driver_log = None 232 233 def set_level(self, level): 234 # apply to dynamic change logger level, and 235 # only change the level of platform log 236 cache = getattr(self.platform_log, "_cache", None) 237 if cache and isinstance(cache, dict): 238 cache.clear() 239 self.platform_log.setLevel(level) 240 241 def add_platform_handler(self, handler): 242 self.platform_log.addHandler(handler) 243 244 def del_platform_handler(self, handler): 245 self.platform_log.removeHandler(handler) 246 247 def add_platform_level(self, level): 248 self.platform_log.setLevel(level) 249 250 def add_task_log(self, handler): 251 if not self.task_log: 252 self.task_log = logging.Logger(self.name) 253 log_level = _query_log_level() 254 self.task_log.setLevel(log_level) 255 self.task_log.addHandler(handler) 256 257 def remove_task_log(self, handler, is_destroy=False): 258 if not self.task_log: 259 return 260 self.task_log.removeHandler(handler) 261 if is_destroy: 262 self.task_log = None 263 264 def add_encrypt_log(self, handler): 265 if self.encrypt_log: 266 return 267 self.encrypt_log = logging.Logger(self.name) 268 log_level = _query_log_level() 269 self.encrypt_log.setLevel(log_level) 270 self.encrypt_log.addHandler(handler) 271 272 def remove_encrypt_log(self, handler): 273 if not self.encrypt_log: 274 return 275 self.encrypt_log.removeHandler(handler) 276 self.encrypt_log = None 277 278 def info(self, msg, *args, **kwargs): 279 additional_output = self._get_additional_output(**kwargs) 280 updated_msg = self._update_msg(additional_output, msg) 281 self.platform_log.info(updated_msg, *args) 282 if self.task_log: 283 self.task_log.info(updated_msg, *args) 284 if self.encrypt_log: 285 self.encrypt_log.info(updated_msg, *args) 286 287 def debug(self, msg, *args, **kwargs): 288 additional_output = self._get_additional_output(**kwargs) 289 updated_msg = self._update_msg(additional_output, msg) 290 from _core.report.encrypt import check_pub_key_exist 291 if not check_pub_key_exist(): 292 self.platform_log.debug(updated_msg, *args) 293 if self.task_log: 294 self.task_log.debug(updated_msg, *args) 295 else: 296 if self.encrypt_log: 297 self.encrypt_log.debug(updated_msg, *args) 298 299 def error(self, msg, *args, **kwargs): 300 error_no = kwargs.get("error_no", "00000") 301 additional_output = self._get_additional_output(error_no, **kwargs) 302 updated_msg = self._update_msg(additional_output, msg) 303 304 self.platform_log.error(updated_msg, *args) 305 if self.task_log: 306 self.task_log.error(updated_msg, *args) 307 if self.encrypt_log: 308 self.encrypt_log.error(updated_msg, *args) 309 310 def warning(self, msg, *args, **kwargs): 311 additional_output = self._get_additional_output(**kwargs) 312 updated_msg = self._update_msg(additional_output, msg) 313 314 self.platform_log.warning(updated_msg, *args) 315 if self.task_log: 316 self.task_log.warning(updated_msg, *args) 317 if self.encrypt_log: 318 self.encrypt_log.warning(updated_msg, *args) 319 320 def exception(self, msg, *args, **kwargs): 321 error_no = kwargs.get("error_no", "00000") 322 exc_info = kwargs.get("exc_info", True) 323 if exc_info is not True and exc_info is not False: 324 exc_info = True 325 additional_output = self._get_additional_output(error_no, **kwargs) 326 updated_msg = self._update_msg(additional_output, msg) 327 328 self.platform_log.exception(updated_msg, exc_info=exc_info, *args) 329 if self.task_log: 330 self.task_log.exception(updated_msg, exc_info=exc_info, *args) 331 if self.encrypt_log: 332 self.encrypt_log.exception(updated_msg, exc_info=exc_info, *args) 333 334 @classmethod 335 def _update_msg(cls, additional_output, msg): 336 msg = "[{}]".format(msg) if msg else msg 337 if msg and additional_output: 338 msg = "{} [{}]".format(msg, additional_output) 339 return msg 340 341 def _get_additional_output(self, error_number=None, **kwargs): 342 dict_str = self._get_dict_str(**kwargs) 343 if error_number: 344 additional_output = "ErrorNo=%s" % error_number 345 else: 346 return dict_str 347 348 if dict_str: 349 additional_output = "%s, %s" % (additional_output, dict_str) 350 return additional_output 351 352 @classmethod 353 def _get_dict_str(cls, **kwargs): 354 dict_str = "" 355 for key, value in kwargs.items(): 356 if key in ["error_no", "exc_info"]: 357 continue 358 dict_str = "%s%s=%s, " % (dict_str, key, value) 359 if dict_str: 360 dict_str = dict_str[:-2] 361 return dict_str 362 363 364def platform_logger(name=None): 365 plugins = get_plugin(Plugin.LOG, LogType.tool) 366 for log_plugin in plugins: 367 if log_plugin.get_plugin_config().enabled: 368 return log_plugin.__logger__(name) 369 return _init_global_logger(name) 370 371 372def device_logger(name=None): 373 plugins = get_plugin(Plugin.LOG, LogType.device) 374 for log_plugin in plugins: 375 if log_plugin.get_plugin_config().enabled: 376 return log_plugin.__logger__(name) 377 return _init_global_logger(name) 378 379 380def shutdown(): 381 # logging will be shutdown automatically, when the program exits. 382 # This function is used by testing. 383 for log in _LOGGERS: 384 for handler in log.handlers: 385 log.removeHandler(handler) 386 for handler in _HANDLERS: 387 handler.close() 388 _HANDLERS.clear() 389 _LOGGERS.clear() 390 391 392def redirect_driver_log_begin(thread_id, log_file): 393 plugins = get_plugin(Plugin.LOG, LogType.tool) 394 for log_plugin in plugins: 395 if log_plugin.get_plugin_config().enabled: 396 log_plugin.add_driver_log_handler(thread_id, log_file) 397 398 399def redirect_driver_log_end(thread_id): 400 plugins = get_plugin(Plugin.LOG, LogType.tool) 401 for log_plugin in plugins: 402 if log_plugin.get_plugin_config().enabled: 403 log_plugin.del_driver_log_handler(thread_id) 404 405 406def add_task_file_handler(log_file=None): 407 if log_file is None: 408 return 409 plugins = get_plugin(Plugin.LOG, LogType.tool) 410 for log_plugin in plugins: 411 if log_plugin.get_plugin_config().enabled: 412 log_plugin.add_task_file_handler(log_file) 413 414 415def remove_task_file_handler(): 416 plugins = get_plugin(Plugin.LOG, LogType.tool) 417 for log_plugin in plugins: 418 if log_plugin.get_plugin_config().enabled: 419 log_plugin.remove_task_file_handler() 420 421 422def add_encrypt_file_handler(log_file=None): 423 if log_file is None: 424 return 425 plugins = get_plugin(Plugin.LOG, LogType.tool) 426 for log_plugin in plugins: 427 if log_plugin.get_plugin_config().enabled: 428 log_plugin.add_encrypt_file_handler(log_file) 429 430 431def remove_encrypt_file_handler(): 432 plugins = get_plugin(Plugin.LOG, LogType.tool) 433 for log_plugin in plugins: 434 if log_plugin.get_plugin_config().enabled: 435 log_plugin.remove_encrypt_file_handler() 436 437 438def _init_global_logger(name=None): 439 handler = logging.StreamHandler(sys.stdout) 440 log_format = \ 441 "[%(asctime)s] [%(thread)d] [%(name)s] [%(levelname)s] [%(message)s]" 442 handler.setFormatter(logging.Formatter(log_format)) 443 log = FrameworkLog(name) 444 log.platform_log.setLevel(logging.INFO) 445 log.platform_log.addHandler(handler) 446 return log 447 448 449def change_logger_level(leve_dict): 450 level_map = {"debug": logging.DEBUG, "info": logging.INFO} 451 if "console" in leve_dict.keys(): 452 level = leve_dict["console"] 453 if not level: 454 return 455 if str(level).lower() in level_map.keys(): 456 logger_level = level_map.get(str(level).lower(), logging.INFO) 457 458 # change level of loggers which will to be instantiated. 459 # Actually, it changes the level attribute in ToolLog, 460 # which will be used when instantiating the FrameLog object. 461 plugins = get_plugin(Plugin.LOG, LogType.tool) 462 for log_plugin in plugins: 463 log_plugin.set_level(logger_level) 464 # change level of loggers which have instantiated 465 for logger in _LOGGERS: 466 if getattr(logger, "setLevel", None): 467 logger.setLevel(logger_level) 468 elif getattr(logger, "set_level", None): 469 logger.set_level(logger_level) 470 471 if "file" in leve_dict.keys(): 472 level = leve_dict["file"] 473 if not level: 474 return 475 if str(level).lower() in level_map.keys(): 476 logger_level = level_map.get(str(level).lower(), logging.INFO) 477 setattr(sys, "log_level", logger_level) 478 479 480class EncryptFileHandler(RotatingFileHandler): 481 482 def __init__(self, filename, mode='ab', max_bytes=0, backup_count=0, 483 encoding=None, delay=False): 484 super().__init__(filename, mode, max_bytes, backup_count, encoding, delay) 485 self.mode = mode 486 self.encrypt_error = None 487 488 def _open(self): 489 if not self.mode == "ab": 490 self.mode = "ab" 491 492 # baseFilename is the attribute in FileHandler 493 base_file_name = getattr(self, "baseFilename", None) 494 return open(base_file_name, self.mode, encoding=self.encoding) 495 496 def emit(self, record): 497 try: 498 if not self._encrypt_valid(): 499 return 500 501 # shouldRoller and doRoller is the method in RotatingFileHandler 502 should_rollover = getattr(self, "shouldRollover", None) 503 if callable(should_rollover) and should_rollover(record): 504 self.doRollover() 505 506 # stream is the attribute in StreamHandler 507 if not getattr(self, "stream", None): 508 setattr(self, "stream", self._open()) 509 msg = self.format(record) 510 stream = getattr(self, "stream", self._open()) 511 stream.write(msg) 512 self.flush() 513 except RecursionError as error: # pylint:disable=undefined-variable 514 raise error 515 516 def _encrypt_valid(self): 517 from _core.report.encrypt import check_pub_key_exist 518 if check_pub_key_exist() and not self.encrypt_error: 519 return True 520 return False 521 522 def format(self, record): 523 """ 524 Customize the implementation method. If the log format of the 525 framework changes, update the return value format of the method 526 in a timely manner. 527 :param record: logging.LogRecord 528 :return: bytes 529 """ 530 from _core.report.encrypt import do_rsa_encrypt 531 create_time = "{},{}".format( 532 time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(record.created)), 533 "{:0>3d}".format(int("%d" % record.msecs))) 534 name = record.name 535 level_name = record.levelname 536 msg = record.msg 537 if msg and "%s" in msg: 538 msg = msg % record.args 539 info = "[%s] [%s] [%s] [%s] %s%s" \ 540 % (create_time, threading.current_thread().ident, name, 541 level_name, msg, "\n") 542 543 try: 544 return do_rsa_encrypt(info) 545 except ParamError as error: 546 error_no_str = \ 547 "ErrorNo={}".format(getattr(error, "error_no", "00113")) 548 info = "[%s] [%s] [%s] [%s] [%s] [%s]\n" % ( 549 create_time, threading.current_thread().ident, 550 name, "ERROR", error, error_no_str) 551 self.encrypt_error = bytes(info, "utf-8") 552 return self.encrypt_error 553 554 555class LogQueue: 556 log = None 557 max_size = 0 558 queue_debug = None 559 queue_info = None 560 561 def __init__(self, log, max_size=MAX_LOG_CACHE_SIZE): 562 self.log = log 563 self.max_size = max_size 564 self.queue_info = queue.Queue(maxsize=self.max_size) 565 self.queue_debug = queue.Queue(maxsize=self.max_size) 566 self.queue_error = queue.Queue(maxsize=self.max_size) 567 568 def _put(self, log_queue, log_data, clear): 569 is_print = False 570 result_data = "" 571 if log_queue.full() or clear: 572 # make sure the last one print 573 if log_queue.qsize() > 0: 574 is_print = True 575 result_data = "{}\n".format(log_queue.get()) 576 else: 577 result_data = "" 578 if log_data != "": 579 log_queue.put(log_data) 580 while not log_queue.empty(): 581 is_print = True 582 result_data = "{} [{}] {}\n".format(result_data, threading.current_thread().ident, log_queue.get()) 583 else: 584 if log_data != "": 585 log_queue.put(log_data) 586 return is_print, result_data 587 588 def info(self, log_data, clear=False): 589 is_print, result_data = self._put(self.queue_info, log_data, clear) 590 if is_print: 591 self.log.info(result_data) 592 593 def debug(self, log_data, clear=False): 594 is_print, result_data = self._put(self.queue_debug, log_data, clear) 595 if is_print: 596 self.log.debug(result_data) 597 598 def error(self, log_data, clear=False): 599 is_print, result_data = self._put(self.queue_error, log_data, clear) 600 if is_print: 601 self.log.error(result_data) 602 603 def clear(self): 604 self.info(log_data="", clear=True) 605 self.debug(log_data="", clear=True) 606 self.error(log_data="", clear=True) 607