1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# 5# Copyright (c) 2025 Northeastern University 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22from pathlib import Path 23from typing import Optional, List, Union, Dict, Any 24 25from ohos.sbom.common.utils import read_json, is_text_file 26from ohos.sbom.data.manifest import Manifest 27from ohos.sbom.data.ninja_json import NinjaJson 28from ohos.sbom.data.opensource import OpenSource 29 30 31class LocalResourceLoader: 32 """ 33 Local resource loader for OpenHarmony codebase. 34 35 Provides methods to load various resource files from the OpenHarmony source tree 36 with caching mechanism to avoid repeated parsing of the same files. 37 """ 38 39 # Class variables for paths 40 _source_root: Optional[str] = None 41 _out_root: Optional[str] = None 42 43 # Resource cache: file path/identifier -> parsed object instance 44 _resource_cache: Dict[str, object] = {} 45 46 @classmethod 47 def set_source_root(cls, source_root: Union[str, Path]) -> None: 48 """ 49 Set the project source root directory path. 50 51 Args: 52 source_root: Path to project source root 53 """ 54 src_path = cls._validate_directory(source_root, "Source root path") 55 cls._source_root = str(src_path) 56 57 @classmethod 58 def set_out_root(cls, out_root: Union[str, Path]) -> None: 59 """ 60 Set the build output directory path. 61 62 Args: 63 out_root: Path to build output directory 64 """ 65 out_path = cls._validate_directory(out_root, "Output directory path") 66 cls._out_root = str(out_path) 67 68 @classmethod 69 def to_local_path(cls, relation_path: Union[str, Path]) -> str: 70 """ 71 Convert a logical path to an absolute filesystem path. 72 73 Args: 74 relation_path: Logical path (e.g. "//kernel/entry.c") 75 76 Returns: 77 Corresponding absolute local path 78 """ 79 source_root = cls._source_root 80 if not relation_path: 81 return source_root 82 83 path_str = str(relation_path).strip() 84 85 if os.path.isabs(path_str) and os.path.exists(path_str): 86 return os.path.abspath(path_str) 87 88 if path_str.startswith("//"): 89 relative_part = path_str[2:] 90 else: 91 relative_part = path_str.lstrip("/") 92 93 local_path = os.path.join(source_root, relative_part) 94 return os.path.normpath(local_path) 95 96 @classmethod 97 def load_ninja_json(cls) -> NinjaJson: 98 """ 99 Load and parse the Ninja build configuration file (JSON format) with caching. 100 101 The parsed configuration is cached to avoid repeated file reads and parsing. 102 103 Returns: 104 NinjaJson: Parsed build configuration object 105 """ 106 # Check cache first 107 cache_key = "ninja_json" 108 cached = cls._get_cache_obj(cache_key) 109 if cached is not None: 110 return cached 111 112 if not cls._source_root: 113 raise RuntimeError("Source root directory not set. Call set_source_root() first.") 114 115 # Construct full path to the build configuration file 116 gn_gen_path = Path(cls._out_root) / "sbom" / "gn_gen.json" 117 118 # Validate file exists with helpful error message 119 if not gn_gen_path.exists(): 120 raise FileNotFoundError( 121 f"Ninja build configuration file not found at: {gn_gen_path.absolute()}\n" 122 "To generate this file, add these arguments to build.sh command:\n" 123 " --gn-flags=--ide=json\n" 124 " --gn-flags=--json-file-name=sbom/gn_gen.json\n" 125 ) 126 127 try: 128 # Read and parse the JSON file 129 data = json.loads(gn_gen_path.read_text(encoding="utf-8")) 130 except json.JSONDecodeError as e: 131 raise json.JSONDecodeError( 132 f"Invalid JSON in build configuration at {gn_gen_path}: {str(e)}", 133 e.doc, e.pos 134 ) 135 except IOError as e: 136 raise IOError( 137 f"Failed to read build configuration from {gn_gen_path}: {str(e)}" 138 ) 139 140 try: 141 # Create and cache the parsed object 142 ninja_json = NinjaJson.from_dict(data) 143 cls._add_cache_obj(cache_key, ninja_json) 144 return ninja_json 145 except (TypeError, ValueError) as e: 146 raise ValueError( 147 f"Failed to convert build configuration data from {gn_gen_path}: {str(e)}" 148 ) 149 150 @classmethod 151 def load_manifest(cls) -> Manifest: 152 """ 153 Load and cache the latest manifest configuration file. 154 155 Returns: 156 Manifest: Parsed manifest object 157 """ 158 # Define cache key (consistent with other methods) 159 cache_key = "manifest" 160 161 # Check cache first 162 cached = cls._get_cache_obj(cache_key) 163 if cached is not None: 164 return cached 165 166 # Find latest manifest file 167 manifest_path = cls._find_latest_manifest() 168 if not manifest_path: 169 raise FileNotFoundError( 170 f"No valid manifest file found in directory: " 171 f"{Path(cls._out_root) / 'sbom' / 'manifests'}" 172 ) 173 174 try: 175 # Parse manifest file 176 manifest = Manifest.from_file(str(manifest_path)) 177 178 # Add to cache before returning 179 cls._add_cache_obj(cache_key, manifest) 180 181 print(f"Successfully loaded and cached manifest from: {manifest_path}") 182 return manifest 183 184 except Exception as e: 185 error_msg = (f"Failed to parse manifest file {manifest_path}. " 186 f"Reason: {str(e)}") 187 raise ValueError(error_msg) from e 188 189 @classmethod 190 def load_opensource(cls, package_path: str) -> Optional[List[OpenSource]]: 191 """ 192 Load OpenSource metadata for a package. 193 194 Args: 195 package_path: Logical path to package (e.g. "//third_party/openssl") 196 197 Returns: 198 List of OpenSource objects, or None if not found/invalid 199 """ 200 local_path = cls.to_local_path(package_path) 201 opensource_file = os.path.join(local_path, "README.OpenSource") 202 203 if not os.path.isfile(opensource_file): 204 return None 205 206 data = read_json(opensource_file) 207 if not data: 208 return None 209 210 if isinstance(data, dict): 211 return [OpenSource.from_dict(data)] 212 elif isinstance(data, list): 213 return [OpenSource.from_dict(item) for item in data if isinstance(item, dict)] 214 return None 215 216 @classmethod 217 def load_text_file(cls, path: str, max_bytes: int = None) -> str: 218 """ 219 Safely read text file content. 220 221 Args: 222 path: Path to file (logical or absolute) 223 max_bytes: Maximum bytes to read (optional) 224 225 Returns: 226 File content as string, or empty string for non-text files 227 """ 228 abs_path = cls.to_local_path(path) 229 230 if not os.path.isfile(abs_path): 231 raise FileNotFoundError(f"File not found: {abs_path}") 232 233 if not is_text_file(abs_path): 234 return "" 235 236 try: 237 with open(abs_path, "r", encoding="utf-8") as f: 238 return f.read(max_bytes) if max_bytes else f.read() 239 except UnicodeDecodeError: 240 return "" 241 except Exception: 242 return "" 243 244 @classmethod 245 def clear_cache(cls) -> None: 246 """Clear all cached resources.""" 247 cls._resource_cache.clear() 248 249 @classmethod 250 def get_cache_size(cls) -> int: 251 """Get current number of cached resources.""" 252 return len(cls._resource_cache) 253 254 @classmethod 255 def _get_cache_obj(cls, obj_name: str) -> Any: 256 """Internal method to get cached object by name.""" 257 return cls._resource_cache.get(obj_name, None) 258 259 @classmethod 260 def _add_cache_obj(cls, obj_name: str, obj: Any) -> None: 261 """Internal method to add cached object by name.""" 262 cls._resource_cache[obj_name] = obj 263 264 @classmethod 265 def _find_latest_manifest(cls) -> Optional[Path]: 266 """ 267 Find the latest manifest file in out/product/sbom/manifests directory. 268 269 Looks for files matching pattern: manifest_tag_YYYYMMDD_HHMMSS.xml 270 271 Returns: 272 Path to latest manifest file, or None if none found 273 """ 274 if not cls._out_root: 275 raise RuntimeError("Out root directory not set. Call set_out_root() first.") 276 277 tag_dir = Path(cls._out_root) / "sbom" / "manifests" 278 279 if not tag_dir.exists() or not tag_dir.is_dir(): 280 print(f"Warning: Manifest tag directory not found: {tag_dir}") 281 return None 282 283 pattern = re.compile(r"manifest_tag_(\d{8})_(\d{6})\.xml$") 284 manifest_files = [] 285 286 for file_path in tag_dir.iterdir(): 287 if file_path.is_file(): 288 match = pattern.match(file_path.name) 289 if match: 290 timestamp = int(match.group(1) + match.group(2)) 291 manifest_files.append((timestamp, file_path)) 292 293 if not manifest_files: 294 print(f"No manifest files found in {tag_dir}") 295 return None 296 297 return max(manifest_files, key=lambda x: x[0])[1] 298 299 @classmethod 300 def _validate_directory(cls, path: Union[str, Path], description: str) -> Path: 301 """ 302 Validate that a path exists and is a directory. 303 304 Args: 305 path: Path to validate (str or Path) 306 description: Description of path for error messages 307 308 Returns: 309 Resolved absolute Path object 310 """ 311 resolved_path = Path(path).resolve() 312 313 if not resolved_path.exists(): 314 raise ValueError(f"{description} does not exist: {resolved_path}") 315 if not resolved_path.is_dir(): 316 raise ValueError(f"{description} is not a directory: {resolved_path}") 317 318 return resolved_path 319