1#!/usr/bin/env python3 2# Copyright 2017 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Helper to upload Jenkins test results to BQ""" 16 17from __future__ import print_function 18 19import os 20import sys 21import time 22import uuid 23 24import six 25 26gcp_utils_dir = os.path.abspath( 27 os.path.join(os.path.dirname(__file__), "../../gcp/utils") 28) 29sys.path.append(gcp_utils_dir) 30import big_query_utils 31 32_DATASET_ID = "jenkins_test_results" 33_DESCRIPTION = "Test results from master job run on Jenkins" 34# 365 days in milliseconds 35_EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000 36_PARTITION_TYPE = "DAY" 37_PROJECT_ID = "grpc-testing" 38_RESULTS_SCHEMA = [ 39 ("job_name", "STRING", "Name of Jenkins job"), 40 ("build_id", "INTEGER", "Build ID of Jenkins job"), 41 ("build_url", "STRING", "URL of Jenkins job"), 42 ("test_name", "STRING", "Individual test name"), 43 ("language", "STRING", "Language of test"), 44 ("platform", "STRING", "Platform used for test"), 45 ("config", "STRING", "Config used for test"), 46 ("compiler", "STRING", "Compiler used for test"), 47 ("iomgr_platform", "STRING", "Iomgr used for test"), 48 ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"), 49 ("timestamp", "TIMESTAMP", "Timestamp of test run"), 50 ("elapsed_time", "FLOAT", "How long test took to run"), 51 ("cpu_estimated", "FLOAT", "Estimated CPU usage of test"), 52 ("cpu_measured", "FLOAT", "Actual CPU usage of test"), 53 ("return_code", "INTEGER", "Exit code of test"), 54] 55_INTEROP_RESULTS_SCHEMA = [ 56 ("job_name", "STRING", "Name of Jenkins/Kokoro job"), 57 ("build_id", "INTEGER", "Build ID of Jenkins/Kokoro job"), 58 ("build_url", "STRING", "URL of Jenkins/Kokoro job"), 59 ( 60 "test_name", 61 "STRING", 62 "Unique test name combining client, server, and test_name", 63 ), 64 ( 65 "suite", 66 "STRING", 67 "Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth", 68 ), 69 ("client", "STRING", "Client language"), 70 ("server", "STRING", "Server host name"), 71 ("test_case", "STRING", "Name of test case"), 72 ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"), 73 ("timestamp", "TIMESTAMP", "Timestamp of test run"), 74 ("elapsed_time", "FLOAT", "How long test took to run"), 75] 76 77 78def _get_build_metadata(test_results): 79 """Add Kokoro build metadata to test_results based on environment 80 variables set by Kokoro. 81 """ 82 build_id = os.getenv("KOKORO_BUILD_NUMBER") 83 build_url = ( 84 "https://source.cloud.google.com/results/invocations/%s" 85 % os.getenv("KOKORO_BUILD_ID") 86 ) 87 job_name = os.getenv("KOKORO_JOB_NAME") 88 89 if build_id: 90 test_results["build_id"] = build_id 91 if build_url: 92 test_results["build_url"] = build_url 93 if job_name: 94 test_results["job_name"] = job_name 95 96 97def _insert_rows_with_retries(bq, bq_table, bq_rows): 98 """Insert rows to bq table. Retry on error.""" 99 # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. 100 for i in range((len(bq_rows) // 1000) + 1): 101 max_retries = 3 102 for attempt in range(max_retries): 103 if big_query_utils.insert_rows( 104 bq, 105 _PROJECT_ID, 106 _DATASET_ID, 107 bq_table, 108 bq_rows[i * 1000 : (i + 1) * 1000], 109 ): 110 break 111 else: 112 if attempt < max_retries - 1: 113 print("Error uploading result to bigquery, will retry.") 114 else: 115 print( 116 "Error uploading result to bigquery, all attempts" 117 " failed." 118 ) 119 sys.exit(1) 120 121 122def upload_results_to_bq(resultset, bq_table, extra_fields): 123 """Upload test results to a BQ table. 124 125 Args: 126 resultset: dictionary generated by jobset.run 127 bq_table: string name of table to create/upload results to in BQ 128 extra_fields: dict with extra values that will be uploaded along with the results 129 """ 130 bq = big_query_utils.create_big_query() 131 big_query_utils.create_partitioned_table( 132 bq, 133 _PROJECT_ID, 134 _DATASET_ID, 135 bq_table, 136 _RESULTS_SCHEMA, 137 _DESCRIPTION, 138 partition_type=_PARTITION_TYPE, 139 expiration_ms=_EXPIRATION_MS, 140 ) 141 142 bq_rows = [] 143 for shortname, results in six.iteritems(resultset): 144 for result in results: 145 test_results = {} 146 _get_build_metadata(test_results) 147 test_results["cpu_estimated"] = result.cpu_estimated 148 test_results["cpu_measured"] = result.cpu_measured 149 test_results["elapsed_time"] = "%.2f" % result.elapsed_time 150 test_results["result"] = result.state 151 test_results["return_code"] = result.returncode 152 test_results["test_name"] = shortname 153 test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") 154 for field_name, field_value in six.iteritems(extra_fields): 155 test_results[field_name] = field_value 156 row = big_query_utils.make_row(str(uuid.uuid4()), test_results) 157 bq_rows.append(row) 158 _insert_rows_with_retries(bq, bq_table, bq_rows) 159 160 161def upload_interop_results_to_bq(resultset, bq_table): 162 """Upload interop test results to a BQ table. 163 164 Args: 165 resultset: dictionary generated by jobset.run 166 bq_table: string name of table to create/upload results to in BQ 167 """ 168 bq = big_query_utils.create_big_query() 169 big_query_utils.create_partitioned_table( 170 bq, 171 _PROJECT_ID, 172 _DATASET_ID, 173 bq_table, 174 _INTEROP_RESULTS_SCHEMA, 175 _DESCRIPTION, 176 partition_type=_PARTITION_TYPE, 177 expiration_ms=_EXPIRATION_MS, 178 ) 179 180 bq_rows = [] 181 for shortname, results in six.iteritems(resultset): 182 for result in results: 183 test_results = {} 184 _get_build_metadata(test_results) 185 test_results["elapsed_time"] = "%.2f" % result.elapsed_time 186 test_results["result"] = result.state 187 test_results["test_name"] = shortname 188 test_results["suite"] = shortname.split(":")[0] 189 test_results["client"] = shortname.split(":")[1] 190 test_results["server"] = shortname.split(":")[2] 191 test_results["test_case"] = shortname.split(":")[3] 192 test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") 193 row = big_query_utils.make_row(str(uuid.uuid4()), test_results) 194 bq_rows.append(row) 195 _insert_rows_with_retries(bq, bq_table, bq_rows) 196