• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3#  Copyright (C) 2024 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16
17"""Resultstore client for Mobly tests."""
18
19import datetime
20import enum
21import importlib.metadata
22import logging
23import pathlib
24import urllib.parse
25import uuid
26
27from google.auth import credentials
28import google_auth_httplib2
29from googleapiclient import discovery
30import httplib2
31
32_DEFAULT_CONFIGURATION = 'default'
33_RESULTSTORE_BASE_LINK = 'https://btx.cloud.google.com'
34
35_PACKAGE_NAME = 'results_uploader'
36
37
38class Status(enum.Enum):
39    """Aggregate status of the Resultstore invocation and target."""
40    PASSED = 'PASSED'
41    FAILED = 'FAILED'
42    SKIPPED = 'SKIPPED'
43    FLAKY = 'FLAKY'
44    UNKNOWN = 'UNKNOWN'
45
46
47class StatusCode(enum.IntEnum):
48    """Test case statuses and their associated code in Resultstore.
49
50    Used to toggle the visibility of test cases with a particular status.
51    """
52    ERRORED = 1
53    TIMED_OUT = 2
54    FAILED = 3
55    FLAKY = 4
56    PASSED = 5
57
58
59class ResultstoreClient:
60    """Resultstore client for Mobly tests."""
61
62    def __init__(
63            self,
64            service: discovery.Resource,
65            creds: credentials.Credentials,
66            project_id: str,
67    ):
68        """Creates a ResultstoreClient.
69
70        Args:
71          service: discovery.Resource object for interacting with the API.
72          creds: credentials to add to HTTP request.
73          project_id: GCP project ID for Resultstore.
74        """
75        self._service = service
76        self._http = google_auth_httplib2.AuthorizedHttp(
77            creds, http=httplib2.Http(timeout=30)
78        )
79        self._project_id = project_id
80
81        self._request_id = ''
82        self._invocation_id = ''
83        self._authorization_token = ''
84        self._target_id = ''
85        self._encoded_target_id = ''
86
87        self._status = Status.UNKNOWN
88
89    @property
90    def _invocation_name(self):
91        """The resource name for the invocation."""
92        if not self._invocation_id:
93            return ''
94        return f'invocations/{self._invocation_id}'
95
96    @property
97    def _target_name(self):
98        """The resource name for the target."""
99        if not (self._invocation_name or self._encoded_target_id):
100            return ''
101        return f'{self._invocation_name}/targets/{self._encoded_target_id}'
102
103    @property
104    def _configured_target_name(self):
105        """The resource name for the configured target."""
106        if not self._target_name:
107            return
108        return f'{self._target_name}/configuredTargets/{_DEFAULT_CONFIGURATION}'
109
110    def set_status(self, status: Status) -> None:
111        """Sets the overall test run status."""
112        self._status = status
113
114    def create_invocation(self, labels: list[str]) -> str:
115        """Creates an invocation.
116
117        Args:
118            labels: A list of labels to attach to the invocation, as
119              `invocation.invocationAttributes.labels`.
120
121        Returns:
122          The invocation ID.
123        """
124        logging.debug('creating invocation...')
125        if self._invocation_id:
126            logging.warning(
127                'invocation %s already exists, skipping creation...',
128                self._invocation_id,
129            )
130            return None
131        invocation = {
132            'timing': {
133                'startTime': datetime.datetime.utcnow().isoformat() + 'Z'
134            },
135            'invocationAttributes': {
136                'projectId': self._project_id,
137                'labels': labels,
138            },
139            'properties': [
140                {
141                    'key': _PACKAGE_NAME,
142                    'value': importlib.metadata.version(_PACKAGE_NAME)
143                }
144            ]
145        }
146        self._request_id = str(uuid.uuid4())
147        self._invocation_id = str(uuid.uuid4())
148        self._authorization_token = str(uuid.uuid4())
149        request = self._service.invocations().create(
150            body=invocation,
151            requestId=self._request_id,
152            invocationId=self._invocation_id,
153            authorizationToken=self._authorization_token,
154        )
155        res = request.execute(http=self._http)
156        logging.debug('invocations.create: %s', res)
157        return self._invocation_id
158
159    def create_default_configuration(self) -> None:
160        """Creates a default configuration."""
161        logging.debug('creating default configuration...')
162        configuration = {
163            'id': {
164                'invocationId': self._invocation_id,
165                'configurationId': _DEFAULT_CONFIGURATION,
166            }
167        }
168        request = (
169            self._service.invocations()
170            .configs()
171            .create(
172                body=configuration,
173                parent=f'invocations/{self._invocation_id}',
174                configId=_DEFAULT_CONFIGURATION,
175                authorizationToken=self._authorization_token,
176            )
177        )
178        res = request.execute(http=self._http)
179        logging.debug('invocations.configs.create: %s', res)
180
181    def create_target(self, target_id: str | None = None) -> str:
182        """Creates a target.
183
184        Args:
185          target_id: An optional custom target ID.
186
187        Returns:
188          The target ID.
189        """
190        logging.debug('creating target in %s...', self._invocation_name)
191        if self._target_id:
192            logging.warning(
193                'target %s already exists, skipping creation...',
194                self._target_id,
195            )
196            return
197        self._target_id = target_id or str(uuid.uuid4())
198        self._encoded_target_id = urllib.parse.quote(self._target_id, safe='')
199        target = {
200            'id': {
201                'invocationId': self._invocation_id,
202                'targetId': self._target_id,
203            },
204            'targetAttributes': {'type': 'TEST', 'language': 'PY'},
205            'visible': True,
206        }
207        request = (
208            self._service.invocations()
209            .targets()
210            .create(
211                body=target,
212                parent=self._invocation_name,
213                targetId=self._target_id,
214                authorizationToken=self._authorization_token,
215            )
216        )
217        res = request.execute(http=self._http)
218        logging.debug('invocations.targets.create: %s', res)
219        return self._target_id
220
221    def create_configured_target(self) -> None:
222        """Creates a configured target."""
223        logging.debug('creating configured target in %s...', self._target_name)
224        configured_target = {
225            'id': {
226                'invocationId': self._invocation_id,
227                'targetId': self._target_id,
228                'configurationId': _DEFAULT_CONFIGURATION,
229            },
230        }
231        request = (
232            self._service.invocations()
233            .targets()
234            .configuredTargets()
235            .create(
236                body=configured_target,
237                parent=self._target_name,
238                configId=_DEFAULT_CONFIGURATION,
239                authorizationToken=self._authorization_token,
240            )
241        )
242        res = request.execute(http=self._http)
243        logging.debug('invocations.targets.configuredTargets.create: %s', res)
244
245    def create_action(
246            self, gcs_bucket: str, gcs_base_dir: str, artifacts: list[str]
247    ) -> str:
248        """Creates an action.
249
250        Args:
251          gcs_bucket: The bucket in GCS where artifacts are stored.
252          gcs_base_dir: Base directory of the artifacts in the GCS bucket.
253          artifacts: List of paths (relative to gcs_bucket) to the test
254            artifacts.
255
256        Returns:
257          The action ID.
258        """
259        logging.debug('creating action in %s...', self._configured_target_name)
260        action_id = str(uuid.uuid4())
261
262        files = []
263        for path in artifacts:
264            uid = str(pathlib.PurePosixPath(path).relative_to(gcs_base_dir))
265            uri = f'gs://{gcs_bucket}/{path}'
266            files.append({'uid': uid, 'uri': uri})
267        action = {
268            'id': {
269                'invocationId': self._invocation_id,
270                'targetId': self._target_id,
271                'configurationId': _DEFAULT_CONFIGURATION,
272                'actionId': action_id,
273            },
274            'testAction': {},
275            'files': files,
276        }
277        request = (
278            self._service.invocations()
279            .targets()
280            .configuredTargets()
281            .actions()
282            .create(
283                body=action,
284                parent=self._configured_target_name,
285                actionId=action_id,
286                authorizationToken=self._authorization_token,
287            )
288        )
289        res = request.execute(http=self._http)
290        logging.debug(
291            'invocations.targets.configuredTargets.actions.create: %s', res
292        )
293        return action_id
294
295    def merge_configured_target(self):
296        """Merges a configured target."""
297        logging.debug('merging configured target %s...',
298                      self._configured_target_name)
299        merge_request = {
300            'configuredTarget': {
301                'statusAttributes': {'status': self._status.value},
302            },
303            'authorizationToken': self._authorization_token,
304            'updateMask': 'statusAttributes',
305        }
306        request = (
307            self._service.invocations()
308            .targets()
309            .configuredTargets()
310            .merge(
311                body=merge_request,
312                name=self._configured_target_name,
313            )
314        )
315        res = request.execute(http=self._http)
316        logging.debug('invocations.targets.configuredTargets.merge: %s', res)
317
318    def finalize_configured_target(self):
319        """Finalizes a configured target."""
320        logging.debug('finalizing configured target %s...',
321                      self._configured_target_name)
322        finalize_request = {
323            'authorizationToken': self._authorization_token,
324        }
325        request = (
326            self._service.invocations()
327            .targets()
328            .configuredTargets()
329            .finalize(
330                body=finalize_request,
331                name=self._configured_target_name,
332            )
333        )
334        res = request.execute(http=self._http)
335        logging.debug('invocations.targets.configuredTargets.finalize: %s', res)
336
337    def merge_target(self):
338        """Merges a target."""
339        logging.debug('merging target %s...', self._target_name)
340        merge_request = {
341            'target': {
342                'statusAttributes': {'status': self._status.value},
343            },
344            'authorizationToken': self._authorization_token,
345            'updateMask': 'statusAttributes',
346        }
347        request = (
348            self._service.invocations()
349            .targets()
350            .merge(
351                body=merge_request,
352                name=self._target_name,
353            )
354        )
355        res = request.execute(http=self._http)
356        logging.debug('invocations.targets.merge: %s', res)
357
358    def finalize_target(self):
359        """Finalizes a target."""
360        logging.debug('finalizing target %s...', self._target_name)
361        finalize_request = {
362            'authorizationToken': self._authorization_token,
363        }
364        request = (
365            self._service.invocations()
366            .targets()
367            .finalize(
368                body=finalize_request,
369                name=self._target_name,
370            )
371        )
372        res = request.execute(http=self._http)
373        logging.debug('invocations.targets.finalize: %s', res)
374
375    def merge_invocation(self):
376        """Merges an invocation."""
377        logging.debug('merging invocation %s...', self._invocation_name)
378        merge_request = {
379            'invocation': {'statusAttributes': {'status': self._status.value}},
380            'updateMask': 'statusAttributes',
381            'authorizationToken': self._authorization_token,
382        }
383        request = self._service.invocations().merge(body=merge_request,
384                                                    name=self._invocation_name)
385        res = request.execute(http=self._http)
386        logging.debug('invocations.merge: %s', res)
387
388    def finalize_invocation(self):
389        """Finalizes an invocation."""
390        logging.debug('finalizing invocation %s...', self._invocation_name)
391        finalize_request = {
392            'authorizationToken': self._authorization_token,
393        }
394        request = self._service.invocations().finalize(
395            body=finalize_request, name=self._invocation_name
396        )
397        res = request.execute(http=self._http)
398        logging.debug('invocations.finalize: %s', res)
399        print('-' * 50)
400        # Make the URL show test cases regardless of status by default.
401        show_statuses = (
402            'showStatuses='
403            f'{",".join(str(status_code) for status_code in StatusCode)}'
404        )
405        print(
406            f'See results in {_RESULTSTORE_BASE_LINK}/'
407            f'{self._target_name};config={_DEFAULT_CONFIGURATION}/tests;'
408            f'{show_statuses}'
409        )
410        self._request_id = ''
411        self._invocation_id = ''
412        self._authorization_token = ''
413        self._target_id = ''
414        self._encoded_target_id = ''
415