1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2025 Northeastern University 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19from collections import defaultdict 20from typing import Dict, Set, List, Optional, Tuple, Any 21 22from ohos.sbom.data.file_dependence import File 23from ohos.sbom.data.manifest import Project 24from ohos.sbom.data.opensource import OpenSource 25from ohos.sbom.data.project_dependence import ProjectDependence 26from ohos.sbom.extraction.local_resource_loader import LocalResourceLoader 27from ohos.sbom.sbom.metadata.sbom_meta_data import RelationshipType 28 29 30class ProjectDependencyAnalyzer: 31 32 def __init__(self): 33 self.manifest = LocalResourceLoader.load_manifest() 34 self._file_to_target_name: Dict[File, str] = {} 35 self._target_to_project: Dict[str, Project] = {} 36 self._project_dependency: Dict[str, ProjectDependence] = {} 37 self._project_to_upstream: Dict[str, List[OpenSource]] = {} 38 self._built = False 39 40 def build(self, all_files: List[File]) -> Dict[str, Dict[str, List[str]]]: 41 self._reset() 42 self._build_mappings(all_files) 43 self._build_project_contains(all_files) 44 self._build_project_dependencies(all_files) 45 self._build_upstream_dependencies() 46 self._built = True 47 return self._format_result() 48 49 def get_file_project_mapping(self) -> Dict[str, str]: 50 mapping = {} 51 for file, target_name in self._file_to_target_name.items(): 52 project = self._target_to_project.get(target_name) 53 if project: 54 mapping[file.relative_path] = project.name 55 return mapping 56 57 def get_project_files(self) -> Dict[str, List[str]]: 58 project_files = defaultdict(list) 59 for file, target_name in self._file_to_target_name.items(): 60 project = self._target_to_project.get(target_name) 61 if project: 62 project_files[project.name].append(file.relative_path) 63 return dict(project_files) 64 65 def get_project_dependence(self) -> Dict[str, ProjectDependence]: 66 if not self._built: 67 raise RuntimeError("The build() method must be called first to construct the dependency relationships") 68 return self._project_dependency 69 70 def to_dict(self) -> Dict[str, Any]: 71 if not self._built: 72 raise RuntimeError("The build() method must be called first to construct the dependency relationships") 73 74 manifest_info = { 75 "remotes": [{"name": r.name, "fetch": r.fetch} for r in self.manifest.remotes], 76 "default": self.manifest.default, 77 "projects": [p.name for p in self.manifest.projects] 78 } 79 80 return { 81 "manifest": manifest_info, 82 "file_project_mapping": self.get_file_project_mapping(), 83 "project_files": self.get_project_files(), 84 "project_dependencies": [pd.to_dict() for pd in self._project_dependency.values()], 85 "upstream_packages": { 86 project_name: [pkg.to_dict() for pkg in pkgs] 87 for project_name, pkgs in self._project_to_upstream.items() 88 } 89 } 90 91 def _reset(self): 92 self._file_to_target_name.clear() 93 self._target_to_project.clear() 94 self._project_dependency.clear() 95 self._project_to_upstream.clear() 96 self._built = False 97 98 def _build_mappings(self, all_files: List[File]): 99 for file in all_files: 100 if file.source_target is not None: 101 self._file_to_target_name[file] = file.source_target.target_name 102 103 for target_name in set(self._file_to_target_name.values()): 104 project = self.manifest.find_project(target_name) 105 if project is not None: 106 self._target_to_project[target_name] = project 107 108 def _get_or_create_dependency(self, project: Project) -> ProjectDependence: 109 if project.name not in self._project_dependency: 110 self._project_dependency[project.name] = ProjectDependence(project) 111 return self._project_dependency[project.name] 112 113 def _get_file_project(self, file: File) -> Optional[Project]: 114 target_name = self._file_to_target_name.get(file) 115 if not target_name: 116 return None 117 return self._target_to_project.get(target_name) 118 119 def _build_project_contains(self, all_files: List[File]): 120 for file in all_files: 121 project = self._get_file_project(file) 122 if not project: 123 continue 124 pd = self._get_or_create_dependency(project) 125 pd.add_dependency(RelationshipType.GENERATES, file) 126 127 def _build_project_dependencies(self, all_files: List[File]): 128 processed_deps: Set[Tuple[str, str]] = set() 129 for file in all_files: 130 current_project = self._get_file_project(file) 131 if not current_project: 132 continue 133 for relation_type in RelationshipType: 134 # skip RelationshipType.OTHER 135 if relation_type == RelationshipType.OTHER: 136 continue 137 self._process_dependencies_for_relation(file, relation_type, current_project, processed_deps) 138 139 def _build_upstream_dependencies(self): 140 for project in self._target_to_project.values(): 141 upstream_pkgs = LocalResourceLoader.load_opensource(project.path) 142 if not upstream_pkgs: 143 continue 144 pd = self._get_or_create_dependency(project) 145 pd.add_dependency_list(RelationshipType.VARIANT_OF, upstream_pkgs) 146 self._project_to_upstream[project.name] = upstream_pkgs 147 148 def _process_dependencies_for_relation(self, file, relation_type, current_project, processed_deps): 149 for dep_file in file.get_dependencies(relation_type): 150 dep_project = self._get_file_project(dep_file) 151 if not dep_project or dep_project.name == current_project.name: 152 continue 153 154 dep_key = (current_project.name, dep_project.name) 155 if dep_key in processed_deps: 156 continue 157 158 pd_src = self._get_or_create_dependency(current_project) 159 pd_src.add_dependency(RelationshipType.DEPENDS_ON, dep_project) 160 processed_deps.add(dep_key) 161 162 def _format_result(self) -> Dict[str, Dict[str, List[str]]]: 163 result = {} 164 for name, pd in self._project_dependency.items(): 165 deps = {} 166 for dep_type, objs in pd.get_dependencies().items(): 167 if objs: 168 key = dep_type.value 169 deps[key] = sorted({getattr(obj, "name", str(obj)) for obj in objs}) 170 result[name] = deps 171 return result 172