1#!/usr/bin/env python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Standalone service to monitor AFE servers and report to ts_mon""" 7 8import argparse 9import sys 10import time 11import logging 12import multiprocessing 13from six.moves import urllib 14 15import common 16from autotest_lib.client.common_lib import global_config 17from autotest_lib.frontend.afe.json_rpc import proxy 18from autotest_lib.server import frontend 19# import needed to setup host_attributes 20# pylint: disable=unused-import 21from autotest_lib.server import site_host_attributes 22from autotest_lib.utils.frozen_chromite.lib import metrics 23from autotest_lib.utils.frozen_chromite.lib import ts_mon_config 24 25METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc' 26METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations' 27METRIC_TICK = METRIC_ROOT + '/tick' 28METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error' 29 30FAILURE_REASONS = { 31 proxy.JSONRPCException: 'JSONRPCException', 32 } 33 34def afe_rpc_call(hostname): 35 """Perform one rpc call set on server 36 37 @param hostname: server's hostname to poll 38 """ 39 afe_monitor = AfeMonitor(hostname) 40 try: 41 afe_monitor.run() 42 except Exception as e: 43 metrics.Counter(METRIC_MONITOR_ERROR).increment( 44 fields={'target_hostname': hostname}) 45 logging.exception('Exception when running against host %s', hostname) 46 47 48def update_shards(shards, shards_lock, period=600, stop_event=None): 49 """Updates dict of shards 50 51 @param shards: list of shards to be updated 52 @param shards_lock: shared lock for accessing shards 53 @param period: time between polls 54 @param stop_event: Event that can be set to stop polling 55 """ 56 while(not stop_event or not stop_event.is_set()): 57 start_time = time.time() 58 59 logging.debug('Updating Shards') 60 61 # server_manager_utils.get_shards() is deprecated. 62 new_shards = set() 63 64 with shards_lock: 65 current_shards = set(shards) 66 rm_shards = current_shards - new_shards 67 add_shards = new_shards - current_shards 68 69 if rm_shards: 70 for s in rm_shards: 71 shards.remove(s) 72 73 if add_shards: 74 shards.extend(add_shards) 75 76 if rm_shards: 77 logging.info('Servers left production: %s', str(rm_shards)) 78 79 if add_shards: 80 logging.info('Servers entered production: %s', 81 str(add_shards)) 82 83 wait_time = (start_time + period) - time.time() 84 if wait_time > 0: 85 time.sleep(wait_time) 86 87 88def poll_rpc_servers(servers, servers_lock, shards=None, period=60, 89 stop_event=None): 90 """Blocking function that polls all servers and shards 91 92 @param servers: list of servers to poll 93 @param servers_lock: lock to be used when accessing servers or shards 94 @param shards: list of shards to poll 95 @param period: time between polls 96 @param stop_event: Event that can be set to stop polling 97 """ 98 pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4) 99 100 while(not stop_event or not stop_event.is_set()): 101 start_time = time.time() 102 with servers_lock: 103 all_servers = set(servers).union(shards) 104 105 logging.debug('Starting Server Polling: %s', ', '.join(all_servers)) 106 pool.map(afe_rpc_call, all_servers) 107 108 logging.debug('Finished Server Polling') 109 110 metrics.Counter(METRIC_TICK).increment() 111 112 wait_time = (start_time + period) - time.time() 113 if wait_time > 0: 114 time.sleep(wait_time) 115 116 117class RpcFlightRecorder(object): 118 """Monitors a list of AFE""" 119 def __init__(self, servers, with_shards=True, poll_period=60): 120 """ 121 @param servers: list of afe services to monitor 122 @param with_shards: also record status on shards 123 @param poll_period: frequency to poll all services, in seconds 124 """ 125 self._manager = multiprocessing.Manager() 126 127 self._poll_period = poll_period 128 129 self._servers = self._manager.list(servers) 130 self._servers_lock = self._manager.RLock() 131 132 self._with_shards = with_shards 133 self._shards = self._manager.list() 134 self._update_shards_ps = None 135 self._poll_rpc_server_ps = None 136 137 self._stop_event = multiprocessing.Event() 138 139 def start(self): 140 """Call to start recorder""" 141 if(self._with_shards): 142 shard_args = [self._shards, self._servers_lock] 143 shard_kwargs = {'stop_event': self._stop_event} 144 self._update_shards_ps = multiprocessing.Process( 145 name='update_shards', 146 target=update_shards, 147 args=shard_args, 148 kwargs=shard_kwargs) 149 150 self._update_shards_ps.start() 151 152 poll_args = [self._servers, self._servers_lock] 153 poll_kwargs= {'shards':self._shards, 154 'period':self._poll_period, 155 'stop_event':self._stop_event} 156 self._poll_rpc_server_ps = multiprocessing.Process( 157 name='poll_rpc_servers', 158 target=poll_rpc_servers, 159 args=poll_args, 160 kwargs=poll_kwargs) 161 162 self._poll_rpc_server_ps.start() 163 164 def close(self): 165 """Send close event to all sub processes""" 166 self._stop_event.set() 167 168 169 def join(self, timeout=None): 170 """Blocking call until closed and processes complete 171 172 @param timeout: passed to each process, so could be >timeout""" 173 if self._poll_rpc_server_ps: 174 self._poll_rpc_server_ps.join(timeout) 175 176 if self._update_shards_ps: 177 self._update_shards_ps.join(timeout) 178 179def _failed(fields, msg_str, reason, err=None): 180 """Mark current run failed 181 182 @param fields, ts_mon fields to mark as failed 183 @param msg_str, message string to be filled 184 @param reason: why it failed 185 @param err: optional error to log more debug info 186 """ 187 fields['success'] = False 188 fields['failure_reason'] = reason 189 logging.warning("%s failed - %s", msg_str, reason) 190 if err: 191 logging.debug("%s fail_err - %s", msg_str, str(err)) 192 193class AfeMonitor(object): 194 """Object that runs rpc calls against the given afe frontend""" 195 196 def __init__(self, hostname): 197 """ 198 @param hostname: hostname of server to monitor, string 199 """ 200 self._hostname = hostname 201 self._afe = frontend.AFE(server=self._hostname) 202 self._metric_fields = {'target_hostname': self._hostname} 203 204 205 def run_cmd(self, cmd, expected=None): 206 """Runs rpc command and log metrics 207 208 @param cmd: string of rpc command to send 209 @param expected: expected result of rpc 210 """ 211 metric_fields = self._metric_fields.copy() 212 metric_fields['command'] = cmd 213 metric_fields['success'] = True 214 metric_fields['failure_reason'] = '' 215 216 with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS, 217 fields=dict(metric_fields), scale=0.001) as f: 218 219 msg_str = "%s:%s" % (self._hostname, cmd) 220 221 222 try: 223 result = self._afe.run(cmd) 224 logging.debug("%s result = %s", msg_str, result) 225 if expected is not None and expected != result: 226 _failed(f, msg_str, 'IncorrectResponse') 227 228 except urllib.error.HTTPError as e: 229 _failed(f, msg_str, 'HTTPError:%d' % e.code) 230 231 except Exception as e: 232 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'), 233 err=e) 234 235 if type(e) not in FAILURE_REASONS: 236 raise 237 238 if f['success']: 239 logging.info("%s success", msg_str) 240 241 242 def run(self): 243 """Tests server and returns the result""" 244 self.run_cmd('get_server_time') 245 self.run_cmd('ping_db', [True]) 246 247 248def get_parser(): 249 """Returns argparse parser""" 250 parser = argparse.ArgumentParser(description=__doc__) 251 252 parser.add_argument('-a', '--afe', action='append', default=[], 253 help='Autotest FrontEnd server to monitor') 254 255 parser.add_argument('-p', '--poll-period', type=int, default=60, 256 help='Frequency to poll AFE servers') 257 258 parser.add_argument('--no-shards', action='store_false', dest='with_shards', 259 help='Disable shard updating') 260 261 return parser 262 263 264def main(argv): 265 """Main function 266 267 @param argv: commandline arguments passed 268 """ 269 parser = get_parser() 270 options = parser.parse_args(argv[1:]) 271 272 273 if not options.afe: 274 options.afe = [global_config.global_config.get_config_value( 275 'SERVER', 'global_afe_hostname', default='cautotest')] 276 277 with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder', 278 indirect=True): 279 flight_recorder = RpcFlightRecorder(options.afe, 280 with_shards=options.with_shards, 281 poll_period=options.poll_period) 282 283 flight_recorder.start() 284 flight_recorder.join() 285 286 287if __name__ == '__main__': 288 main(sys.argv) 289