• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6
7from pyfakefs import fake_filesystem_unittest  # pylint: disable=import-error
8from typing import List, Tuple, Iterable
9
10from flake_suppressor_common import common_typing as ct
11from flake_suppressor_common import expectations as expectations_module
12from flake_suppressor_common import queries
13from flake_suppressor_common import results as results_module
14from flake_suppressor_common import tag_utils
15
16
17CHROMIUM_SRC_DIR = os.path.realpath(
18    os.path.join(os.path.dirname(__file__), '..', '..'))
19RELATIVE_EXPECTATION_FILE_DIRECTORY = os.path.join('content', 'test', 'gpu',
20                                                   'gpu_tests',
21                                                   'test_expectations')
22ABSOLUTE_EXPECTATION_FILE_DIRECTORY = os.path.join(
23    CHROMIUM_SRC_DIR, RELATIVE_EXPECTATION_FILE_DIRECTORY)
24
25TAG_HEADER = """\
26# OS
27# tags: [ android android-lollipop android-marshmallow android-nougat
28#             android-pie android-r android-s android-t
29#         chromeos
30#         fuchsia
31#         linux ubuntu
32#         mac highsierra mojave catalina bigsur monterey
33#         win win8 win10 ]
34# Browser
35# tags: [ android-chromium android-webview-instrumentation
36#         debug debug-x64
37#         release release-x64
38#         fuchsia-chrome web-engine-shell ]
39# results: [ Failure RetryOnFailure Skip Slow ]
40"""
41
42
43def CreateFile(test: fake_filesystem_unittest.TestCase, *args,
44               **kwargs) -> None:
45  # TODO(crbug.com/1156806): Remove this and just use fs.create_file() when
46  # Catapult is updated to a newer version of pyfakefs that is compatible with
47  # Chromium's version.
48  if hasattr(test.fs, 'create_file'):
49    test.fs.create_file(*args, **kwargs)
50  else:
51    test.fs.CreateFile(*args, **kwargs)
52
53
54class FakeProcess():
55  def __init__(self, stdout: str):
56    self.stdout = stdout or ''
57
58
59class UnitTest_BigQueryQuerier(queries.BigQueryQuerier):
60  def GetResultCountCIQuery(self) -> str:
61    return """SELECT * FROM foo"""
62
63  def GetResultCountTryQuery(self) -> str:
64    return """submitted_builds SELECT * FROM bar"""
65
66  def GetFlakyOrFailingCiQuery(self) -> str:
67    return """SELECT * FROM foo"""
68
69  def GetFlakyOrFailingTryQuery(self) -> str:
70    return """submitted_builds SELECT * FROM bar"""
71
72  def GetFailingBuildCulpritFromCiQuery(self) -> str:
73    raise NotImplementedError()
74
75
76class UnitTestResultProcessor(results_module.ResultProcessor):
77  def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str
78                                          ) -> Tuple[str, str]:
79    _, suite, __, test_name = result_db_name.split('.', 3)
80    return suite, test_name
81
82
83class UnitTestTagUtils(tag_utils.BaseTagUtils):
84  def RemoveIgnoredTags(self, tags: Iterable[str]) -> ct.TagTupleType:
85    tags = list(set(tags) - set(['win-laptop']))
86    tags.sort()
87    return tuple(tags)
88
89
90# pylint: disable=unused-argument
91class UnitTestExpectationProcessor(expectations_module.ExpectationProcessor):
92  def GetExpectationFileForSuite(self, suite: str,
93                                 typ_tags: ct.TagTupleType) -> str:
94    filename = suite.replace('integration_test', 'expectations.txt')
95    return os.path.join(ABSOLUTE_EXPECTATION_FILE_DIRECTORY, filename)
96
97  def IsSuiteUnsupported(self, suite) -> bool:
98    return False
99
100  def GetExpectedResult(self, fraction: float, flaky_threshold: float) -> str:
101    if fraction < flaky_threshold:
102      return 'RetryOnFailure'
103    return 'Failure'
104
105  def ListLocalCheckoutExpectationFiles(self) -> List[str]:
106    raise NotImplementedError()
107
108  def ListOriginExpectationFiles(self) -> List[str]:
109    raise NotImplementedError()
110
111# pylint: enable=unused-argument
112