• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2017 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Helper to upload Jenkins test results to BQ"""
16
17from __future__ import print_function
18
19import os
20import sys
21import time
22import uuid
23
24import six
25
26gcp_utils_dir = os.path.abspath(
27    os.path.join(os.path.dirname(__file__), "../../gcp/utils")
28)
29sys.path.append(gcp_utils_dir)
30import big_query_utils
31
32_DATASET_ID = "jenkins_test_results"
33_DESCRIPTION = "Test results from master job run on Jenkins"
34# 365 days in milliseconds
35_EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000
36_PARTITION_TYPE = "DAY"
37_PROJECT_ID = "grpc-testing"
38_RESULTS_SCHEMA = [
39    ("job_name", "STRING", "Name of Jenkins job"),
40    ("build_id", "INTEGER", "Build ID of Jenkins job"),
41    ("build_url", "STRING", "URL of Jenkins job"),
42    ("test_name", "STRING", "Individual test name"),
43    ("language", "STRING", "Language of test"),
44    ("platform", "STRING", "Platform used for test"),
45    ("config", "STRING", "Config used for test"),
46    ("compiler", "STRING", "Compiler used for test"),
47    ("iomgr_platform", "STRING", "Iomgr used for test"),
48    ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"),
49    ("timestamp", "TIMESTAMP", "Timestamp of test run"),
50    ("elapsed_time", "FLOAT", "How long test took to run"),
51    ("cpu_estimated", "FLOAT", "Estimated CPU usage of test"),
52    ("cpu_measured", "FLOAT", "Actual CPU usage of test"),
53    ("return_code", "INTEGER", "Exit code of test"),
54]
55_INTEROP_RESULTS_SCHEMA = [
56    ("job_name", "STRING", "Name of Jenkins/Kokoro job"),
57    ("build_id", "INTEGER", "Build ID of Jenkins/Kokoro job"),
58    ("build_url", "STRING", "URL of Jenkins/Kokoro job"),
59    (
60        "test_name",
61        "STRING",
62        "Unique test name combining client, server, and test_name",
63    ),
64    (
65        "suite",
66        "STRING",
67        "Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth",
68    ),
69    ("client", "STRING", "Client language"),
70    ("server", "STRING", "Server host name"),
71    ("test_case", "STRING", "Name of test case"),
72    ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"),
73    ("timestamp", "TIMESTAMP", "Timestamp of test run"),
74    ("elapsed_time", "FLOAT", "How long test took to run"),
75]
76
77
78def _get_build_metadata(test_results):
79    """Add Kokoro build metadata to test_results based on environment
80    variables set by Kokoro.
81    """
82    build_id = os.getenv("KOKORO_BUILD_NUMBER")
83    build_url = (
84        "https://source.cloud.google.com/results/invocations/%s"
85        % os.getenv("KOKORO_BUILD_ID")
86    )
87    job_name = os.getenv("KOKORO_JOB_NAME")
88
89    if build_id:
90        test_results["build_id"] = build_id
91    if build_url:
92        test_results["build_url"] = build_url
93    if job_name:
94        test_results["job_name"] = job_name
95
96
97def _insert_rows_with_retries(bq, bq_table, bq_rows):
98    """Insert rows to bq table. Retry on error."""
99    # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
100    for i in range((len(bq_rows) // 1000) + 1):
101        max_retries = 3
102        for attempt in range(max_retries):
103            if big_query_utils.insert_rows(
104                bq,
105                _PROJECT_ID,
106                _DATASET_ID,
107                bq_table,
108                bq_rows[i * 1000 : (i + 1) * 1000],
109            ):
110                break
111            else:
112                if attempt < max_retries - 1:
113                    print("Error uploading result to bigquery, will retry.")
114                else:
115                    print(
116                        "Error uploading result to bigquery, all attempts"
117                        " failed."
118                    )
119                    sys.exit(1)
120
121
122def upload_results_to_bq(resultset, bq_table, extra_fields):
123    """Upload test results to a BQ table.
124
125    Args:
126        resultset: dictionary generated by jobset.run
127        bq_table: string name of table to create/upload results to in BQ
128        extra_fields: dict with extra values that will be uploaded along with the results
129    """
130    bq = big_query_utils.create_big_query()
131    big_query_utils.create_partitioned_table(
132        bq,
133        _PROJECT_ID,
134        _DATASET_ID,
135        bq_table,
136        _RESULTS_SCHEMA,
137        _DESCRIPTION,
138        partition_type=_PARTITION_TYPE,
139        expiration_ms=_EXPIRATION_MS,
140    )
141
142    bq_rows = []
143    for shortname, results in six.iteritems(resultset):
144        for result in results:
145            test_results = {}
146            _get_build_metadata(test_results)
147            test_results["cpu_estimated"] = result.cpu_estimated
148            test_results["cpu_measured"] = result.cpu_measured
149            test_results["elapsed_time"] = "%.2f" % result.elapsed_time
150            test_results["result"] = result.state
151            test_results["return_code"] = result.returncode
152            test_results["test_name"] = shortname
153            test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S")
154            for field_name, field_value in six.iteritems(extra_fields):
155                test_results[field_name] = field_value
156            row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
157            bq_rows.append(row)
158    _insert_rows_with_retries(bq, bq_table, bq_rows)
159
160
161def upload_interop_results_to_bq(resultset, bq_table):
162    """Upload interop test results to a BQ table.
163
164    Args:
165        resultset: dictionary generated by jobset.run
166        bq_table: string name of table to create/upload results to in BQ
167    """
168    bq = big_query_utils.create_big_query()
169    big_query_utils.create_partitioned_table(
170        bq,
171        _PROJECT_ID,
172        _DATASET_ID,
173        bq_table,
174        _INTEROP_RESULTS_SCHEMA,
175        _DESCRIPTION,
176        partition_type=_PARTITION_TYPE,
177        expiration_ms=_EXPIRATION_MS,
178    )
179
180    bq_rows = []
181    for shortname, results in six.iteritems(resultset):
182        for result in results:
183            test_results = {}
184            _get_build_metadata(test_results)
185            test_results["elapsed_time"] = "%.2f" % result.elapsed_time
186            test_results["result"] = result.state
187            test_results["test_name"] = shortname
188            test_results["suite"] = shortname.split(":")[0]
189            test_results["client"] = shortname.split(":")[1]
190            test_results["server"] = shortname.split(":")[2]
191            test_results["test_case"] = shortname.split(":")[3]
192            test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S")
193            row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
194            bq_rows.append(row)
195    _insert_rows_with_retries(bq, bq_table, bq_rows)
196