1# -*- coding: utf-8 -*- 2# Copyright 2018 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The client of GS Cache server. 7 8GS Cache server is a server cache the responses from Google Storage to our lab. 9It also provides some RPCs to serve files in an archive like tar, tgz, etc. 10 11This client implements some functions used by autotest based on those RPCs. 12 13For details of GS Cache server, see go/cros-gs-cache. 14""" 15 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20import json 21import logging 22import urllib 23import urlparse 24 25import requests 26 27from autotest_lib.client.common_lib import error 28from autotest_lib.client.common_lib import global_config 29from autotest_lib.client.common_lib import utils 30from autotest_lib.client.common_lib.cros import dev_server 31from autotest_lib.client.common_lib.cros import retry 32from autotest_lib.client.common_lib.cros import string_utils 33 34from chromite.lib import metrics 35 36 37_CONFIG = global_config.global_config 38_CONFIG_SECTION = 'GS_CACHE' 39 40_PORT = _CONFIG.get_config_value(_CONFIG_SECTION, 'server_port', default=8888, 41 type=int) 42_CROS_IMAGE_ARCHIVE_BUCKET = _CONFIG.get_config_value( 43 _CONFIG_SECTION, 'gs_image_archive_bucket', 44 default="chromeos-image-archive") 45_USE_SSH_CONNECTION = _CONFIG.get_config_value( 46 'CROS', 'enable_ssh_connection_for_devserver', type=bool, 47 default=False) 48_SSH_CALL_TIMEOUT_SECONDS = 60 49 50_MESSAGE_LENGTH_MAX_CHARS = 200 51_MAX_URL_QUERY_LENGTH = 4096 52 53# Exit code of `curl` when cannot connect to host. Man curl for details. 54_CURL_RC_CANNOT_CONNECT_TO_HOST = 7 55 56METRICS_PATH = 'chromeos/autotest/gs_cache_client' 57 58 59def _truncate_long_message(message): 60 """Truncate too long message (e.g. url) to limited length.""" 61 if len(message) > _MESSAGE_LENGTH_MAX_CHARS: 62 message = '%s...' % message[:_MESSAGE_LENGTH_MAX_CHARS] 63 return message 64 65 66class CommunicationError(Exception): 67 """Raised when has errors in communicate with GS Cache server.""" 68 69 70class ResponseContentError(Exception): 71 """Error raised when response content has problems.""" 72 73 74class NoGsCacheServerError(Exception): 75 """Error raised when we cannot connect to Gs Cache server.""" 76 77 78class _GsCacheAPI(object): 79 """A thin wrapper of the GS Cache server API. 80 81 Useful for stubbing out GS Cache server calls for unittests. 82 """ 83 def __init__(self, gs_cache_server_name): 84 """Construct the instance by the information of reference server. 85 86 @param gs_cache_server_name: A string of GS Cache server hostname. 87 """ 88 self._hostname = gs_cache_server_name 89 self._netloc = '%s:%s' % (gs_cache_server_name, _PORT) 90 self._is_in_restricted_subnet = utils.get_restricted_subnet( 91 gs_cache_server_name, utils.RESTRICTED_SUBNETS 92 ) 93 94 @property 95 def server_netloc(self): 96 """The network location of the Gs Cache server.""" 97 return self._netloc 98 99 def _ssh_call(self, url): 100 """Helper function to make a 'SSH call'. 101 102 @param url: The URL to be called. 103 104 @return The output string of the call. 105 @throws CommunicationError when the SSH command failed. 106 """ 107 cmd = 'ssh %s \'curl "%s"\'' % (self._hostname, utils.sh_escape(url)) 108 logging.debug('Gs Cache call: %s', _truncate_long_message(cmd)) 109 try: 110 result = utils.run(cmd, timeout=_SSH_CALL_TIMEOUT_SECONDS) 111 except error.CmdError as err: 112 if err.result_obj.exit_status == _CURL_RC_CANNOT_CONNECT_TO_HOST: 113 raise NoGsCacheServerError( 114 'Cannot connect to Gs Cache at %s via SSH.' 115 % self._netloc) 116 117 raise CommunicationError('Error occurred: rc=%d, cmd=%s' % 118 (err.result_obj.exit_status, err.command)) 119 120 return result.stdout 121 122 @retry.retry((CommunicationError, ResponseContentError), timeout_min=3, 123 delay_sec=5) 124 def _call(self, action, bucket, path, queries): 125 """Helper function to make a call to GS Cache server. 126 127 There are two ways to call, i.e. 128 1. via HTTP: We need this because all DUTs have and only have HTTP 129 access to GS Cache server hosts. 130 2. via SSH (ssh into the server and run a loopback `curl` call): We 131 need this for the call from other services, e.g. drones, which have 132 only SSH access to GS Cache servers. 133 134 @param action: The name of RPC to be called, e.g. extract, etc. 135 @param bucket: The bucket of the file on GS. 136 @param path: The path of the file on GS. The bucket part is not 137 included. For example, if the GS path is 138 'gs://bucket/path/to/file', the `path` here is just 139 'path/to/file'. 140 @param queries: A dict of queries (i.e. parameters). 141 142 @return The http response content or SSH command output. 143 @throws CommunicationError if there are errors while talking to GS Cache 144 server. 145 """ 146 url = urlparse.SplitResult( 147 'http', self._netloc, '/'.join([action, bucket, path]), 148 urllib.urlencode(queries, doseq=True), None).geturl() 149 if _USE_SSH_CONNECTION and self._is_in_restricted_subnet: 150 return self._ssh_call(url) 151 else: 152 logging.debug('Gs Cache call: %s', _truncate_long_message(url)) 153 # TODO(guocb): Re-use the TCP connection. 154 try: 155 rsp = requests.get(url) 156 except requests.ConnectionError as err: 157 raise NoGsCacheServerError( 158 'Cannot connect to Gs Cache at %s via HTTP: %s' 159 % (self._netloc, err)) 160 if not rsp.ok: 161 msg = 'HTTP request: GET %s\nHTTP Response: %d: %s' % ( 162 rsp.url, rsp.status_code, rsp.content) 163 raise CommunicationError(msg) 164 return rsp.content 165 166 def extract(self, bucket, archive, files): 167 """API binding of `extract`. 168 169 @param bucket: The bucket of the file on GS. 170 @param archive: The path of archive on GS (bucket part not included). 171 @param files: A list of files to be extracted. 172 173 @return A dict of extracted files, in format of 174 {filename: content, ...}. 175 @throws ResponseContentError if the response is not in JSON format. 176 """ 177 rsp_contents = [] 178 # The files to be extract may be too many which reuslts in too long URL 179 # and http server may responses with 414 error. So we split them into 180 # multiple requests if necessary. 181 for part_of_files in string_utils.join_longest_with_length_limit( 182 files, _MAX_URL_QUERY_LENGTH, separator='&file=', 183 do_join=False): 184 rsp_contents.append(self._call('extract', bucket, archive, 185 {'file': part_of_files})) 186 content_dict = {} 187 try: 188 for content in rsp_contents: 189 content_dict.update(json.loads(content)) 190 except ValueError as err: 191 raise ResponseContentError( 192 'Got ValueError "%s" when decoding to JSON format. The ' 193 'response content is: %s' % (err, rsp_contents)) 194 195 return content_dict 196 197 198class GsCacheClient(object): 199 """A client of Google Storage Cache server.""" 200 201 _CONTROL_FILE_PREFIX = 'autotest/' 202 _CONTROL_FILE_PREFIX_LEN = len(_CONTROL_FILE_PREFIX) 203 204 def __init__(self, fallback_dev_server, api=None): 205 """Constructor. 206 207 @param fallback_dev_server: An instance of dev_server.DevServer which 208 is only used for fallback to old path in 209 case GS Cache server call fails. 210 @param api: A _GsCacheAPI object (to stub out calls for tests). 211 """ 212 self._fallback_server = fallback_dev_server 213 self._api = api or _GsCacheAPI(fallback_dev_server.hostname) 214 215 def list_suite_controls(self, build, suite_name=None): 216 """Get the list of all control files for |build|. 217 218 It tries to get control files from GS Cache server first. If failed, 219 fall back to devserver. 220 221 @param build: A string of build name (e.g. coral-release/R65-10300.0.0). 222 @param suite_name: The name of the suite for which we require control 223 files. Pass None to get control files of all suites 224 of the build. 225 @return the control files content as a dict, in format of 226 {path1: content1, path2: content2}. 227 @throws error.SuiteControlFileException when there is an error while 228 listing. 229 """ 230 try: 231 with metrics.SecondsTimer( 232 METRICS_PATH + '/call_timer_2', record_on_exception=True, 233 add_exception_field=True, scale=0.001, 234 fields={'rpc_name': 'list_suite_controls', 235 'rpc_server': self._api.server_netloc, 236 'is_gs_cache_call': True} 237 ): 238 return self._list_suite_controls(build, suite_name) 239 # We have to catch error.TimeoutException here because that's the 240 # default exception we can get when the code doesn't run in main 241 # thread. 242 except Exception as err: 243 logging.warn('GS Cache Error: %s', err) 244 logging.warn( 245 'Falling back to devserver call of "list_suite_controls".') 246 c = metrics.Counter(METRICS_PATH + '/fallback_to_devserver_2') 247 error_type = ('other' if isinstance(err, NoGsCacheServerError) else 248 'gs_cache_error') 249 c.increment(fields={'rpc_server': self._api.server_netloc, 250 'rpc_name': 'list_suite_controls', 251 'error_type': error_type}) 252 253 try: 254 with metrics.SecondsTimer( 255 METRICS_PATH + '/call_timer_2', record_on_exception=True, 256 add_exception_field=True, scale=0.001, 257 fields={'rpc_name': 'list_suite_controls', 258 'rpc_server': self._api.server_netloc, 259 'is_gs_cache_call': False} 260 ): 261 return self._fallback_server.list_suite_controls( 262 build, suite_name=suite_name if suite_name else '') 263 except dev_server.DevServerException as err: 264 raise error.SuiteControlFileException(err) 265 266 def _list_suite_controls(self, build, suite_name=''): 267 """'GS Cache' version of list_suite_controls.""" 268 test_suites = '%s/test_suites.tar.bz2' % build 269 map_file_name = 'autotest/test_suites/suite_to_control_file_map' 270 content_dict = self._api.extract(_CROS_IMAGE_ARCHIVE_BUCKET, 271 test_suites, [map_file_name]) 272 try: 273 map_file_content = content_dict[map_file_name] 274 except KeyError: 275 raise ResponseContentError( 276 "File '%s' isn't in response: %s" % 277 (map_file_name, _truncate_long_message(str(content_dict)))) 278 try: 279 suite_to_control_files = json.loads(map_file_content) 280 except ValueError as err: 281 raise ResponseContentError( 282 'Got ValueError "%s" when decoding to JSON format. The ' 283 'map file content is: %s' % (err, map_file_content)) 284 try: 285 control_files = suite_to_control_files[suite_name] 286 # The control files in control_files.tar have 'autotest/' prefix. 287 control_files = [self._CONTROL_FILE_PREFIX + p 288 for p in control_files] 289 except KeyError: 290 control_files = ['*/control', '*/control.*'] 291 292 result_dict = self._api.extract( 293 _CROS_IMAGE_ARCHIVE_BUCKET, '%s/control_files.tar' % build, 294 control_files) 295 # Remove the prefix of 'autotest/' from the control file names. 296 return {k[self._CONTROL_FILE_PREFIX_LEN:]: v 297 for k, v in result_dict.items() 298 if k.startswith(self._CONTROL_FILE_PREFIX)} 299