• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""The client of GS Cache server.
7
8GS Cache server is a server cache the responses from Google Storage to our lab.
9It also provides some RPCs to serve files in an archive like tar, tgz, etc.
10
11This client implements some functions used by autotest based on those RPCs.
12
13For details of GS Cache server, see go/cros-gs-cache.
14"""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import json
21import logging
22import urllib
23import urlparse
24
25import requests
26
27from autotest_lib.client.common_lib import error
28from autotest_lib.client.common_lib import global_config
29from autotest_lib.client.common_lib import utils
30from autotest_lib.client.common_lib.cros import dev_server
31from autotest_lib.client.common_lib.cros import retry
32from autotest_lib.client.common_lib.cros import string_utils
33
34from chromite.lib import metrics
35
36
37_CONFIG = global_config.global_config
38_CONFIG_SECTION = 'GS_CACHE'
39
40_PORT = _CONFIG.get_config_value(_CONFIG_SECTION, 'server_port', default=8888,
41                                 type=int)
42_CROS_IMAGE_ARCHIVE_BUCKET = _CONFIG.get_config_value(
43        _CONFIG_SECTION, 'gs_image_archive_bucket',
44        default="chromeos-image-archive")
45_USE_SSH_CONNECTION = _CONFIG.get_config_value(
46        'CROS', 'enable_ssh_connection_for_devserver', type=bool,
47        default=False)
48_SSH_CALL_TIMEOUT_SECONDS = 60
49
50_MESSAGE_LENGTH_MAX_CHARS = 200
51_MAX_URL_QUERY_LENGTH = 4096
52
53# Exit code of `curl` when cannot connect to host. Man curl for details.
54_CURL_RC_CANNOT_CONNECT_TO_HOST = 7
55
56METRICS_PATH = 'chromeos/autotest/gs_cache_client'
57
58
59def _truncate_long_message(message):
60    """Truncate too long message (e.g. url) to limited length."""
61    if len(message) > _MESSAGE_LENGTH_MAX_CHARS:
62        message = '%s...' % message[:_MESSAGE_LENGTH_MAX_CHARS]
63    return message
64
65
66class CommunicationError(Exception):
67    """Raised when has errors in communicate with GS Cache server."""
68
69
70class ResponseContentError(Exception):
71    """Error raised when response content has problems."""
72
73
74class NoGsCacheServerError(Exception):
75    """Error raised when we cannot connect to Gs Cache server."""
76
77
78class _GsCacheAPI(object):
79    """A thin wrapper of the GS Cache server API.
80
81    Useful for stubbing out GS Cache server calls for unittests.
82    """
83    def __init__(self, gs_cache_server_name):
84        """Construct the instance by the information of reference server.
85
86        @param gs_cache_server_name: A string of GS Cache server hostname.
87        """
88        self._hostname = gs_cache_server_name
89        self._netloc = '%s:%s' % (gs_cache_server_name, _PORT)
90        self._is_in_restricted_subnet = utils.get_restricted_subnet(
91                gs_cache_server_name, utils.RESTRICTED_SUBNETS
92        )
93
94    @property
95    def server_netloc(self):
96        """The network location of the Gs Cache server."""
97        return self._netloc
98
99    def _ssh_call(self, url):
100        """Helper function to make a 'SSH call'.
101
102        @param url: The URL to be called.
103
104        @return The output string of the call.
105        @throws CommunicationError when the SSH command failed.
106        """
107        cmd = 'ssh %s \'curl "%s"\'' % (self._hostname, utils.sh_escape(url))
108        logging.debug('Gs Cache call: %s', _truncate_long_message(cmd))
109        try:
110            result = utils.run(cmd, timeout=_SSH_CALL_TIMEOUT_SECONDS)
111        except error.CmdError as err:
112            if err.result_obj.exit_status == _CURL_RC_CANNOT_CONNECT_TO_HOST:
113                raise NoGsCacheServerError(
114                        'Cannot connect to Gs Cache at %s via SSH.'
115                        % self._netloc)
116
117            raise CommunicationError('Error occurred: rc=%d, cmd=%s' %
118                                     (err.result_obj.exit_status, err.command))
119
120        return result.stdout
121
122    @retry.retry((CommunicationError, ResponseContentError), timeout_min=3,
123                 delay_sec=5)
124    def _call(self, action, bucket, path, queries):
125        """Helper function to make a call to GS Cache server.
126
127        There are two ways to call, i.e.
128        1. via HTTP: We need this because all DUTs have and only have HTTP
129        access to GS Cache server hosts.
130        2. via SSH (ssh into the server and run a loopback `curl` call): We
131        need this for the call from other services, e.g. drones, which have
132        only SSH access to GS Cache servers.
133
134        @param action: The name of RPC to be called, e.g. extract, etc.
135        @param bucket: The bucket of the file on GS.
136        @param path: The path of the file on GS. The bucket part is not
137                     included. For example, if the GS path is
138                     'gs://bucket/path/to/file', the `path` here is just
139                     'path/to/file'.
140        @param queries: A dict of queries (i.e. parameters).
141
142        @return The http response content or SSH command output.
143        @throws CommunicationError if there are errors while talking to GS Cache
144            server.
145        """
146        url = urlparse.SplitResult(
147                'http', self._netloc, '/'.join([action, bucket, path]),
148                urllib.urlencode(queries, doseq=True), None).geturl()
149        if _USE_SSH_CONNECTION and self._is_in_restricted_subnet:
150            return self._ssh_call(url)
151        else:
152            logging.debug('Gs Cache call: %s', _truncate_long_message(url))
153            # TODO(guocb): Re-use the TCP connection.
154            try:
155                rsp = requests.get(url)
156            except requests.ConnectionError as err:
157                raise NoGsCacheServerError(
158                        'Cannot connect to Gs Cache at %s via HTTP: %s'
159                        % (self._netloc, err))
160            if not rsp.ok:
161                msg = 'HTTP request: GET %s\nHTTP Response: %d: %s' % (
162                        rsp.url, rsp.status_code, rsp.content)
163                raise CommunicationError(msg)
164            return rsp.content
165
166    def extract(self, bucket, archive, files):
167        """API binding of `extract`.
168
169        @param bucket: The bucket of the file on GS.
170        @param archive: The path of archive on GS (bucket part not included).
171        @param files: A list of files to be extracted.
172
173        @return A dict of extracted files, in format of
174                {filename: content, ...}.
175        @throws ResponseContentError if the response is not in JSON format.
176        """
177        rsp_contents = []
178        # The files to be extract may be too many which reuslts in too long URL
179        # and http server may responses with 414 error. So we split them into
180        # multiple requests if necessary.
181        for part_of_files in string_utils.join_longest_with_length_limit(
182                files, _MAX_URL_QUERY_LENGTH, separator='&file=',
183                do_join=False):
184            rsp_contents.append(self._call('extract', bucket, archive,
185                                           {'file': part_of_files}))
186        content_dict = {}
187        try:
188            for content in rsp_contents:
189                content_dict.update(json.loads(content))
190        except ValueError as err:
191            raise ResponseContentError(
192                'Got ValueError "%s" when decoding to JSON format. The '
193                'response content is: %s' % (err, rsp_contents))
194
195        return content_dict
196
197
198class GsCacheClient(object):
199    """A client of Google Storage Cache server."""
200
201    _CONTROL_FILE_PREFIX = 'autotest/'
202    _CONTROL_FILE_PREFIX_LEN = len(_CONTROL_FILE_PREFIX)
203
204    def __init__(self, fallback_dev_server, api=None):
205        """Constructor.
206
207        @param fallback_dev_server: An instance of dev_server.DevServer which
208                                    is only used for fallback to old path in
209                                    case GS Cache server call fails.
210        @param api: A _GsCacheAPI object (to stub out calls for tests).
211        """
212        self._fallback_server = fallback_dev_server
213        self._api = api or _GsCacheAPI(fallback_dev_server.hostname)
214
215    def list_suite_controls(self, build, suite_name=None):
216        """Get the list of all control files for |build|.
217
218        It tries to get control files from GS Cache server first. If failed,
219        fall back to devserver.
220
221        @param build: A string of build name (e.g. coral-release/R65-10300.0.0).
222        @param suite_name: The name of the suite for which we require control
223                           files. Pass None to get control files of all suites
224                           of the build.
225        @return the control files content as a dict, in format of
226                {path1: content1, path2: content2}.
227        @throws error.SuiteControlFileException when there is an error while
228                listing.
229        """
230        try:
231            with metrics.SecondsTimer(
232                    METRICS_PATH + '/call_timer_2', record_on_exception=True,
233                    add_exception_field=True, scale=0.001,
234                    fields={'rpc_name': 'list_suite_controls',
235                            'rpc_server': self._api.server_netloc,
236                            'is_gs_cache_call': True}
237            ):
238                return self._list_suite_controls(build, suite_name)
239        # We have to catch error.TimeoutException here because that's the
240        # default exception we can get when the code doesn't run in main
241        # thread.
242        except Exception as err:
243            logging.warn('GS Cache Error: %s', err)
244            logging.warn(
245                    'Falling back to devserver call of "list_suite_controls".')
246            c = metrics.Counter(METRICS_PATH + '/fallback_to_devserver_2')
247            error_type = ('other' if isinstance(err, NoGsCacheServerError) else
248                          'gs_cache_error')
249            c.increment(fields={'rpc_server': self._api.server_netloc,
250                                'rpc_name': 'list_suite_controls',
251                                'error_type': error_type})
252
253        try:
254            with metrics.SecondsTimer(
255                    METRICS_PATH + '/call_timer_2', record_on_exception=True,
256                    add_exception_field=True, scale=0.001,
257                    fields={'rpc_name': 'list_suite_controls',
258                            'rpc_server': self._api.server_netloc,
259                            'is_gs_cache_call': False}
260            ):
261                return self._fallback_server.list_suite_controls(
262                        build, suite_name=suite_name if suite_name else '')
263        except dev_server.DevServerException as err:
264            raise error.SuiteControlFileException(err)
265
266    def _list_suite_controls(self, build, suite_name=''):
267        """'GS Cache' version of list_suite_controls."""
268        test_suites = '%s/test_suites.tar.bz2' % build
269        map_file_name = 'autotest/test_suites/suite_to_control_file_map'
270        content_dict = self._api.extract(_CROS_IMAGE_ARCHIVE_BUCKET,
271                                         test_suites, [map_file_name])
272        try:
273            map_file_content = content_dict[map_file_name]
274        except KeyError:
275            raise ResponseContentError(
276                    "File '%s' isn't in response: %s" %
277                    (map_file_name, _truncate_long_message(str(content_dict))))
278        try:
279            suite_to_control_files = json.loads(map_file_content)
280        except ValueError as err:
281            raise ResponseContentError(
282                'Got ValueError "%s" when decoding to JSON format. The '
283                'map file content is: %s' % (err, map_file_content))
284        try:
285            control_files = suite_to_control_files[suite_name]
286            # The control files in control_files.tar have 'autotest/' prefix.
287            control_files = [self._CONTROL_FILE_PREFIX + p
288                             for p in control_files]
289        except KeyError:
290            control_files = ['*/control', '*/control.*']
291
292        result_dict = self._api.extract(
293                _CROS_IMAGE_ARCHIVE_BUCKET, '%s/control_files.tar' % build,
294                control_files)
295        # Remove the prefix of 'autotest/' from the control file names.
296        return {k[self._CONTROL_FILE_PREFIX_LEN:]: v
297                for k, v in result_dict.items()
298                if k.startswith(self._CONTROL_FILE_PREFIX)}
299