1#!/usr/bin/env python 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Standalone service to monitor AFE servers and report to ts_mon""" 7import sys 8import time 9import logging 10import multiprocessing 11import urllib2 12 13import common 14from autotest_lib.client.common_lib import global_config 15from autotest_lib.frontend.afe.json_rpc import proxy 16from autotest_lib.server import frontend 17# import needed to setup host_attributes 18# pylint: disable=unused-import 19from autotest_lib.server import site_host_attributes 20from autotest_lib.site_utils import server_manager_utils 21from chromite.lib import commandline 22from chromite.lib import metrics 23from chromite.lib import ts_mon_config 24 25METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc' 26METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations' 27METRIC_TICK = METRIC_ROOT + '/tick' 28METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error' 29 30FAILURE_REASONS = { 31 proxy.JSONRPCException: 'JSONRPCException', 32 } 33 34def afe_rpc_call(hostname): 35 """Perform one rpc call set on server 36 37 @param hostname: server's hostname to poll 38 """ 39 afe_monitor = AfeMonitor(hostname) 40 try: 41 afe_monitor.run() 42 except Exception as e: 43 metrics.Counter(METRIC_MONITOR_ERROR).increment( 44 fields={'target_hostname': hostname}) 45 logging.exception('Exception when running against host %s', hostname) 46 47 48def update_shards(shards, shards_lock, period=600, stop_event=None): 49 """Updates dict of shards 50 51 @param shards: list of shards to be updated 52 @param shards_lock: shared lock for accessing shards 53 @param period: time between polls 54 @param stop_event: Event that can be set to stop polling 55 """ 56 while(not stop_event or not stop_event.is_set()): 57 start_time = time.time() 58 59 logging.debug('Updating Shards') 60 new_shards = set(server_manager_utils.get_shards()) 61 62 with shards_lock: 63 current_shards = set(shards) 64 rm_shards = current_shards - new_shards 65 add_shards = new_shards - current_shards 66 67 if rm_shards: 68 for s in rm_shards: 69 shards.remove(s) 70 71 if add_shards: 72 shards.extend(add_shards) 73 74 if rm_shards: 75 logging.info('Servers left production: %s', str(rm_shards)) 76 77 if add_shards: 78 logging.info('Servers entered production: %s', 79 str(add_shards)) 80 81 wait_time = (start_time + period) - time.time() 82 if wait_time > 0: 83 time.sleep(wait_time) 84 85 86def poll_rpc_servers(servers, servers_lock, shards=None, period=60, 87 stop_event=None): 88 """Blocking function that polls all servers and shards 89 90 @param servers: list of servers to poll 91 @param servers_lock: lock to be used when accessing servers or shards 92 @param shards: list of shards to poll 93 @param period: time between polls 94 @param stop_event: Event that can be set to stop polling 95 """ 96 pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4) 97 98 while(not stop_event or not stop_event.is_set()): 99 start_time = time.time() 100 with servers_lock: 101 all_servers = set(servers).union(shards) 102 103 logging.debug('Starting Server Polling: %s', ', '.join(all_servers)) 104 pool.map(afe_rpc_call, all_servers) 105 106 logging.debug('Finished Server Polling') 107 108 metrics.Counter(METRIC_TICK).increment() 109 110 wait_time = (start_time + period) - time.time() 111 if wait_time > 0: 112 time.sleep(wait_time) 113 114 115class RpcFlightRecorder(object): 116 """Monitors a list of AFE""" 117 def __init__(self, servers, with_shards=True, poll_period=60): 118 """ 119 @param servers: list of afe services to monitor 120 @param with_shards: also record status on shards 121 @param poll_period: frequency to poll all services, in seconds 122 """ 123 self._manager = multiprocessing.Manager() 124 125 self._poll_period = poll_period 126 127 self._servers = self._manager.list(servers) 128 self._servers_lock = self._manager.RLock() 129 130 self._with_shards = with_shards 131 self._shards = self._manager.list() 132 self._update_shards_ps = None 133 self._poll_rpc_server_ps = None 134 135 self._stop_event = multiprocessing.Event() 136 137 def start(self): 138 """Call to start recorder""" 139 if(self._with_shards): 140 shard_args = [self._shards, self._servers_lock] 141 shard_kwargs = {'stop_event': self._stop_event} 142 self._update_shards_ps = multiprocessing.Process( 143 name='update_shards', 144 target=update_shards, 145 args=shard_args, 146 kwargs=shard_kwargs) 147 148 self._update_shards_ps.start() 149 150 poll_args = [self._servers, self._servers_lock] 151 poll_kwargs= {'shards':self._shards, 152 'period':self._poll_period, 153 'stop_event':self._stop_event} 154 self._poll_rpc_server_ps = multiprocessing.Process( 155 name='poll_rpc_servers', 156 target=poll_rpc_servers, 157 args=poll_args, 158 kwargs=poll_kwargs) 159 160 self._poll_rpc_server_ps.start() 161 162 def close(self): 163 """Send close event to all sub processes""" 164 self._stop_event.set() 165 166 167 def termitate(self): 168 """Terminate processes""" 169 self.close() 170 if self._poll_rpc_server_ps: 171 self._poll_rpc_server_ps.terminate() 172 173 if self._update_shards_ps: 174 self._update_shards_ps.terminate() 175 176 if self._manager: 177 self._manager.shutdown() 178 179 180 def join(self, timeout=None): 181 """Blocking call until closed and processes complete 182 183 @param timeout: passed to each process, so could be >timeout""" 184 if self._poll_rpc_server_ps: 185 self._poll_rpc_server_ps.join(timeout) 186 187 if self._update_shards_ps: 188 self._update_shards_ps.join(timeout) 189 190def _failed(fields, msg_str, reason, err=None): 191 """Mark current run failed 192 193 @param fields, ts_mon fields to mark as failed 194 @param msg_str, message string to be filled 195 @param reason: why it failed 196 @param err: optional error to log more debug info 197 """ 198 fields['success'] = False 199 fields['failure_reason'] = reason 200 logging.warning("%s failed - %s", msg_str, reason) 201 if err: 202 logging.debug("%s fail_err - %s", msg_str, str(err)) 203 204class AfeMonitor(object): 205 """Object that runs rpc calls against the given afe frontend""" 206 207 def __init__(self, hostname): 208 """ 209 @param hostname: hostname of server to monitor, string 210 """ 211 self._hostname = hostname 212 self._afe = frontend.AFE(server=self._hostname) 213 self._metric_fields = {'target_hostname': self._hostname} 214 215 216 def run_cmd(self, cmd, expected=None): 217 """Runs rpc command and log metrics 218 219 @param cmd: string of rpc command to send 220 @param expected: expected result of rpc 221 """ 222 metric_fields = self._metric_fields.copy() 223 metric_fields['command'] = cmd 224 metric_fields['success'] = True 225 metric_fields['failure_reason'] = '' 226 227 with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS, 228 fields=dict(metric_fields), scale=0.001) as f: 229 230 msg_str = "%s:%s" % (self._hostname, cmd) 231 232 233 try: 234 result = self._afe.run(cmd) 235 logging.debug("%s result = %s", msg_str, result) 236 if expected is not None and expected != result: 237 _failed(f, msg_str, 'IncorrectResponse') 238 239 except urllib2.HTTPError as e: 240 _failed(f, msg_str, 'HTTPError:%d' % e.code) 241 242 except Exception as e: 243 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'), 244 err=e) 245 246 if type(e) not in FAILURE_REASONS: 247 raise 248 249 if f['success']: 250 logging.info("%s success", msg_str) 251 252 253 def run(self): 254 """Tests server and returns the result""" 255 self.run_cmd('get_server_time') 256 self.run_cmd('ping_db', [True]) 257 258 259def get_parser(): 260 """Returns argparse parser""" 261 parser = commandline.ArgumentParser(description=__doc__) 262 263 parser.add_argument('-a', '--afe', action='append', default=[], 264 help='Autotest FrontEnd server to monitor') 265 266 parser.add_argument('-p', '--poll-period', type=int, default=60, 267 help='Frequency to poll AFE servers') 268 269 parser.add_argument('--no-shards', action='store_false', dest='with_shards', 270 help='Disable shard updating') 271 272 return parser 273 274 275def main(argv): 276 """Main function 277 278 @param argv: commandline arguments passed 279 """ 280 parser = get_parser() 281 options = parser.parse_args(argv[1:]) 282 283 284 if not options.afe: 285 options.afe = [global_config.global_config.get_config_value( 286 'SERVER', 'global_afe_hostname', default='cautotest')] 287 288 with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder', 289 indirect=True): 290 flight_recorder = RpcFlightRecorder(options.afe, 291 with_shards=options.with_shards, 292 poll_period=options.poll_period) 293 294 flight_recorder.start() 295 flight_recorder.join() 296 297 298if __name__ == '__main__': 299 main(sys.argv) 300