1#!/usr/bin/env python2 2 3# Copyright 2017 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Load generator for devserver. 8 9Example usage: 10 11# Find DUTs in suites pool to test with: 12atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready 13 14# Lock DUTs: 15atest host mod -l -r 'quick provision testing' DUT1 DUT2 16 17# Create config file with DUTs to test and builds to use. 18cat >config.json <<EOD 19{ 20 "BOARD": { 21 "duts": [ 22 "chromeosX-rowY-rackZ-hostA", 23 "chromeosX-rowY-rackZ-hostB", 24 ], 25 "versions": [ 26 "auron_paine-paladin/R65-10208.0.0-rc2", 27 "auron_paine-paladin/R65-10208.0.0-rc3", 28 "auron_paine-paladin/R65-10209.0.0-rc1" 29 ] 30 }, 31} 32EOD 33 34# Do 100 total provisions, aiming to have 10 active simultaneously. 35loadtest.py $DS config.json --simultaneous 10 --total 100 36 37# Unlock DUTs: 38atest host mod -u DUT1 DUT2 39""" 40 41import collections 42import datetime 43import json 44import random 45import re 46import signal 47import subprocess 48import sys 49import time 50 51import common 52from autotest_lib.client.common_lib import time_utils 53from autotest_lib.client.common_lib.cros import dev_server 54from chromite.lib import commandline 55from chromite.lib import cros_logging as logging 56from chromite.lib import locking 57from chromite.lib import parallel 58 59# Paylods to stage. 60PAYLOADS = ['quick_provision', 'stateful'] 61 62# Number of seconds between full status checks. 63STATUS_POLL_SECONDS = 2 64 65# Number of successes/failures to blacklist a DUT. 66BLACKLIST_CONSECUTIVE_FAILURE = 2 67BLACKLIST_TOTAL_SUCCESS = 0 68BLACKLIST_TOTAL_FAILURE = 5 69 70def get_parser(): 71 """Creates the argparse parser.""" 72 parser = commandline.ArgumentParser(description=__doc__) 73 parser.add_argument('server', type=str, action='store', 74 help='Devserver to load test.') 75 parser.add_argument('config', type=str, action='store', 76 help='Path to JSON config file.' 77 'Config file is indexed by board with keys of ' 78 '"duts" and "versions", each a list.') 79 parser.add_argument('--blacklist-consecutive', '-C', type=int, 80 action='store', 81 help=('Consecutive number of failures before ' 82 'blacklisting DUT (default %d).') % 83 BLACKLIST_CONSECUTIVE_FAILURE, 84 default=BLACKLIST_CONSECUTIVE_FAILURE) 85 parser.add_argument('--blacklist-success', '-S', type=int, action='store', 86 help=('Total number of successes before blacklisting ' 87 'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS, 88 default=BLACKLIST_TOTAL_SUCCESS) 89 parser.add_argument('--blacklist-total', '-T', type=int, action='store', 90 help=('Total number of failures before blacklisting ' 91 'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE, 92 default=BLACKLIST_TOTAL_FAILURE) 93 parser.add_argument('--boards', '-b', type=str, action='store', 94 help='Comma-separated list of boards to provision.') 95 parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun', 96 help='Do not attempt to provision.') 97 parser.add_argument('--duts', '-d', type=str, action='store', 98 help='Comma-separated list of duts to provision.') 99 parser.add_argument('--outputlog', '-l', type=str, action='store', 100 help='Path to append JSON entries to.') 101 parser.add_argument('--output', '-o', type=str, action='store', 102 help='Path to write JSON file to.') 103 parser.add_argument('--ping', '-p', action='store_true', 104 help='Ping DUTs and blacklist unresponsive ones.') 105 parser.add_argument('--simultaneous', '-s', type=int, action='store', 106 help='Number of simultaneous provisions to run.', 107 default=1) 108 parser.add_argument('--no-stage', action='store_false', 109 dest='stage', default=True, 110 help='Do not attempt to stage builds.') 111 parser.add_argument('--total', '-t', type=int, action='store', 112 help='Number of total provisions to run.', 113 default=0) 114 return parser 115 116def make_entry(entry_id, name, status, start_time, 117 finish_time=None, parent=None, **kwargs): 118 """Generate an event log entry to be stored in Cloud Datastore. 119 120 @param entry_id: A (Kind, id) tuple representing the key. 121 @param name: A string identifying the event 122 @param status: A string identifying the status of the event. 123 @param start_time: A datetime of the start of the event. 124 @param finish_time: A datetime of the finish of the event. 125 @param parent: A (Kind, id) tuple representing the parent key. 126 127 @return A dictionary representing the entry suitable for dumping via JSON. 128 """ 129 entry = { 130 'id': entry_id, 131 'name': name, 132 'status': status, 133 'start_time': time_utils.to_epoch_time(start_time), 134 } 135 if finish_time is not None: 136 entry['finish_time'] = time_utils.to_epoch_time(finish_time) 137 if parent is not None: 138 entry['parent'] = parent 139 return entry 140 141class Job(object): 142 """Tracks a single provision job.""" 143 def __init__(self, ds, host_name, build_name, 144 entry_id=0, parent=None, board=None, 145 start_active=0, 146 force_update=False, full_update=False, 147 clobber_stateful=True, quick_provision=True, 148 ping=False, dryrun=False): 149 150 self.ds = ds 151 self.host_name = host_name 152 self.build_name = build_name 153 154 self.entry_id = ('Job', entry_id) 155 self.parent = parent 156 self.board = board 157 self.start_active = start_active 158 self.end_active = None 159 self.check_active_sum = 0 160 self.check_active_count = 0 161 162 self.start_time = datetime.datetime.now() 163 self.finish_time = None 164 self.trigger_response = None 165 166 self.ping = ping 167 self.pre_ping = None 168 self.post_ping = None 169 170 self.kwargs = { 171 'host_name': host_name, 172 'build_name': build_name, 173 'force_update': force_update, 174 'full_update': full_update, 175 'clobber_stateful': clobber_stateful, 176 'quick_provision': quick_provision, 177 } 178 179 if dryrun: 180 self.finish_time = datetime.datetime.now() 181 self.raised_error = None 182 self.success = True 183 self.pid = 0 184 else: 185 if self.ping: 186 self.pre_ping = ping_dut(self.host_name) 187 self.trigger_response = ds._trigger_auto_update(**self.kwargs) 188 189 def as_entry(self): 190 """Generate an entry for exporting to datastore.""" 191 entry = make_entry(self.entry_id, self.host_name, 192 'pass' if self.success else 'fail', 193 self.start_time, self.finish_time, self.parent) 194 entry.update({ 195 'build_name': self.build_name, 196 'board': self.board, 197 'devserver': self.ds.hostname, 198 'start_active': self.start_active, 199 'end_active': self.end_active, 200 'force_update': self.kwargs['force_update'], 201 'full_update': self.kwargs['full_update'], 202 'clobber_stateful': self.kwargs['clobber_stateful'], 203 'quick_provision': self.kwargs['quick_provision'], 204 'elapsed': int(self.elapsed().total_seconds()), 205 'trigger_response': self.trigger_response, 206 'pre_ping': self.pre_ping, 207 'post_ping': self.post_ping, 208 }) 209 if self.check_active_count: 210 entry['avg_active'] = (self.check_active_sum / 211 self.check_active_count) 212 return entry 213 214 def check(self, active_count): 215 """Checks if a job has completed. 216 217 @param active_count: Number of active provisions at time of the check. 218 @return: True if the job has completed, False otherwise. 219 """ 220 if self.finish_time is not None: 221 return True 222 223 self.check_active_sum += active_count 224 self.check_active_count += 1 225 226 finished, raised_error, pid = self.ds.check_for_auto_update_finished( 227 self.trigger_response, wait=False, **self.kwargs) 228 if finished: 229 self.finish_time = datetime.datetime.now() 230 self.raised_error = raised_error 231 self.success = self.raised_error is None 232 self.pid = pid 233 self.end_active = active_count 234 if self.ping: 235 self.post_ping = ping_dut(self.host_name) 236 237 return finished 238 239 def elapsed(self): 240 """Determine the elapsed time of the task.""" 241 finish_time = self.finish_time or datetime.datetime.now() 242 return finish_time - self.start_time 243 244class Runner(object): 245 """Parallel provision load test runner.""" 246 def __init__(self, ds, duts, config, simultaneous=1, total=0, 247 outputlog=None, ping=False, blacklist_consecutive=None, 248 blacklist_success=None, blacklist_total=None, dryrun=False): 249 self.ds = ds 250 self.duts = duts 251 self.config = config 252 self.start_time = datetime.datetime.now() 253 self.finish_time = None 254 self.simultaneous = simultaneous 255 self.total = total 256 self.outputlog = outputlog 257 self.ping = ping 258 self.blacklist_consecutive = blacklist_consecutive 259 self.blacklist_success = blacklist_success 260 self.blacklist_total = blacklist_total 261 self.dryrun = dryrun 262 263 self.active = [] 264 self.started = 0 265 self.completed = [] 266 # Track DUTs which have failed multiple times. 267 self.dut_blacklist = set() 268 # Track versions of each DUT to provision in order. 269 self.last_versions = {} 270 271 # id for the parent entry. 272 # TODO: This isn't the most unique. 273 self.entry_id = ('Runner', 274 int(time_utils.to_epoch_time(datetime.datetime.now()))) 275 276 # ids for the job entries. 277 self.next_id = 0 278 279 if self.outputlog: 280 dump_entries_as_json([self.as_entry()], self.outputlog) 281 282 def signal_handler(self, signum, frame): 283 """Signal handle to dump current status.""" 284 logging.info('Received signal %s', signum) 285 if signum == signal.SIGUSR1: 286 now = datetime.datetime.now() 287 logging.info('%d active provisions, %d completed provisions, ' 288 '%s elapsed:', 289 len(self.active), len(self.completed), 290 now - self.start_time) 291 for job in self.active: 292 logging.info(' %s -> %s, %s elapsed', 293 job.host_name, job.build_name, 294 now - job.start_time) 295 296 def as_entry(self): 297 """Generate an entry for exporting to datastore.""" 298 entry = make_entry(self.entry_id, 'Runner', 'pass', 299 self.start_time, self.finish_time) 300 entry.update({ 301 'devserver': self.ds.hostname, 302 }) 303 return entry 304 305 def get_completed_entries(self): 306 """Retrieves all completed jobs as entries for datastore.""" 307 entries = [self.as_entry()] 308 entries.extend([job.as_entry() for job in self.completed]) 309 return entries 310 311 def get_next_id(self): 312 """Get the next Job id.""" 313 entry_id = self.next_id 314 self.next_id += 1 315 return entry_id 316 317 def spawn(self, host_name, build_name): 318 """Spawn a single provision job.""" 319 job = Job(self.ds, host_name, build_name, 320 entry_id=self.get_next_id(), parent=self.entry_id, 321 board=self.get_dut_board_type(host_name), 322 start_active=len(self.active), ping=self.ping, 323 dryrun=self.dryrun) 324 self.active.append(job) 325 logging.info('Provision (%d) of %s to %s started', 326 job.entry_id[1], job.host_name, job.build_name) 327 self.last_versions[host_name] = build_name 328 self.started += 1 329 330 def replenish(self): 331 """Replenish the number of active provisions to match goals.""" 332 while ((self.simultaneous == 0 or 333 len(self.active) < self.simultaneous) and 334 (self.total == 0 or self.started < self.total)): 335 host_name = self.find_idle_dut() 336 if host_name: 337 build_name = self.find_build_for_dut(host_name) 338 self.spawn(host_name, build_name) 339 elif self.simultaneous: 340 logging.warn('Insufficient DUTs to satisfy goal') 341 return False 342 else: 343 return len(self.active) > 0 344 return True 345 346 def check_all(self): 347 """Check the status of outstanding provisions.""" 348 still_active = [] 349 for job in self.active: 350 if job.check(len(self.active)): 351 logging.info('Provision (%d) of %s to %s %s in %s: %s', 352 job.entry_id[1], job.host_name, job.build_name, 353 'completed' if job.success else 'failed', 354 job.elapsed(), job.raised_error) 355 entry = job.as_entry() 356 logging.debug(json.dumps(entry)) 357 if self.outputlog: 358 dump_entries_as_json([entry], self.outputlog) 359 self.completed.append(job) 360 if self.should_blacklist(job.host_name): 361 logging.error('Blacklisting DUT %s', job.host_name) 362 self.dut_blacklist.add(job.host_name) 363 else: 364 still_active.append(job) 365 self.active = still_active 366 367 def should_blacklist(self, host_name): 368 """Determines if a given DUT should be blacklisted.""" 369 jobs = [job for job in self.completed if job.host_name == host_name] 370 total = 0 371 consecutive = 0 372 successes = 0 373 for job in jobs: 374 if not job.success: 375 total += 1 376 consecutive += 1 377 if ((self.blacklist_total is not None and 378 total >= self.blacklist_total) or 379 (self.blacklist_consecutive is not None and 380 consecutive >= self.blacklist_consecutive)): 381 return True 382 else: 383 successes += 1 384 if (self.blacklist_success is not None and 385 successes >= self.blacklist_success): 386 return True 387 consecutive = 0 388 return False 389 390 def find_idle_dut(self): 391 """Find an idle DUT to provision..""" 392 active_duts = {job.host_name for job in self.active} 393 idle_duts = [d for d in self.duts 394 if d not in active_duts | self.dut_blacklist] 395 return random.choice(idle_duts) if len(idle_duts) else None 396 397 def get_dut_board_type(self, host_name): 398 """Determine the board type of a DUT.""" 399 return self.duts[host_name] 400 401 def get_board_versions(self, board): 402 """Determine the versions to provision for a board.""" 403 return self.config[board]['versions'] 404 405 def find_build_for_dut(self, host_name): 406 """Determine a build to provision on a DUT.""" 407 board = self.get_dut_board_type(host_name) 408 versions = self.get_board_versions(board) 409 last_version = self.last_versions.get(host_name) 410 try: 411 last_index = versions.index(last_version) 412 except ValueError: 413 return versions[0] 414 return versions[(last_index + 1) % len(versions)] 415 416 def stage(self, build): 417 logging.debug('Staging %s', build) 418 self.ds.stage_artifacts(build, PAYLOADS) 419 420 def stage_all(self): 421 """Stage all necessary artifacts.""" 422 boards = set(self.duts.values()) 423 logging.info('Staging for %d boards', len(boards)) 424 funcs = [] 425 for board in boards: 426 for build in self.get_board_versions(board): 427 funcs.append(lambda build_=build: self.stage(build_)) 428 parallel.RunParallelSteps(funcs) 429 430 def loop(self): 431 """Run the main provision loop.""" 432 # Install a signal handler for status updates. 433 old_handler = signal.signal(signal.SIGUSR1, self.signal_handler) 434 signal.siginterrupt(signal.SIGUSR1, False) 435 436 try: 437 while True: 438 self.check_all() 439 if self.total != 0 and len(self.completed) >= self.total: 440 break 441 if not self.replenish() and len(self.active) == 0: 442 logging.error('Unable to replenish with no active ' 443 'provisions') 444 return False 445 logging.debug('%d provisions active', len(self.active)) 446 time.sleep(STATUS_POLL_SECONDS) 447 return True 448 except KeyboardInterrupt: 449 return False 450 finally: 451 self.finish_time = datetime.datetime.now() 452 # Clean up signal handler. 453 signal.signal(signal.SIGUSR1, old_handler) 454 455 def elapsed(self): 456 """Determine the elapsed time of the task.""" 457 finish_time = self.finish_time or datetime.datetime.now() 458 return finish_time - self.start_time 459 460def dump_entries_as_json(entries, output_file): 461 """Dump event log entries as json to a file. 462 463 @param entries: A list of event log entries to dump. 464 @param output_file: The file to write to. 465 """ 466 # Write the entries out as JSON. 467 logging.debug('Dumping %d entries' % len(entries)) 468 for e in entries: 469 json.dump(e, output_file, sort_keys=True) 470 output_file.write('\n') 471 output_file.flush() 472 473def ping_dut(hostname): 474 """Checks if a host is responsive to pings.""" 475 if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname): 476 hostname += '.cros' 477 478 response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname], 479 stdout=subprocess.PIPE) 480 return response == 0 481 482def main(argv): 483 """Load generator for a devserver.""" 484 parser = get_parser() 485 options = parser.parse_args(argv) 486 487 # Parse devserver. 488 if options.server: 489 if re.match(r'^https?://', options.server): 490 server = options.server 491 else: 492 server = 'http://%s/' % options.server 493 ds = dev_server.ImageServer(server) 494 else: 495 parser.print_usage() 496 logging.error('Must specify devserver') 497 sys.exit(1) 498 499 # Parse config file and determine master list of duts and their board type, 500 # filtering by board type if specified. 501 duts = {} 502 if options.config: 503 with open(options.config, 'r') as f: 504 config = json.load(f) 505 boards = (options.boards.split(',') 506 if options.boards else config.keys()) 507 duts_specified = (set(options.duts.split(',')) 508 if options.duts else None) 509 for board in boards: 510 duts.update({dut: board for dut in config[board]['duts'] 511 if duts_specified is None or 512 dut in duts_specified}) 513 logging.info('Config file %s: %d boards, %d duts', 514 options.config, len(boards), len(duts)) 515 else: 516 parser.print_usage() 517 logging.error('Must specify config file') 518 sys.exit(1) 519 520 if options.ping: 521 logging.info('Performing ping tests') 522 duts_alive = {} 523 for dut, board in duts.items(): 524 if ping_dut(dut): 525 duts_alive[dut] = board 526 else: 527 logging.error('Ignoring DUT %s (%s) for failing initial ' 528 'ping check', dut, board) 529 duts = duts_alive 530 logging.info('After ping tests: %d boards, %d duts', len(boards), 531 len(duts)) 532 533 # Set up the test runner and stage all the builds. 534 outputlog = open(options.outputlog, 'a') if options.outputlog else None 535 runner = Runner(ds, duts, config, 536 simultaneous=options.simultaneous, total=options.total, 537 outputlog=outputlog, ping=options.ping, 538 blacklist_consecutive=options.blacklist_consecutive, 539 blacklist_success=options.blacklist_success, 540 blacklist_total=options.blacklist_total, 541 dryrun=options.dryrun) 542 if options.stage: 543 runner.stage_all() 544 545 # Run all the provisions. 546 with locking.FileLock(options.config, blocking=True).lock(): 547 completed = runner.loop() 548 logging.info('%s in %s', 'Completed' if completed else 'Interrupted', 549 runner.elapsed()) 550 # Write all entries as JSON. 551 entries = runner.get_completed_entries() 552 if options.output: 553 with open(options.output, 'w') as f: 554 dump_entries_as_json(entries, f) 555 else: 556 dump_entries_as_json(entries, sys.stdout) 557 logging.info('Summary: %s', 558 dict(collections.Counter([e['status'] for e in entries 559 if e['name'] != 'Runner']))) 560 561 # List blacklisted DUTs. 562 if runner.dut_blacklist: 563 logging.warn('Blacklisted DUTs:') 564 for host_name in runner.dut_blacklist: 565 logging.warn(' %s', host_name) 566 567if __name__ == '__main__': 568 sys.exit(main(sys.argv[1:])) 569