1#!/usr/bin/env python2 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Standalone service to monitor AFE servers and report to ts_mon""" 7 8import argparse 9import sys 10import time 11import logging 12import multiprocessing 13from six.moves import urllib 14 15import common 16from autotest_lib.client.common_lib import global_config 17from autotest_lib.frontend.afe.json_rpc import proxy 18from autotest_lib.server import frontend 19# import needed to setup host_attributes 20# pylint: disable=unused-import 21from autotest_lib.server import site_host_attributes 22from autotest_lib.site_utils import server_manager_utils 23from chromite.lib import metrics 24from chromite.lib import ts_mon_config 25 26METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc' 27METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations' 28METRIC_TICK = METRIC_ROOT + '/tick' 29METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error' 30 31FAILURE_REASONS = { 32 proxy.JSONRPCException: 'JSONRPCException', 33 } 34 35def afe_rpc_call(hostname): 36 """Perform one rpc call set on server 37 38 @param hostname: server's hostname to poll 39 """ 40 afe_monitor = AfeMonitor(hostname) 41 try: 42 afe_monitor.run() 43 except Exception as e: 44 metrics.Counter(METRIC_MONITOR_ERROR).increment( 45 fields={'target_hostname': hostname}) 46 logging.exception('Exception when running against host %s', hostname) 47 48 49def update_shards(shards, shards_lock, period=600, stop_event=None): 50 """Updates dict of shards 51 52 @param shards: list of shards to be updated 53 @param shards_lock: shared lock for accessing shards 54 @param period: time between polls 55 @param stop_event: Event that can be set to stop polling 56 """ 57 while(not stop_event or not stop_event.is_set()): 58 start_time = time.time() 59 60 logging.debug('Updating Shards') 61 new_shards = set(server_manager_utils.get_shards()) 62 63 with shards_lock: 64 current_shards = set(shards) 65 rm_shards = current_shards - new_shards 66 add_shards = new_shards - current_shards 67 68 if rm_shards: 69 for s in rm_shards: 70 shards.remove(s) 71 72 if add_shards: 73 shards.extend(add_shards) 74 75 if rm_shards: 76 logging.info('Servers left production: %s', str(rm_shards)) 77 78 if add_shards: 79 logging.info('Servers entered production: %s', 80 str(add_shards)) 81 82 wait_time = (start_time + period) - time.time() 83 if wait_time > 0: 84 time.sleep(wait_time) 85 86 87def poll_rpc_servers(servers, servers_lock, shards=None, period=60, 88 stop_event=None): 89 """Blocking function that polls all servers and shards 90 91 @param servers: list of servers to poll 92 @param servers_lock: lock to be used when accessing servers or shards 93 @param shards: list of shards to poll 94 @param period: time between polls 95 @param stop_event: Event that can be set to stop polling 96 """ 97 pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4) 98 99 while(not stop_event or not stop_event.is_set()): 100 start_time = time.time() 101 with servers_lock: 102 all_servers = set(servers).union(shards) 103 104 logging.debug('Starting Server Polling: %s', ', '.join(all_servers)) 105 pool.map(afe_rpc_call, all_servers) 106 107 logging.debug('Finished Server Polling') 108 109 metrics.Counter(METRIC_TICK).increment() 110 111 wait_time = (start_time + period) - time.time() 112 if wait_time > 0: 113 time.sleep(wait_time) 114 115 116class RpcFlightRecorder(object): 117 """Monitors a list of AFE""" 118 def __init__(self, servers, with_shards=True, poll_period=60): 119 """ 120 @param servers: list of afe services to monitor 121 @param with_shards: also record status on shards 122 @param poll_period: frequency to poll all services, in seconds 123 """ 124 self._manager = multiprocessing.Manager() 125 126 self._poll_period = poll_period 127 128 self._servers = self._manager.list(servers) 129 self._servers_lock = self._manager.RLock() 130 131 self._with_shards = with_shards 132 self._shards = self._manager.list() 133 self._update_shards_ps = None 134 self._poll_rpc_server_ps = None 135 136 self._stop_event = multiprocessing.Event() 137 138 def start(self): 139 """Call to start recorder""" 140 if(self._with_shards): 141 shard_args = [self._shards, self._servers_lock] 142 shard_kwargs = {'stop_event': self._stop_event} 143 self._update_shards_ps = multiprocessing.Process( 144 name='update_shards', 145 target=update_shards, 146 args=shard_args, 147 kwargs=shard_kwargs) 148 149 self._update_shards_ps.start() 150 151 poll_args = [self._servers, self._servers_lock] 152 poll_kwargs= {'shards':self._shards, 153 'period':self._poll_period, 154 'stop_event':self._stop_event} 155 self._poll_rpc_server_ps = multiprocessing.Process( 156 name='poll_rpc_servers', 157 target=poll_rpc_servers, 158 args=poll_args, 159 kwargs=poll_kwargs) 160 161 self._poll_rpc_server_ps.start() 162 163 def close(self): 164 """Send close event to all sub processes""" 165 self._stop_event.set() 166 167 168 def join(self, timeout=None): 169 """Blocking call until closed and processes complete 170 171 @param timeout: passed to each process, so could be >timeout""" 172 if self._poll_rpc_server_ps: 173 self._poll_rpc_server_ps.join(timeout) 174 175 if self._update_shards_ps: 176 self._update_shards_ps.join(timeout) 177 178def _failed(fields, msg_str, reason, err=None): 179 """Mark current run failed 180 181 @param fields, ts_mon fields to mark as failed 182 @param msg_str, message string to be filled 183 @param reason: why it failed 184 @param err: optional error to log more debug info 185 """ 186 fields['success'] = False 187 fields['failure_reason'] = reason 188 logging.warning("%s failed - %s", msg_str, reason) 189 if err: 190 logging.debug("%s fail_err - %s", msg_str, str(err)) 191 192class AfeMonitor(object): 193 """Object that runs rpc calls against the given afe frontend""" 194 195 def __init__(self, hostname): 196 """ 197 @param hostname: hostname of server to monitor, string 198 """ 199 self._hostname = hostname 200 self._afe = frontend.AFE(server=self._hostname) 201 self._metric_fields = {'target_hostname': self._hostname} 202 203 204 def run_cmd(self, cmd, expected=None): 205 """Runs rpc command and log metrics 206 207 @param cmd: string of rpc command to send 208 @param expected: expected result of rpc 209 """ 210 metric_fields = self._metric_fields.copy() 211 metric_fields['command'] = cmd 212 metric_fields['success'] = True 213 metric_fields['failure_reason'] = '' 214 215 with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS, 216 fields=dict(metric_fields), scale=0.001) as f: 217 218 msg_str = "%s:%s" % (self._hostname, cmd) 219 220 221 try: 222 result = self._afe.run(cmd) 223 logging.debug("%s result = %s", msg_str, result) 224 if expected is not None and expected != result: 225 _failed(f, msg_str, 'IncorrectResponse') 226 227 except urllib.error.HTTPError as e: 228 _failed(f, msg_str, 'HTTPError:%d' % e.code) 229 230 except Exception as e: 231 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'), 232 err=e) 233 234 if type(e) not in FAILURE_REASONS: 235 raise 236 237 if f['success']: 238 logging.info("%s success", msg_str) 239 240 241 def run(self): 242 """Tests server and returns the result""" 243 self.run_cmd('get_server_time') 244 self.run_cmd('ping_db', [True]) 245 246 247def get_parser(): 248 """Returns argparse parser""" 249 parser = argparse.ArgumentParser(description=__doc__) 250 251 parser.add_argument('-a', '--afe', action='append', default=[], 252 help='Autotest FrontEnd server to monitor') 253 254 parser.add_argument('-p', '--poll-period', type=int, default=60, 255 help='Frequency to poll AFE servers') 256 257 parser.add_argument('--no-shards', action='store_false', dest='with_shards', 258 help='Disable shard updating') 259 260 return parser 261 262 263def main(argv): 264 """Main function 265 266 @param argv: commandline arguments passed 267 """ 268 parser = get_parser() 269 options = parser.parse_args(argv[1:]) 270 271 272 if not options.afe: 273 options.afe = [global_config.global_config.get_config_value( 274 'SERVER', 'global_afe_hostname', default='cautotest')] 275 276 with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder', 277 indirect=True): 278 flight_recorder = RpcFlightRecorder(options.afe, 279 with_shards=options.with_shards, 280 poll_period=options.poll_period) 281 282 flight_recorder.start() 283 flight_recorder.join() 284 285 286if __name__ == '__main__': 287 main(sys.argv) 288