#!/usr/bin/env python2 # Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Load generator for devserver. Example usage: # Find DUTs in suites pool to test with: atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready # Lock DUTs: atest host mod -l -r 'quick provision testing' DUT1 DUT2 # Create config file with DUTs to test and builds to use. cat >config.json < %s, %s elapsed', job.host_name, job.build_name, now - job.start_time) def as_entry(self): """Generate an entry for exporting to datastore.""" entry = make_entry(self.entry_id, 'Runner', 'pass', self.start_time, self.finish_time) entry.update({ 'devserver': self.ds.hostname, }) return entry def get_completed_entries(self): """Retrieves all completed jobs as entries for datastore.""" entries = [self.as_entry()] entries.extend([job.as_entry() for job in self.completed]) return entries def get_next_id(self): """Get the next Job id.""" entry_id = self.next_id self.next_id += 1 return entry_id def spawn(self, host_name, build_name): """Spawn a single provision job.""" job = Job(self.ds, host_name, build_name, entry_id=self.get_next_id(), parent=self.entry_id, board=self.get_dut_board_type(host_name), start_active=len(self.active), ping=self.ping, dryrun=self.dryrun) self.active.append(job) logging.info('Provision (%d) of %s to %s started', job.entry_id[1], job.host_name, job.build_name) self.last_versions[host_name] = build_name self.started += 1 def replenish(self): """Replenish the number of active provisions to match goals.""" while ((self.simultaneous == 0 or len(self.active) < self.simultaneous) and (self.total == 0 or self.started < self.total)): host_name = self.find_idle_dut() if host_name: build_name = self.find_build_for_dut(host_name) self.spawn(host_name, build_name) elif self.simultaneous: logging.warn('Insufficient DUTs to satisfy goal') return False else: return len(self.active) > 0 return True def check_all(self): """Check the status of outstanding provisions.""" still_active = [] for job in self.active: if job.check(len(self.active)): logging.info('Provision (%d) of %s to %s %s in %s: %s', job.entry_id[1], job.host_name, job.build_name, 'completed' if job.success else 'failed', job.elapsed(), job.raised_error) entry = job.as_entry() logging.debug(json.dumps(entry)) if self.outputlog: dump_entries_as_json([entry], self.outputlog) self.completed.append(job) if self.should_blacklist(job.host_name): logging.error('Blacklisting DUT %s', job.host_name) self.dut_blacklist.add(job.host_name) else: still_active.append(job) self.active = still_active def should_blacklist(self, host_name): """Determines if a given DUT should be blacklisted.""" jobs = [job for job in self.completed if job.host_name == host_name] total = 0 consecutive = 0 successes = 0 for job in jobs: if not job.success: total += 1 consecutive += 1 if ((self.blacklist_total is not None and total >= self.blacklist_total) or (self.blacklist_consecutive is not None and consecutive >= self.blacklist_consecutive)): return True else: successes += 1 if (self.blacklist_success is not None and successes >= self.blacklist_success): return True consecutive = 0 return False def find_idle_dut(self): """Find an idle DUT to provision..""" active_duts = {job.host_name for job in self.active} idle_duts = [d for d in self.duts if d not in active_duts | self.dut_blacklist] return random.choice(idle_duts) if len(idle_duts) else None def get_dut_board_type(self, host_name): """Determine the board type of a DUT.""" return self.duts[host_name] def get_board_versions(self, board): """Determine the versions to provision for a board.""" return self.config[board]['versions'] def find_build_for_dut(self, host_name): """Determine a build to provision on a DUT.""" board = self.get_dut_board_type(host_name) versions = self.get_board_versions(board) last_version = self.last_versions.get(host_name) try: last_index = versions.index(last_version) except ValueError: return versions[0] return versions[(last_index + 1) % len(versions)] def stage(self, build): """Stage artifacts for a given build.""" logging.debug('Staging %s', build) self.ds.stage_artifacts(build, PAYLOADS) def stage_all(self): """Stage all necessary artifacts.""" boards = set(self.duts.values()) logging.info('Staging for %d boards', len(boards)) funcs = [] for board in boards: for build in self.get_board_versions(board): funcs.append(lambda build_=build: self.stage(build_)) parallel.RunParallelSteps(funcs) def loop(self): """Run the main provision loop.""" # Install a signal handler for status updates. old_handler = signal.signal(signal.SIGUSR1, self.signal_handler) signal.siginterrupt(signal.SIGUSR1, False) try: while True: self.check_all() if self.total != 0 and len(self.completed) >= self.total: break if not self.replenish() and len(self.active) == 0: logging.error('Unable to replenish with no active ' 'provisions') return False logging.debug('%d provisions active', len(self.active)) time.sleep(STATUS_POLL_SECONDS) return True except KeyboardInterrupt: return False finally: self.finish_time = datetime.datetime.now() # Clean up signal handler. signal.signal(signal.SIGUSR1, old_handler) def elapsed(self): """Determine the elapsed time of the task.""" finish_time = self.finish_time or datetime.datetime.now() return finish_time - self.start_time def dump_entries_as_json(entries, output_file): """Dump event log entries as json to a file. @param entries: A list of event log entries to dump. @param output_file: The file to write to. """ # Write the entries out as JSON. logging.debug('Dumping %d entries' % len(entries)) for e in entries: json.dump(e, output_file, sort_keys=True) output_file.write('\n') output_file.flush() def ping_dut(hostname): """Checks if a host is responsive to pings.""" if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname): hostname += '.cros' response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname], stdout=subprocess.PIPE) return response == 0 def main(argv): """Load generator for a devserver.""" parser = get_parser() options = parser.parse_args(argv) # Parse devserver. if options.server: if re.match(r'^https?://', options.server): server = options.server else: server = 'http://%s/' % options.server ds = dev_server.ImageServer(server) else: parser.print_usage() logging.error('Must specify devserver') sys.exit(1) # Parse config file and determine master list of duts and their board type, # filtering by board type if specified. duts = {} if options.config: with open(options.config, 'r') as f: config = json.load(f) boards = (options.boards.split(',') if options.boards else config.keys()) duts_specified = (set(options.duts.split(',')) if options.duts else None) for board in boards: duts.update({dut: board for dut in config[board]['duts'] if duts_specified is None or dut in duts_specified}) logging.info('Config file %s: %d boards, %d duts', options.config, len(boards), len(duts)) else: parser.print_usage() logging.error('Must specify config file') sys.exit(1) if options.ping: logging.info('Performing ping tests') duts_alive = {} for dut, board in duts.items(): if ping_dut(dut): duts_alive[dut] = board else: logging.error('Ignoring DUT %s (%s) for failing initial ' 'ping check', dut, board) duts = duts_alive logging.info('After ping tests: %d boards, %d duts', len(boards), len(duts)) # Set up the test runner and stage all the builds. outputlog = open(options.outputlog, 'a') if options.outputlog else None runner = Runner(ds, duts, config, simultaneous=options.simultaneous, total=options.total, outputlog=outputlog, ping=options.ping, blacklist_consecutive=options.blacklist_consecutive, blacklist_success=options.blacklist_success, blacklist_total=options.blacklist_total, dryrun=options.dryrun) if options.stage: runner.stage_all() # Run all the provisions. with locking.FileLock(options.config, blocking=True).lock(): completed = runner.loop() logging.info('%s in %s', 'Completed' if completed else 'Interrupted', runner.elapsed()) # Write all entries as JSON. entries = runner.get_completed_entries() if options.output: with open(options.output, 'w') as f: dump_entries_as_json(entries, f) else: dump_entries_as_json(entries, sys.stdout) logging.info('Summary: %s', dict(collections.Counter([e['status'] for e in entries if e['name'] != 'Runner']))) # List blacklisted DUTs. if runner.dut_blacklist: logging.warn('Blacklisted DUTs:') for host_name in runner.dut_blacklist: logging.warn(' %s', host_name) if __name__ == '__main__': sys.exit(main(sys.argv[1:]))