• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Common Utils."""
16
17# pylint: disable=g-importing-member
18from dataclasses import dataclass
19import io
20from pathlib import Path
21from pathlib import PurePath
22import sys
23from typing import List
24from typing import Set
25
26# pylint: disable=g-import-not-at-top
27try:
28  from git import Blob
29  from git import Commit
30  from git import Tree
31except ModuleNotFoundError:
32  print(
33      'ERROR: Please install GitPython by `pip3 install GitPython`.',
34      file=sys.stderr)
35  exit(1)
36
37THIS_DIR = Path(__file__).resolve().parent
38LIBCORE_DIR = THIS_DIR.parent.parent.resolve()
39
40UPSTREAM_CLASS_PATHS = [
41    'jdk/src/share/classes/',
42    'src/java.base/share/classes/',
43    'src/java.base/linux/classes/',
44    'src/java.base/unix/classes/',
45    'src/java.sql/share/classes/',
46    'src/java.logging/share/classes/',
47    'src/java.prefs/share/classes/',
48    'src/java.net/share/classes/',
49]
50
51UPSTREAM_TEST_PATHS = [
52    'jdk/test/',
53    'test/jdk/',
54]
55
56UPSTREAM_SEARCH_PATHS = UPSTREAM_CLASS_PATHS + UPSTREAM_TEST_PATHS
57
58OJLUNI_JAVA_BASE_PATH = 'ojluni/src/main/java/'
59OJLUNI_TEST_PATH = 'ojluni/src/'
60
61
62@dataclass
63class ExpectedUpstreamEntry:
64  """A map entry in the EXPECTED_UPSTREAM file."""
65  dst_path: str  # destination path
66  git_ref: str  # a git reference to an upstream commit
67  src_path: str  # source path in the commit pointed by the git_ref
68  comment_lines: str = ''  # The comment lines above the entry line
69
70  def __eq__(self, other):
71    if not isinstance(other, ExpectedUpstreamEntry):
72      return False
73
74    return (self.dst_path == other.dst_path and
75            self.git_ref == other.git_ref and
76            self.src_path == other.src_path and
77            self.comment_lines == other.comment_lines)
78
79
80class ExpectedUpstreamFile:
81  """A file object representing the EXPECTED_UPSTREAM file."""
82
83  def __init__(self, file_or_bytes=LIBCORE_DIR / 'EXPECTED_UPSTREAM'):
84    if isinstance(file_or_bytes, Path):
85      path = Path(file_or_bytes)
86      # pylint: disable=unnecessary-lambda
87      self.openable = lambda mode: path.open(mode)
88    elif isinstance(file_or_bytes, bytes):
89      self.openable = lambda mode: io.StringIO(file_or_bytes.decode('utf-8'))
90    else:
91      raise NotImplementedError('Only support bytes or Path type')
92
93  def read_all_entries(self) -> List[ExpectedUpstreamEntry]:
94    """Read all entries from the file."""
95    result: List[ExpectedUpstreamEntry] = []
96    with self.openable('r') as file:
97      comment_lines = ''  # Store the comment lines in the next entry
98      for line in file:
99        stripped = line.strip()
100        # Ignore empty lines and comments starting with '#'
101        if not stripped or stripped.startswith('#'):
102          comment_lines += line
103          continue
104
105        entry = self.parse_line(stripped, comment_lines)
106        result.append(entry)
107        comment_lines = ''
108
109    return result
110
111  def write_all_entries(self, entries: List[ExpectedUpstreamEntry]) -> None:
112    """Write all entries into the file."""
113    with self.openable('w') as file:
114      for e in entries:
115        file.write(e.comment_lines)
116        file.write(','.join([e.dst_path, e.git_ref, e.src_path]))
117        file.write('\n')
118
119  def write_new_entry(self, entry: ExpectedUpstreamEntry,
120                      entries: List[ExpectedUpstreamEntry] = None) -> None:
121    if entries is None:
122      entries = self.read_all_entries()
123
124    entries.append(entry)
125    self.sort_and_write_all_entries(entries)
126
127  def sort_and_write_all_entries(self,
128                                 entries: List[ExpectedUpstreamEntry]) -> None:
129    header = entries[0].comment_lines
130    entries[0].comment_lines = ''
131    entries.sort(key=lambda e: e.dst_path)
132    # Keep the header above the first entry
133    entries[0].comment_lines = header + entries[0].comment_lines
134    self.write_all_entries(entries)
135
136  def get_new_or_modified_entries(
137      self, other: ExpectedUpstreamEntry) -> List[ExpectedUpstreamEntry]:
138    r"""Return a list of modified and added entries from the other file.
139
140    Args:
141      other: the other file
142
143    Returns:
144      A list of modified and added entries
145    """
146    result: List[ExpectedUpstreamEntry] = []
147    this_entries = self.read_all_entries()
148    that_entries = other.read_all_entries()
149    this_map = {}
150    for e in this_entries:
151      this_map[e.dst_path] = e
152
153    for e in that_entries:
154      value = this_map.get(e.dst_path)
155      if value is None or value != e:
156        result.append(e)
157
158    return result
159
160  @staticmethod
161  def parse_line(line: str, comment_lines: str) -> ExpectedUpstreamEntry:
162    items = line.split(',')
163    size = len(items)
164    if size != 3:
165      raise ValueError(
166          f"The size must be 3, but is {size}. The line is '{line}'")
167
168    return ExpectedUpstreamEntry(items[0], items[1], items[2], comment_lines)
169
170
171class OjluniFinder:
172  """Finder for java classes or ojluni/ paths."""
173
174  def __init__(self, existing_paths: List[str]):
175    self.existing_paths = existing_paths
176
177  @staticmethod
178  def translate_ojluni_path_to_class_name(path: str) -> str:
179    r"""Translate an Ojluni file path to full class name.
180
181    Args:
182      path: ojluni path
183
184    Returns:
185      class name or None if class name isn't found.
186    """
187
188    if not path.endswith('.java'):
189      return None
190
191    if path.startswith(OJLUNI_JAVA_BASE_PATH):
192      base_path = OJLUNI_JAVA_BASE_PATH
193    elif path.startswith(OJLUNI_TEST_PATH):
194      base_path = OJLUNI_TEST_PATH
195    else:
196      return None
197
198    base_len = len(base_path)
199    return path[base_len:-5].replace('/', '.')
200
201  @staticmethod
202  def translate_from_class_name_to_ojluni_path(class_or_path: str) -> str:
203    """Returns a ojluni path from a class name."""
204    # if it contains '/', then it's a path
205    if '/' in class_or_path:
206      return class_or_path
207
208    base_path = OJLUNI_TEST_PATH if class_or_path.startswith(
209        'test.') else OJLUNI_JAVA_BASE_PATH
210
211    relative_path = class_or_path.replace('.', '/')
212    return f'{base_path}{relative_path}.java'
213
214  def match_path_prefix(self, input_path: str) -> Set[str]:
215    """Returns a set of existing file paths matching the given partial path."""
216    path_matches = list(
217        filter(lambda path: path.startswith(input_path), self.existing_paths))
218    result_set: Set[str] = set()
219    # if it's found, just return the result
220    if input_path in path_matches:
221      result_set.add(input_path)
222    else:
223      input_ojluni_path = PurePath(input_path)
224      # the input ends with '/', the autocompletion result contain the children
225      # instead of the matching the prefix in its parent directory
226      input_path_parent_or_self = input_ojluni_path
227      if not input_path.endswith('/'):
228        input_path_parent_or_self = input_path_parent_or_self.parent
229      n_parts = len(input_path_parent_or_self.parts)
230      for match in path_matches:
231        path = PurePath(match)
232        # path.parts[n_parts] should not exceed the index and should be
233        # a valid child path because input_path_parent_or_self must be a
234        # valid directory
235        child = list(path.parts)[n_parts]
236        result = (input_path_parent_or_self / child).as_posix()
237        # if result is not exact, the result represents a directory.
238        if result != match:
239          result += '/'
240        result_set.add(result)
241
242    return result_set
243
244  def match_classname_prefix(self, input_class_name: str) -> List[str]:
245    """Returns a list of package / class names given the partial class name."""
246    # If '/' exists, it's probably a path, not a partial class name
247    if '/' in input_class_name:
248      return []
249
250    result_list = []
251    partial_relative_path = input_class_name.replace('.', '/')
252    for base_path in [OJLUNI_JAVA_BASE_PATH, OJLUNI_TEST_PATH]:
253      partial_ojluni_path = base_path + partial_relative_path
254      result_paths = self.match_path_prefix(partial_ojluni_path)
255      # pylint: disable=cell-var-from-loop
256      result_list.extend(
257          map(lambda path: convert_path_to_java_class_name(path, base_path),
258              list(result_paths)))
259
260    return result_list
261
262
263class OpenjdkFinder:
264  """Finder for java classes or paths in a upstream OpenJDK commit."""
265
266  def __init__(self, commit: Commit):
267    self.commit = commit
268
269  @staticmethod
270  def translate_src_path_to_ojluni_path(src_path: str) -> str:
271    """Returns None if src_path isn't in a known source directory."""
272    relative_path = None
273    for base_path in UPSTREAM_TEST_PATHS:
274      if src_path.startswith(base_path):
275        length = len(base_path)
276        relative_path = src_path[length:]
277        break
278
279    if relative_path:
280      return f'{OJLUNI_TEST_PATH}test/{relative_path}'
281
282    for base_path in UPSTREAM_CLASS_PATHS:
283      if src_path.startswith(base_path):
284        length = len(base_path)
285        relative_path = src_path[length:]
286        break
287
288    if relative_path:
289      return f'{OJLUNI_JAVA_BASE_PATH}{relative_path}'
290
291    return None
292
293  def find_src_path_from_classname(self, class_or_path: str) -> str:
294    """Finds a valid source path given a valid class name or path."""
295    # if it contains '/', then it's a path
296    if '/' in class_or_path:
297      if self.has_file(class_or_path):
298        return class_or_path
299      else:
300        return None
301
302    relative_path = class_or_path.replace('.', '/')
303    src_path = None
304    for base_path in UPSTREAM_SEARCH_PATHS:
305      full_path = f'{base_path}{relative_path}.java'
306      if self.has_file(full_path):
307        src_path = full_path
308        break
309
310    return src_path
311
312  def get_search_paths(self) -> List[str]:
313    return UPSTREAM_SEARCH_PATHS
314
315  def find_src_path_from_ojluni_path(self, ojluni_path: str) -> str:
316    """Returns a source path that guessed from the ojluni_path."""
317    base_paths = None
318    relative_path = None
319
320    TEST_PATH = OJLUNI_TEST_PATH + 'test/'
321    if ojluni_path.startswith(OJLUNI_JAVA_BASE_PATH):
322      base_paths = UPSTREAM_CLASS_PATHS
323      length = len(OJLUNI_JAVA_BASE_PATH)
324      relative_path = ojluni_path[length:]
325    elif ojluni_path.startswith(TEST_PATH):
326      base_paths = UPSTREAM_TEST_PATHS
327      length = len(TEST_PATH)
328      relative_path = ojluni_path[length:]
329    else:
330      return None
331
332    for base_path in base_paths:
333      full_path = base_path + relative_path
334      if self.has_file(full_path):
335        return full_path
336
337    return None
338
339  def match_path_prefix(self, input_path: str) -> List[str]:
340    """Returns a list of source paths matching the given partial string."""
341    result_list = []
342
343    search_tree = self.commit.tree
344    path_obj = PurePath(input_path)
345    is_exact = self.has_file(path_obj.as_posix())
346    is_directory_path = input_path.endswith('/')
347    exact_obj = search_tree[path_obj.as_posix()] if is_exact else None
348    search_word = ''
349    if is_exact and isinstance(exact_obj, Blob):
350      # an exact file path
351      result_list.append(input_path)
352      return result_list
353    elif is_directory_path:
354      # an exact directory path and can't be a prefix directory name.
355      if is_exact:
356        search_tree = exact_obj
357      else:
358        # Such path doesn't exist, and thus returns empty list
359        return result_list
360    elif len(path_obj.parts) >= 2 and not is_directory_path:
361      parent_path = path_obj.parent.as_posix()
362      if self.has_file(parent_path):
363        search_tree = search_tree[parent_path]
364        search_word = path_obj.name
365      else:
366        # Return empty list because no such path is found
367        return result_list
368    else:
369      search_word = input_path
370
371    for tree in search_tree.trees:
372      tree_path = PurePath(tree.path)
373      if tree_path.name.startswith(search_word):
374        # Append '/' to indicate directory type. If the result has this item
375        # only, shell should auto-fill the input, and thus
376        # next tabbing in shell should fall into the above condition
377        # `is_exact and input_path.endswith('/')` and will search in the child
378        # tree.
379        result_path = tree.path + '/'
380        result_list.append(result_path)
381
382    for blob in search_tree.blobs:
383      blob_path = PurePath(blob.path)
384      if blob_path.name.startswith(search_word):
385        result_list.append(blob.path)
386
387    return result_list
388
389  def match_classname_prefix(self, input_class_name: str) -> List[str]:
390    """Return a list of package / class names from given commit and input."""
391    # If '/' exists, it's probably a path, not a class name.
392    if '/' in input_class_name:
393      return []
394
395    result_list = []
396    for base_path in UPSTREAM_SEARCH_PATHS:
397      base_len = len(base_path)
398      path = base_path + input_class_name.replace('.', '/')
399      path_results = self.match_path_prefix(path)
400      for p in path_results:
401        relative_path = p[base_len:]
402        if relative_path.endswith('.java'):
403          relative_path = relative_path[0:-5]
404        result_list.append(relative_path.replace('/', '.'))
405
406    return result_list
407
408  def has_file(self, path: str) -> bool:
409    """Returns True if the directory / file exists in the tree."""
410    return has_file_in_tree(path, self.commit.tree)
411
412
413def convert_path_to_java_class_name(path: str, base_path: str) -> str:
414  base_len = len(base_path)
415  result = path[base_len:]
416  if result.endswith('.java'):
417    result = result[0:-5]
418  result = result.replace('/', '.')
419  return result
420
421
422def has_file_in_tree(path: str, tree: Tree) -> bool:
423  """Returns True if the directory / file exists in the tree."""
424  try:
425    # pylint: disable=pointless-statement
426    tree[path]
427    return True
428  except KeyError:
429    return False
430