1#!/usr/bin/env python3 2 3# Copyright (C) 2024 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Resultstore client for Mobly tests.""" 18 19import datetime 20import enum 21import importlib.metadata 22import logging 23import pathlib 24import urllib.parse 25import uuid 26 27from google.auth import credentials 28import google_auth_httplib2 29from googleapiclient import discovery 30import httplib2 31 32_DEFAULT_CONFIGURATION = 'default' 33_RESULTSTORE_BASE_LINK = 'https://btx.cloud.google.com' 34 35_PACKAGE_NAME = 'results_uploader' 36 37 38class Status(enum.Enum): 39 """Aggregate status of the Resultstore invocation and target.""" 40 PASSED = 'PASSED' 41 FAILED = 'FAILED' 42 SKIPPED = 'SKIPPED' 43 FLAKY = 'FLAKY' 44 UNKNOWN = 'UNKNOWN' 45 46 47class StatusCode(enum.IntEnum): 48 """Test case statuses and their associated code in Resultstore. 49 50 Used to toggle the visibility of test cases with a particular status. 51 """ 52 ERRORED = 1 53 TIMED_OUT = 2 54 FAILED = 3 55 FLAKY = 4 56 PASSED = 5 57 58 59class ResultstoreClient: 60 """Resultstore client for Mobly tests.""" 61 62 def __init__( 63 self, 64 service: discovery.Resource, 65 creds: credentials.Credentials, 66 project_id: str, 67 ): 68 """Creates a ResultstoreClient. 69 70 Args: 71 service: discovery.Resource object for interacting with the API. 72 creds: credentials to add to HTTP request. 73 project_id: GCP project ID for Resultstore. 74 """ 75 self._service = service 76 self._http = google_auth_httplib2.AuthorizedHttp( 77 creds, http=httplib2.Http(timeout=30) 78 ) 79 self._project_id = project_id 80 81 self._request_id = '' 82 self._invocation_id = '' 83 self._authorization_token = '' 84 self._target_id = '' 85 self._encoded_target_id = '' 86 87 self._status = Status.UNKNOWN 88 89 @property 90 def _invocation_name(self): 91 """The resource name for the invocation.""" 92 if not self._invocation_id: 93 return '' 94 return f'invocations/{self._invocation_id}' 95 96 @property 97 def _target_name(self): 98 """The resource name for the target.""" 99 if not (self._invocation_name or self._encoded_target_id): 100 return '' 101 return f'{self._invocation_name}/targets/{self._encoded_target_id}' 102 103 @property 104 def _configured_target_name(self): 105 """The resource name for the configured target.""" 106 if not self._target_name: 107 return 108 return f'{self._target_name}/configuredTargets/{_DEFAULT_CONFIGURATION}' 109 110 def set_status(self, status: Status) -> None: 111 """Sets the overall test run status.""" 112 self._status = status 113 114 def create_invocation(self, labels: list[str]) -> str: 115 """Creates an invocation. 116 117 Args: 118 labels: A list of labels to attach to the invocation, as 119 `invocation.invocationAttributes.labels`. 120 121 Returns: 122 The invocation ID. 123 """ 124 logging.debug('creating invocation...') 125 if self._invocation_id: 126 logging.warning( 127 'invocation %s already exists, skipping creation...', 128 self._invocation_id, 129 ) 130 return None 131 invocation = { 132 'timing': { 133 'startTime': datetime.datetime.utcnow().isoformat() + 'Z' 134 }, 135 'invocationAttributes': { 136 'projectId': self._project_id, 137 'labels': labels, 138 }, 139 'properties': [ 140 { 141 'key': _PACKAGE_NAME, 142 'value': importlib.metadata.version(_PACKAGE_NAME) 143 } 144 ] 145 } 146 self._request_id = str(uuid.uuid4()) 147 self._invocation_id = str(uuid.uuid4()) 148 self._authorization_token = str(uuid.uuid4()) 149 request = self._service.invocations().create( 150 body=invocation, 151 requestId=self._request_id, 152 invocationId=self._invocation_id, 153 authorizationToken=self._authorization_token, 154 ) 155 res = request.execute(http=self._http) 156 logging.debug('invocations.create: %s', res) 157 return self._invocation_id 158 159 def create_default_configuration(self) -> None: 160 """Creates a default configuration.""" 161 logging.debug('creating default configuration...') 162 configuration = { 163 'id': { 164 'invocationId': self._invocation_id, 165 'configurationId': _DEFAULT_CONFIGURATION, 166 } 167 } 168 request = ( 169 self._service.invocations() 170 .configs() 171 .create( 172 body=configuration, 173 parent=f'invocations/{self._invocation_id}', 174 configId=_DEFAULT_CONFIGURATION, 175 authorizationToken=self._authorization_token, 176 ) 177 ) 178 res = request.execute(http=self._http) 179 logging.debug('invocations.configs.create: %s', res) 180 181 def create_target(self, target_id: str | None = None) -> str: 182 """Creates a target. 183 184 Args: 185 target_id: An optional custom target ID. 186 187 Returns: 188 The target ID. 189 """ 190 logging.debug('creating target in %s...', self._invocation_name) 191 if self._target_id: 192 logging.warning( 193 'target %s already exists, skipping creation...', 194 self._target_id, 195 ) 196 return 197 self._target_id = target_id or str(uuid.uuid4()) 198 self._encoded_target_id = urllib.parse.quote(self._target_id, safe='') 199 target = { 200 'id': { 201 'invocationId': self._invocation_id, 202 'targetId': self._target_id, 203 }, 204 'targetAttributes': {'type': 'TEST', 'language': 'PY'}, 205 'visible': True, 206 } 207 request = ( 208 self._service.invocations() 209 .targets() 210 .create( 211 body=target, 212 parent=self._invocation_name, 213 targetId=self._target_id, 214 authorizationToken=self._authorization_token, 215 ) 216 ) 217 res = request.execute(http=self._http) 218 logging.debug('invocations.targets.create: %s', res) 219 return self._target_id 220 221 def create_configured_target(self) -> None: 222 """Creates a configured target.""" 223 logging.debug('creating configured target in %s...', self._target_name) 224 configured_target = { 225 'id': { 226 'invocationId': self._invocation_id, 227 'targetId': self._target_id, 228 'configurationId': _DEFAULT_CONFIGURATION, 229 }, 230 } 231 request = ( 232 self._service.invocations() 233 .targets() 234 .configuredTargets() 235 .create( 236 body=configured_target, 237 parent=self._target_name, 238 configId=_DEFAULT_CONFIGURATION, 239 authorizationToken=self._authorization_token, 240 ) 241 ) 242 res = request.execute(http=self._http) 243 logging.debug('invocations.targets.configuredTargets.create: %s', res) 244 245 def create_action( 246 self, gcs_bucket: str, gcs_base_dir: str, artifacts: list[str] 247 ) -> str: 248 """Creates an action. 249 250 Args: 251 gcs_bucket: The bucket in GCS where artifacts are stored. 252 gcs_base_dir: Base directory of the artifacts in the GCS bucket. 253 artifacts: List of paths (relative to gcs_bucket) to the test 254 artifacts. 255 256 Returns: 257 The action ID. 258 """ 259 logging.debug('creating action in %s...', self._configured_target_name) 260 action_id = str(uuid.uuid4()) 261 262 files = [] 263 for path in artifacts: 264 uid = str(pathlib.PurePosixPath(path).relative_to(gcs_base_dir)) 265 uri = f'gs://{gcs_bucket}/{path}' 266 files.append({'uid': uid, 'uri': uri}) 267 action = { 268 'id': { 269 'invocationId': self._invocation_id, 270 'targetId': self._target_id, 271 'configurationId': _DEFAULT_CONFIGURATION, 272 'actionId': action_id, 273 }, 274 'testAction': {}, 275 'files': files, 276 } 277 request = ( 278 self._service.invocations() 279 .targets() 280 .configuredTargets() 281 .actions() 282 .create( 283 body=action, 284 parent=self._configured_target_name, 285 actionId=action_id, 286 authorizationToken=self._authorization_token, 287 ) 288 ) 289 res = request.execute(http=self._http) 290 logging.debug( 291 'invocations.targets.configuredTargets.actions.create: %s', res 292 ) 293 return action_id 294 295 def merge_configured_target(self): 296 """Merges a configured target.""" 297 logging.debug('merging configured target %s...', 298 self._configured_target_name) 299 merge_request = { 300 'configuredTarget': { 301 'statusAttributes': {'status': self._status.value}, 302 }, 303 'authorizationToken': self._authorization_token, 304 'updateMask': 'statusAttributes', 305 } 306 request = ( 307 self._service.invocations() 308 .targets() 309 .configuredTargets() 310 .merge( 311 body=merge_request, 312 name=self._configured_target_name, 313 ) 314 ) 315 res = request.execute(http=self._http) 316 logging.debug('invocations.targets.configuredTargets.merge: %s', res) 317 318 def finalize_configured_target(self): 319 """Finalizes a configured target.""" 320 logging.debug('finalizing configured target %s...', 321 self._configured_target_name) 322 finalize_request = { 323 'authorizationToken': self._authorization_token, 324 } 325 request = ( 326 self._service.invocations() 327 .targets() 328 .configuredTargets() 329 .finalize( 330 body=finalize_request, 331 name=self._configured_target_name, 332 ) 333 ) 334 res = request.execute(http=self._http) 335 logging.debug('invocations.targets.configuredTargets.finalize: %s', res) 336 337 def merge_target(self): 338 """Merges a target.""" 339 logging.debug('merging target %s...', self._target_name) 340 merge_request = { 341 'target': { 342 'statusAttributes': {'status': self._status.value}, 343 }, 344 'authorizationToken': self._authorization_token, 345 'updateMask': 'statusAttributes', 346 } 347 request = ( 348 self._service.invocations() 349 .targets() 350 .merge( 351 body=merge_request, 352 name=self._target_name, 353 ) 354 ) 355 res = request.execute(http=self._http) 356 logging.debug('invocations.targets.merge: %s', res) 357 358 def finalize_target(self): 359 """Finalizes a target.""" 360 logging.debug('finalizing target %s...', self._target_name) 361 finalize_request = { 362 'authorizationToken': self._authorization_token, 363 } 364 request = ( 365 self._service.invocations() 366 .targets() 367 .finalize( 368 body=finalize_request, 369 name=self._target_name, 370 ) 371 ) 372 res = request.execute(http=self._http) 373 logging.debug('invocations.targets.finalize: %s', res) 374 375 def merge_invocation(self): 376 """Merges an invocation.""" 377 logging.debug('merging invocation %s...', self._invocation_name) 378 merge_request = { 379 'invocation': {'statusAttributes': {'status': self._status.value}}, 380 'updateMask': 'statusAttributes', 381 'authorizationToken': self._authorization_token, 382 } 383 request = self._service.invocations().merge(body=merge_request, 384 name=self._invocation_name) 385 res = request.execute(http=self._http) 386 logging.debug('invocations.merge: %s', res) 387 388 def finalize_invocation(self): 389 """Finalizes an invocation.""" 390 logging.debug('finalizing invocation %s...', self._invocation_name) 391 finalize_request = { 392 'authorizationToken': self._authorization_token, 393 } 394 request = self._service.invocations().finalize( 395 body=finalize_request, name=self._invocation_name 396 ) 397 res = request.execute(http=self._http) 398 logging.debug('invocations.finalize: %s', res) 399 print('-' * 50) 400 # Make the URL show test cases regardless of status by default. 401 show_statuses = ( 402 'showStatuses=' 403 f'{",".join(str(status_code) for status_code in StatusCode)}' 404 ) 405 print( 406 f'See results in {_RESULTSTORE_BASE_LINK}/' 407 f'{self._target_name};config={_DEFAULT_CONFIGURATION}/tests;' 408 f'{show_statuses}' 409 ) 410 self._request_id = '' 411 self._invocation_id = '' 412 self._authorization_token = '' 413 self._target_id = '' 414 self._encoded_target_id = '' 415