1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2025 Northeastern University 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20from collections import defaultdict 21from pathlib import Path 22from typing import List, Optional, Dict 23 24from ohos.sbom.analysis.depend_graph import DependGraphAnalyzer 25from ohos.sbom.data.file_dependence import File, FileType 26from ohos.sbom.data.target import Target 27from ohos.sbom.sbom.metadata.sbom_meta_data import RelationshipType 28 29 30class FileDependencyAnalyzer: 31 def __init__(self, all_target_depend: DependGraphAnalyzer): 32 self._depend_graph = all_target_depend 33 self._file_dependencies: Dict[str, File] = {} 34 self._target_name_map_file = defaultdict(list) 35 36 def build_start(self, target_name: str): 37 self._depend_graph.dfs_downstream( 38 start=target_name, 39 max_depth=None, 40 pre_visit=self._pre_visit_callback, 41 post_visit=self._post_visit_callback 42 ) 43 44 def get_file_dependencies(self) -> Dict[str, File]: 45 return self._file_dependencies 46 47 def get_target_name_map_file(self) -> Dict[str, List[str]]: 48 return self._target_name_map_file 49 50 def build_all_install_deps_optimized(self, install_targets: List[str]): 51 virtual_root = "__ALL_INSTALL_ROOT__" 52 53 try: 54 55 self._depend_graph.add_virtual_root(virtual_root, install_targets) 56 57 print(f"Starting one-time traversal of dependencies for {len(install_targets)} modules...") 58 self._depend_graph.dfs_downstream( 59 start=virtual_root, 60 max_depth=None, 61 pre_visit=self._pre_visit_callback, 62 post_visit=self._post_visit_callback 63 ) 64 print(f"Completed! Collected {len(self._file_dependencies)} files in total") 65 66 finally: 67 self._depend_graph.remove_virtual_root(virtual_root) 68 69 def extract_outputs_and_source_outputs(self, target: Target) -> list: 70 raw_outputs = getattr(target, 'outputs', None) 71 raw_source_outputs = getattr(target, 'source_outputs', None) 72 73 outputs = raw_outputs if isinstance(raw_outputs, (list, tuple)) else [] 74 source_outputs = raw_source_outputs if isinstance(raw_source_outputs, dict) else {} 75 76 result = [] 77 78 for out in outputs: 79 if out and 'unstripped' not in out: 80 result.append(out) 81 82 for output_list in source_outputs.values(): 83 if isinstance(output_list, (list, tuple)) and len(output_list) > 0: 84 first_output = output_list[0] 85 if first_output and 'unstripped' not in first_output: 86 result.append(first_output) 87 88 return result 89 90 def process_source_output_dependencies(self, target: Target, outputs: list, source_list: list): 91 output_by_stem = {Path(out).stem: out for out in outputs} 92 matched_outputs = set() 93 94 for source in source_list: 95 stem = Path(source).stem 96 if stem in output_by_stem: 97 output_file = self._file_dependencies.setdefault( 98 output_by_stem[stem], File(output_by_stem[stem], target) 99 ) 100 source_file = self._file_dependencies.setdefault(source, File(source, None)) 101 output_file.add_dependency(RelationshipType.GENERATED_FROM, source_file) 102 matched_outputs.add(output_by_stem[stem]) 103 104 remaining_outputs = [out for out in outputs if out not in matched_outputs] 105 for out in remaining_outputs: 106 output_file = self._file_dependencies.setdefault(out, File(out, target)) 107 for source in source_list: 108 source_file = self._file_dependencies.setdefault(source, File(source, None)) 109 output_file.add_dependency(RelationshipType.GENERATED_FROM, source_file) 110 111 def process_target(self, target: Target, all_outputs: list): 112 try: 113 114 if not all_outputs: 115 return 116 117 source_list = self.get_source_list(target) 118 119 self.process_source_output_dependencies(target, all_outputs, source_list) 120 121 except Exception as e: 122 print(f"Error processing target '{getattr(target, 'target_name', 'unknown')}': {e}") 123 124 def process_libs_dependencies(self, target: Target, outputs: list): 125 """ 126 Process library dependencies from the target's 'libs' field and link them to output files. 127 Handles both static and dynamic libraries with appropriate relationship types. 128 """ 129 try: 130 # Extract and normalize the 'libs' list from the target 131 lib_list = getattr(target, 'libs', None) 132 if not lib_list: 133 return 134 if isinstance(lib_list, str): 135 lib_list = [lib_list] 136 elif not isinstance(lib_list, (list, tuple)): 137 return 138 139 # Clean up library names (strip whitespace and remove empty entries) 140 libs = [lib.strip() for lib in lib_list if isinstance(lib, str) and lib.strip()] 141 142 # Parse dependencies into static and dynamic libraries 143 dep_result = self.extract_libs_dependencies(libs) 144 145 # Process each output file 146 for out in outputs: 147 output_file = self._file_dependencies.get(out) 148 if not output_file: 149 continue # Skip if output file is not tracked 150 151 # Handle static library dependencies 152 for static_lib in dep_result.get('static', []): 153 lib_file = self._file_dependencies.setdefault( 154 static_lib, 155 File(static_lib, None, file_type=FileType.STATIC_LIBRARY) 156 ) 157 output_file.add_dependency_by_file_type(lib_file) 158 159 # Handle dynamic library dependencies 160 for dynamic_lib in dep_result.get('dynamic', []): 161 lib_file = self._file_dependencies.setdefault( 162 dynamic_lib, 163 File(dynamic_lib, None, file_type=FileType.SHARED_LIBRARY) 164 ) 165 output_file.add_dependency_by_file_type(lib_file) 166 167 except Exception as e: 168 print(f"Error processing libs for target '{getattr(target, 'target_name', 'unknown')}': {e}") 169 170 def process_ldflags_dependencies(self, target: Target, outputs: list): 171 """ 172 Process library dependencies extracted from the target's 'ldflags'. 173 Resolves static and dynamic libraries specified via -l, .a/.so paths, or linker flags. 174 Links them to output files with appropriate relationship types. 175 """ 176 try: 177 # Extract and normalize ldflags from target 178 ldflags_list = getattr(target, 'ldflags', None) 179 if not ldflags_list: 180 return 181 if isinstance(ldflags_list, str): 182 ldflags_list = [ldflags_list] 183 elif not isinstance(ldflags_list, (list, tuple)): 184 return 185 186 # Clean up flags: strip and filter valid strings 187 ldflags = [flag.strip() for flag in ldflags_list if isinstance(flag, str) and flag.strip()] 188 189 # Parse dependencies from ldflags 190 dep_result = self.extract_ldflags_dependencies(ldflags) 191 192 # Process each output file 193 for out in outputs: 194 output_file = self._file_dependencies.get(out) 195 if not output_file: 196 continue # Skip if output is not tracked 197 198 # Handle static library dependencies 199 for static_lib in dep_result.get('static', []): 200 lib_file = self._file_dependencies.setdefault( 201 static_lib, 202 File(static_lib, None, file_type=FileType.STATIC_LIBRARY) 203 ) 204 output_file.add_dependency_by_file_type(lib_file) 205 206 # Handle dynamic library dependencies 207 for dynamic_lib in dep_result.get('dynamic', []): 208 lib_file = self._file_dependencies.setdefault( 209 dynamic_lib, 210 File(dynamic_lib, None, file_type=FileType.SHARED_LIBRARY) 211 ) 212 output_file.add_dependency_by_file_type(lib_file) 213 214 except Exception as e: 215 print(f"Error processing libs for executable '{getattr(target, 'target_name', 'unknown')}': {e}") 216 217 def extract_deps(self, target: Target, outputs): 218 """ 219 Extract and process dependencies from the target's 'deps' field. 220 For each dependency: 221 - Resolve the target in the dependency graph 222 - Skip metadata generator targets 223 - Link output files using appropriate file types 224 All errors are logged but do not block processing of other deps. 225 """ 226 try: 227 # Extract and normalize the 'deps' list 228 dep_list = getattr(target, 'deps', None) 229 if not dep_list: 230 return 231 if isinstance(dep_list, str): 232 dep_list = [dep_list] 233 elif not isinstance(dep_list, (list, tuple)): 234 return 235 236 # Process each dependency 237 for dep in dep_list: 238 self._process_single_dep(dep, outputs, target) 239 240 except Exception as e: 241 print(f"Error processing deps for executable '{getattr(target, 'target_name', 'unknown')}': {e}") 242 243 def extract_libs_dependencies(self, libs: List[str]) -> Dict[str, List[str]]: 244 static_libs = [] 245 dynamic_libs = [] 246 247 for lib in libs: 248 if not isinstance(lib, str) or not lib.strip(): 249 continue 250 lib = lib.strip() 251 if lib.endswith('.a'): 252 basename = os.path.basename(lib) 253 static_libs.append(basename) 254 else: 255 if lib.startswith('lib') and (lib.endswith('.so') or '.so.' in lib): 256 so_name = lib.split('.so')[0] + '.so' 257 dynamic_libs.append(so_name) 258 else: 259 dyn_name = f"lib{lib}.so" if not lib.startswith('lib') else f"{lib}.so" 260 dynamic_libs.append(dyn_name) 261 262 def unique(lst): 263 seen = set() 264 result = [] 265 for x in lst: 266 if x not in seen: 267 seen.add(x) 268 result.append(x) 269 return result 270 271 return { 272 'static': unique(static_libs), 273 'dynamic': unique(dynamic_libs) 274 } 275 276 def extract_ldflags_dependencies(self, ldflags: List[str]) -> Dict[str, List[str]]: 277 static_libs = [] 278 dynamic_libs = [] 279 is_static_mode = False 280 281 i = 0 282 while i < len(ldflags): 283 flag = ldflags[i].strip() 284 if not flag: 285 i += 1 286 continue 287 288 if flag in ("-Wl,-Bstatic", "-static"): 289 is_static_mode = True 290 elif flag in ("-Wl,-Bdynamic", "-shared"): 291 is_static_mode = False 292 293 elif flag.startswith("-l"): 294 self._handle_library_flag(flag[2:], is_static_mode, static_libs, dynamic_libs) 295 296 elif flag == "-l" and i + 1 < len(ldflags): 297 lib_name = ldflags[i + 1].strip() 298 if lib_name: 299 self._handle_library_flag(lib_name, is_static_mode, static_libs, dynamic_libs) 300 i += 1 301 302 elif flag.startswith('-Wl,--exclude-libs='): 303 self._handle_exclude_libs(flag, static_libs) 304 305 elif self._is_library_path(flag): 306 basename = self._normalize_library_path(flag) 307 if basename.endswith('.a'): 308 static_libs.append(basename) 309 else: 310 dynamic_libs.append(basename) 311 312 elif flag.startswith("-stdlib="): 313 lib_name = flag.split("=", 1)[1] 314 self._add_lib(lib_name, is_static_mode, static_libs, dynamic_libs) 315 316 elif flag.startswith("-rtlib="): 317 lib_name = flag.split("=", 1)[1] 318 self._add_lib(f"{lib_name}_rt", is_static_mode, static_libs, dynamic_libs) 319 320 i += 1 321 322 return { 323 "static": self._unique(static_libs), 324 "dynamic": self._unique(dynamic_libs) 325 } 326 327 def get_source_list(self, target: Target) -> list: 328 source_list = getattr(target, 'sources', None) or getattr(target, 'source', None) 329 330 if not source_list: 331 return [] 332 333 if isinstance(source_list, str): 334 source_list = [source_list] 335 elif not isinstance(source_list, (list, tuple)): 336 source_list = [] 337 338 return [src.strip() for src in source_list if isinstance(src, str) and src.strip()] 339 340 def get_remaining_outputs(self, target: Target, outputs: list) -> list: 341 source_list = self.get_source_list(target) 342 source_stems = {Path(src).stem for src in source_list if src.strip()} 343 return [ 344 out for out in outputs 345 if isinstance(out, str) and out.strip() and Path(out).stem not in source_stems 346 ] 347 348 def _post_visit_callback(self, node: str, depth: int, parent: Optional[str]) -> None: 349 350 target = self._depend_graph.get_target(node) 351 if self._is_metadata_generator_target(target.target_name): 352 return 353 outputs = self.extract_outputs_and_source_outputs(target) 354 target_type = target.type 355 if target_type == 'copy': 356 self._handle_copy(target, outputs) 357 elif target_type == 'group': 358 return 359 elif target_type == 'source_set': 360 self._handle_source_set(target, outputs) 361 elif target_type == 'executable': 362 self._handle_executable(target, outputs) 363 elif target_type == 'shared_library': 364 self._handle_shared_library(target, outputs) 365 elif target_type == 'action': 366 self._handle_action(target, outputs) 367 elif target_type == 'action_foreach': 368 self._handle_action_foreach(target, outputs) 369 elif target_type == 'generated_file': 370 self._handle_executable(target, outputs) 371 elif target_type == 'rust_library': 372 self._handle_rust_library(target, outputs) 373 elif target_type == 'rust_proc_macro': 374 self._handle_rust_proc_macro(target, outputs) 375 elif target_type == 'static_library': 376 self._handle_static_library(target, outputs) 377 elif target_type == 'virtual_root': 378 return 379 else: 380 print(f"Error: unknown target type '{target_type}' for target '{target.target_name}'") 381 return 382 383 def _pre_visit_callback(self, node: str, depth: int, parent: Optional[str]) -> bool: 384 target = self._depend_graph.get_target(node) 385 386 if target.target_name in self._target_name_map_file: 387 return True 388 389 if self._is_metadata_generator_target(target.target_name): 390 return False 391 392 self._target_name_map_file[target.target_name] = [] 393 394 outputs = getattr(target, 'outputs', None) or [] 395 created_any = False 396 397 for output in outputs: 398 if not output: 399 continue 400 if output in self._file_dependencies or 'unstripped' in output: 401 continue 402 output_file = File(output, target) 403 self._file_dependencies[output] = output_file 404 self._target_name_map_file[target.target_name].append(output_file) 405 created_any = True 406 407 if not created_any and hasattr(target, 'source_outputs') and target.source_outputs: 408 for output_list in target.source_outputs.values(): 409 if not output_list: 410 continue 411 primary_output = output_list[0] 412 if not primary_output or primary_output in self._file_dependencies or 'unstripped' in primary_output: 413 continue 414 output_file = File(primary_output, target) 415 self._file_dependencies[primary_output] = output_file 416 self._target_name_map_file[target.target_name].append(output_file) 417 created_any = True 418 419 return True 420 421 def _link_dependency_outputs( 422 self, 423 outputs: list, 424 dep_out_file_list: list, 425 target: Target 426 ): 427 """Link dependency output files to current target's output files.""" 428 for out in outputs or []: 429 if out not in self._file_dependencies: 430 self._file_dependencies[out] = File(out, target) 431 file_out = self._file_dependencies[out] 432 file_out.add_dependency_list_by_file_type(dep_out_file_list) 433 434 def _process_single_dep(self, dep, outputs, target): 435 """Helper to process one dependency, avoids deep nesting.""" 436 try: 437 dep_target = self._depend_graph.get_target(dep) 438 if not dep_target: 439 return 440 441 if self._is_metadata_generator_target(dep_target.target_name): 442 return 443 444 dep_out_file_list = self._target_name_map_file.get(dep_target.target_name, []) 445 if not dep_out_file_list: 446 return 447 448 self._link_dependency_outputs(outputs, dep_out_file_list, target) 449 450 except Exception as e: 451 print(f"Error processing dep '{dep}': {e}") 452 453 def _is_metadata_generator_target(self, target_name: str) -> bool: 454 core_name = target_name.split('(', 1)[0] 455 456 metadata_suffixes = ( 457 '__notice', 458 '__check', 459 '_info', 460 'notice.txt', 461 '_notice' 462 ) 463 464 return core_name.endswith(metadata_suffixes) 465 466 def _get_or_create_file(self, relative_path): 467 if relative_path in self._file_dependencies: 468 file = self._file_dependencies[relative_path] 469 else: 470 file = File(relative_path, None) 471 self._file_dependencies[relative_path] = file 472 return file 473 474 def _handle_library_flag(self, lib_name: str, is_static_mode: bool, static_libs: list, dynamic_libs: list): 475 base = lib_name if lib_name.startswith('lib') else f"lib{lib_name}" 476 if is_static_mode: 477 static_libs.append(f"{base}.a") 478 else: 479 dynamic_libs.append(f"{base}.so") 480 481 def _handle_exclude_libs(self, flag: str, static_libs: list): 482 parts = flag.split('=', 2) 483 if len(parts) >= 3: 484 lib_path = parts[2] 485 basename = os.path.basename(lib_path) 486 if basename.endswith('.a'): 487 static_libs.append(basename) 488 489 def _is_library_path(self, flag: str) -> bool: 490 return flag.endswith('.a') or flag.endswith('.so') or '.so.' in flag 491 492 def _normalize_library_path(self, flag: str) -> str: 493 basename = os.path.basename(flag) 494 if '.so.' in basename: 495 stem = basename.split('.so.')[0] 496 return f"{stem}.so" 497 return basename 498 499 def _add_lib(self, lib_name: str, is_static_mode: bool, static_libs: list, dynamic_libs: list): 500 base = lib_name if lib_name.startswith('lib') else f"lib{lib_name}" 501 if is_static_mode: 502 static_libs.append(f"{base}.a") 503 else: 504 dynamic_libs.append(f"{base}.so") 505 506 def _unique(self, lst: list) -> list: 507 seen = set() 508 result = [] 509 for x in lst: 510 if x not in seen: 511 seen.add(x) 512 result.append(x) 513 return result 514 515 def _handle_copy(self, target: Target, outputs): 516 source_list = getattr(target, 'sources', None) or getattr(target, 'source', None) 517 if not source_list: 518 return 519 520 outputs = outputs or getattr(target, 'outputs', None) 521 if not outputs: 522 return 523 524 for source, output_path in zip(source_list, outputs): 525 if 'unstripped' in source or 'unstripped' in output_path: 526 continue 527 528 source_file = self._get_or_create_file(source) 529 530 if output_path not in self._file_dependencies: 531 self._file_dependencies[output_path] = File(output_path, target) 532 533 self._file_dependencies[output_path].add_dependency(RelationshipType.COPY_OF, source_file) 534 535 def _handle_source_set(self, target: Target, outputs): 536 self.process_target(target, outputs) 537 remain_outputs = self.get_remaining_outputs(target, outputs) 538 self.process_libs_dependencies(target, remain_outputs) 539 self.process_ldflags_dependencies(target, remain_outputs) 540 self.extract_deps(target, remain_outputs) 541 542 def _handle_executable(self, target: Target, outputs): 543 self.process_target(target, outputs) 544 remain_outputs = self.get_remaining_outputs(target, outputs) 545 self.process_libs_dependencies(target, remain_outputs) 546 self.process_ldflags_dependencies(target, remain_outputs) 547 self.extract_deps(target, remain_outputs) 548 549 def _handle_shared_library(self, target: Target, outputs): 550 self.process_target(target, outputs) 551 remain_outputs = self.get_remaining_outputs(target, outputs) 552 self.process_libs_dependencies(target, remain_outputs) 553 self.process_ldflags_dependencies(target, remain_outputs) 554 self.extract_deps(target, remain_outputs) 555 556 def _handle_static_library(self, target: Target, outputs): 557 self.process_target(target, outputs) 558 remain_outputs = self.get_remaining_outputs(target, outputs) 559 self.process_libs_dependencies(target, remain_outputs) 560 self.process_ldflags_dependencies(target, remain_outputs) 561 self.extract_deps(target, remain_outputs) 562 563 def _handle_action(self, target: Target, outputs): 564 self.process_target(target, outputs) 565 remain_outputs = self.get_remaining_outputs(target, outputs) 566 self.process_libs_dependencies(target, remain_outputs) 567 self.process_ldflags_dependencies(target, remain_outputs) 568 self.extract_deps(target, remain_outputs) 569 570 def _handle_action_foreach(self, target: Target, outputs): 571 self.process_target(target, outputs) 572 remain_outputs = self.get_remaining_outputs(target, outputs) 573 self.process_libs_dependencies(target, remain_outputs) 574 self.process_ldflags_dependencies(target, remain_outputs) 575 self.extract_deps(target, remain_outputs) 576 577 def _handle_rust_library(self, target: Target, outputs): 578 self.process_target(target, outputs) 579 remain_outputs = self.get_remaining_outputs(target, outputs) 580 self.process_libs_dependencies(target, remain_outputs) 581 self.process_ldflags_dependencies(target, remain_outputs) 582 self.extract_deps(target, remain_outputs) 583 584 def _handle_rust_proc_macro(self, target, outputs): 585 self.process_target(target, outputs) 586 remain_outputs = self.get_remaining_outputs(target, outputs) 587 self.process_libs_dependencies(target, remain_outputs) 588 self.process_ldflags_dependencies(target, remain_outputs) 589 self.extract_deps(target, remain_outputs) 590