• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2025 Northeastern University
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20from collections import defaultdict
21from pathlib import Path
22from typing import List, Optional, Dict
23
24from ohos.sbom.analysis.depend_graph import DependGraphAnalyzer
25from ohos.sbom.data.file_dependence import File, FileType
26from ohos.sbom.data.target import Target
27from ohos.sbom.sbom.metadata.sbom_meta_data import RelationshipType
28
29
30class FileDependencyAnalyzer:
31    def __init__(self, all_target_depend: DependGraphAnalyzer):
32        self._depend_graph = all_target_depend
33        self._file_dependencies: Dict[str, File] = {}
34        self._target_name_map_file = defaultdict(list)
35
36    def build_start(self, target_name: str):
37        self._depend_graph.dfs_downstream(
38            start=target_name,
39            max_depth=None,
40            pre_visit=self._pre_visit_callback,
41            post_visit=self._post_visit_callback
42        )
43
44    def get_file_dependencies(self) -> Dict[str, File]:
45        return self._file_dependencies
46
47    def get_target_name_map_file(self) -> Dict[str, List[str]]:
48        return self._target_name_map_file
49
50    def build_all_install_deps_optimized(self, install_targets: List[str]):
51        virtual_root = "__ALL_INSTALL_ROOT__"
52
53        try:
54
55            self._depend_graph.add_virtual_root(virtual_root, install_targets)
56
57            print(f"Starting one-time traversal of dependencies for {len(install_targets)} modules...")
58            self._depend_graph.dfs_downstream(
59                start=virtual_root,
60                max_depth=None,
61                pre_visit=self._pre_visit_callback,
62                post_visit=self._post_visit_callback
63            )
64            print(f"Completed! Collected {len(self._file_dependencies)} files in total")
65
66        finally:
67            self._depend_graph.remove_virtual_root(virtual_root)
68
69    def extract_outputs_and_source_outputs(self, target: Target) -> list:
70        raw_outputs = getattr(target, 'outputs', None)
71        raw_source_outputs = getattr(target, 'source_outputs', None)
72
73        outputs = raw_outputs if isinstance(raw_outputs, (list, tuple)) else []
74        source_outputs = raw_source_outputs if isinstance(raw_source_outputs, dict) else {}
75
76        result = []
77
78        for out in outputs:
79            if out and 'unstripped' not in out:
80                result.append(out)
81
82        for output_list in source_outputs.values():
83            if isinstance(output_list, (list, tuple)) and len(output_list) > 0:
84                first_output = output_list[0]
85                if first_output and 'unstripped' not in first_output:
86                    result.append(first_output)
87
88        return result
89
90    def process_source_output_dependencies(self, target: Target, outputs: list, source_list: list):
91        output_by_stem = {Path(out).stem: out for out in outputs}
92        matched_outputs = set()
93
94        for source in source_list:
95            stem = Path(source).stem
96            if stem in output_by_stem:
97                output_file = self._file_dependencies.setdefault(
98                    output_by_stem[stem], File(output_by_stem[stem], target)
99                )
100                source_file = self._file_dependencies.setdefault(source, File(source, None))
101                output_file.add_dependency(RelationshipType.GENERATED_FROM, source_file)
102                matched_outputs.add(output_by_stem[stem])
103
104        remaining_outputs = [out for out in outputs if out not in matched_outputs]
105        for out in remaining_outputs:
106            output_file = self._file_dependencies.setdefault(out, File(out, target))
107            for source in source_list:
108                source_file = self._file_dependencies.setdefault(source, File(source, None))
109                output_file.add_dependency(RelationshipType.GENERATED_FROM, source_file)
110
111    def process_target(self, target: Target, all_outputs: list):
112        try:
113
114            if not all_outputs:
115                return
116
117            source_list = self.get_source_list(target)
118
119            self.process_source_output_dependencies(target, all_outputs, source_list)
120
121        except Exception as e:
122            print(f"Error processing target '{getattr(target, 'target_name', 'unknown')}': {e}")
123
124    def process_libs_dependencies(self, target: Target, outputs: list):
125        """
126        Process library dependencies from the target's 'libs' field and link them to output files.
127        Handles both static and dynamic libraries with appropriate relationship types.
128        """
129        try:
130            # Extract and normalize the 'libs' list from the target
131            lib_list = getattr(target, 'libs', None)
132            if not lib_list:
133                return
134            if isinstance(lib_list, str):
135                lib_list = [lib_list]
136            elif not isinstance(lib_list, (list, tuple)):
137                return
138
139            # Clean up library names (strip whitespace and remove empty entries)
140            libs = [lib.strip() for lib in lib_list if isinstance(lib, str) and lib.strip()]
141
142            # Parse dependencies into static and dynamic libraries
143            dep_result = self.extract_libs_dependencies(libs)
144
145            # Process each output file
146            for out in outputs:
147                output_file = self._file_dependencies.get(out)
148                if not output_file:
149                    continue  # Skip if output file is not tracked
150
151                # Handle static library dependencies
152                for static_lib in dep_result.get('static', []):
153                    lib_file = self._file_dependencies.setdefault(
154                        static_lib,
155                        File(static_lib, None, file_type=FileType.STATIC_LIBRARY)
156                    )
157                    output_file.add_dependency_by_file_type(lib_file)
158
159                # Handle dynamic library dependencies
160                for dynamic_lib in dep_result.get('dynamic', []):
161                    lib_file = self._file_dependencies.setdefault(
162                        dynamic_lib,
163                        File(dynamic_lib, None, file_type=FileType.SHARED_LIBRARY)
164                    )
165                    output_file.add_dependency_by_file_type(lib_file)
166
167        except Exception as e:
168            print(f"Error processing libs for target '{getattr(target, 'target_name', 'unknown')}': {e}")
169
170    def process_ldflags_dependencies(self, target: Target, outputs: list):
171        """
172        Process library dependencies extracted from the target's 'ldflags'.
173        Resolves static and dynamic libraries specified via -l, .a/.so paths, or linker flags.
174        Links them to output files with appropriate relationship types.
175        """
176        try:
177            # Extract and normalize ldflags from target
178            ldflags_list = getattr(target, 'ldflags', None)
179            if not ldflags_list:
180                return
181            if isinstance(ldflags_list, str):
182                ldflags_list = [ldflags_list]
183            elif not isinstance(ldflags_list, (list, tuple)):
184                return
185
186            # Clean up flags: strip and filter valid strings
187            ldflags = [flag.strip() for flag in ldflags_list if isinstance(flag, str) and flag.strip()]
188
189            # Parse dependencies from ldflags
190            dep_result = self.extract_ldflags_dependencies(ldflags)
191
192            # Process each output file
193            for out in outputs:
194                output_file = self._file_dependencies.get(out)
195                if not output_file:
196                    continue  # Skip if output is not tracked
197
198                # Handle static library dependencies
199                for static_lib in dep_result.get('static', []):
200                    lib_file = self._file_dependencies.setdefault(
201                        static_lib,
202                        File(static_lib, None, file_type=FileType.STATIC_LIBRARY)
203                    )
204                    output_file.add_dependency_by_file_type(lib_file)
205
206                # Handle dynamic library dependencies
207                for dynamic_lib in dep_result.get('dynamic', []):
208                    lib_file = self._file_dependencies.setdefault(
209                        dynamic_lib,
210                        File(dynamic_lib, None, file_type=FileType.SHARED_LIBRARY)
211                    )
212                    output_file.add_dependency_by_file_type(lib_file)
213
214        except Exception as e:
215            print(f"Error processing libs for executable '{getattr(target, 'target_name', 'unknown')}': {e}")
216
217    def extract_deps(self, target: Target, outputs):
218        """
219        Extract and process dependencies from the target's 'deps' field.
220        For each dependency:
221          - Resolve the target in the dependency graph
222          - Skip metadata generator targets
223          - Link output files using appropriate file types
224        All errors are logged but do not block processing of other deps.
225        """
226        try:
227            # Extract and normalize the 'deps' list
228            dep_list = getattr(target, 'deps', None)
229            if not dep_list:
230                return
231            if isinstance(dep_list, str):
232                dep_list = [dep_list]
233            elif not isinstance(dep_list, (list, tuple)):
234                return
235
236            # Process each dependency
237            for dep in dep_list:
238                self._process_single_dep(dep, outputs, target)
239
240        except Exception as e:
241            print(f"Error processing deps for executable '{getattr(target, 'target_name', 'unknown')}': {e}")
242
243    def extract_libs_dependencies(self, libs: List[str]) -> Dict[str, List[str]]:
244        static_libs = []
245        dynamic_libs = []
246
247        for lib in libs:
248            if not isinstance(lib, str) or not lib.strip():
249                continue
250            lib = lib.strip()
251            if lib.endswith('.a'):
252                basename = os.path.basename(lib)
253                static_libs.append(basename)
254            else:
255                if lib.startswith('lib') and (lib.endswith('.so') or '.so.' in lib):
256                    so_name = lib.split('.so')[0] + '.so'
257                    dynamic_libs.append(so_name)
258                else:
259                    dyn_name = f"lib{lib}.so" if not lib.startswith('lib') else f"{lib}.so"
260                    dynamic_libs.append(dyn_name)
261
262        def unique(lst):
263            seen = set()
264            result = []
265            for x in lst:
266                if x not in seen:
267                    seen.add(x)
268                    result.append(x)
269            return result
270
271        return {
272            'static': unique(static_libs),
273            'dynamic': unique(dynamic_libs)
274        }
275
276    def extract_ldflags_dependencies(self, ldflags: List[str]) -> Dict[str, List[str]]:
277        static_libs = []
278        dynamic_libs = []
279        is_static_mode = False
280
281        i = 0
282        while i < len(ldflags):
283            flag = ldflags[i].strip()
284            if not flag:
285                i += 1
286                continue
287
288            if flag in ("-Wl,-Bstatic", "-static"):
289                is_static_mode = True
290            elif flag in ("-Wl,-Bdynamic", "-shared"):
291                is_static_mode = False
292
293            elif flag.startswith("-l"):
294                self._handle_library_flag(flag[2:], is_static_mode, static_libs, dynamic_libs)
295
296            elif flag == "-l" and i + 1 < len(ldflags):
297                lib_name = ldflags[i + 1].strip()
298                if lib_name:
299                    self._handle_library_flag(lib_name, is_static_mode, static_libs, dynamic_libs)
300                i += 1
301
302            elif flag.startswith('-Wl,--exclude-libs='):
303                self._handle_exclude_libs(flag, static_libs)
304
305            elif self._is_library_path(flag):
306                basename = self._normalize_library_path(flag)
307                if basename.endswith('.a'):
308                    static_libs.append(basename)
309                else:
310                    dynamic_libs.append(basename)
311
312            elif flag.startswith("-stdlib="):
313                lib_name = flag.split("=", 1)[1]
314                self._add_lib(lib_name, is_static_mode, static_libs, dynamic_libs)
315
316            elif flag.startswith("-rtlib="):
317                lib_name = flag.split("=", 1)[1]
318                self._add_lib(f"{lib_name}_rt", is_static_mode, static_libs, dynamic_libs)
319
320            i += 1
321
322        return {
323            "static": self._unique(static_libs),
324            "dynamic": self._unique(dynamic_libs)
325        }
326
327    def get_source_list(self, target: Target) -> list:
328        source_list = getattr(target, 'sources', None) or getattr(target, 'source', None)
329
330        if not source_list:
331            return []
332
333        if isinstance(source_list, str):
334            source_list = [source_list]
335        elif not isinstance(source_list, (list, tuple)):
336            source_list = []
337
338        return [src.strip() for src in source_list if isinstance(src, str) and src.strip()]
339
340    def get_remaining_outputs(self, target: Target, outputs: list) -> list:
341        source_list = self.get_source_list(target)
342        source_stems = {Path(src).stem for src in source_list if src.strip()}
343        return [
344            out for out in outputs
345            if isinstance(out, str) and out.strip() and Path(out).stem not in source_stems
346        ]
347
348    def _post_visit_callback(self, node: str, depth: int, parent: Optional[str]) -> None:
349
350        target = self._depend_graph.get_target(node)
351        if self._is_metadata_generator_target(target.target_name):
352            return
353        outputs = self.extract_outputs_and_source_outputs(target)
354        target_type = target.type
355        if target_type == 'copy':
356            self._handle_copy(target, outputs)
357        elif target_type == 'group':
358            return
359        elif target_type == 'source_set':
360            self._handle_source_set(target, outputs)
361        elif target_type == 'executable':
362            self._handle_executable(target, outputs)
363        elif target_type == 'shared_library':
364            self._handle_shared_library(target, outputs)
365        elif target_type == 'action':
366            self._handle_action(target, outputs)
367        elif target_type == 'action_foreach':
368            self._handle_action_foreach(target, outputs)
369        elif target_type == 'generated_file':
370            self._handle_executable(target, outputs)
371        elif target_type == 'rust_library':
372            self._handle_rust_library(target, outputs)
373        elif target_type == 'rust_proc_macro':
374            self._handle_rust_proc_macro(target, outputs)
375        elif target_type == 'static_library':
376            self._handle_static_library(target, outputs)
377        elif target_type == 'virtual_root':
378            return
379        else:
380            print(f"Error: unknown target type '{target_type}' for target '{target.target_name}'")
381            return
382
383    def _pre_visit_callback(self, node: str, depth: int, parent: Optional[str]) -> bool:
384        target = self._depend_graph.get_target(node)
385
386        if target.target_name in self._target_name_map_file:
387            return True
388
389        if self._is_metadata_generator_target(target.target_name):
390            return False
391
392        self._target_name_map_file[target.target_name] = []
393
394        outputs = getattr(target, 'outputs', None) or []
395        created_any = False
396
397        for output in outputs:
398            if not output:
399                continue
400            if output in self._file_dependencies or 'unstripped' in output:
401                continue
402            output_file = File(output, target)
403            self._file_dependencies[output] = output_file
404            self._target_name_map_file[target.target_name].append(output_file)
405            created_any = True
406
407        if not created_any and hasattr(target, 'source_outputs') and target.source_outputs:
408            for output_list in target.source_outputs.values():
409                if not output_list:
410                    continue
411                primary_output = output_list[0]
412                if not primary_output or primary_output in self._file_dependencies or 'unstripped' in primary_output:
413                    continue
414                output_file = File(primary_output, target)
415                self._file_dependencies[primary_output] = output_file
416                self._target_name_map_file[target.target_name].append(output_file)
417                created_any = True
418
419        return True
420
421    def _link_dependency_outputs(
422            self,
423            outputs: list,
424            dep_out_file_list: list,
425            target: Target
426    ):
427        """Link dependency output files to current target's output files."""
428        for out in outputs or []:
429            if out not in self._file_dependencies:
430                self._file_dependencies[out] = File(out, target)
431            file_out = self._file_dependencies[out]
432            file_out.add_dependency_list_by_file_type(dep_out_file_list)
433
434    def _process_single_dep(self, dep, outputs, target):
435        """Helper to process one dependency, avoids deep nesting."""
436        try:
437            dep_target = self._depend_graph.get_target(dep)
438            if not dep_target:
439                return
440
441            if self._is_metadata_generator_target(dep_target.target_name):
442                return
443
444            dep_out_file_list = self._target_name_map_file.get(dep_target.target_name, [])
445            if not dep_out_file_list:
446                return
447
448            self._link_dependency_outputs(outputs, dep_out_file_list, target)
449
450        except Exception as e:
451            print(f"Error processing dep '{dep}': {e}")
452
453    def _is_metadata_generator_target(self, target_name: str) -> bool:
454        core_name = target_name.split('(', 1)[0]
455
456        metadata_suffixes = (
457            '__notice',
458            '__check',
459            '_info',
460            'notice.txt',
461            '_notice'
462        )
463
464        return core_name.endswith(metadata_suffixes)
465
466    def _get_or_create_file(self, relative_path):
467        if relative_path in self._file_dependencies:
468            file = self._file_dependencies[relative_path]
469        else:
470            file = File(relative_path, None)
471            self._file_dependencies[relative_path] = file
472        return file
473
474    def _handle_library_flag(self, lib_name: str, is_static_mode: bool, static_libs: list, dynamic_libs: list):
475        base = lib_name if lib_name.startswith('lib') else f"lib{lib_name}"
476        if is_static_mode:
477            static_libs.append(f"{base}.a")
478        else:
479            dynamic_libs.append(f"{base}.so")
480
481    def _handle_exclude_libs(self, flag: str, static_libs: list):
482        parts = flag.split('=', 2)
483        if len(parts) >= 3:
484            lib_path = parts[2]
485            basename = os.path.basename(lib_path)
486            if basename.endswith('.a'):
487                static_libs.append(basename)
488
489    def _is_library_path(self, flag: str) -> bool:
490        return flag.endswith('.a') or flag.endswith('.so') or '.so.' in flag
491
492    def _normalize_library_path(self, flag: str) -> str:
493        basename = os.path.basename(flag)
494        if '.so.' in basename:
495            stem = basename.split('.so.')[0]
496            return f"{stem}.so"
497        return basename
498
499    def _add_lib(self, lib_name: str, is_static_mode: bool, static_libs: list, dynamic_libs: list):
500        base = lib_name if lib_name.startswith('lib') else f"lib{lib_name}"
501        if is_static_mode:
502            static_libs.append(f"{base}.a")
503        else:
504            dynamic_libs.append(f"{base}.so")
505
506    def _unique(self, lst: list) -> list:
507        seen = set()
508        result = []
509        for x in lst:
510            if x not in seen:
511                seen.add(x)
512                result.append(x)
513        return result
514
515    def _handle_copy(self, target: Target, outputs):
516        source_list = getattr(target, 'sources', None) or getattr(target, 'source', None)
517        if not source_list:
518            return
519
520        outputs = outputs or getattr(target, 'outputs', None)
521        if not outputs:
522            return
523
524        for source, output_path in zip(source_list, outputs):
525            if 'unstripped' in source or 'unstripped' in output_path:
526                continue
527
528            source_file = self._get_or_create_file(source)
529
530            if output_path not in self._file_dependencies:
531                self._file_dependencies[output_path] = File(output_path, target)
532
533            self._file_dependencies[output_path].add_dependency(RelationshipType.COPY_OF, source_file)
534
535    def _handle_source_set(self, target: Target, outputs):
536        self.process_target(target, outputs)
537        remain_outputs = self.get_remaining_outputs(target, outputs)
538        self.process_libs_dependencies(target, remain_outputs)
539        self.process_ldflags_dependencies(target, remain_outputs)
540        self.extract_deps(target, remain_outputs)
541
542    def _handle_executable(self, target: Target, outputs):
543        self.process_target(target, outputs)
544        remain_outputs = self.get_remaining_outputs(target, outputs)
545        self.process_libs_dependencies(target, remain_outputs)
546        self.process_ldflags_dependencies(target, remain_outputs)
547        self.extract_deps(target, remain_outputs)
548
549    def _handle_shared_library(self, target: Target, outputs):
550        self.process_target(target, outputs)
551        remain_outputs = self.get_remaining_outputs(target, outputs)
552        self.process_libs_dependencies(target, remain_outputs)
553        self.process_ldflags_dependencies(target, remain_outputs)
554        self.extract_deps(target, remain_outputs)
555
556    def _handle_static_library(self, target: Target, outputs):
557        self.process_target(target, outputs)
558        remain_outputs = self.get_remaining_outputs(target, outputs)
559        self.process_libs_dependencies(target, remain_outputs)
560        self.process_ldflags_dependencies(target, remain_outputs)
561        self.extract_deps(target, remain_outputs)
562
563    def _handle_action(self, target: Target, outputs):
564        self.process_target(target, outputs)
565        remain_outputs = self.get_remaining_outputs(target, outputs)
566        self.process_libs_dependencies(target, remain_outputs)
567        self.process_ldflags_dependencies(target, remain_outputs)
568        self.extract_deps(target, remain_outputs)
569
570    def _handle_action_foreach(self, target: Target, outputs):
571        self.process_target(target, outputs)
572        remain_outputs = self.get_remaining_outputs(target, outputs)
573        self.process_libs_dependencies(target, remain_outputs)
574        self.process_ldflags_dependencies(target, remain_outputs)
575        self.extract_deps(target, remain_outputs)
576
577    def _handle_rust_library(self, target: Target, outputs):
578        self.process_target(target, outputs)
579        remain_outputs = self.get_remaining_outputs(target, outputs)
580        self.process_libs_dependencies(target, remain_outputs)
581        self.process_ldflags_dependencies(target, remain_outputs)
582        self.extract_deps(target, remain_outputs)
583
584    def _handle_rust_proc_macro(self, target, outputs):
585        self.process_target(target, outputs)
586        remain_outputs = self.get_remaining_outputs(target, outputs)
587        self.process_libs_dependencies(target, remain_outputs)
588        self.process_ldflags_dependencies(target, remain_outputs)
589        self.extract_deps(target, remain_outputs)
590