1#!/usr/bin/env python2 2 3# Copyright 2017 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Load generator for devserver. 8 9Example usage: 10 11# Find DUTs in suites pool to test with: 12atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready 13 14# Lock DUTs: 15atest host mod -l -r 'quick provision testing' DUT1 DUT2 16 17# Create config file with DUTs to test and builds to use. 18cat >config.json <<EOD 19{ 20 "BOARD": { 21 "duts": [ 22 "chromeosX-rowY-rackZ-hostA", 23 "chromeosX-rowY-rackZ-hostB", 24 ], 25 "versions": [ 26 "auron_paine-paladin/R65-10208.0.0-rc2", 27 "auron_paine-paladin/R65-10208.0.0-rc3", 28 "auron_paine-paladin/R65-10209.0.0-rc1" 29 ] 30 }, 31} 32EOD 33 34# Do 100 total provisions, aiming to have 10 active simultaneously. 35loadtest.py $DS config.json --simultaneous 10 --total 100 36 37# Unlock DUTs: 38atest host mod -u DUT1 DUT2 39""" 40 41import argparse 42import collections 43import datetime 44import json 45import logging 46import random 47import re 48import signal 49import subprocess 50import sys 51import time 52 53import common 54from autotest_lib.client.common_lib import time_utils 55from autotest_lib.client.common_lib.cros import dev_server 56from chromite.lib import locking 57from chromite.lib import parallel 58 59# Paylods to stage. 60PAYLOADS = ['quick_provision', 'stateful'] 61 62# Number of seconds between full status checks. 63STATUS_POLL_SECONDS = 2 64 65# Number of successes/failures to blacklist a DUT. 66BLACKLIST_CONSECUTIVE_FAILURE = 2 67BLACKLIST_TOTAL_SUCCESS = 0 68BLACKLIST_TOTAL_FAILURE = 5 69 70def get_parser(): 71 """Creates the argparse parser.""" 72 parser = argparse.ArgumentParser(description=__doc__) 73 parser.add_argument('server', type=str, action='store', 74 help='Devserver to load test.') 75 parser.add_argument('config', type=str, action='store', 76 help='Path to JSON config file.' 77 'Config file is indexed by board with keys of ' 78 '"duts" and "versions", each a list.') 79 parser.add_argument('--blacklist-consecutive', '-C', type=int, 80 action='store', 81 help=('Consecutive number of failures before ' 82 'blacklisting DUT (default %d).') % 83 BLACKLIST_CONSECUTIVE_FAILURE, 84 default=BLACKLIST_CONSECUTIVE_FAILURE) 85 parser.add_argument('--blacklist-success', '-S', type=int, action='store', 86 help=('Total number of successes before blacklisting ' 87 'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS, 88 default=BLACKLIST_TOTAL_SUCCESS) 89 parser.add_argument('--blacklist-total', '-T', type=int, action='store', 90 help=('Total number of failures before blacklisting ' 91 'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE, 92 default=BLACKLIST_TOTAL_FAILURE) 93 parser.add_argument('--boards', '-b', type=str, action='store', 94 help='Comma-separated list of boards to provision.') 95 parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun', 96 help='Do not attempt to provision.') 97 parser.add_argument('--duts', '-d', type=str, action='store', 98 help='Comma-separated list of duts to provision.') 99 parser.add_argument('--outputlog', '-l', type=str, action='store', 100 help='Path to append JSON entries to.') 101 parser.add_argument('--output', '-o', type=str, action='store', 102 help='Path to write JSON file to.') 103 parser.add_argument('--ping', '-p', action='store_true', 104 help='Ping DUTs and blacklist unresponsive ones.') 105 parser.add_argument('--simultaneous', '-s', type=int, action='store', 106 help='Number of simultaneous provisions to run.', 107 default=1) 108 parser.add_argument('--no-stage', action='store_false', 109 dest='stage', default=True, 110 help='Do not attempt to stage builds.') 111 parser.add_argument('--total', '-t', type=int, action='store', 112 help='Number of total provisions to run.', 113 default=0) 114 return parser 115 116def make_entry(entry_id, name, status, start_time, 117 finish_time=None, parent=None, **kwargs): 118 """Generate an event log entry to be stored in Cloud Datastore. 119 120 @param entry_id: A (Kind, id) tuple representing the key. 121 @param name: A string identifying the event 122 @param status: A string identifying the status of the event. 123 @param start_time: A datetime of the start of the event. 124 @param finish_time: A datetime of the finish of the event. 125 @param parent: A (Kind, id) tuple representing the parent key. 126 127 @return A dictionary representing the entry suitable for dumping via JSON. 128 """ 129 entry = { 130 'id': entry_id, 131 'name': name, 132 'status': status, 133 'start_time': time_utils.to_epoch_time(start_time), 134 } 135 if finish_time is not None: 136 entry['finish_time'] = time_utils.to_epoch_time(finish_time) 137 if parent is not None: 138 entry['parent'] = parent 139 return entry 140 141class Job(object): 142 """Tracks a single provision job.""" 143 def __init__(self, ds, host_name, build_name, 144 entry_id=0, parent=None, board=None, 145 start_active=0, 146 force_update=False, full_update=False, 147 clobber_stateful=True, quick_provision=True, 148 ping=False, dryrun=False): 149 150 self.ds = ds 151 self.host_name = host_name 152 self.build_name = build_name 153 154 self.entry_id = ('Job', entry_id) 155 self.parent = parent 156 self.board = board 157 self.start_active = start_active 158 self.end_active = None 159 self.check_active_sum = 0 160 self.check_active_count = 0 161 162 self.start_time = datetime.datetime.now() 163 self.finish_time = None 164 self.trigger_response = None 165 166 self.ping = ping 167 self.pre_ping = None 168 self.post_ping = None 169 170 self.kwargs = { 171 'host_name': host_name, 172 'build_name': build_name, 173 'force_update': force_update, 174 'full_update': full_update, 175 'clobber_stateful': clobber_stateful, 176 'quick_provision': quick_provision, 177 } 178 179 if dryrun: 180 self.finish_time = datetime.datetime.now() 181 self.raised_error = None 182 self.success = True 183 self.pid = 0 184 else: 185 if self.ping: 186 self.pre_ping = ping_dut(self.host_name) 187 self.trigger_response = ds._trigger_auto_update(**self.kwargs) 188 189 def as_entry(self): 190 """Generate an entry for exporting to datastore.""" 191 entry = make_entry(self.entry_id, self.host_name, 192 'pass' if self.success else 'fail', 193 self.start_time, self.finish_time, self.parent) 194 entry.update({ 195 'build_name': self.build_name, 196 'board': self.board, 197 'devserver': self.ds.hostname, 198 'start_active': self.start_active, 199 'end_active': self.end_active, 200 'force_update': self.kwargs['force_update'], 201 'full_update': self.kwargs['full_update'], 202 'clobber_stateful': self.kwargs['clobber_stateful'], 203 'quick_provision': self.kwargs['quick_provision'], 204 'elapsed': int(self.elapsed().total_seconds()), 205 'trigger_response': self.trigger_response, 206 'pre_ping': self.pre_ping, 207 'post_ping': self.post_ping, 208 }) 209 if self.check_active_count: 210 entry['avg_active'] = (self.check_active_sum / 211 self.check_active_count) 212 return entry 213 214 def check(self, active_count): 215 """Checks if a job has completed. 216 217 @param active_count: Number of active provisions at time of the check. 218 @return: True if the job has completed, False otherwise. 219 """ 220 if self.finish_time is not None: 221 return True 222 223 self.check_active_sum += active_count 224 self.check_active_count += 1 225 226 finished, raised_error, pid = self.ds.check_for_auto_update_finished( 227 self.trigger_response, wait=False, **self.kwargs) 228 if finished: 229 self.finish_time = datetime.datetime.now() 230 self.raised_error = raised_error 231 self.success = self.raised_error is None 232 self.pid = pid 233 self.end_active = active_count 234 if self.ping: 235 self.post_ping = ping_dut(self.host_name) 236 237 return finished 238 239 def elapsed(self): 240 """Determine the elapsed time of the task.""" 241 finish_time = self.finish_time or datetime.datetime.now() 242 return finish_time - self.start_time 243 244class Runner(object): 245 """Parallel provision load test runner.""" 246 def __init__(self, ds, duts, config, simultaneous=1, total=0, 247 outputlog=None, ping=False, blacklist_consecutive=None, 248 blacklist_success=None, blacklist_total=None, dryrun=False): 249 self.ds = ds 250 self.duts = duts 251 self.config = config 252 self.start_time = datetime.datetime.now() 253 self.finish_time = None 254 self.simultaneous = simultaneous 255 self.total = total 256 self.outputlog = outputlog 257 self.ping = ping 258 self.blacklist_consecutive = blacklist_consecutive 259 self.blacklist_success = blacklist_success 260 self.blacklist_total = blacklist_total 261 self.dryrun = dryrun 262 263 self.active = [] 264 self.started = 0 265 self.completed = [] 266 # Track DUTs which have failed multiple times. 267 self.dut_blacklist = set() 268 # Track versions of each DUT to provision in order. 269 self.last_versions = {} 270 271 # id for the parent entry. 272 # TODO: This isn't the most unique. 273 self.entry_id = ('Runner', 274 int(time_utils.to_epoch_time(datetime.datetime.now()))) 275 276 # ids for the job entries. 277 self.next_id = 0 278 279 if self.outputlog: 280 dump_entries_as_json([self.as_entry()], self.outputlog) 281 282 def signal_handler(self, signum, frame): 283 """Signal handle to dump current status.""" 284 logging.info('Received signal %s', signum) 285 if signum == signal.SIGUSR1: 286 now = datetime.datetime.now() 287 logging.info('%d active provisions, %d completed provisions, ' 288 '%s elapsed:', 289 len(self.active), len(self.completed), 290 now - self.start_time) 291 for job in self.active: 292 logging.info(' %s -> %s, %s elapsed', 293 job.host_name, job.build_name, 294 now - job.start_time) 295 296 def as_entry(self): 297 """Generate an entry for exporting to datastore.""" 298 entry = make_entry(self.entry_id, 'Runner', 'pass', 299 self.start_time, self.finish_time) 300 entry.update({ 301 'devserver': self.ds.hostname, 302 }) 303 return entry 304 305 def get_completed_entries(self): 306 """Retrieves all completed jobs as entries for datastore.""" 307 entries = [self.as_entry()] 308 entries.extend([job.as_entry() for job in self.completed]) 309 return entries 310 311 def get_next_id(self): 312 """Get the next Job id.""" 313 entry_id = self.next_id 314 self.next_id += 1 315 return entry_id 316 317 def spawn(self, host_name, build_name): 318 """Spawn a single provision job.""" 319 job = Job(self.ds, host_name, build_name, 320 entry_id=self.get_next_id(), parent=self.entry_id, 321 board=self.get_dut_board_type(host_name), 322 start_active=len(self.active), ping=self.ping, 323 dryrun=self.dryrun) 324 self.active.append(job) 325 logging.info('Provision (%d) of %s to %s started', 326 job.entry_id[1], job.host_name, job.build_name) 327 self.last_versions[host_name] = build_name 328 self.started += 1 329 330 def replenish(self): 331 """Replenish the number of active provisions to match goals.""" 332 while ((self.simultaneous == 0 or 333 len(self.active) < self.simultaneous) and 334 (self.total == 0 or self.started < self.total)): 335 host_name = self.find_idle_dut() 336 if host_name: 337 build_name = self.find_build_for_dut(host_name) 338 self.spawn(host_name, build_name) 339 elif self.simultaneous: 340 logging.warn('Insufficient DUTs to satisfy goal') 341 return False 342 else: 343 return len(self.active) > 0 344 return True 345 346 def check_all(self): 347 """Check the status of outstanding provisions.""" 348 still_active = [] 349 for job in self.active: 350 if job.check(len(self.active)): 351 logging.info('Provision (%d) of %s to %s %s in %s: %s', 352 job.entry_id[1], job.host_name, job.build_name, 353 'completed' if job.success else 'failed', 354 job.elapsed(), job.raised_error) 355 entry = job.as_entry() 356 logging.debug(json.dumps(entry)) 357 if self.outputlog: 358 dump_entries_as_json([entry], self.outputlog) 359 self.completed.append(job) 360 if self.should_blacklist(job.host_name): 361 logging.error('Blacklisting DUT %s', job.host_name) 362 self.dut_blacklist.add(job.host_name) 363 else: 364 still_active.append(job) 365 self.active = still_active 366 367 def should_blacklist(self, host_name): 368 """Determines if a given DUT should be blacklisted.""" 369 jobs = [job for job in self.completed if job.host_name == host_name] 370 total = 0 371 consecutive = 0 372 successes = 0 373 for job in jobs: 374 if not job.success: 375 total += 1 376 consecutive += 1 377 if ((self.blacklist_total is not None and 378 total >= self.blacklist_total) or 379 (self.blacklist_consecutive is not None and 380 consecutive >= self.blacklist_consecutive)): 381 return True 382 else: 383 successes += 1 384 if (self.blacklist_success is not None and 385 successes >= self.blacklist_success): 386 return True 387 consecutive = 0 388 return False 389 390 def find_idle_dut(self): 391 """Find an idle DUT to provision..""" 392 active_duts = {job.host_name for job in self.active} 393 idle_duts = [d for d in self.duts 394 if d not in active_duts | self.dut_blacklist] 395 return random.choice(idle_duts) if len(idle_duts) else None 396 397 def get_dut_board_type(self, host_name): 398 """Determine the board type of a DUT.""" 399 return self.duts[host_name] 400 401 def get_board_versions(self, board): 402 """Determine the versions to provision for a board.""" 403 return self.config[board]['versions'] 404 405 def find_build_for_dut(self, host_name): 406 """Determine a build to provision on a DUT.""" 407 board = self.get_dut_board_type(host_name) 408 versions = self.get_board_versions(board) 409 last_version = self.last_versions.get(host_name) 410 try: 411 last_index = versions.index(last_version) 412 except ValueError: 413 return versions[0] 414 return versions[(last_index + 1) % len(versions)] 415 416 def stage(self, build): 417 """Stage artifacts for a given build.""" 418 logging.debug('Staging %s', build) 419 self.ds.stage_artifacts(build, PAYLOADS) 420 421 def stage_all(self): 422 """Stage all necessary artifacts.""" 423 boards = set(self.duts.values()) 424 logging.info('Staging for %d boards', len(boards)) 425 funcs = [] 426 for board in boards: 427 for build in self.get_board_versions(board): 428 funcs.append(lambda build_=build: self.stage(build_)) 429 parallel.RunParallelSteps(funcs) 430 431 def loop(self): 432 """Run the main provision loop.""" 433 # Install a signal handler for status updates. 434 old_handler = signal.signal(signal.SIGUSR1, self.signal_handler) 435 signal.siginterrupt(signal.SIGUSR1, False) 436 437 try: 438 while True: 439 self.check_all() 440 if self.total != 0 and len(self.completed) >= self.total: 441 break 442 if not self.replenish() and len(self.active) == 0: 443 logging.error('Unable to replenish with no active ' 444 'provisions') 445 return False 446 logging.debug('%d provisions active', len(self.active)) 447 time.sleep(STATUS_POLL_SECONDS) 448 return True 449 except KeyboardInterrupt: 450 return False 451 finally: 452 self.finish_time = datetime.datetime.now() 453 # Clean up signal handler. 454 signal.signal(signal.SIGUSR1, old_handler) 455 456 def elapsed(self): 457 """Determine the elapsed time of the task.""" 458 finish_time = self.finish_time or datetime.datetime.now() 459 return finish_time - self.start_time 460 461def dump_entries_as_json(entries, output_file): 462 """Dump event log entries as json to a file. 463 464 @param entries: A list of event log entries to dump. 465 @param output_file: The file to write to. 466 """ 467 # Write the entries out as JSON. 468 logging.debug('Dumping %d entries' % len(entries)) 469 for e in entries: 470 json.dump(e, output_file, sort_keys=True) 471 output_file.write('\n') 472 output_file.flush() 473 474def ping_dut(hostname): 475 """Checks if a host is responsive to pings.""" 476 if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname): 477 hostname += '.cros' 478 479 response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname], 480 stdout=subprocess.PIPE) 481 return response == 0 482 483def main(argv): 484 """Load generator for a devserver.""" 485 parser = get_parser() 486 options = parser.parse_args(argv) 487 488 # Parse devserver. 489 if options.server: 490 if re.match(r'^https?://', options.server): 491 server = options.server 492 else: 493 server = 'http://%s/' % options.server 494 ds = dev_server.ImageServer(server) 495 else: 496 parser.print_usage() 497 logging.error('Must specify devserver') 498 sys.exit(1) 499 500 # Parse config file and determine master list of duts and their board type, 501 # filtering by board type if specified. 502 duts = {} 503 if options.config: 504 with open(options.config, 'r') as f: 505 config = json.load(f) 506 boards = (options.boards.split(',') 507 if options.boards else config.keys()) 508 duts_specified = (set(options.duts.split(',')) 509 if options.duts else None) 510 for board in boards: 511 duts.update({dut: board for dut in config[board]['duts'] 512 if duts_specified is None or 513 dut in duts_specified}) 514 logging.info('Config file %s: %d boards, %d duts', 515 options.config, len(boards), len(duts)) 516 else: 517 parser.print_usage() 518 logging.error('Must specify config file') 519 sys.exit(1) 520 521 if options.ping: 522 logging.info('Performing ping tests') 523 duts_alive = {} 524 for dut, board in duts.items(): 525 if ping_dut(dut): 526 duts_alive[dut] = board 527 else: 528 logging.error('Ignoring DUT %s (%s) for failing initial ' 529 'ping check', dut, board) 530 duts = duts_alive 531 logging.info('After ping tests: %d boards, %d duts', len(boards), 532 len(duts)) 533 534 # Set up the test runner and stage all the builds. 535 outputlog = open(options.outputlog, 'a') if options.outputlog else None 536 runner = Runner(ds, duts, config, 537 simultaneous=options.simultaneous, total=options.total, 538 outputlog=outputlog, ping=options.ping, 539 blacklist_consecutive=options.blacklist_consecutive, 540 blacklist_success=options.blacklist_success, 541 blacklist_total=options.blacklist_total, 542 dryrun=options.dryrun) 543 if options.stage: 544 runner.stage_all() 545 546 # Run all the provisions. 547 with locking.FileLock(options.config, blocking=True).lock(): 548 completed = runner.loop() 549 logging.info('%s in %s', 'Completed' if completed else 'Interrupted', 550 runner.elapsed()) 551 # Write all entries as JSON. 552 entries = runner.get_completed_entries() 553 if options.output: 554 with open(options.output, 'w') as f: 555 dump_entries_as_json(entries, f) 556 else: 557 dump_entries_as_json(entries, sys.stdout) 558 logging.info('Summary: %s', 559 dict(collections.Counter([e['status'] for e in entries 560 if e['name'] != 'Runner']))) 561 562 # List blacklisted DUTs. 563 if runner.dut_blacklist: 564 logging.warn('Blacklisted DUTs:') 565 for host_name in runner.dut_blacklist: 566 logging.warn(' %s', host_name) 567 568if __name__ == '__main__': 569 sys.exit(main(sys.argv[1:])) 570