• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''
2#  Copyright (c) 2023 Huawei Device Co., Ltd.
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6
7#      http://www.apache.org/licenses/LICENSE-2.0
8
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14'''
15
16import datetime
17import json
18import os
19import re
20import shutil
21import stat
22import subprocess
23import sys
24import time
25
26import paramiko
27
28import LinuxContains
29
30
31install_fail_dict = {}
32install_success_dict = {}
33total_hap_num = 0
34hap_install_success = 0
35hap_install_fail = 0
36special_num = 0
37special_hap_list = []
38depend_config_map = dict
39
40CONFIG_PATH='D:\\window_manager_config.xml'
41FINGER_PRINT = '8E93863FC32EE238060BF69A9B37E2608FFFB21F93C862DD511CBAC9F30024B5'
42repeat_time = 0
43snapshot = 1
44
45
46def exec_cmd(cmd):
47    f = os.popen("hdc shell " + "\"" + cmd + "\"")
48    # print(cmd)
49    text = f.read()
50    f.close()
51
52
53    return text
54
55
56def exec_cmd_path(cmd, path):
57    f = os.popen("hdc shell " + "\"" + cmd + "\" >> " + path)
58    # print(cmd)
59    text = f.read()
60    f.close()
61    return text
62
63
64def exec_cmd_simple(cmd):
65    f = os.popen(cmd)
66    # print(cmd)
67    text = f.read()
68    f.close()
69    return text
70
71def exists(file_path):
72    return os.path.exists(file_path)
73
74
75def get_haps(local_dir):
76    files = os.listdir(local_dir)
77    hap_list = []
78    for file in files:
79        if "_signed" in file:
80            hap_list.append(file)
81    return hap_list
82
83
84def install_hap(hap, hap_path, base_dir):
85    # print("install " + hap_path + "\\" + hap)
86    if not exists(os.path.join(hap_path, hap)):
87        return
88    install_res = exec_cmd_simple("hdc install -r {}".format(hap_path + "/" + hap))
89    # print(install_res)
90    if not install_res.__contains__("msg:install bundle successfully."):
91        install_res = install_res.replace("\nAppMod finish\n\n", "")
92        install_fail_dict[hap] = install_res
93    else:
94        exec_cmd_simple("echo install {0} success! >> {1}".format(hap, "{0}/auto_test.log".format(base_dir)))
95        install_success_dict[hap] = install_res
96    time.sleep(2)
97
98
99def install_depend_haps(curr_haps, hap_path, base_dir):
100    for hap in curr_haps:
101        if not depend_config_map.get(hap):
102            continue
103        depend_haps = depend_config_map.get(hap)
104        for depend_hap in depend_haps:
105            install_hap(depend_hap, hap_path, base_dir)
106        install_depend_haps(depend_haps, hap_path, base_dir)
107
108
109def get_test_bundle(begin_bundles):
110    end_bundles = get_all_bundles()
111    return list(set(end_bundles) - set(begin_bundles))
112
113
114def install_haps(local_hap, hap_path, base_dir):
115    for hap in local_hap:
116        install_hap(hap, hap_path, base_dir)
117
118
119def start_log():
120    cmd_clean = "rm -r /data/log/hilog/*"
121    cmd_start = "hilog -w start -l 1M"
122    exec_cmd(cmd_clean)
123    exec_cmd(cmd_start)
124
125
126def stop_log():
127    cmd_stop = "hilog -w stop"
128    exec_cmd(cmd_stop)
129
130
131def recv_log(local_log):
132    # 输出日志结果
133    file_path = local_log
134    cmd = "hdc file recv /data/log/hilog/ {}".format(file_path)
135    exec_cmd_simple(cmd)
136
137
138def test_install_hap_with_error_snapshot(uninstall_bundles, base_dir):
139    with open(LinuxContains.FA_MODAL_AND_LOWER_CASE_LIST,'r',encoding='utf-8') as fp:
140        FA_Python_Obj=json.load(fp)
141        is_stage_model=0
142        for key,value in FA_Python_Obj.items():
143            for bundle_name in uninstall_bundles:
144                # print("key:  "+key)
145                bundle_name=bundle_name.strip()
146                if bundle_name==key:
147                    is_stage_model=is_stage_model+1
148            if is_stage_model == 0:
149                cmd = r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest /ets/TestRunner/OpenHarmonyTestRunner -s timeout 30000000 "
150            else:
151                cmd=  r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000 "
152    path = "{0}/auto_test.log".format(base_dir)
153    if len(uninstall_bundles) != 1:
154        exec_cmd_simple("echo uninstall_bundles:{0}, >> {1}".format(uninstall_bundles, path))
155    for bundle in uninstall_bundles:
156        bundle = bundle.strip()
157        exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle,
158                                                                                                  datetime.datetime.now().strftime(
159                                                                                                      '%Y-%m-%d %H:%M:%S'),
160                                                                                                  path))
161        # print("test " + bundle)
162        tmp_cmd = cmd.format(bundle)
163        # print(tmp_cmd)
164        p = subprocess.Popen(tmp_cmd, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
165                             stderr=subprocess.PIPE, encoding="utf-8")
166        current = "-1"
167        auto_log = open("{0}/auto_test.log".format(base_dir), mode='a')
168        while True:
169            line = p.stdout.readline()
170            auto_log.writelines(line)
171            if line and "current" in line:
172                nums = re.findall(r"\d+", line)
173                if (len(nums) == 0):
174                    current = "0"
175                else:
176                    current = nums[0]
177            if line and "stack" in line:
178                exec_cmd_simple(
179                    "hdc shell snapshot_display -f /data/snapshot/{0}_{1}.jpeg".format(bundle, current))
180            if line == '' and p.poll() != None:
181                break
182        exec_cmd_simple("hdc file recv /data/snapshot/. {}/snapshot".format(base_dir))
183        exec_cmd_simple("hdc shell rm -rf /data/snapshot/*")
184        auto_log.flush()
185        auto_log.close()
186        time.sleep(5)
187
188
189def test_install_hap(test_bundles, base_dir):
190    cmd = r"aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000"
191    path = "{0}/auto_test.log".format(base_dir)
192    for bundle in test_bundles:
193        bundle = bundle.strip()
194        if bundle == 'ohos.samples.launcher':
195            cmd_launcher = 'hdc shell aa start -b ohos.samples.launcher -a MainAbility'
196            os.system(cmd_launcher)
197        exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle,
198                                                                                                  datetime.datetime.now().strftime(
199                                                                                                      '%Y-%m-%d %H:%M:%S'),
200                                                                                                  path))
201        # print("test " + bundle)
202        tmp_cmd = cmd.format(bundle)
203        exec_cmd_path(tmp_cmd, path)
204        time.sleep(5)
205
206
207def uninstall_bundles(install_bundles):
208    for bundle in install_bundles:
209        # print("uninstall " + bundle)
210        uninstall_res = exec_cmd_simple("hdc uninstall {}".format(bundle))
211
212
213def clear_dir(dir_path):
214    if os.path.exists(dir_path):
215        shutil.rmtree(dir_path)
216    os.makedirs(dir_path)
217
218
219def clear_file(file_path):
220    if os.path.exists(file_path):
221        os.remove(file_path)
222    os.system(r"type nul>{}".format(file_path))
223
224
225def get_all_bundles():
226    bundles = exec_cmd("bm dump -a")
227    bundles = bundles.splitlines()
228    del bundles[0]
229    return bundles
230
231
232def batch_install(haps, base_dir):
233    start_log()
234    exec_cmd_simple("hdc shell power-shell setmode 602")
235    # exec_cmd_simple("hdc shell setenforce 0")
236    exec_cmd_simple("hdc shell param set persist.ace.testmode.enabled 1")
237    exec_cmd_simple("hdc shell mkdir /data/snapshot")
238    limit = 2
239    count = 0
240    time = 0
241    cur_batch_hap = []
242    special_haps = LinuxContains.SPECIAL_HAP.split(";")
243    target_paths = LinuxContains.TARGET_PATH.split(";")
244    begin_bundles = get_all_bundles()
245    while time <= repeat_time:
246        for hap in haps:
247
248            isTargetPath = False
249            for target_path in target_paths:
250                if hap.startswith(target_path):
251                    isTargetPath = True
252            if not isTargetPath:
253                continue
254
255            isSpecialSkip = False
256            for special_hap in special_haps:
257                if special_hap in hap:
258                    global special_num
259                    isSpecialSkip = True
260                    special_hap_list.append(hap)
261                    special_num = special_num + 1
262                    break
263            if isSpecialSkip:
264                continue
265
266            cur_batch_hap.append(hap)
267            count = count + 1
268            if count == limit:
269                install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
270                test_bundles = get_test_bundle(begin_bundles)
271                install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
272                all_install_bundles = get_test_bundle(begin_bundles)
273                if snapshot == 0:
274                    test_install_hap(test_bundles, base_dir)
275                else:
276                    test_install_hap_with_error_snapshot(test_bundles, base_dir)
277                count = 0
278                uninstall_bundles(all_install_bundles)
279                cur_batch_hap.clear()
280        if len(cur_batch_hap) != 0:
281            install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
282            test_bundles = get_test_bundle(begin_bundles)
283            install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
284            all_install_bundles = get_test_bundle(begin_bundles)
285            if snapshot == 0:
286                test_install_hap(test_bundles, base_dir)
287            else:
288                test_install_hap_with_error_snapshot(test_bundles, base_dir)
289            uninstall_bundles(all_install_bundles)
290            cur_batch_hap.clear()
291        time += 1
292    stop_log()
293    recv_log(base_dir + "/")
294
295
296def handle_test_log(base_dir):
297    file = open("{0}/auto_test.log".format(base_dir), encoding='utf-8', errors='ignore')
298    p_num = 0
299    fail_num = 0
300    success_num = 0
301    test_exp_num = 0
302    test_pass_num = 0
303    test_fail_num = 0
304    test_error_num = 0
305    died_num = 0
306    fail_dict = []
307    died_dict = []
308    curr_name = ""
309    has_result = 1
310    for x in file:
311        if x.startswith(r"================start"):
312            if (has_result == 0):
313                fail_num = fail_num + 1
314                fail_dict.append(curr_name)
315            else:
316                has_result = 0
317            p_num = p_num + 1
318            curr_name = x.split(" ")[1]
319        if x.startswith(r"OHOS_REPORT_RESULT"):
320            has_result = 1
321            nums = re.findall(r"\d+", x)
322            if len(nums) == 4 or len(nums) == 5:
323                if nums[0] == nums[3] and int(nums[0]) != 0:
324                    success_num = success_num + 1
325                else:
326                    fail_num = fail_num + 1
327                    fail_dict.append(curr_name)
328                test_exp_num = test_exp_num + int(nums[0])
329                test_fail_num = test_fail_num + int(nums[1])
330                test_error_num = test_error_num + int(nums[2])
331                test_pass_num = test_pass_num + int(nums[3])
332            else:
333                fail_num = fail_num + 1
334                fail_dict.append(curr_name)
335        elif x.__contains__("App died"):
336            has_result = 1
337            died_num = died_num + 1
338            died_dict.append(curr_name)
339
340    file.close()
341
342    error_log = open("{0}/auto_test.log".format(base_dir), mode='a')
343    error_log.writelines(
344        "共完成测试项目 {0}个,成功{1}个,失败{2}个,异常中止(App died){3}个,特殊应用跳过{4}个\n".format(
345            int(p_num + special_num / 2), success_num, fail_num, died_num, int(special_num / 2)))
346
347    error_log.writelines(
348        "共完成测试用例 {0}个,成功{1}个,失败{2}个,错误{3}个\n".format(test_exp_num, test_pass_num, test_fail_num,
349                                                                      test_error_num))
350    print("successNum:{0} failNum:{1} diedNum:{2} specialNum:{3} testNum:{4} testSuccessNum:{5} testFailNum:{6} testErrorNum:{7}".format(success_num,
351        fail_num, died_num, int(special_num / 2), test_exp_num, test_pass_num, test_fail_num, test_error_num))
352    if len(fail_dict) > 0:
353        error_log.writelines("失败工程BundleName如下:\n")
354        for x in fail_dict:
355            error_log.writelines("     " + x + "\n")
356    if len(died_dict) > 0:
357        error_log.writelines("异常中止(App died)工程BundleName如下:\n")
358        for x in died_dict:
359            error_log.writelines("     " + x + "\n")
360    error_log.writelines("安装失败项目数量:{0}\n".format(len(install_fail_dict)))
361    for i in install_fail_dict:
362        error_log.writelines('{0} : {1}'.format(i, install_fail_dict[i]))
363    if len(special_hap_list) > 0:
364        error_log.writelines("特殊安装跳过Hap数量:{0}\n".format(special_num))
365        for i in special_hap_list:
366            error_log.writelines(i + "\n")
367    error_log.flush()
368    error_log.close()
369    if fail_num != 0 or died_num != 0 or len(install_fail_dict) != 0:
370        print ('test failed !!')
371    else:
372        return ('test success !!')
373
374def init_out():
375    base_dir = sys.path[0]
376    out_path = "{0}/ui_test_out/{1}".format(base_dir, datetime.datetime.now().strftime('%Y%m%d'))
377    print(out_path)
378    clear_dir(out_path)
379    clear_dir("{0}/errorLog".format(out_path))
380    clear_dir("{0}/successLog".format(out_path))
381    clear_dir("{0}/SampleSignHap".format(out_path))
382    clear_file("{0}/auto_test.log".format(out_path))
383    clear_dir("{0}/hilog".format(out_path))
384    clear_dir("{0}/snapshot".format(out_path))
385    return out_path
386
387
388def add_permission(bundle_name, finger_print):
389    KEY_NAME = 'install_list'
390    f = open(LinuxContains.INSTALL_LIST_CAPABILITY, 'r')
391    if bundle_name in f.read():
392        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f:
393            old_data = json.load(f)
394            # print(bundle_name + " 已存在,需要修改权限")
395            for i in old_data[KEY_NAME]:
396                if i['bundleName'] == bundle_name:
397                    i['app_signature'][0] = finger_print
398        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f:
399            json.dump(old_data, f)
400
401    else:
402        # 追加权限
403        permission_fields = {
404            "bundleName": bundle_name,
405            "app_signature": [finger_print],
406            "allowAppUsePrivilegeExtension": True
407        }
408        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f:
409            old_data = json.load(f)
410            # 这里就是那个列表List
411            old_data[KEY_NAME].append(permission_fields)
412            # print(bundle_name + " 不存在,需要追加权限")
413        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f:
414            json.dump(old_data, f)
415
416
417def pull_list():
418    cmd_pull = "hdc shell mount -o rw,remount / & hdc file recv /system/etc/app/install_list_capability.json"
419    os.system(cmd_pull)
420
421
422def push_list():
423    cmd_push = "hdc shell mount -o rw,remount /  & hdc file send install_list_capability.json /system/etc/app/install_list_capability.json & hdc shell chmod 777 /system/etc/app/install_list_capability.json & hdc shell reboot"
424    # print(cmd_push)
425    os.system(cmd_push)
426    time.sleep(40)  # 等待重启
427    cmd_unlock = 'hdc shell power-shell wakeup & hdc shell uinput -T -m 100 200 100 900 600 & hdc shell power-shell setmode 602'  # 亮屏并解锁屏幕
428    # print("设置屏幕常亮")
429    # print(cmd_unlock)
430    os.system(cmd_unlock)
431
432
433def load_config_to_dict(config_file):
434    with open(config_file, encoding='utf-8') as a:
435        # 读取文件
436        global depend_config_map
437        depend_config_map = json.load(a)
438        # print(type (depend_config_map))
439
440
441def replace_enable(file,old_str,new_str):
442    with open(file, "r", encoding="utf-8") as f1,open("%s.bak" % file, "w", encoding="utf-8") as f2:
443        for line in f1:
444            if old_str in line:
445                line = line.replace(old_str, new_str)
446            f2.write(line)
447    os.remove(file)
448    os.rename("%s.bak" % file, file)
449
450
451def pull_config():
452    cmd='hdc shell mount -o rw,remount / & hdc file recv system/etc/window/resources/window_manager_config.xml {0}'.format(LinuxContains.SAVE_XML_PATH)
453    os.system(cmd)
454
455
456def push_config():
457    cmd='hdc file send {0}/window_manager_config.xml system/etc/window/resources/window_manager_config.xml'.format(LinuxContains.SAVE_XML_PATH)
458    os.system(cmd)
459
460
461if __name__ == '__main__':
462    load_config_to_dict(LinuxContains.COMBIN_CONFIG)
463    # print(depend_config_map)
464
465    # 创建ui_test_out文件夹及子文件夹
466    out_base = init_out()
467    # pull_config()
468    # replace_enable(CONFIG_PATH,'decor enable="false"','decor enable="true"')
469    # push_config()
470
471    # 拉install_list_capability.json
472    pull_list()
473
474    # 特殊安装应用,添加权限
475    with open(LinuxContains.SPECIAL_LIST,'r',encoding='utf-8') as fp:
476        python_obj=json.load(fp)
477        for key,value in python_obj.items():
478            add_permission(value[0],FINGER_PRINT)
479
480    # 推install_list_capability.json,重启解锁
481    push_list()
482    # sftp_from_remote("{0}\\SampleSignHap".format(out_base), "{0}\\errorLog".format(out_base),
483    #                  "{0}\\successLog".format(out_base))
484    haps = get_haps(LinuxContains.SIGN_HAP_PATH)
485    total_hap_num = len(haps) / 2
486    batch_install(haps, out_base)
487    handle_test_log(out_base)