• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4#
5# Copyright (c) 2025 Northeastern University
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from collections import defaultdict
20from typing import Dict, Set, List, Optional, Tuple, Any
21
22from ohos.sbom.data.file_dependence import File
23from ohos.sbom.data.manifest import Project
24from ohos.sbom.data.opensource import OpenSource
25from ohos.sbom.data.project_dependence import ProjectDependence
26from ohos.sbom.extraction.local_resource_loader import LocalResourceLoader
27from ohos.sbom.sbom.metadata.sbom_meta_data import RelationshipType
28
29
30class ProjectDependencyAnalyzer:
31
32    def __init__(self):
33        self.manifest = LocalResourceLoader.load_manifest()
34        self._file_to_target_name: Dict[File, str] = {}
35        self._target_to_project: Dict[str, Project] = {}
36        self._project_dependency: Dict[str, ProjectDependence] = {}
37        self._project_to_upstream: Dict[str, List[OpenSource]] = {}
38        self._built = False
39
40    def build(self, all_files: List[File]) -> Dict[str, Dict[str, List[str]]]:
41        self._reset()
42        self._build_mappings(all_files)
43        self._build_project_contains(all_files)
44        self._build_project_dependencies(all_files)
45        self._build_upstream_dependencies()
46        self._built = True
47        return self._format_result()
48
49    def get_file_project_mapping(self) -> Dict[str, str]:
50        mapping = {}
51        for file, target_name in self._file_to_target_name.items():
52            project = self._target_to_project.get(target_name)
53            if project:
54                mapping[file.relative_path] = project.name
55        return mapping
56
57    def get_project_files(self) -> Dict[str, List[str]]:
58        project_files = defaultdict(list)
59        for file, target_name in self._file_to_target_name.items():
60            project = self._target_to_project.get(target_name)
61            if project:
62                project_files[project.name].append(file.relative_path)
63        return dict(project_files)
64
65    def get_project_dependence(self) -> Dict[str, ProjectDependence]:
66        if not self._built:
67            raise RuntimeError("The build() method must be called first to construct the dependency relationships")
68        return self._project_dependency
69
70    def to_dict(self) -> Dict[str, Any]:
71        if not self._built:
72            raise RuntimeError("The build() method must be called first to construct the dependency relationships")
73
74        manifest_info = {
75            "remotes": [{"name": r.name, "fetch": r.fetch} for r in self.manifest.remotes],
76            "default": self.manifest.default,
77            "projects": [p.name for p in self.manifest.projects]
78        }
79
80        return {
81            "manifest": manifest_info,
82            "file_project_mapping": self.get_file_project_mapping(),
83            "project_files": self.get_project_files(),
84            "project_dependencies": [pd.to_dict() for pd in self._project_dependency.values()],
85            "upstream_packages": {
86                project_name: [pkg.to_dict() for pkg in pkgs]
87                for project_name, pkgs in self._project_to_upstream.items()
88            }
89        }
90
91    def _reset(self):
92        self._file_to_target_name.clear()
93        self._target_to_project.clear()
94        self._project_dependency.clear()
95        self._project_to_upstream.clear()
96        self._built = False
97
98    def _build_mappings(self, all_files: List[File]):
99        for file in all_files:
100            if file.source_target is not None:
101                self._file_to_target_name[file] = file.source_target.target_name
102
103        for target_name in set(self._file_to_target_name.values()):
104            project = self.manifest.find_project(target_name)
105            if project is not None:
106                self._target_to_project[target_name] = project
107
108    def _get_or_create_dependency(self, project: Project) -> ProjectDependence:
109        if project.name not in self._project_dependency:
110            self._project_dependency[project.name] = ProjectDependence(project)
111        return self._project_dependency[project.name]
112
113    def _get_file_project(self, file: File) -> Optional[Project]:
114        target_name = self._file_to_target_name.get(file)
115        if not target_name:
116            return None
117        return self._target_to_project.get(target_name)
118
119    def _build_project_contains(self, all_files: List[File]):
120        for file in all_files:
121            project = self._get_file_project(file)
122            if not project:
123                continue
124            pd = self._get_or_create_dependency(project)
125            pd.add_dependency(RelationshipType.GENERATES, file)
126
127    def _build_project_dependencies(self, all_files: List[File]):
128        processed_deps: Set[Tuple[str, str]] = set()
129        for file in all_files:
130            current_project = self._get_file_project(file)
131            if not current_project:
132                continue
133            for relation_type in RelationshipType:
134                # skip RelationshipType.OTHER
135                if relation_type == RelationshipType.OTHER:
136                    continue
137                self._process_dependencies_for_relation(file, relation_type, current_project, processed_deps)
138
139    def _build_upstream_dependencies(self):
140        for project in self._target_to_project.values():
141            upstream_pkgs = LocalResourceLoader.load_opensource(project.path)
142            if not upstream_pkgs:
143                continue
144            pd = self._get_or_create_dependency(project)
145            pd.add_dependency_list(RelationshipType.VARIANT_OF, upstream_pkgs)
146            self._project_to_upstream[project.name] = upstream_pkgs
147
148    def _process_dependencies_for_relation(self, file, relation_type, current_project, processed_deps):
149        for dep_file in file.get_dependencies(relation_type):
150            dep_project = self._get_file_project(dep_file)
151            if not dep_project or dep_project.name == current_project.name:
152                continue
153
154            dep_key = (current_project.name, dep_project.name)
155            if dep_key in processed_deps:
156                continue
157
158            pd_src = self._get_or_create_dependency(current_project)
159            pd_src.add_dependency(RelationshipType.DEPENDS_ON, dep_project)
160            processed_deps.add(dep_key)
161
162    def _format_result(self) -> Dict[str, Dict[str, List[str]]]:
163        result = {}
164        for name, pd in self._project_dependency.items():
165            deps = {}
166            for dep_type, objs in pd.get_dependencies().items():
167                if objs:
168                    key = dep_type.value
169                    deps[key] = sorted({getattr(obj, "name", str(obj)) for obj in objs})
170            result[name] = deps
171        return result
172