• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import json
2import math
3import os.path
4import sqlite3
5import logging
6
7import pytest
8
9
10class Test:
11
12    @pytest.mark.parametrize('setup_teardown', [None], indirect=True)
13    def test(self, setup_teardown, device):
14        #return
15        check_list_file = os.path.join(device.resource_path, 'apl_check_list.json')
16        logging.info('reading {}'.format(check_list_file))
17        json_data = json.load(open(check_list_file, 'r'))
18        whitelist_dict = {}
19        for data in json_data:
20            whitelist_dict.update({data.get('bundle&processName'): int(data.get('apl'))})
21        assert whitelist_dict, 'check list is empty'
22
23        logging.info('exporting access_token.db')
24        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db')
25        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal')
26        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm')
27        db_file = os.path.join(device.report_path, 'access_token.db')
28        assert os.path.exists(db_file), '{} not exist'.format(db_file)
29
30        logging.info('exporting nativetoken.json')
31        SA_INFO_FILE = "/data/service/el0/access_token/nativetoken.json"
32        device.hdc_file_recv(SA_INFO_FILE)
33        sa_info_file = os.path.join(device.report_path, 'nativetoken.json')
34        assert os.path.exists(sa_info_file), '{} not exist'.format(sa_info_file)
35
36        logging.info('querying from hap_token_info_table')
37        hap_apl_result = self.query_records(db_file, 'select bundle_name,apl from hap_token_info_table')
38        assert hap_apl_result, 'hap_token_info_table is empty'
39        logging.info('querying native_token_info_table')
40        native_apl_result = self.get_sa_info(sa_info_file)
41        assert native_apl_result, 'native_token_info_table is empty'
42
43        logging.info('hap apl checking')
44        hap_check_rst = self.compare_db_with_whitelist(hap_apl_result, whitelist_dict, 1)
45        logging.info('native apl checking')
46        native_check_rst = self.compare_db_with_whitelist(native_apl_result, whitelist_dict, 2)
47        assert hap_check_rst, 'hap apl check failed'
48        assert native_check_rst, 'native apl check failed'
49
50    @staticmethod
51    def get_sa_info(sa_info_file):
52        result_dict = {}
53        with open(sa_info_file, 'r') as file:
54            data = json.load(file)
55            for item in data:
56                processName = item.get('processName')
57                APL = item.get('APL')
58                result_dict.update({processName: APL})
59        return result_dict
60
61    @staticmethod
62    def query_records(db_file, sql):
63        conn = sqlite3.connect(db_file)
64        assert conn, 'sqlit database connect failed'
65        cursor = conn.cursor()
66        cursor.execute(sql)
67        results = cursor.fetchall()
68        conn.close()
69        if not results:
70            return
71        result_dict = {}
72        for line in results:
73            key = line[0]
74            value = 0 if math.isnan(line[1]) else line[1]
75            result_dict.update({key: value})
76        return result_dict
77
78    @staticmethod
79    def compare_db_with_whitelist(db_data: dict, whitelist_dict: dict, basic_value):
80        """
81        数据库和白名单对比
82        :param db_data:
83        :param whitelist_dict:
84        :param basic_value:
85        :return:
86        """
87        check_rst = True
88        for key, apl in db_data.items():
89            if key not in whitelist_dict.keys():
90                continue
91            if apl <= basic_value:
92                continue
93            is_pass = whitelist_dict[key] == apl
94            if not is_pass:
95                logging.info('bundleName/processName = {} apl = {} | check failed'.format(key, apl))
96                check_rst = False
97            else:
98                logging.info('bundleName/processName = {} apl = {} | check pass'.format(key, apl))
99        return check_rst
100