• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright (c) 2023 Huawei Device Co., Ltd.
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#!/usr/bin/python3
15import time
16import sys
17import os
18sys.path.append(os.path.dirname(os.path.realpath(__file__)) + os.sep)
19from read_device import *
20from read_excel import *
21from apl_config import *
22
23def whitelist_check(apl, value, fields_from_whitelist):
24    # True 包含在白名单内
25    check = value in fields_from_whitelist.keys()
26    is_pass = False
27    if check and str(apl) == fields_from_whitelist[value]:
28        is_pass = True
29    return is_pass
30
31def compare_hap_apl(fields_from_device, fields_from_whitelist):
32    records = []
33    log_tag = 'compare_hap_apl'
34    hap_check = True
35    for value in fields_from_device:
36        apl = fields_from_device[value][0]
37        if apl > 1:
38            is_pass = whitelist_check(apl, value, fields_from_whitelist)
39            info = 'bundleName = {} apl = {}'.format(value, str(apl))
40            if is_pass == False:
41                hap_check = False
42                # info = value
43                # info = 'bundleName = {} apl = {}'.format(value, str(apl))
44                log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
45                records.append(log_content)
46            else:
47              apl_set_log_content(LogLevel(2).name, log_tag, info)
48    return records, hap_check
49
50def compare_native_apl(fields_from_device, fields_from_whitelist):
51    records = []
52    log_tag = 'compare_native_apl'
53    native_check = True
54    for value in fields_from_device:
55        apl = fields_from_device[value][0]
56        if apl > 2:
57            info = 'processName = {} apl = {}'.format(value, str(apl))
58            is_pass = whitelist_check(apl, value, fields_from_whitelist)
59            if is_pass == False:
60                native_check = False
61                log_content = apl_set_log_content(LogLevel(1).name, log_tag, info)
62                records.append(log_content)
63            else:
64                apl_set_log_content(LogLevel(2).name, log_tag, info)
65    return records, native_check
66
67def fields_compare_write_once(fields_from_device,fields_from_excel):
68    records=[]
69    for bundle_name in fields_from_device.keys():
70        if bundle_name not in fields_from_excel.keys():
71            record=(bundle_name,ErrorType(1).name)
72            records.append(record)
73            continue
74
75        fields=fields_from_device[bundle_name]
76        standard_fields=fields_from_excel[bundle_name]
77        if not isInvalid(fields,standard_fields):
78            record=(bundle_name,ErrorType(2).name)
79            records.append(record)
80    print('Compare successful!')
81    return records
82
83
84def isInvalid(fields,standard_fields):
85    if len(fields) == 1:
86        return fields[0] <= standard_fields[0]
87
88    for field, standard_field in fields, standard_fields:
89        if field>standard_field:
90            return False
91    return True
92
93def write_record(name,error):
94    try:
95        file = open(APL_RECORD_PATH,'a')
96        err_record = set_error_record(name, error)
97        file.write(err_record)
98        file.close()
99    except Exception as e:
100        log_content=apl_set_log_content(str(e))
101        apl_log(log_content)
102
103def write_record_once(err_records,is_overwrite):
104    try:
105        file=open(APL_RECORD_PATH,is_overwrite)
106        for record in err_records:
107            err_record = set_error_record(record[0],record[1])
108            file.write(err_record)
109        file.close()
110    except Exception as e:
111        log_content=apl_set_log_content(str(e))
112        apl_log(log_content)
113
114def excel_thread():
115    try:
116        # settings={
117        #     '	svn': SVN,
118        #     'url': url_encode(SVN_URL),
119        #     'user': USER,
120        #     'pwd': PWD,
121        #     'dir': FILE_PATH,
122        # }
123        # excel_file = FILE_PATH #svn_checkout(settings)
124        log_tag = 'excel_thread'
125        # if excel_file == None:
126        #     apl_set_log_content(LogLevel(2).name, log_tag, 'svn_checkoutc failed') #raise
127        # apl_from_excel = read_excel(excel_file, sheet = SHEET_NAME, cols = COLS)
128        # path = PATH + 'APL基线标准v1.0.json'
129        path = PATH + 'temp.json'
130        apl_from_json = read_json(path)
131        return apl_from_json
132    except Exception as e:
133        apl_set_log_content(LogLevel(1).name, log_tag, 'excel_thread catch error: {}'.format(e.args[0]))
134        return None
135
136def sql_thread(sn, sn2):
137    log_tag = 'sql_thread'
138    try:
139        print(DOWNLOAD_DB.format(sn)+' ' + SQL_SRC + ' ' + SQL_DES)
140        print()
141        sql_file = download_from_device(DOWNLOAD_DB.format(sn), SQL_SRC, SQL_DES)
142        if sql_file == None:
143            raise
144        query_hap_apl_thread = AplCompareThread(query_hap_apl, (sql_file, QUERY_HAP_APL))
145        query_native_apl_thread = AplCompareThread(query_native_apl, (sql_file, QUERY_NATIVE_APL))
146
147        query_hap_apl_thread.start()
148        query_native_apl_thread.start()
149
150        query_hap_apl_thread.join()
151        query_native_apl_thread.join()
152
153        hap_apl_map = query_hap_apl_thread.get_result()
154        native_apl_map = query_native_apl_thread.get_result()
155
156        return hap_apl_map, native_apl_map
157    except:
158        apl_set_log_content(LogLevel(1).name, log_tag, 'download_from_device failed')
159        return None,None
160
161def apl_check_main(sn):
162    try:
163        log_tag = 'Main'
164        apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check Begin!--------')
165        excel_thr = AplCompareThread(excel_thread)
166        sql_thr = AplCompareThread(sql_thread, (sn, sn))
167
168        excel_thr.start()
169        sql_thr.start()
170
171        excel_thr.join()
172        sql_thr.join()
173
174        apl_from_excel = excel_thr.get_result()
175        hap_apl_map, native_apl_map = sql_thr.get_result()
176
177        if apl_from_excel == None or hap_apl_map == None or native_apl_map == None:
178            raise
179        hap_results, hap_check = compare_hap_apl(hap_apl_map, apl_from_excel)
180        native_results, native_check = compare_native_apl(native_apl_map, apl_from_excel)
181        write_record_once(hap_results, IS_OVERWRITE)
182        write_record_once(native_results, 'a')
183        if native_check == False or hap_check == False:
184            apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = {}, native = {}] --------'.format(hap_check, native_check))
185        apl_set_log_content(LogLevel(2).name, log_tag, '--------APL Check End! --------')
186    except Exception as e:
187        apl_set_log_content(LogLevel(1).name, log_tag, '--------APL Check failed![hap = False, native = False] --------')
188        apl_set_log_content(LogLevel(1).name, log_tag, "{}".format(e.args[0]))
189
190
191if __name__ == '__main__':
192    try:
193        sn = sys.argv[1]
194    except:
195        sn_list = []
196        result = os.popen('hdc list targets')
197        res = result.read()
198        for line in res.splitlines():
199            sn_list.append(line)
200        sn = sn_list[0]
201    apl_check_main(sn)
202