• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: execute test tasks
19"""
20
21import logging
22import os
23import re
24import shutil
25import signal
26import subprocess
27import time
28import zipfile
29
30import json5
31
32import options
33import utils
34
35
36class IncrementalTest:
37    @staticmethod
38    def validate_module_name_change(task, inc_task, is_debug, stdout, stderr, new_module_name):
39        output_file = get_compile_output_file_path(task, is_debug)
40        output_dir = os.path.dirname(output_file)
41        output_file_name = os.path.basename(output_file)
42        output_file_name_items = output_file_name.split(
43            '-')  # hap name format: entry-default.hap
44        output_file_name_items[0] = new_module_name
45        output_file_name = '-'.join(output_file_name_items)
46        new_module_name_output_file = os.path.join(
47            output_dir, output_file_name)
48
49        logging.debug(f"new module hap file: {new_module_name_output_file}")
50
51        passed = validate(inc_task, task, is_debug, stdout,
52                          stderr, 'incremental_compile_change_module_name',
53                          new_module_name_output_file)
54        logging.debug(f"validate new module hap file, passed {passed}")
55        if not passed:
56            return
57
58        if is_debug:
59            inc_info = inc_task.debug_info
60        else:
61            inc_info = inc_task.release_info
62        uncompressed_output_file = new_module_name_output_file + '.uncompressed'
63        with zipfile.ZipFile(new_module_name_output_file, 'r') as zip_ref:
64            zip_ref.extractall(uncompressed_output_file)
65
66        abc_path = os.path.join(uncompressed_output_file, 'ets')
67        modules_abc_path = os.path.join(abc_path, 'modules.abc')
68        modules_pa = disasm_abc(task, modules_abc_path)
69        if not modules_pa or not os.path.exists(modules_pa):
70            inc_info.result = options.TaskResult.failed
71            inc_info.error_message = 'ark_disasm failed, module name change not verified'
72            return
73
74        func_str = ''
75        with open(modules_pa, 'r', encoding='utf-8') as pa:
76            line = pa.readline()
77            while line:
78                if '.function' in line.strip():
79                    func_str = line.strip()
80                    break
81                line = pa.readline()
82
83        func_define_items = func_str.split('.')
84        if not new_module_name in func_define_items:
85            inc_info.result = options.TaskResult.failed
86            inc_info.error_message = f'expected entry name {new_module_name} in function name, \
87                                     actual function name: {func_str}'
88
89        shutil.rmtree(uncompressed_output_file)
90
91    @staticmethod
92    def is_file_in_modified_files(task_type, backup_file_relative_path, modified_cache_files):
93        if 'stage' in task_type:
94            return backup_file_relative_path in modified_cache_files
95        else:
96            non_temporary_path = backup_file_relative_path.split("temporary")[
97                1].lstrip(os.path.sep)
98            logging.debug(f"non_temporary_path: {non_temporary_path}")
99            for file in modified_cache_files:
100                logging.debug(f"modified_cache_files file: {file}")
101                if non_temporary_path in file:
102                    return True
103        return False
104
105    @staticmethod
106    def validate_compile_incremental_file(task, inc_task, is_debug, modified_files):
107        cache_extention = ''
108        if 'stage' in task.type:
109            cache_extention = '.protoBin'
110        elif 'fa' in task.type or 'compatible8' in task.type:
111            cache_extention = '.temp.abc'
112        elif 'js' in task.type:
113            cache_extention = '.abc'
114
115        modified_cache_files = []
116        # modified_files is a list of file with relative path to .../debug/release
117        for file in modified_files:
118            name, ext = os.path.splitext(file)
119            modified_cache_files.append(name + cache_extention)
120
121        logging.debug(f"modified_cache_files: {modified_cache_files}")
122
123        if is_debug:
124            cache_path = os.path.join(
125                task.path, *(task.build_path), *(task.cache_path), 'debug')
126            backup_path = task.backup_info.cache_debug
127            inc_info = inc_task.debug_info
128        else:
129            cache_path = os.path.join(
130                task.path, *(task.build_path), *(task.cache_path), 'release')
131            backup_path = task.backup_info.cache_release
132            inc_info = inc_task.release_info
133
134        for root, dirs, files in os.walk(cache_path):
135            for file in files:
136                if not file.endswith(cache_extention):
137                    continue
138                file_absolute_path = os.path.join(root, file)
139                file_relative_path = os.path.relpath(
140                    file_absolute_path, cache_path)
141                backup_file = os.path.join(backup_path, file_relative_path)
142
143                if not os.path.exists(backup_file):
144                    logging.debug(f"backup file not exits: {backup_file}")
145                    continue
146
147                if utils.is_file_timestamps_same(file_absolute_path, backup_file):
148                    continue
149
150                logging.debug(f"found file ${file_relative_path} changed")
151                is_file_in_list = IncrementalTest.is_file_in_modified_files(
152                    task.type, file_relative_path, modified_cache_files)
153                logging.debug(f"is file in list: {is_file_in_list}")
154                if not is_file_in_list:
155                    logging.debug(f"Unexpected file modified: {file_relative_path}")
156                    inc_info.result = options.TaskResult.failed
157                    inc_info.error_message = f'Incremental compile found unexpected file timestamp changed. \
158                                             Changed file: {file_relative_path}'
159                    return
160
161    @staticmethod
162    def prepare_incremental_task(task, test_name):
163        if test_name in task.incre_compilation_info:
164            inc_task = task.incre_compilation_info[test_name]
165        else:
166            inc_task = options.IncCompilationInfo()
167            inc_task.name = test_name
168            task.incre_compilation_info[test_name] = inc_task
169        return inc_task
170
171    @staticmethod
172    def compile_incremental_no_modify(task, is_debug):
173        test_name = 'no_change'
174        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
175
176        logging.info(f"==========> Running {test_name} for task: {task.name}")
177        [stdout, stderr] = compile_project(task, is_debug)
178        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_no_change')
179        if passed:
180            IncrementalTest.validate_compile_incremental_file(
181                task, inc_task, is_debug, [])
182
183    @staticmethod
184    def compile_incremental_add_oneline(task, is_debug):
185        test_name = 'add_oneline'
186        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
187
188        logging.info(f"==========> Running {test_name} for task: {task.name}")
189        modify_file_item = task.inc_modify_file
190        modify_file = os.path.join(task.path, *modify_file_item)
191        modify_file_backup = modify_file + ".bak"
192        shutil.copyfile(modify_file, modify_file_backup)
193
194        with open(modify_file, 'a', encoding='utf-8') as file:
195            file.write(options.configs.get('patch_content').get(
196                'patch_lines_2').get('tail'))
197
198        [stdout, stderr] = compile_project(task, is_debug)
199        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_oneline')
200        if passed:
201            modified_files = [os.path.join(*modify_file_item)]
202            IncrementalTest.validate_compile_incremental_file(
203                task, inc_task, is_debug, modified_files)
204
205        shutil.move(modify_file_backup, modify_file)
206
207    @staticmethod
208    def compile_incremental_add_file(task, is_debug):
209        test_name = 'add_file'
210        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
211
212        logging.info(f"==========> Running {test_name} for task: {task.name}")
213        modify_file_item = task.inc_modify_file
214        modify_file = os.path.join(task.path, *modify_file_item)
215        modify_file_backup = modify_file + ".bak"
216        shutil.copyfile(modify_file, modify_file_backup)
217
218        modify_dir = os.path.dirname(modify_file)
219        if 'js' in task.type:
220            patch_content = options.configs.get(
221                'patch_content').get('patch_new_file_js')
222            new_file_name = patch_content.get('name')
223            new_file_content = patch_content.get('content')
224        else:
225            patch_content = options.configs.get(
226                'patch_content').get('patch_new_file_ets')
227            new_file_name = patch_content.get('name')
228            new_file_content = patch_content.get('content')
229        new_file = os.path.join(modify_dir, new_file_name)
230
231        with open(new_file, 'w', encoding='utf-8') as file:
232            file.writelines(new_file_content)
233
234        with open(modify_file, 'r+', encoding='utf-8') as file:
235            old_content = file.read()
236            file.seek(0)
237            patch_lines = options.configs.get(
238                'patch_content').get('patch_lines_1')
239            file.write(patch_lines.get('head'))
240            file.write(old_content)
241            file.write(patch_lines.get('tail'))
242
243        [stdout, stderr] = compile_project(task, is_debug)
244        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_file')
245        if passed:
246            modified_files = [os.path.join(*modify_file_item)]
247            IncrementalTest.validate_compile_incremental_file(
248                task, inc_task, is_debug, modified_files)
249
250        shutil.move(modify_file_backup, modify_file)
251        os.remove(new_file)
252
253    @staticmethod
254    def compile_incremental_add_nonexistent_file(task, is_debug):
255        test_name = 'add_nonexistent_file'
256        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
257
258        logging.info(f"==========> Running {test_name} for task: {task.name}")
259
260        modify_file_item = task.inc_modify_file
261        modify_file = os.path.join(task.path, *modify_file_item)
262        modify_file_backup = modify_file + ".bak"
263        shutil.copyfile(modify_file, modify_file_backup)
264
265        with open(modify_file, 'r+', encoding='utf-8') as file:
266            old_content = file.read()
267            file.seek(0)
268            patch_lines = options.configs.get(
269                'patch_content').get('patch_lines_1')
270            file.write(patch_lines.get('head'))
271            file.write(old_content)
272            file.write(patch_lines.get('tail'))
273
274        [stdout, stderr] = compile_project(task, is_debug)
275        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_nonexistent_file')
276        if not passed:
277            logging.info("The first compilation file does not exist. The compilation fails as expected")
278
279            modify_dir = os.path.dirname(modify_file)
280            if 'js' in task.type:
281                patch_content = options.configs.get(
282                    'patch_content').get('patch_new_file_js')
283                new_file_name = patch_content.get('name')
284                new_file_content = patch_content.get('content')
285            else:
286                patch_content = options.configs.get(
287                    'patch_content').get('patch_new_file_ets')
288                new_file_name = patch_content.get('name')
289                new_file_content = patch_content.get('content')
290            new_file = os.path.join(modify_dir, new_file_name)
291
292            with open(new_file, 'w', encoding='utf-8') as file:
293                file.writelines(new_file_content)
294
295            [stdout, stderr] = compile_project(task, is_debug)
296            passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_add_nonexistent_file')
297            if passed:
298                modified_files = [os.path.join(*modify_file_item)]
299                IncrementalTest.validate_compile_incremental_file(
300                    task, inc_task, is_debug, modified_files)
301
302            shutil.move(modify_file_backup, modify_file)
303            os.remove(new_file)
304
305    @staticmethod
306    def compile_incremental_delete_file(task, is_debug):
307        test_name = 'delete_file'
308        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
309
310        logging.info(f"==========> Running {test_name} for task: {task.name}")
311        # this test is after 'add_file', and in test 'add_file' already done remove file,
312        # so here just call compile
313        [stdout, stderr] = compile_project(task, is_debug)
314        passed = validate(inc_task, task, is_debug, stdout, stderr, 'incremental_compile_delete_file')
315        if passed:
316            modify_file_item = task.inc_modify_file
317            modified_files = [os.path.join(*modify_file_item)]
318            IncrementalTest.validate_compile_incremental_file(
319                task, inc_task, is_debug, modified_files)
320
321    @staticmethod
322    def compile_incremental_reverse_hap_mode(task, is_debug):
323        test_name = 'reverse_hap_mode'
324        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
325
326        logging.info(f"==========> Running {test_name} for task: {task.name}")
327        hap_mode = not is_debug
328        [stdout, stderr] = compile_project(task, hap_mode)
329        validate(inc_task, task, hap_mode, stdout, stderr, 'incremental_compile_reverse_hap_mode')
330
331    @staticmethod
332    def compile_incremental_modify_module_name(task, is_debug):
333        if 'stage' not in task.type:
334            return
335
336        test_name = 'change_module_name'
337        inc_task = IncrementalTest.prepare_incremental_task(task, test_name)
338
339        logging.info(f"==========> Running {test_name} for task: {task.name}")
340        # modify build-profile.json5
341        profile_file = os.path.join(task.path, 'build-profile.json5')
342        profile_file_backup = profile_file + ".bak"
343        shutil.copyfile(profile_file, profile_file_backup)
344
345        with open(profile_file, 'r') as file:
346            profile_data = json5.load(file)
347        new_module_name = "new_entry"
348        logging.debug(f"profile_data is: {profile_data}")
349        for module in profile_data['modules']:
350            if module['name'] == task.hap_module:
351                module['name'] = new_module_name
352                break
353        with open(profile_file, 'w') as file:
354            json5.dump(profile_data, file)
355
356        # modify module.json5 for stage mode
357        entry_item = task.build_path[:-2]  # to entry path
358        config_file_dir = os.path.join(task.path, *entry_item, 'src', 'main')
359        config_file = os.path.join(config_file_dir, 'module.json5')
360        config_file_backup = config_file + ".bak"
361        shutil.copyfile(config_file, config_file_backup)
362
363        with open(config_file, 'r') as file:
364            config_data = json5.load(file)
365        config_data['module']['name'] = new_module_name
366        with open(config_file, 'w') as file:
367            json5.dump(config_data, file)
368
369        try:
370            cmd = get_hvigor_compile_cmd(task, is_debug, new_module_name)
371            [stdout, stderr] = compile_project(task, is_debug, cmd)
372            IncrementalTest.validate_module_name_change(
373                task, inc_task, is_debug, stdout, stderr, new_module_name)
374        except Exception as e:
375            logging.exception(e)
376        finally:
377            shutil.move(profile_file_backup, profile_file)
378            shutil.move(config_file_backup, config_file)
379
380
381class OtherTest:
382    @staticmethod
383    def is_abc_same_in_haps(hap_1, hap_2):
384        hap_1_abc_files = []
385        hap_2_abc_files = []
386        with zipfile.ZipFile(hap_1) as zf1, zipfile.ZipFile(hap_2) as zf2:
387            for file in zf1.namelist():
388                if file.endswith('.abc'):
389                    hap_1_abc_files.append(file)
390            for file in zf2.namelist():
391                if file.endswith('.abc'):
392                    hap_2_abc_files.append(file)
393
394            hap_1_abc_files.sort()
395            hap_2_abc_files.sort()
396
397            if len(hap_1_abc_files) != len(hap_2_abc_files):
398                return False
399
400            for idx, abc_file in enumerate(hap_1_abc_files):
401                with zf1.open(abc_file) as f1, zf2.open(hap_2_abc_files[idx]) as f2:
402                    data1 = f1.read()
403                    data2 = f2.read()
404                    if data1 != data2:
405                        return False
406
407        return True
408
409    @staticmethod
410    def verify_binary_consistency(task):
411        test_name = 'binary_consistency'
412        test_info = options.CompilationInfo()
413        task.other_tests[test_name] = test_info
414        debug_consistency = True
415        release_consistency = True
416
417        logging.info(f"==========> Running {test_name} for task: {task.name}")
418        if options.arguments.hap_mode in ['all', 'release']:
419            # will have at lease 1 output from full compile
420            if len(task.backup_info.output_release) == 1:
421                compile_project(task, False)
422                backup_compile_output(task, False)
423
424            if len(task.backup_info.output_release) == 2:
425                release_consistency = OtherTest.is_abc_same_in_haps(task.backup_info.output_release[0],
426                                                                    task.backup_info.output_release[1])
427            else:
428                release_consistency = False
429            logging.debug(f"release consistency: {release_consistency}")
430
431        if options.arguments.hap_mode in ['all', 'debug']:
432            if len(task.backup_info.output_debug) == 1:
433                compile_project(task, True)
434                backup_compile_output(task, True)
435
436            if len(task.backup_info.output_debug) == 2:
437                debug_consistency = OtherTest.is_abc_same_in_haps(task.backup_info.output_debug[0],
438                                                                  task.backup_info.output_debug[1])
439            else:
440                debug_consistency = False
441            logging.debug(f"debug consistency: {debug_consistency}")
442
443        if debug_consistency and release_consistency:
444            test_info.result = options.TaskResult.passed
445        else:
446            test_info.result = options.TaskResult.failed
447
448    @staticmethod
449    def execute_break_compile(task, is_debug):
450        test_name = 'break_continue_compile'
451        test_info = options.CompilationInfo()
452        task.other_tests[test_name] = test_info
453
454        logging.info(f"==========> Running {test_name} for task: {task.name}")
455        clean_compile(task)
456        cmd = get_hvigor_compile_cmd(task, is_debug)
457        logging.debug(f'cmd: {cmd}')
458        logging.debug(f"cmd execution path {task.path}")
459        process = subprocess.Popen(cmd, shell=False, cwd=task.path,
460                                   stdout=subprocess.PIPE,
461                                   stderr=subprocess.STDOUT)
462
463        for line in iter(process.stdout.readline, b''):
464            if b'CompileArkTS' in line:
465                logging.debug("terminate signal sent")
466                process.send_signal(signal.SIGTERM)
467                break
468
469        [stdout, stderr] = process.communicate(
470            timeout=options.arguments.compile_timeout)
471
472        logging.debug("first compile: stdcout: %s",
473                      stdout.decode('utf-8', errors="ignore"))
474
475        logging.debug("another compile")
476        [stdout, stderr] = compile_project(task, is_debug)
477
478        [is_success, time_string] = is_compile_success(stdout)
479        if not is_success:
480            test_info.result = options.TaskResult.failed
481            test_info.error_message = stderr
482        else:
483            passed = validate_compile_output(test_info, task, is_debug)
484            if passed:
485                test_info.result = options.TaskResult.passed
486        if options.arguments.run_haps:
487            run_compile_output(test_info, task, True, 'other_tests_break_continue_compile')
488
489    @staticmethod
490    def compile_full_with_error(task, is_debug):
491        test_name = 'compile_with_error'
492        test_info = options.CompilationInfo()
493        task.other_tests[test_name] = test_info
494
495        logging.info(f"==========> Running {test_name} for task: {task.name}")
496        modify_file_item = task.inc_modify_file
497        modify_file = os.path.join(task.path, *modify_file_item)
498        modify_file_backup = modify_file + ".bak"
499        shutil.copyfile(modify_file, modify_file_backup)
500
501        patch_lines_error = options.configs.get(
502            'patch_content').get('patch_lines_error')
503        with open(modify_file, 'a', encoding='utf-8') as file:
504            file.write(patch_lines_error.get('tail'))
505
506        [stdout, stderr] = compile_project(task, is_debug)
507        expected_errors = patch_lines_error.get('expected_error')
508
509        passed = False
510        for expected_error in expected_errors:
511            if expected_error in stderr:
512                passed = True
513                break
514
515        if passed:
516            test_info.result = options.TaskResult.passed
517        else:
518            test_info.result = options.TaskResult.failed
519            test_info.error_message = f"expected error message: {expected_errors}, but got {stderr}"
520
521        shutil.move(modify_file_backup, modify_file)
522
523    @staticmethod
524    def compile_with_exceed_length(task, is_debug):
525        test_name = 'compile_with_exceed_length'
526        test_info = options.CompilationInfo()
527        task.other_tests[test_name] = test_info
528
529        logging.info(f"==========> Running {test_name} for task: {task.name}")
530        # get build-profile.json5
531        entry_item = task.build_path[:-2]  # to entry path
532        profile_file = os.path.join(
533            task.path, *entry_item, 'build-profile.json5')
534        profile_file_backup = profile_file + ".bak"
535        shutil.copyfile(profile_file, profile_file_backup)
536
537        with open(profile_file, 'r', encoding='utf-8') as file:
538            profile_data = json5.load(file)
539
540        long_str = 'default1234567890123456789012345678901234567890123456789012345678901234567890123456789' + \
541                   '012345678901234567890123456789'
542        logging.debug("long_str: %s", long_str)
543        profile_data['targets'][0]['name'] = long_str
544
545        with open(profile_file, 'w', encoding='utf-8') as file:
546            json5.dump(profile_data, file)
547
548        cmd = get_hvigor_compile_cmd(task, is_debug, task.hap_module, long_str)
549        [stdout, stderr] = compile_project(task, is_debug, cmd)
550        # Only the Windows platform has a length limit
551        if utils.is_windows():
552            expected_error_message = 'The length of path exceeds the maximum length: 259'
553
554            if expected_error_message in stderr:
555                test_info.result = options.TaskResult.passed
556            else:
557                test_info.result = options.TaskResult.failed
558                test_info.error_message = f"expected error message: {expected_error_message}, but got {stderr}"
559        else:
560            [is_success, time_string] = is_compile_success(stdout)
561            if not is_success:
562                test_info.result = options.TaskResult.failed
563                test_info.error_message = stderr
564            else:
565                passed = validate_compile_output(test_info, task, is_debug)
566                if passed:
567                    test_info.result = options.TaskResult.passed
568
569        shutil.move(profile_file_backup, profile_file)
570
571    @staticmethod
572    def compile_ohos_test(task):
573        test_name = 'ohos_test'
574        test_info = options.CompilationInfo()
575        task.other_tests[test_name] = test_info
576
577        logging.info(f"==========> Running {test_name} for task: {task.name}")
578        # ohosTest has only debug mode
579        cmd = [get_hvigor_path(task.path), '--mode', 'module',
580               '-p', 'module=entry@ohosTest', 'assembleHap']
581        [stdout, stderr] = compile_project(task, True, cmd)
582        [is_success, time_string] = is_compile_success(stdout)
583        if not is_success:
584            test_info.result = options.TaskResult.failed
585            test_info.error_message = stderr
586        else:
587            output_file = get_compile_output_file_path(task, True)
588            output_dir = os.path.dirname(output_file)
589            output_file_name = os.path.basename(output_file)
590
591            ohos_test_str = 'ohosTest'
592            output_file_name_items = output_file_name.split(
593                '-')  # hap name format: entry-default-signed.hap
594            # ohosTest hap format: entry-ohosTest-signed.hap
595            output_file_name_items[-2] = ohos_test_str
596            output_file_name = '-'.join(output_file_name_items)
597
598            output_dir_items = output_dir.split(os.path.sep)
599            output_dir_items[-1] = ohos_test_str
600            if utils.is_windows():
601                # for windows, need to add an empty string to mark between disk identifier and path
602                output_dir_items.insert(1, os.path.sep)
603            elif utils.is_mac():
604                output_dir_items.insert(0, os.path.sep)
605            ohos_test_output_file = os.path.join(
606                *output_dir_items, output_file_name)
607
608            passed = validate_compile_output(
609                test_info, task, True, ohos_test_output_file)
610            if passed:
611                test_info.result = options.TaskResult.passed
612
613
614def disasm_abc(task, abc_file):
615    if not os.path.exists(task.ark_disasm_path):
616        logging.error("ark_disasm executable not found")
617        return ''
618
619    pa_file = abc_file + '.pa'
620    cmd = [task.ark_disasm_path, '--verbose', abc_file, pa_file]
621    logging.debug(f'cmd: {cmd}')
622    process = subprocess.Popen(
623        cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
624    [stdout, stderr] = process.communicate(
625        timeout=options.arguments.compile_timeout)
626
627    logging.debug("disasm stdcout: %s",
628                  stdout.decode('utf-8', errors="ignore"))
629    logging.warning("disasm: stdcerr: %s",
630                    stderr.decode('utf-8', errors="ignore"))
631
632    return pa_file
633
634
635def is_abc_debug_info_correct(task, abc_file, is_debug):
636    pa_file = disasm_abc(task, abc_file)
637    if not os.path.exists(pa_file):
638        logging.error(f"pa file not exist: {pa_file}")
639        return False
640
641    debug_info_block_str = 'LOCAL_VARIABLE_TABLE'
642    has_debug_info_block = False
643    with open(pa_file, 'r', encoding='utf-8') as pa:
644        line = pa.readline()
645        while line:
646            if debug_info_block_str in line.strip():
647                has_debug_info_block = True
648                break
649            line = pa.readline()
650
651    if is_debug:
652        return has_debug_info_block
653    else:
654        return not has_debug_info_block
655
656
657def validate_output_for_jsbundle(info, task, uncompressed_output_path, is_debug):
658    abc_files = []
659    for root, dirs, files in os.walk(uncompressed_output_path):
660        for file in files:
661            if file.endswith('.abc'):
662                abc_files.append(os.path.join(root, file))
663
664    total_size = 0
665    for file in abc_files:
666        total_size += os.path.getsize(
667            os.path.join(uncompressed_output_path, file))
668        if 'compatible8' not in task.type and not is_abc_debug_info_correct(task, file, is_debug):
669            # skip compatible8 outputs as disasm may failed
670            info.result = options.TaskResult.failed
671            info.error_message = f"{file} debug info not correct"
672            return False
673
674    if total_size == 0:
675        info.result = options.TaskResult.failed
676        info.error_message = "abc not found or abc size is 0"
677        return False
678    else:
679        info.abc_size = total_size
680
681    if is_debug:
682        for file in abc_files:
683            sourcemap_file = file.replace('.abc', '.js.map')
684            if not os.path.exists(os.path.join(uncompressed_output_path, sourcemap_file)):
685                info.result = options.TaskResult.failed
686                info.error_message = "sourcemap not found"
687                return False
688
689    return True
690
691
692def validate_output_for_esmodule(info, task, uncompressed_output_path, is_debug):
693    abc_generated_path = os.path.join(uncompressed_output_path, 'ets')
694
695    modules_abc_path = os.path.join(abc_generated_path, 'modules.abc')
696    if not os.path.exists(modules_abc_path):
697        info.result = options.TaskResult.failed
698        info.error_message = "modules.abc not found"
699        return False
700
701    modules_abc_size = os.path.getsize(modules_abc_path)
702    if modules_abc_size <= 0:
703        info.result = options.TaskResult.failed
704        info.error_message = "modules.abc size is 0"
705        return False
706    if not is_abc_debug_info_correct(task, modules_abc_path, is_debug):
707        info.result = options.TaskResult.failed
708        info.error_message = "modules.abc debug info not correct"
709        return False
710    info.abc_size = modules_abc_size
711
712    if 'widget' in task.type:
713        widget_abc_path = os.path.join(abc_generated_path, 'widgets.abc')
714        if not os.path.exists(widget_abc_path):
715            info.result = options.TaskResult.failed
716            info.error_message = "widgets.abc not found"
717            return False
718
719        widgets_abc_size = os.path.getsize(widget_abc_path)
720        if widgets_abc_size <= 0:
721            info.result = options.TaskResult.failed
722            info.error_message = "widgets.abc size is 0"
723            return False
724        if not is_abc_debug_info_correct(task, widget_abc_path, is_debug):
725            info.result = options.TaskResult.failed
726            info.error_message = "widgets.abc debug info not correct"
727            return False
728        info.abc_size += widgets_abc_size
729
730    if is_debug:
731        sourcemap_path = abc_generated_path
732    else:
733        sourcemap_path = os.path.join(
734            task.path, *(task.build_path), *(task.cache_path), 'release')
735    sourcemap_file = os.path.join(sourcemap_path, 'sourceMaps.map')
736    if not os.path.exists(sourcemap_file):
737        info.result = options.TaskResult.failed
738        info.error_message = "sourcemap not found"
739        return False
740
741    return True
742
743
744def collect_compile_time(info, time_string):
745    time_min = 0.0
746    time_second = 0.0
747    time_millisecond = 0.0
748
749    time_items = time_string.split()
750    for idx, item in enumerate(time_items):
751        if item == 'min':
752            time_min = float(time_items[idx - 1]) * 60
753        if item == 's':
754            time_second = float(time_items[idx - 1])
755        if item == 'ms':
756            time_millisecond = round(float(time_items[idx - 1]) / 1000, 3)
757
758    info.time = round(time_min + time_second + time_millisecond, 3)
759
760
761def get_compile_output_file_path(task, is_debug):
762    output_file = ''
763
764    if is_debug:
765        output_file = os.path.join(
766            task.path, *(task.build_path), *(task.output_hap_path))
767    else:
768        output_file = os.path.join(
769            task.path, *(task.build_path), *(task.output_app_path))
770
771    return output_file
772
773
774def validate_compile_output(info, task, is_debug, output_file=''):
775    passed = False
776
777    if output_file == '':
778        output_file = get_compile_output_file_path(task, is_debug)
779    uncompressed_output_file = output_file + '.uncompressed'
780
781    if not os.path.exists(output_file):
782        logging.error("output file for task %s not exists: %s",
783                      task.name, output_file)
784        passed = False
785
786        info.result = options.TaskResult.failed
787        info.error_message = "Hap not found"
788        return passed
789    try:
790        with zipfile.ZipFile(output_file, 'r') as zip_ref:
791            zip_ref.extractall(uncompressed_output_file)
792    except Exception as e:
793        logging.error(f"unzip exception: {e}")
794        logging.error(
795            f"uncompressed output file for task {task.name} failed. output file: {output_file}")
796        passed = False
797
798        info.result = options.TaskResult.failed
799        info.error_message = "Hap uncompressed failed, cannot exam build products"
800        return passed
801
802    if utils.is_esmodule(task.type):
803        passed = validate_output_for_esmodule(
804            info, task, uncompressed_output_file, is_debug)
805    else:
806        passed = validate_output_for_jsbundle(
807            info, task, uncompressed_output_file, is_debug)
808
809    shutil.rmtree(uncompressed_output_file)
810
811    return passed
812
813
814def run_compile_output(info, task, is_debug, picture_name):
815    picture_suffix = 'debug'
816    if not is_debug:
817        picture_suffix = 'release'
818    picture_name = f'{picture_name}_{picture_suffix}'
819    utils.get_running_screenshot(task, picture_name)
820    time.sleep(2)
821    runtime_passed = utils.verify_runtime(task, picture_name)
822    if not runtime_passed:
823        logging.error(f'The runtime of the {task.name} is inconsistent with the reference screenshot,'
824                      f' when running {picture_name}')
825        info.runtime_result = options.TaskResult.failed
826        info.error_message = "The runtime result is inconsistent with the reference"
827    else:
828        info.runtime_result = options.TaskResult.passed
829
830    return runtime_passed
831
832
833def is_compile_success(compile_stdout):
834    pattern = r"BUILD SUCCESSFUL in (\d+ min )?(\d+ s )?(\d+ ms)?"
835    match_result = re.search(pattern, compile_stdout)
836    if not match_result:
837        return [False, '']
838
839    return [True, match_result.group(0)]
840
841
842def validate(compilation_info, task, is_debug, stdout, stderr, picture_name, output_file=''):
843    info = {}
844    if is_debug:
845        info = compilation_info.debug_info
846    else:
847        info = compilation_info.release_info
848
849    # ret_code will be 1 if there's stderr, use "COMPILE SUCCESSFUL" as a flag to make a judge
850    [is_success, time_string] = is_compile_success(stdout)
851    if not is_success:
852        info.result = options.TaskResult.failed
853        info.error_message = stderr
854        return False
855
856    passed = validate_compile_output(info, task, is_debug, output_file)
857
858    if options.arguments.run_haps:
859        runtime_passed = run_compile_output(info, task, is_debug, picture_name)
860
861    if passed:
862        collect_compile_time(info, time_string)
863        info.result = options.TaskResult.passed
864
865    return passed
866
867
868def get_hvigor_path(project_path):
869    hvigor = ''
870    if utils.is_windows():
871        hvigor = os.path.join(project_path, 'hvigorw.bat')
872    else:
873        hvigor = os.path.join(project_path, 'hvigorw')
874        utils.add_executable_permission(hvigor)
875    return hvigor
876
877
878def get_hvigor_compile_cmd(task, is_debug, module_name='', module_target='default'):
879    cmd = [get_hvigor_path(task.path)]
880    build_mode = 'debug' if is_debug else 'release'
881    module = module_name if module_name else task.hap_module
882    cmd.extend(['--mode', 'module', '-p', 'product=default', '-p', f'module={module}@{module_target}', '-p',
883                f'buildMode={build_mode}', 'assembleHap',
884                '--info', '--verbose-analyze', '--parallel', '--incremental', '--daemon'])
885    return cmd
886
887
888def compile_project(task, is_debug, cmd=''):
889    if not cmd:
890        cmd = get_hvigor_compile_cmd(task, is_debug)
891
892    logging.debug(f'cmd: {cmd}')
893    logging.debug(f"cmd execution path {task.path}")
894    process = subprocess.Popen(cmd, shell=False, cwd=task.path,
895                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
896    stdout, stderr = process.communicate(
897        timeout=options.arguments.compile_timeout)
898    stdout_utf8 = stdout.decode("utf-8", errors="ignore")
899    stderr_utf8 = stderr.decode("utf-8", errors="ignore")
900    logging.debug(f"cmd stdout: {stdout_utf8}")
901    logging.debug(f"cmd stderr: {stderr_utf8}")
902
903    return [stdout_utf8, stderr_utf8]
904
905
906def clean_compile(task):
907    cmd = [get_hvigor_path(task.path), 'clean']
908    logging.debug(f'cmd: {cmd}')
909    logging.debug(f"cmd execution path {task.path}")
910    process = subprocess.Popen(cmd, shell=False, cwd=task.path,
911                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
912    out, err = process.communicate(timeout=options.arguments.compile_timeout)
913
914
915def compile_incremental(task, is_debug):
916    logging.info(
917        f"==========> Running task: {task.name} in incremental compilation")
918    [stdout, stderr] = compile_project(task, is_debug)
919
920    [is_success, time_string] = is_compile_success(stdout)
921    if not is_success:
922        logging.error(
923            "Incremental compile failed due to first compile failed!")
924        return
925
926    if options.arguments.compile_mode == 'incremental':
927        passed = validate(task.full_compilation_info,
928                          task, is_debug, stdout, stderr, 'incremental_compile_first')
929        if not passed:
930            logging.error(
931                "Incremental compile failed due to first compile failed!")
932            return
933
934    backup_compile_output(task, is_debug)
935    backup_compile_cache(task, is_debug)
936
937    IncrementalTest.compile_incremental_no_modify(task, is_debug)
938    IncrementalTest.compile_incremental_add_oneline(task, is_debug)
939    IncrementalTest.compile_incremental_add_file(task, is_debug)
940    IncrementalTest.compile_incremental_add_nonexistent_file(task, is_debug)
941    IncrementalTest.compile_incremental_delete_file(task, is_debug)
942    IncrementalTest.compile_incremental_reverse_hap_mode(task, is_debug)
943    IncrementalTest.compile_incremental_modify_module_name(task, is_debug)
944
945
946def backup_compile_output(task, is_debug):
947    backup_path = task.backup_info.cache_path
948    if not os.path.exists(backup_path):
949        os.mkdir(backup_path)
950
951    if is_debug:
952        if len(task.backup_info.output_debug) == 2:
953            return
954
955        backup_output_path = os.path.join(backup_path, 'output', 'debug')
956        if not os.path.exists(backup_output_path):
957            os.makedirs(backup_output_path)
958
959    else:
960        if len(task.backup_info.output_release) == 2:
961            return
962
963        backup_output_path = os.path.join(backup_path, 'output', 'release')
964        if not os.path.exists(backup_output_path):
965            os.makedirs(backup_output_path)
966
967    output_file = get_compile_output_file_path(task, is_debug)
968    shutil.copy(output_file, backup_output_path)
969    backup_output = os.path.join(
970        backup_output_path, os.path.basename(output_file))
971    backup_time_output = backup_output + '-' + utils.get_time_string()
972    shutil.move(backup_output, backup_time_output)
973
974    if is_debug:
975        task.backup_info.output_debug.append(backup_time_output)
976    else:
977        task.backup_info.output_release.append(backup_time_output)
978
979
980def backup_compile_cache(task, is_debug):
981    backup_path = task.backup_info.cache_path
982    if not os.path.exists(backup_path):
983        os.mkdir(backup_path)
984
985    backup_cache_path = os.path.join(backup_path, 'cache')
986    if not os.path.exists(backup_cache_path):
987        os.mkdir(backup_cache_path)
988    cache_files = os.path.join(
989        task.path, *(task.build_path), *(task.cache_path))
990
991    if is_debug:
992        if task.backup_info.cache_debug != '':
993            return
994
995        cache_files = os.path.join(cache_files, 'debug')
996        backup_cache_file = os.path.join(backup_cache_path, 'debug')
997        shutil.copytree(cache_files, backup_cache_file)
998        task.backup_info.cache_debug = backup_cache_file
999    else:
1000        if task.backup_info.cache_release != '':
1001            return
1002
1003        cache_files = os.path.join(cache_files, 'release')
1004        backup_cache_file = os.path.join(backup_cache_path, 'release')
1005        shutil.copytree(cache_files, backup_cache_file)
1006        task.backup_info.cache_release = backup_cache_file
1007
1008
1009def execute_full_compile(task):
1010    logging.info(f"==========> Running task: {task.name} in full compilation")
1011    clean_compile(task)
1012    passed = False
1013    if options.arguments.hap_mode in ['all', 'release']:
1014        [stdout, stderr] = compile_project(task, False)
1015        passed = validate(task.full_compilation_info,
1016                          task, False, stdout, stderr, 'full_compile')
1017        if passed:
1018            backup_compile_output(task, False)
1019        clean_compile(task)
1020    if options.arguments.hap_mode in ['all', 'debug']:
1021        [stdout, stderr] = compile_project(task, True)
1022        passed = validate(task.full_compilation_info,
1023                          task, True, stdout, stderr, 'full_compile')
1024        if passed:
1025            backup_compile_output(task, True)
1026        clean_compile(task)
1027
1028    return passed
1029
1030
1031def execute_incremental_compile(task):
1032    logging.info(
1033        f"==========> Running task: {task.name} in incremental compilation")
1034    if options.arguments.hap_mode in ['all', 'release']:
1035        compile_incremental(task, False)
1036        clean_compile(task)
1037    if options.arguments.hap_mode in ['all', 'debug']:
1038        compile_incremental(task, True)
1039        clean_compile(task)
1040
1041
1042def clean_backup(task):
1043    if os.path.exists(task.backup_info.cache_path):
1044        shutil.rmtree(task.backup_info.cache_path)
1045    return
1046
1047
1048def execute(test_tasks):
1049    for task in test_tasks:
1050        try:
1051            logging.info(f"======> Running task: {task.name}")
1052            if options.arguments.compile_mode in ['all', 'full']:
1053                if not execute_full_compile(task):
1054                    logging.info("Full compile failed, skip other tests!")
1055                    continue
1056
1057            if options.arguments.compile_mode in ['all', 'incremental']:
1058                execute_incremental_compile(task)
1059
1060            OtherTest.verify_binary_consistency(task)
1061
1062            # for these tests, use one hapMode maybe enough
1063            is_debug = True if options.arguments.hap_mode == 'debug' else False
1064            OtherTest.execute_break_compile(task, is_debug)
1065            if 'error' in task.type:
1066                OtherTest.compile_full_with_error(task, is_debug)
1067
1068            if 'exceed_length_error' in task.type:
1069                OtherTest.compile_with_exceed_length(task, is_debug)
1070
1071            if 'ohosTest' in task.type:
1072                OtherTest.compile_ohos_test(task)
1073
1074            logging.info(f"======> Running task: {task.name} finised")
1075        except Exception as e:
1076            logging.exception(e)
1077        finally:
1078            clean_backup(task)
1079