1# -*- coding: utf-8 -*- 2# Copyright (c) 2023 Huawei Device Co., Ltd. 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14#!/usr/bin/python3 15import time 16import sys 17import os 18sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep) 19from read_device import * 20from read_excel import * 21from apl_config import * 22 23def whitelist_check(apl, value, fields_from_whitelist): 24 # True 包含在白名单内 25 check = value in fields_from_whitelist.keys() 26 is_pass = False 27 if check and str(apl) == fields_from_whitelist[value]: 28 is_pass = True 29 return is_pass 30 31def compare_hap_apl(fields_from_device, fields_from_whitelist): 32 records = [] 33 log_tag = 'compare_hap_apl' 34 hap_check = True 35 for value in fields_from_device: 36 apl = fields_from_device[value][0] 37 if apl > 1: 38 is_pass = whitelist_check(apl, value, fields_from_whitelist) 39 info = 'bundleName = {} apl = {}'.format(value, str(apl)) 40 if is_pass == False: 41 hap_check = False 42 # info = value 43 # info = 'bundleName = {} apl = {}'.format(value, str(apl)) 44 log_content = apl_set_log_content(LogLevel(1).name, log_tag, info) 45 records.append(log_content) 46 else: 47 apl_set_log_content(LogLevel(2).name, log_tag, info) 48 return records, hap_check 49 50def compare_native_apl(fields_from_device, fields_from_whitelist): 51 records = [] 52 log_tag = 'compare_native_apl' 53 native_check = True 54 for value in fields_from_device: 55 apl = fields_from_device[value][0] 56 if apl > 2: 57 info = 'processName = {} apl = {}'.format(value, str(apl)) 58 is_pass = whitelist_check(apl, value, fields_from_whitelist) 59 if is_pass == False: 60 native_check = False 61 log_content = apl_set_log_content(LogLevel(1).name, log_tag, info) 62 records.append(log_content) 63 else: 64 apl_set_log_content(LogLevel(2).name, log_tag, info) 65 return records, native_check 66 67def fields_compare_write_once(fields_from_device,fields_from_excel): 68 records=[] 69 for bundle_name in fields_from_device.keys(): 70 if bundle_name not in fields_from_excel.keys(): 71 record=(bundle_name,ErrorType(1).name) 72 records.append(record) 73 continue 74 75 fields=fields_from_device[bundle_name] 76 standard_fields=fields_from_excel[bundle_name] 77 if not isInvalid(fields,standard_fields): 78 record=(bundle_name,ErrorType(2).name) 79 records.append(record) 80 print('Compare successful!') 81 return records 82 83 84def isInvalid(fields,standard_fields): 85 if len(fields) == 1: 86 return fields[0] <= standard_fields[0] 87 88 for field, standard_field in fields, standard_fields: 89 if field>standard_field: 90 return False 91 return True 92 93def write_record(name,error): 94 try: 95 file = open(APL_RECORD_PATH,'a') 96 err_record = set_error_record(name, error) 97 file.write(err_record) 98 file.close() 99 except Exception as e: 100 log_content=apl_set_log_content(str(e)) 101 apl_log(log_content) 102 103def write_record_once(err_records,is_overwrite): 104 try: 105 file=open(APL_RECORD_PATH,is_overwrite) 106 for record in err_records: 107 err_record = set_error_record(record[0],record[1]) 108 file.write(err_record) 109 file.close() 110 except Exception as e: 111 log_content=apl_set_log_content(str(e)) 112 apl_log(log_content) 113 114def excel_thread(): 115 try: 116 # settings={ 117 # ' svn': SVN, 118 # 'url': url_encode(SVN_URL), 119 # 'user': USER, 120 # 'pwd': PWD, 121 # 'dir': FILE_PATH, 122 # } 123 # excel_file = FILE_PATH #svn_checkout(settings) 124 log_tag = 'excel_thread' 125 # if excel_file == None: 126 # apl_set_log_content(LogLevel(2).name, log_tag, 'svn_checkoutc failed') #raise 127 # apl_from_excel = read_excel(excel_file, sheet = SHEET_NAME, cols = COLS) 128 # path = PATH + 'APL基线标准v1.0.json' 129 path = PATH + 'temp.json' 130 apl_from_json = read_json(path) 131 return apl_from_json 132 except Exception as e: 133 apl_set_log_content(LogLevel(1).name, log_tag, 'excel_thread catch error: {}'.format(e.args[0])) 134 return None 135 136def sql_thread(sn, sn2): 137 log_tag = 'sql_thread' 138 try: 139 print(DOWNLOAD_DB.format(sn)+' ' + SQL_SRC + ' ' + SQL_DES) 140 print() 141 sql_file = download_from_device(DOWNLOAD_DB.format(sn), SQL_SRC, SQL_DES) 142 if sql_file == None: 143 raise 144 query_hap_apl_thread = AplCompareThread(query_hap_apl, (sql_file, QUERY_HAP_APL)) 145 query_native_apl_thread = AplCompareThread(query_native_apl, (sql_file, QUERY_NATIVE_APL)) 146 147 query_hap_apl_thread.start() 148 query_native_apl_thread.start() 149 150 query_hap_apl_thread.join() 151 query_native_apl_thread.join() 152 153 hap_apl_map = query_hap_apl_thread.get_result() 154 native_apl_map = query_native_apl_thread.get_result() 155 156 return hap_apl_map, native_apl_map 157 except: 158 apl_set_log_content(LogLevel(1).name, log_tag, 'download_from_device failed') 159 return None,None 160 161def apl_check_main(sn): 162 try: 163 log_tag = 'Main' 164 apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check Begin!--------') 165 excel_thr = AplCompareThread(excel_thread) 166 sql_thr = AplCompareThread(sql_thread, (sn, sn)) 167 168 excel_thr.start() 169 sql_thr.start() 170 171 excel_thr.join() 172 sql_thr.join() 173 174 apl_from_excel = excel_thr.get_result() 175 hap_apl_map, native_apl_map = sql_thr.get_result() 176 177 if apl_from_excel == None or hap_apl_map == None or native_apl_map == None: 178 raise 179 hap_results, hap_check = compare_hap_apl(hap_apl_map, apl_from_excel) 180 native_results, native_check = compare_native_apl(native_apl_map, apl_from_excel) 181 write_record_once(hap_results, IS_OVERWRITE) 182 write_record_once(native_results, 'a') 183 if native_check == False or hap_check == False: 184 apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = {}, native = {}] --------'.format(hap_check, native_check)) 185 apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check End! --------') 186 except Exception as e: 187 apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = False, native = False] --------') 188 apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.args[0])) 189 190 191if __name__ == '__main__': 192 try: 193 sn = sys.argv[1] 194 except: 195 sn_list = [] 196 result = os.popen('hdc list targets') 197 res = result.read() 198 for line in res.splitlines(): 199 sn_list.append(line) 200 sn = sn_list[0] 201 apl_check_main(sn) 202