1import json 2import math 3import os.path 4import sqlite3 5import logging 6 7import pytest 8 9 10class Test: 11 12 @pytest.mark.parametrize('setup_teardown', [None], indirect=True) 13 def test(self, setup_teardown, device): 14 return 15 check_list_file = os.path.join(device.resource_path, 'apl_check_list.json') 16 logging.info('reading {}'.format(check_list_file)) 17 json_data = json.load(open(check_list_file, 'r')) 18 whitelist_dict = {} 19 for data in json_data: 20 whitelist_dict.update({data.get('bundle&processName'): int(data.get('apl'))}) 21 assert whitelist_dict, 'check list is empty' 22 23 logging.info('exporting access_token.db') 24 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db') 25 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal') 26 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm') 27 db_file = os.path.join(device.report_path, 'access_token.db') 28 assert os.path.exists(db_file), '{} not exist'.format(db_file) 29 30 logging.info('querying from hap_token_info_table') 31 hap_apl_result = self.query_records(db_file, 'select bundle_name,apl from hap_token_info_table') 32 assert hap_apl_result, 'hap_token_info_table is empty' 33 logging.info('querying native_token_info_table') 34 native_apl_result = self.query_records(db_file, 'select process_name,apl from native_token_info_table') 35 assert native_apl_result, 'native_token_info_table is empty' 36 37 logging.info('hap apl checking') 38 hap_check_rst = self.compare_db_with_whitelist(hap_apl_result, whitelist_dict, 1) 39 logging.info('native apl checking') 40 native_check_rst = self.compare_db_with_whitelist(native_apl_result, whitelist_dict, 2) 41 assert hap_check_rst, 'hap apl check failed' 42 assert native_check_rst, 'native apl check failed' 43 44 @staticmethod 45 def query_records(db_file, sql): 46 conn = sqlite3.connect(db_file) 47 assert conn, 'sqlit database connect failed' 48 cursor = conn.cursor() 49 cursor.execute(sql) 50 results = cursor.fetchall() 51 conn.close() 52 if not results: 53 return 54 result_dict = {} 55 for line in results: 56 key = line[0] 57 value = 0 if math.isnan(line[1]) else line[1] 58 result_dict.update({key: value}) 59 return result_dict 60 61 @staticmethod 62 def compare_db_with_whitelist(db_data: dict, whitelist_dict: dict, basic_value): 63 """ 64 数据库和白名单对比 65 :param db_data: 66 :param whitelist_dict: 67 :param basic_value: 68 :return: 69 """ 70 check_rst = True 71 for key, apl in db_data.items(): 72 if key not in whitelist_dict.keys(): 73 continue 74 if apl <= basic_value: 75 continue 76 is_pass = whitelist_dict[key] == apl 77 if not is_pass: 78 logging.info('bundleName/processName = {} apl = {} | check failed'.format(key, apl)) 79 check_rst = False 80 else: 81 logging.info('bundleName/processName = {} apl = {} | check pass'.format(key, apl)) 82 return check_rst 83