1# Copyright 2021 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Common Utils.""" 16 17# pylint: disable=g-importing-member 18from dataclasses import dataclass 19import io 20from pathlib import Path 21from pathlib import PurePath 22import sys 23from typing import List 24from typing import Set 25 26# pylint: disable=g-import-not-at-top 27try: 28 from git import Blob 29 from git import Commit 30 from git import Tree 31except ModuleNotFoundError: 32 print( 33 'ERROR: Please install GitPython by `pip3 install GitPython`.', 34 file=sys.stderr) 35 exit(1) 36 37THIS_DIR = Path(__file__).resolve().parent 38LIBCORE_DIR = THIS_DIR.parent.parent.resolve() 39 40UPSTREAM_CLASS_PATHS = [ 41 'jdk/src/share/classes/', 42 'src/java.base/share/classes/', 43 'src/java.base/linux/classes/', 44 'src/java.base/unix/classes/', 45 'src/java.sql/share/classes/', 46 'src/java.logging/share/classes/', 47 'src/java.prefs/share/classes/', 48 'src/java.net/share/classes/', 49] 50 51UPSTREAM_TEST_PATHS = [ 52 'jdk/test/', 53 'test/jdk/', 54] 55 56UPSTREAM_SEARCH_PATHS = UPSTREAM_CLASS_PATHS + UPSTREAM_TEST_PATHS 57 58OJLUNI_JAVA_BASE_PATH = 'ojluni/src/main/java/' 59OJLUNI_TEST_PATH = 'ojluni/src/' 60 61 62@dataclass 63class ExpectedUpstreamEntry: 64 """A map entry in the EXPECTED_UPSTREAM file.""" 65 dst_path: str # destination path 66 git_ref: str # a git reference to an upstream commit 67 src_path: str # source path in the commit pointed by the git_ref 68 comment_lines: str = '' # The comment lines above the entry line 69 70 def __eq__(self, other): 71 if not isinstance(other, ExpectedUpstreamEntry): 72 return False 73 74 return (self.dst_path == other.dst_path and 75 self.git_ref == other.git_ref and 76 self.src_path == other.src_path and 77 self.comment_lines == other.comment_lines) 78 79 80class ExpectedUpstreamFile: 81 """A file object representing the EXPECTED_UPSTREAM file.""" 82 83 def __init__(self, file_or_bytes=LIBCORE_DIR / 'EXPECTED_UPSTREAM'): 84 if isinstance(file_or_bytes, Path): 85 path = Path(file_or_bytes) 86 # pylint: disable=unnecessary-lambda 87 self.openable = lambda mode: path.open(mode) 88 elif isinstance(file_or_bytes, bytes): 89 self.openable = lambda mode: io.StringIO(file_or_bytes.decode('utf-8')) 90 else: 91 raise NotImplementedError('Only support bytes or Path type') 92 93 def read_all_entries(self) -> List[ExpectedUpstreamEntry]: 94 """Read all entries from the file.""" 95 result: List[ExpectedUpstreamEntry] = [] 96 with self.openable('r') as file: 97 comment_lines = '' # Store the comment lines in the next entry 98 for line in file: 99 stripped = line.strip() 100 # Ignore empty lines and comments starting with '#' 101 if not stripped or stripped.startswith('#'): 102 comment_lines += line 103 continue 104 105 entry = self.parse_line(stripped, comment_lines) 106 result.append(entry) 107 comment_lines = '' 108 109 return result 110 111 def write_all_entries(self, entries: List[ExpectedUpstreamEntry]) -> None: 112 """Write all entries into the file.""" 113 with self.openable('w') as file: 114 for e in entries: 115 file.write(e.comment_lines) 116 file.write(','.join([e.dst_path, e.git_ref, e.src_path])) 117 file.write('\n') 118 119 def write_new_entry(self, entry: ExpectedUpstreamEntry, 120 entries: List[ExpectedUpstreamEntry] = None) -> None: 121 if entries is None: 122 entries = self.read_all_entries() 123 124 entries.append(entry) 125 self.sort_and_write_all_entries(entries) 126 127 def sort_and_write_all_entries(self, 128 entries: List[ExpectedUpstreamEntry]) -> None: 129 header = entries[0].comment_lines 130 entries[0].comment_lines = '' 131 entries.sort(key=lambda e: e.dst_path) 132 # Keep the header above the first entry 133 entries[0].comment_lines = header + entries[0].comment_lines 134 self.write_all_entries(entries) 135 136 def get_new_or_modified_entries( 137 self, other: ExpectedUpstreamEntry) -> List[ExpectedUpstreamEntry]: 138 r"""Return a list of modified and added entries from the other file. 139 140 Args: 141 other: the other file 142 143 Returns: 144 A list of modified and added entries 145 """ 146 result: List[ExpectedUpstreamEntry] = [] 147 this_entries = self.read_all_entries() 148 that_entries = other.read_all_entries() 149 this_map = {} 150 for e in this_entries: 151 this_map[e.dst_path] = e 152 153 for e in that_entries: 154 value = this_map.get(e.dst_path) 155 if value is None or value != e: 156 result.append(e) 157 158 return result 159 160 @staticmethod 161 def parse_line(line: str, comment_lines: str) -> ExpectedUpstreamEntry: 162 items = line.split(',') 163 size = len(items) 164 if size != 3: 165 raise ValueError( 166 f"The size must be 3, but is {size}. The line is '{line}'") 167 168 return ExpectedUpstreamEntry(items[0], items[1], items[2], comment_lines) 169 170 171class OjluniFinder: 172 """Finder for java classes or ojluni/ paths.""" 173 174 def __init__(self, existing_paths: List[str]): 175 self.existing_paths = existing_paths 176 177 @staticmethod 178 def translate_ojluni_path_to_class_name(path: str) -> str: 179 r"""Translate an Ojluni file path to full class name. 180 181 Args: 182 path: ojluni path 183 184 Returns: 185 class name or None if class name isn't found. 186 """ 187 188 if not path.endswith('.java'): 189 return None 190 191 if path.startswith(OJLUNI_JAVA_BASE_PATH): 192 base_path = OJLUNI_JAVA_BASE_PATH 193 elif path.startswith(OJLUNI_TEST_PATH): 194 base_path = OJLUNI_TEST_PATH 195 else: 196 return None 197 198 base_len = len(base_path) 199 return path[base_len:-5].replace('/', '.') 200 201 @staticmethod 202 def translate_from_class_name_to_ojluni_path(class_or_path: str) -> str: 203 """Returns a ojluni path from a class name.""" 204 # if it contains '/', then it's a path 205 if '/' in class_or_path: 206 return class_or_path 207 208 base_path = OJLUNI_TEST_PATH if class_or_path.startswith( 209 'test.') else OJLUNI_JAVA_BASE_PATH 210 211 relative_path = class_or_path.replace('.', '/') 212 return f'{base_path}{relative_path}.java' 213 214 def match_path_prefix(self, input_path: str) -> Set[str]: 215 """Returns a set of existing file paths matching the given partial path.""" 216 path_matches = list( 217 filter(lambda path: path.startswith(input_path), self.existing_paths)) 218 result_set: Set[str] = set() 219 # if it's found, just return the result 220 if input_path in path_matches: 221 result_set.add(input_path) 222 else: 223 input_ojluni_path = PurePath(input_path) 224 # the input ends with '/', the autocompletion result contain the children 225 # instead of the matching the prefix in its parent directory 226 input_path_parent_or_self = input_ojluni_path 227 if not input_path.endswith('/'): 228 input_path_parent_or_self = input_path_parent_or_self.parent 229 n_parts = len(input_path_parent_or_self.parts) 230 for match in path_matches: 231 path = PurePath(match) 232 # path.parts[n_parts] should not exceed the index and should be 233 # a valid child path because input_path_parent_or_self must be a 234 # valid directory 235 child = list(path.parts)[n_parts] 236 result = (input_path_parent_or_self / child).as_posix() 237 # if result is not exact, the result represents a directory. 238 if result != match: 239 result += '/' 240 result_set.add(result) 241 242 return result_set 243 244 def match_classname_prefix(self, input_class_name: str) -> List[str]: 245 """Returns a list of package / class names given the partial class name.""" 246 # If '/' exists, it's probably a path, not a partial class name 247 if '/' in input_class_name: 248 return [] 249 250 result_list = [] 251 partial_relative_path = input_class_name.replace('.', '/') 252 for base_path in [OJLUNI_JAVA_BASE_PATH, OJLUNI_TEST_PATH]: 253 partial_ojluni_path = base_path + partial_relative_path 254 result_paths = self.match_path_prefix(partial_ojluni_path) 255 # pylint: disable=cell-var-from-loop 256 result_list.extend( 257 map(lambda path: convert_path_to_java_class_name(path, base_path), 258 list(result_paths))) 259 260 return result_list 261 262 263class OpenjdkFinder: 264 """Finder for java classes or paths in a upstream OpenJDK commit.""" 265 266 def __init__(self, commit: Commit): 267 self.commit = commit 268 269 @staticmethod 270 def translate_src_path_to_ojluni_path(src_path: str) -> str: 271 """Returns None if src_path isn't in a known source directory.""" 272 relative_path = None 273 for base_path in UPSTREAM_TEST_PATHS: 274 if src_path.startswith(base_path): 275 length = len(base_path) 276 relative_path = src_path[length:] 277 break 278 279 if relative_path: 280 return f'{OJLUNI_TEST_PATH}test/{relative_path}' 281 282 for base_path in UPSTREAM_CLASS_PATHS: 283 if src_path.startswith(base_path): 284 length = len(base_path) 285 relative_path = src_path[length:] 286 break 287 288 if relative_path: 289 return f'{OJLUNI_JAVA_BASE_PATH}{relative_path}' 290 291 return None 292 293 def find_src_path_from_classname(self, class_or_path: str) -> str: 294 """Finds a valid source path given a valid class name or path.""" 295 # if it contains '/', then it's a path 296 if '/' in class_or_path: 297 if self.has_file(class_or_path): 298 return class_or_path 299 else: 300 return None 301 302 relative_path = class_or_path.replace('.', '/') 303 src_path = None 304 for base_path in UPSTREAM_SEARCH_PATHS: 305 full_path = f'{base_path}{relative_path}.java' 306 if self.has_file(full_path): 307 src_path = full_path 308 break 309 310 return src_path 311 312 def get_search_paths(self) -> List[str]: 313 return UPSTREAM_SEARCH_PATHS 314 315 def find_src_path_from_ojluni_path(self, ojluni_path: str) -> str: 316 """Returns a source path that guessed from the ojluni_path.""" 317 base_paths = None 318 relative_path = None 319 320 TEST_PATH = OJLUNI_TEST_PATH + 'test/' 321 if ojluni_path.startswith(OJLUNI_JAVA_BASE_PATH): 322 base_paths = UPSTREAM_CLASS_PATHS 323 length = len(OJLUNI_JAVA_BASE_PATH) 324 relative_path = ojluni_path[length:] 325 elif ojluni_path.startswith(TEST_PATH): 326 base_paths = UPSTREAM_TEST_PATHS 327 length = len(TEST_PATH) 328 relative_path = ojluni_path[length:] 329 else: 330 return None 331 332 for base_path in base_paths: 333 full_path = base_path + relative_path 334 if self.has_file(full_path): 335 return full_path 336 337 return None 338 339 def match_path_prefix(self, input_path: str) -> List[str]: 340 """Returns a list of source paths matching the given partial string.""" 341 result_list = [] 342 343 search_tree = self.commit.tree 344 path_obj = PurePath(input_path) 345 is_exact = self.has_file(path_obj.as_posix()) 346 is_directory_path = input_path.endswith('/') 347 exact_obj = search_tree[path_obj.as_posix()] if is_exact else None 348 search_word = '' 349 if is_exact and isinstance(exact_obj, Blob): 350 # an exact file path 351 result_list.append(input_path) 352 return result_list 353 elif is_directory_path: 354 # an exact directory path and can't be a prefix directory name. 355 if is_exact: 356 search_tree = exact_obj 357 else: 358 # Such path doesn't exist, and thus returns empty list 359 return result_list 360 elif len(path_obj.parts) >= 2 and not is_directory_path: 361 parent_path = path_obj.parent.as_posix() 362 if self.has_file(parent_path): 363 search_tree = search_tree[parent_path] 364 search_word = path_obj.name 365 else: 366 # Return empty list because no such path is found 367 return result_list 368 else: 369 search_word = input_path 370 371 for tree in search_tree.trees: 372 tree_path = PurePath(tree.path) 373 if tree_path.name.startswith(search_word): 374 # Append '/' to indicate directory type. If the result has this item 375 # only, shell should auto-fill the input, and thus 376 # next tabbing in shell should fall into the above condition 377 # `is_exact and input_path.endswith('/')` and will search in the child 378 # tree. 379 result_path = tree.path + '/' 380 result_list.append(result_path) 381 382 for blob in search_tree.blobs: 383 blob_path = PurePath(blob.path) 384 if blob_path.name.startswith(search_word): 385 result_list.append(blob.path) 386 387 return result_list 388 389 def match_classname_prefix(self, input_class_name: str) -> List[str]: 390 """Return a list of package / class names from given commit and input.""" 391 # If '/' exists, it's probably a path, not a class name. 392 if '/' in input_class_name: 393 return [] 394 395 result_list = [] 396 for base_path in UPSTREAM_SEARCH_PATHS: 397 base_len = len(base_path) 398 path = base_path + input_class_name.replace('.', '/') 399 path_results = self.match_path_prefix(path) 400 for p in path_results: 401 relative_path = p[base_len:] 402 if relative_path.endswith('.java'): 403 relative_path = relative_path[0:-5] 404 result_list.append(relative_path.replace('/', '.')) 405 406 return result_list 407 408 def has_file(self, path: str) -> bool: 409 """Returns True if the directory / file exists in the tree.""" 410 return has_file_in_tree(path, self.commit.tree) 411 412 413def convert_path_to_java_class_name(path: str, base_path: str) -> str: 414 base_len = len(base_path) 415 result = path[base_len:] 416 if result.endswith('.java'): 417 result = result[0:-5] 418 result = result.replace('/', '.') 419 return result 420 421 422def has_file_in_tree(path: str, tree: Tree) -> bool: 423 """Returns True if the directory / file exists in the tree.""" 424 try: 425 # pylint: disable=pointless-statement 426 tree[path] 427 return True 428 except KeyError: 429 return False 430