• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright (c) 2025 Huawei Device Co., Ltd.
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import os
17import re
18import json
19from functools import lru_cache
20from collections import defaultdict
21
22class BuildProcessor:
23
24    def __init__(self, root_dir, ace_root):
25        self.root_dir = root_dir
26        self.ace_root = ace_root
27        self.build_info = defaultdict(lambda: {"name": "", "source_list": [], "deps_list": [], "include_list": [], "config_list": []})
28        self.data_json = []
29        self.group_json = []
30        self.type_deps = {}
31        self.heads = {}
32
33        self.unittest_pattern = re.compile(
34            r'(ace|ohos)_unittest\("([^"]*)"\)\s*\{(.*?)(?=\})',
35            re.DOTALL | re.MULTILINE
36        )
37        self.group_pattern = re.compile(
38            r'group\("([^"]*)"\)\s*\{(.*?)(?=\})',
39            re.DOTALL | re.MULTILINE
40        )
41        self.source_set_pattern = re.compile(
42            r'ohos_source_set\("([^"]*)"\)\s*\{(.*?)(?=\})',
43            re.DOTALL | re.MULTILINE
44        )
45        self.type_get_pattern = r'''
46            if\s*\(type\s*==\s*"([^"]+)"\)
47            \s*(\{
48                (?:
49                    [^{}]*
50                    | (?R)
51                )*
52            \})
53        '''
54
55        self.sources_pattern = re.compile(r'sources\s*[+]?=\s*\[(.*?)\]', re.DOTALL)
56        self.deps_pattern = re.compile(r'deps\s*[+]?=\s*\[(.*?)\]', re.DOTALL)
57        self.includes_pattern = re.compile(r'include_dirs\s*[+]?=\s*\[(.*?)\]', re.DOTALL)
58        self.configs_pattern = re.compile(r'configs\s*[+]?=\s*\[(.*?)\]', re.DOTALL)
59        self.type_pattern = re.compile(r'type\s*=\s*"([^"]+)"')
60
61    def execute(self):
62        self.parse_build_gn("foundation/arkui/ace_engine/test/unittest/ace_unittest.gni", self.type_get_pattern)
63
64        for root, _, files in os.walk(self.root_dir):
65            if "BUILD.gn" in files:
66                path = os.path.join(root, "BUILD.gn")
67                self.parse_build_gn(path, self.unittest_pattern)
68                self.parse_groups(path)
69
70        self.parse_build_gn("foundation/arkui/ace_engine/test/unittest/BUILD.gn", self.source_set_pattern)
71
72        for target in self.data_json:
73            target["deps_list"] = self._get_deps_list(target)
74            target["dep_h"] = [h for d in target["deps_list"] for h in self.process_file(d)]
75
76        change_files, oh_fields = self.process_changes()
77
78        if len(oh_fields) == 1 and oh_fields[0] == "arkui_ace_engine":
79            print(" ".join(self.analyze_impact(change_files)))
80        else:
81            print(f"TDDarkui_ace_engine")
82        self.generate_output()
83
84
85    def parse_build_gn(self, file_path, pattern):
86        content = self._read_file(file_path)
87        processed_content = "\n".join(line.split("#")[0].rstrip()
88                                    for line in content.splitlines())
89        if pattern == self.type_get_pattern:
90            block_content = self.extract_gn_block(content, 'if (type == "components")')
91            self.set_type("components", block_content, file_path)
92            block_content = self.extract_gn_block(content, 'if (type == "new")')
93            self.set_type("new", block_content, file_path)
94            block_content = self.extract_gn_block(content, 'if (type == "pipeline")')
95            self.set_type("pipeline", block_content, file_path)
96        else:
97            for match in pattern.finditer(processed_content):
98                self._process_unittest(match, file_path)
99
100
101    def extract_gn_block(self, content, start_condition):
102        pattern = re.compile(
103            r'{}\s*{{'.format(re.escape(start_condition)),
104            re.DOTALL
105        )
106        match = pattern.search(content)
107        if not match:
108            return ""
109
110        start_pos = match.end()
111        brace_count = 1
112        index = start_pos
113        in_string = False
114        in_comment = False
115        quote_char = None
116        escape = False
117
118        while index < len(content) and brace_count > 0:
119            char = content[index]
120
121            if in_comment:
122                if char == '\n':
123                    in_comment = False
124                index += 1
125                continue
126            if in_string:
127                if escape:
128                    escape = False
129                elif char == '\\':
130                    escape = True
131                elif char == quote_char:
132                    in_string = False
133            else:
134                if char == '#':
135                    in_comment = True
136                elif char in ('"', "'"):
137                    in_string = True
138                    quote_char = char
139                elif char == '{':
140                    brace_count += 1
141                elif char == '}':
142                    brace_count -= 1
143
144            index += 1
145        if brace_count != 0:
146            return ""
147        return content[start_pos:index-1].strip()
148
149    def set_type(self, type, content, file_path):
150        base_path = os.path.dirname(file_path)
151        deps = self._get_gn_content(self.deps_pattern, content, base_path)
152        self.type_deps[type] = deps
153
154    def process_file(self, file_path):
155        if self.heads.get(file_path):
156            return self.heads.get(file_path)
157        content = self._read_file(file_path)
158        self.heads[file_path] = {header for line in content.split('\n')
159                if (header := self._process_includes(line))}
160        return self.heads[file_path]
161
162    def parse_groups(self, file_path):
163        content = self._read_file(file_path)
164        processed_content = "\n".join(line.split("#")[0].rstrip()
165                                    for line in content.splitlines())
166
167        for match in self.group_pattern.finditer(processed_content):
168            self._process_group(match, file_path)
169
170    def process_changes(self):
171        change_info = self._read_json("change_info.json")
172        openharmony_fields = [v["name"] for v in change_info.values() if "name" in v]
173
174        change_files = []
175        file_operations = {
176            "added": lambda x: x,
177            "rename": lambda x: [item for pair in x for item in pair],
178            "modified": lambda x: x,
179            "deleted": lambda x: x
180        }
181
182        for value in change_info.values():
183            changed_files = value.get("changed_file_list", {})
184            for op, processor in file_operations.items():
185                if op in changed_files:
186                    change_files.extend(processor(changed_files[op]))
187
188        return (
189            [os.path.join(self.ace_root, f) for f in change_files],
190            openharmony_fields
191        )
192
193    def generate_output(self):
194        with open("test_targets.json", "w") as f:
195            json.dump(self.data_json, f, indent=2)
196
197        with open("groups.json", "w") as f:
198            json.dump(self.group_json, f, indent=2)
199
200    def analyze_impact(self, change_files):
201        tdd_data = self._read_json("developtools/integration_verification/tools/gated_check_in/ace_engine.json") or {}
202        adapted_targets = set(tdd_data.get("adapted_test_targets", []))
203        adapting_targets = set(tdd_data.get("adapting_test_targets", []))
204
205        change_set = set(change_files)
206        impacted = []
207
208        for target in self.data_json:
209            target_sets = {
210                "source_list": set(target["source_list"]),
211                "deps_list": set(target["deps_list"]),
212                "includes_list": set(target["includes_list"]),
213                "configs_list": set(target["configs_list"]),
214                "source_h": set(target["source_h"]),
215                "dep_h": set(target["dep_h"]),
216                "includes_h": set(target["includes_h"]),
217                "configs_h": set(target["configs_h"])
218            }
219            if any(change_set & s for s in target_sets.values()):
220                if target["test_target"] not in adapting_targets:
221                    impacted.append(target["test_target"])
222
223        return self.ret_build_target(impacted, change_files)
224
225    @lru_cache(maxsize=128)
226    def _read_file(self, file_path):
227        try:
228            with open(file_path, 'r', encoding='utf-8') as f:
229                return f.read()
230        except Exception as e:
231            return ""
232
233    def ret_build_target(self, impacted, change_files):
234        if not impacted:
235            for file in change_files:
236                if file.endswith(".h"):
237                    return ["TDDarkui_ace_engine"]
238            return ["foundation/arkui/ace_engine/test/unittest/adapter/ohos/entrance:container_test"]
239        return impacted
240
241    def _get_deps_list(self, target):
242        ret = []
243        for dep in target["deps_list"]:
244            ret = list(set(ret + self._get_source_list(dep)))
245            ret.append(os.path.join(dep.split(":", 1)[0], "BUILD.gn"))
246        return ret
247
248    def _get_source_list(self, dep):
249        for target in self.data_json:
250            if dep == target["test_target"]:
251                return target["source_list"]
252        return []
253
254    def _process_includes(self, line):
255        for pattern in [r'#include\s*"(.*?)"', r'#include\s*<(.*?)>']:
256            match = re.match(pattern, line)
257            if match and (header := match.group(1)).endswith('.h'):
258                return header
259        return None
260
261    def _process_unittest(self, match, file_path):
262        base_path = os.path.dirname(file_path)
263        if base_path == "foundation/arkui/ace_engine/test/unittest":
264            target_name = match.group(1)
265            target_content = match.group(2)
266        else:
267            target_name = match.group(2)
268            target_content = match.group(3)
269
270
271        sources = self._get_gn_content(self.sources_pattern, target_content, base_path)
272        sources.append(file_path)
273        deps = self._get_gn_content(self.deps_pattern, target_content, base_path)
274        includes = self._get_include_files(self._get_gn_content(self.includes_pattern, target_content, base_path))
275        configs = self._get_gn_content(self.configs_pattern, target_content, base_path)
276
277        source_h = {h for s in sources for h in self.process_file(s)}
278        dep_h = {h for d in deps for h in self.process_file(d)}
279        include_h = {h for s in includes for h in self.process_file(s)}
280        config_h = {h for d in configs for h in self.process_file(d)}
281        if match.group(1) == "ace":
282            for match_ in self.type_pattern.finditer(target_content):
283                deps += self.type_deps[match_.group(1)]
284        build_target = f"{os.path.dirname(file_path)}:{target_name}"
285        self.data_json.append({
286            "test_target": build_target,
287            "source_list": sources,
288            "deps_list": deps,
289            "includes_list": includes,
290            "configs_list": configs,
291            "source_h": list(source_h),
292            "dep_h": list(dep_h),
293            "includes_h": list(include_h),
294            "configs_h": list(config_h)
295        })
296
297    def _get_include_files(self, includes_list):
298        all_files = []
299        for path in includes_list:
300            for root, dirs, files in os.walk(path):
301                for file in files:
302                    all_files.append(os.path.join(root, file))
303        return all_files
304
305    def _process_group(self, match, file_path):
306        group_name = match.group(1)
307        group_content = match.group(2)
308        base_path = os.path.dirname(file_path)
309
310        deps = [self._normalize_path(d, base_path).replace("/:", ":")
311                for d in self._get_gn_content(self.deps_pattern, group_content, "")]
312
313        self.group_json.append({
314            "group_name": f"{base_path}:{group_name}",
315            "deps_list": deps
316        })
317
318    def _get_gn_content(self, pattern, content, base_path):
319        all_matches = pattern.finditer(content)
320        sources = []
321        for match in all_matches:
322            matched_content = match.group(1)
323            sources.extend([
324                self._normalize_path(s, base_path)
325                for s in matched_content.split(',') if s.strip()
326            ])
327        return sources
328
329    def _normalize_path(self, s, base_path):
330        s = s.strip().strip('"')
331        if '/' not in s:
332            return os.path.join(base_path, s)
333        return s.replace('$ace_root', self.ace_root)
334
335    def _read_json(self, path):
336        try:
337            with open(path, 'r', encoding='utf-8') as f:
338                return json.load(f)
339        except Exception as e:
340            return {}
341
342
343if __name__ == "__main__":
344    processor = BuildProcessor(
345        root_dir="foundation/arkui/ace_engine",
346        ace_root="foundation/arkui/ace_engine"
347    )
348    processor.execute()
349