1#!/usr/bin/env python3 2# Copyright 2016 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# Uploads performance benchmark result file to bigquery. 17 18import argparse 19import calendar 20import json 21import os 22import sys 23import time 24import uuid 25 26gcp_utils_dir = os.path.abspath( 27 os.path.join(os.path.dirname(__file__), "../../gcp/utils") 28) 29sys.path.append(gcp_utils_dir) 30import big_query_utils 31 32_DEFAULT_PROJECT_ID = "grpc-testing" 33 34 35def _upload_netperf_latency_csv_to_bigquery( 36 project_id, dataset_id, table_id, result_file 37): 38 with open(result_file, "r") as f: 39 (col1, col2, col3) = f.read().split(",") 40 latency50 = float(col1.strip()) * 1000 41 latency90 = float(col2.strip()) * 1000 42 latency99 = float(col3.strip()) * 1000 43 44 scenario_result = { 45 "scenario": {"name": "netperf_tcp_rr"}, 46 "summary": { 47 "latency50": latency50, 48 "latency90": latency90, 49 "latency99": latency99, 50 }, 51 } 52 53 bq = big_query_utils.create_big_query() 54 _create_results_table(bq, project_id, dataset_id, table_id) 55 56 if not _insert_result( 57 bq, project_id, dataset_id, table_id, scenario_result, flatten=False 58 ): 59 print("Error uploading result to bigquery.") 60 sys.exit(1) 61 62 63def _upload_scenario_result_to_bigquery( 64 project_id, 65 dataset_id, 66 table_id, 67 result_file, 68 metadata_file, 69 node_info_file, 70 prometheus_query_results_file, 71): 72 with open(result_file, "r") as f: 73 scenario_result = json.loads(f.read()) 74 75 bq = big_query_utils.create_big_query() 76 _create_results_table(bq, project_id, dataset_id, table_id) 77 78 if not _insert_scenario_result( 79 bq, 80 project_id, 81 dataset_id, 82 table_id, 83 scenario_result, 84 metadata_file, 85 node_info_file, 86 prometheus_query_results_file, 87 ): 88 print("Error uploading result to bigquery.") 89 sys.exit(1) 90 91 92def _insert_result( 93 bq, project_id, dataset_id, table_id, scenario_result, flatten=True 94): 95 if flatten: 96 _flatten_result_inplace(scenario_result) 97 _populate_metadata_inplace(scenario_result) 98 row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result) 99 return big_query_utils.insert_rows( 100 bq, project_id, dataset_id, table_id, [row] 101 ) 102 103 104def _insert_scenario_result( 105 bq, 106 project_id, 107 dataset_id, 108 table_id, 109 scenario_result, 110 test_metadata_file, 111 node_info_file, 112 prometheus_query_results_file, 113 flatten=True, 114): 115 if flatten: 116 _flatten_result_inplace(scenario_result) 117 _populate_metadata_from_file(scenario_result, test_metadata_file) 118 _populate_node_metadata_from_file(scenario_result, node_info_file) 119 _populate_prometheus_query_results_from_file( 120 scenario_result, prometheus_query_results_file 121 ) 122 row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result) 123 return big_query_utils.insert_rows( 124 bq, project_id, dataset_id, table_id, [row] 125 ) 126 127 128def _create_results_table(bq, project_id, dataset_id, table_id): 129 with open( 130 os.path.dirname(__file__) + "/scenario_result_schema.json", "r" 131 ) as f: 132 table_schema = json.loads(f.read()) 133 desc = "Results of performance benchmarks." 134 return big_query_utils.create_table2( 135 bq, project_id, dataset_id, table_id, table_schema, desc 136 ) 137 138 139def _flatten_result_inplace(scenario_result): 140 """Bigquery is not really great for handling deeply nested data 141 and repeated fields. To maintain values of some fields while keeping 142 the schema relatively simple, we artificially leave some of the fields 143 as JSON strings. 144 """ 145 scenario_result["scenario"]["clientConfig"] = json.dumps( 146 scenario_result["scenario"]["clientConfig"] 147 ) 148 scenario_result["scenario"]["serverConfig"] = json.dumps( 149 scenario_result["scenario"]["serverConfig"] 150 ) 151 scenario_result["latencies"] = json.dumps(scenario_result["latencies"]) 152 scenario_result["serverCpuStats"] = [] 153 for stats in scenario_result["serverStats"]: 154 scenario_result["serverCpuStats"].append(dict()) 155 scenario_result["serverCpuStats"][-1]["totalCpuTime"] = stats.pop( 156 "totalCpuTime", None 157 ) 158 scenario_result["serverCpuStats"][-1]["idleCpuTime"] = stats.pop( 159 "idleCpuTime", None 160 ) 161 for stats in scenario_result["clientStats"]: 162 stats["latencies"] = json.dumps(stats["latencies"]) 163 stats.pop("requestResults", None) 164 scenario_result["serverCores"] = json.dumps(scenario_result["serverCores"]) 165 scenario_result["clientSuccess"] = json.dumps( 166 scenario_result["clientSuccess"] 167 ) 168 scenario_result["serverSuccess"] = json.dumps( 169 scenario_result["serverSuccess"] 170 ) 171 scenario_result["requestResults"] = json.dumps( 172 scenario_result.get("requestResults", []) 173 ) 174 scenario_result["serverCpuUsage"] = scenario_result["summary"].pop( 175 "serverCpuUsage", None 176 ) 177 scenario_result["summary"].pop("successfulRequestsPerSecond", None) 178 scenario_result["summary"].pop("failedRequestsPerSecond", None) 179 180 181def _populate_metadata_inplace(scenario_result): 182 """Populates metadata based on environment variables set by Jenkins.""" 183 # NOTE: Grabbing the Kokoro environment variables will only work if the 184 # driver is running locally on the same machine where Kokoro has started 185 # the job. For our setup, this is currently the case, so just assume that. 186 build_number = os.getenv("KOKORO_BUILD_NUMBER") 187 build_url = ( 188 "https://source.cloud.google.com/results/invocations/%s" 189 % os.getenv("KOKORO_BUILD_ID") 190 ) 191 job_name = os.getenv("KOKORO_JOB_NAME") 192 git_commit = os.getenv("KOKORO_GIT_COMMIT") 193 # actual commit is the actual head of PR that is getting tested 194 # TODO(jtattermusch): unclear how to obtain on Kokoro 195 git_actual_commit = os.getenv("ghprbActualCommit") 196 197 utc_timestamp = str(calendar.timegm(time.gmtime())) 198 metadata = {"created": utc_timestamp} 199 200 if build_number: 201 metadata["buildNumber"] = build_number 202 if build_url: 203 metadata["buildUrl"] = build_url 204 if job_name: 205 metadata["jobName"] = job_name 206 if git_commit: 207 metadata["gitCommit"] = git_commit 208 if git_actual_commit: 209 metadata["gitActualCommit"] = git_actual_commit 210 211 scenario_result["metadata"] = metadata 212 213 214def _populate_metadata_from_file(scenario_result, test_metadata_file): 215 utc_timestamp = str(calendar.timegm(time.gmtime())) 216 metadata = {"created": utc_timestamp} 217 218 _annotation_to_bq_metadata_key_map = { 219 "ci_" + key: key 220 for key in ( 221 "buildNumber", 222 "buildUrl", 223 "jobName", 224 "gitCommit", 225 "gitActualCommit", 226 ) 227 } 228 229 if os.access(test_metadata_file, os.R_OK): 230 with open(test_metadata_file, "r") as f: 231 test_metadata = json.loads(f.read()) 232 233 # eliminate managedFields from metadata set 234 if "managedFields" in test_metadata: 235 del test_metadata["managedFields"] 236 237 annotations = test_metadata.get("annotations", {}) 238 239 # if use kubectl apply ..., kubectl will append current configuration to 240 # annotation, the field is deleted since it includes a lot of irrelevant 241 # information 242 if "kubectl.kubernetes.io/last-applied-configuration" in annotations: 243 del annotations["kubectl.kubernetes.io/last-applied-configuration"] 244 245 # dump all metadata as JSON to testMetadata field 246 scenario_result["testMetadata"] = json.dumps(test_metadata) 247 for key, value in _annotation_to_bq_metadata_key_map.items(): 248 if key in annotations: 249 metadata[value] = annotations[key] 250 251 scenario_result["metadata"] = metadata 252 253 254def _populate_node_metadata_from_file(scenario_result, node_info_file): 255 node_metadata = {"driver": {}, "servers": [], "clients": []} 256 _node_info_to_bq_node_metadata_key_map = { 257 "Name": "name", 258 "PodIP": "podIP", 259 "NodeName": "nodeName", 260 } 261 262 if os.access(node_info_file, os.R_OK): 263 with open(node_info_file, "r") as f: 264 file_metadata = json.loads(f.read()) 265 for key, value in _node_info_to_bq_node_metadata_key_map.items(): 266 node_metadata["driver"][value] = file_metadata["Driver"][key] 267 for clientNodeInfo in file_metadata["Clients"]: 268 node_metadata["clients"].append( 269 { 270 value: clientNodeInfo[key] 271 for key, value in _node_info_to_bq_node_metadata_key_map.items() 272 } 273 ) 274 for serverNodeInfo in file_metadata["Servers"]: 275 node_metadata["servers"].append( 276 { 277 value: serverNodeInfo[key] 278 for key, value in _node_info_to_bq_node_metadata_key_map.items() 279 } 280 ) 281 282 scenario_result["nodeMetadata"] = node_metadata 283 284 285def _populate_prometheus_query_results_from_file( 286 scenario_result, prometheus_query_result_file 287): 288 """Populate the results from Prometheus query to Bigquery table""" 289 if os.access(prometheus_query_result_file, os.R_OK): 290 with open(prometheus_query_result_file, "r", encoding="utf8") as f: 291 file_query_results = json.loads(f.read()) 292 293 scenario_result["testDurationSeconds"] = file_query_results[ 294 "testDurationSeconds" 295 ] 296 clientsPrometheusData = [] 297 if "clients" in file_query_results: 298 for client_name, client_data in file_query_results[ 299 "clients" 300 ].items(): 301 clientPrometheusData = {"name": client_name} 302 containersPrometheusData = [] 303 for container_name, container_data in client_data.items(): 304 containerPrometheusData = { 305 "name": container_name, 306 "cpuSeconds": container_data["cpuSeconds"], 307 "memoryMean": container_data["memoryMean"], 308 } 309 containersPrometheusData.append(containerPrometheusData) 310 clientPrometheusData[ 311 "containers" 312 ] = containersPrometheusData 313 clientsPrometheusData.append(clientPrometheusData) 314 scenario_result["clientsPrometheusData"] = clientsPrometheusData 315 316 serversPrometheusData = [] 317 if "servers" in file_query_results: 318 for server_name, server_data in file_query_results[ 319 "servers" 320 ].items(): 321 serverPrometheusData = {"name": server_name} 322 containersPrometheusData = [] 323 for container_name, container_data in server_data.items(): 324 containerPrometheusData = { 325 "name": container_name, 326 "cpuSeconds": container_data["cpuSeconds"], 327 "memoryMean": container_data["memoryMean"], 328 } 329 containersPrometheusData.append(containerPrometheusData) 330 serverPrometheusData[ 331 "containers" 332 ] = containersPrometheusData 333 serversPrometheusData.append(serverPrometheusData) 334 scenario_result["serversPrometheusData"] = serversPrometheusData 335 336 337argp = argparse.ArgumentParser(description="Upload result to big query.") 338argp.add_argument( 339 "--bq_result_table", 340 required=True, 341 default=None, 342 type=str, 343 help='Bigquery "dataset.table" or "project.dataset.table" to upload results to. The default project is "grpc-testing".', 344) 345argp.add_argument( 346 "--file_to_upload", 347 default="scenario_result.json", 348 type=str, 349 help="Report file to upload.", 350) 351argp.add_argument( 352 "--metadata_file_to_upload", 353 default="metadata.json", 354 type=str, 355 help="Metadata file to upload.", 356) 357argp.add_argument( 358 "--node_info_file_to_upload", 359 default="node_info.json", 360 type=str, 361 help="Node information file to upload.", 362) 363argp.add_argument( 364 "--prometheus_query_results_to_upload", 365 default="prometheus_query_result.json", 366 type=str, 367 help="Prometheus query result file to upload.", 368) 369argp.add_argument( 370 "--file_format", 371 choices=["scenario_result", "netperf_latency_csv"], 372 default="scenario_result", 373 help="Format of the file to upload.", 374) 375 376args = argp.parse_args() 377 378bq_words = args.bq_result_table.split(".", 2) 379if len(bq_words) == 3: 380 project_id, dataset_id, table_id = bq_words 381elif len(bq_words) == 2: 382 project_id = _DEFAULT_PROJECT_ID 383 dataset_id, table_id = bq_words 384else: 385 print( 386 "BigQuery table must be in the format dataset.table or project.dataset.table." 387 ) 388 sys.exit(1) 389 390if args.file_format == "netperf_latency_csv": 391 _upload_netperf_latency_csv_to_bigquery( 392 project_id, dataset_id, table_id, args.file_to_upload 393 ) 394else: 395 _upload_scenario_result_to_bigquery( 396 project_id, 397 dataset_id, 398 table_id, 399 args.file_to_upload, 400 args.metadata_file_to_upload, 401 args.node_info_file_to_upload, 402 args.prometheus_query_results_to_upload, 403 ) 404print( 405 "Successfully uploaded %s, %s, %s and %s to BigQuery.\n" 406 % ( 407 args.file_to_upload, 408 args.metadata_file_to_upload, 409 args.node_info_file_to_upload, 410 args.prometheus_query_results_to_upload, 411 ) 412) 413