1#!/usr/bin/env python3 2# Copyright 2017 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Uploads RBE results to BigQuery""" 16 17import argparse 18import json 19import os 20import ssl 21import sys 22import urllib.error 23import urllib.parse 24import urllib.request 25import uuid 26 27gcp_utils_dir = os.path.abspath( 28 os.path.join(os.path.dirname(__file__), "../../gcp/utils") 29) 30sys.path.append(gcp_utils_dir) 31import big_query_utils 32 33_DATASET_ID = "jenkins_test_results" 34_DESCRIPTION = "Test results from master RBE builds on Kokoro" 35# 365 days in milliseconds 36_EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000 37_PARTITION_TYPE = "DAY" 38_PROJECT_ID = "grpc-testing" 39_RESULTS_SCHEMA = [ 40 ("job_name", "STRING", "Name of Kokoro job"), 41 ("build_id", "INTEGER", "Build ID of Kokoro job"), 42 ("build_url", "STRING", "URL of Kokoro build"), 43 ("test_target", "STRING", "Bazel target path"), 44 ("test_class_name", "STRING", "Name of test class"), 45 ("test_case", "STRING", "Name of test case"), 46 ("result", "STRING", "Test or build result"), 47 ("timestamp", "TIMESTAMP", "Timestamp of test run"), 48 ("duration", "FLOAT", "Duration of the test run"), 49] 50_TABLE_ID = "rbe_test_results" 51 52 53def _get_api_key(): 54 """Returns string with API key to access ResultStore. 55 Intended to be used in Kokoro environment.""" 56 api_key_directory = os.getenv("KOKORO_GFILE_DIR") 57 api_key_file = os.path.join(api_key_directory, "resultstore_api_key") 58 assert os.path.isfile(api_key_file), ( 59 "Must add --api_key arg if not on " 60 "Kokoro or Kokoro environment is not set up properly." 61 ) 62 with open(api_key_file, "r") as f: 63 return f.read().replace("\n", "") 64 65 66def _get_invocation_id(): 67 """Returns String of Bazel invocation ID. Intended to be used in 68 Kokoro environment.""" 69 bazel_id_directory = os.getenv("KOKORO_ARTIFACTS_DIR") 70 bazel_id_file = os.path.join(bazel_id_directory, "bazel_invocation_ids") 71 assert os.path.isfile(bazel_id_file), ( 72 "bazel_invocation_ids file, written " 73 "by RBE initialization script, expected but not found." 74 ) 75 with open(bazel_id_file, "r") as f: 76 return f.read().replace("\n", "") 77 78 79def _parse_test_duration(duration_str): 80 """Parse test duration string in '123.567s' format""" 81 try: 82 if duration_str.endswith("s"): 83 duration_str = duration_str[:-1] 84 return float(duration_str) 85 except: 86 return None 87 88 89def _upload_results_to_bq(rows): 90 """Upload test results to a BQ table. 91 92 Args: 93 rows: A list of dictionaries containing data for each row to insert 94 """ 95 bq = big_query_utils.create_big_query() 96 big_query_utils.create_partitioned_table( 97 bq, 98 _PROJECT_ID, 99 _DATASET_ID, 100 _TABLE_ID, 101 _RESULTS_SCHEMA, 102 _DESCRIPTION, 103 partition_type=_PARTITION_TYPE, 104 expiration_ms=_EXPIRATION_MS, 105 ) 106 107 max_retries = 3 108 for attempt in range(max_retries): 109 if big_query_utils.insert_rows( 110 bq, _PROJECT_ID, _DATASET_ID, _TABLE_ID, rows 111 ): 112 break 113 else: 114 if attempt < max_retries - 1: 115 print("Error uploading result to bigquery, will retry.") 116 else: 117 print( 118 "Error uploading result to bigquery, all attempts failed." 119 ) 120 sys.exit(1) 121 122 123def _get_resultstore_data(api_key, invocation_id): 124 """Returns dictionary of test results by querying ResultStore API. 125 Args: 126 api_key: String of ResultStore API key 127 invocation_id: String of ResultStore invocation ID to results from 128 """ 129 all_actions = [] 130 page_token = "" 131 # ResultStore's API returns data on a limited number of tests. When we exceed 132 # that limit, the 'nextPageToken' field is included in the request to get 133 # subsequent data, so keep requesting until 'nextPageToken' field is omitted. 134 while True: 135 req = urllib.request.Request( 136 url="https://resultstore.googleapis.com/v2/invocations/%s/targets/-/configuredTargets/-/actions?key=%s&pageToken=%s&fields=next_page_token,actions.id,actions.status_attributes,actions.timing,actions.test_action" 137 % (invocation_id, api_key, page_token), 138 headers={"Content-Type": "application/json"}, 139 ) 140 ctx_dict = {} 141 if os.getenv("PYTHONHTTPSVERIFY") == "0": 142 ctx = ssl.create_default_context() 143 ctx.check_hostname = False 144 ctx.verify_mode = ssl.CERT_NONE 145 ctx_dict = {"context": ctx} 146 raw_resp = urllib.request.urlopen(req, **ctx_dict).read() 147 decoded_resp = ( 148 raw_resp 149 if isinstance(raw_resp, str) 150 else raw_resp.decode("utf-8", "ignore") 151 ) 152 results = json.loads(decoded_resp) 153 all_actions.extend(results["actions"]) 154 if "nextPageToken" not in results: 155 break 156 page_token = results["nextPageToken"] 157 return all_actions 158 159 160if __name__ == "__main__": 161 # Arguments are necessary if running in a non-Kokoro environment. 162 argp = argparse.ArgumentParser( 163 description=( 164 "Fetches results for given RBE invocation and uploads them to" 165 " BigQuery table." 166 ) 167 ) 168 argp.add_argument( 169 "--api_key", 170 default="", 171 type=str, 172 help="The API key to read from ResultStore API", 173 ) 174 argp.add_argument( 175 "--invocation_id", 176 default="", 177 type=str, 178 help="UUID of bazel invocation to fetch.", 179 ) 180 argp.add_argument( 181 "--bq_dump_file", 182 default=None, 183 type=str, 184 help="Dump JSON data to file just before uploading", 185 ) 186 argp.add_argument( 187 "--resultstore_dump_file", 188 default=None, 189 type=str, 190 help="Dump JSON data as received from ResultStore API", 191 ) 192 argp.add_argument( 193 "--skip_upload", 194 default=False, 195 action="store_const", 196 const=True, 197 help="Skip uploading to bigquery", 198 ) 199 args = argp.parse_args() 200 201 api_key = args.api_key or _get_api_key() 202 invocation_id = args.invocation_id or _get_invocation_id() 203 resultstore_actions = _get_resultstore_data(api_key, invocation_id) 204 205 if args.resultstore_dump_file: 206 with open(args.resultstore_dump_file, "w") as f: 207 json.dump(resultstore_actions, f, indent=4, sort_keys=True) 208 print( 209 ("Dumped resultstore data to file %s" % args.resultstore_dump_file) 210 ) 211 212 # google.devtools.resultstore.v2.Action schema: 213 # https://github.com/googleapis/googleapis/blob/master/google/devtools/resultstore/v2/action.proto 214 bq_rows = [] 215 for index, action in enumerate(resultstore_actions): 216 # Filter out non-test related data, such as build results. 217 if "testAction" not in action: 218 continue 219 # Some test results contain the fileProcessingErrors field, which indicates 220 # an issue with parsing results individual test cases. 221 if "fileProcessingErrors" in action: 222 test_cases = [ 223 { 224 "testCase": { 225 "caseName": str(action["id"]["actionId"]), 226 } 227 } 228 ] 229 # Test timeouts have a different dictionary structure compared to pass and 230 # fail results. 231 elif action["statusAttributes"]["status"] == "TIMED_OUT": 232 test_cases = [ 233 { 234 "testCase": { 235 "caseName": str(action["id"]["actionId"]), 236 "timedOut": True, 237 } 238 } 239 ] 240 # When RBE believes its infrastructure is failing, it will abort and 241 # mark running tests as UNKNOWN. These infrastructure failures may be 242 # related to our tests, so we should investigate if specific tests are 243 # repeatedly being marked as UNKNOWN. 244 elif action["statusAttributes"]["status"] == "UNKNOWN": 245 test_cases = [ 246 { 247 "testCase": { 248 "caseName": str(action["id"]["actionId"]), 249 "unknown": True, 250 } 251 } 252 ] 253 # Take the timestamp from the previous action, which should be 254 # a close approximation. 255 action["timing"] = { 256 "startTime": resultstore_actions[index - 1]["timing"][ 257 "startTime" 258 ] 259 } 260 elif "testSuite" not in action["testAction"]: 261 continue 262 elif "tests" not in action["testAction"]["testSuite"]: 263 continue 264 else: 265 test_cases = [] 266 for tests_item in action["testAction"]["testSuite"]["tests"]: 267 test_cases += tests_item["testSuite"]["tests"] 268 for test_case in test_cases: 269 if any(s in test_case["testCase"] for s in ["errors", "failures"]): 270 result = "FAILED" 271 elif "timedOut" in test_case["testCase"]: 272 result = "TIMEOUT" 273 elif "unknown" in test_case["testCase"]: 274 result = "UNKNOWN" 275 else: 276 result = "PASSED" 277 try: 278 bq_rows.append( 279 { 280 "insertId": str(uuid.uuid4()), 281 "json": { 282 "job_name": os.getenv("KOKORO_JOB_NAME"), 283 "build_id": os.getenv("KOKORO_BUILD_NUMBER"), 284 "build_url": "https://source.cloud.google.com/results/invocations/%s" 285 % invocation_id, 286 "test_target": action["id"]["targetId"], 287 "test_class_name": test_case["testCase"].get( 288 "className", "" 289 ), 290 "test_case": test_case["testCase"]["caseName"], 291 "result": result, 292 "timestamp": action["timing"]["startTime"], 293 "duration": _parse_test_duration( 294 action["timing"]["duration"] 295 ), 296 }, 297 } 298 ) 299 except Exception as e: 300 print(("Failed to parse test result. Error: %s" % str(e))) 301 print((json.dumps(test_case, indent=4))) 302 bq_rows.append( 303 { 304 "insertId": str(uuid.uuid4()), 305 "json": { 306 "job_name": os.getenv("KOKORO_JOB_NAME"), 307 "build_id": os.getenv("KOKORO_BUILD_NUMBER"), 308 "build_url": "https://source.cloud.google.com/results/invocations/%s" 309 % invocation_id, 310 "test_target": action["id"]["targetId"], 311 "test_class_name": "N/A", 312 "test_case": "N/A", 313 "result": "UNPARSABLE", 314 "timestamp": "N/A", 315 }, 316 } 317 ) 318 319 if args.bq_dump_file: 320 with open(args.bq_dump_file, "w") as f: 321 json.dump(bq_rows, f, indent=4, sort_keys=True) 322 print(("Dumped BQ data to file %s" % args.bq_dump_file)) 323 324 if not args.skip_upload: 325 # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. 326 MAX_ROWS = 1000 327 for i in range(0, len(bq_rows), MAX_ROWS): 328 _upload_results_to_bq(bq_rows[i : i + MAX_ROWS]) 329 else: 330 print("Skipped upload to bigquery.") 331