1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""The experiment runner module.""" 5from __future__ import print_function 6 7import getpass 8import os 9import shutil 10import time 11 12import afe_lock_machine 13import test_flag 14 15from cros_utils import command_executer 16from cros_utils import logger 17from cros_utils.email_sender import EmailSender 18from cros_utils.file_utils import FileUtils 19 20import config 21from experiment_status import ExperimentStatus 22from results_cache import CacheConditions 23from results_cache import ResultsCache 24from results_report import HTMLResultsReport 25from results_report import TextResultsReport 26from results_report import JSONResultsReport 27from schedv2 import Schedv2 28 29 30def _WriteJSONReportToFile(experiment, results_dir, json_report): 31 """Writes a JSON report to a file in results_dir.""" 32 has_llvm = any('llvm' in l.compiler for l in experiment.labels) 33 compiler_string = 'llvm' if has_llvm else 'gcc' 34 board = experiment.labels[0].board 35 filename = 'report_%s_%s_%s.%s.json' % (board, json_report.date, 36 json_report.time.replace(':', '.'), 37 compiler_string) 38 fullname = os.path.join(results_dir, filename) 39 report_text = json_report.GetReport() 40 with open(fullname, 'w') as out_file: 41 out_file.write(report_text) 42 43 44class ExperimentRunner(object): 45 """ExperimentRunner Class.""" 46 47 STATUS_TIME_DELAY = 30 48 THREAD_MONITOR_DELAY = 2 49 50 def __init__(self, 51 experiment, 52 json_report, 53 using_schedv2=False, 54 log=None, 55 cmd_exec=None): 56 self._experiment = experiment 57 self.l = log or logger.GetLogger(experiment.log_dir) 58 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 59 self._terminated = False 60 self.json_report = json_report 61 self.locked_machines = [] 62 if experiment.log_level != 'verbose': 63 self.STATUS_TIME_DELAY = 10 64 65 # Setting this to True will use crosperf sched v2 (feature in progress). 66 self._using_schedv2 = using_schedv2 67 68 def _GetMachineList(self): 69 """Return a list of all requested machines. 70 71 Create a list of all the requested machines, both global requests and 72 label-specific requests, and return the list. 73 """ 74 machines = self._experiment.remote 75 # All Label.remote is a sublist of experiment.remote. 76 for l in self._experiment.labels: 77 for r in l.remote: 78 assert r in machines 79 return machines 80 81 def _UpdateMachineList(self, locked_machines): 82 """Update machines lists to contain only locked machines. 83 84 Go through all the lists of requested machines, both global and 85 label-specific requests, and remove any machine that we were not 86 able to lock. 87 88 Args: 89 locked_machines: A list of the machines we successfully locked. 90 """ 91 for m in self._experiment.remote: 92 if m not in locked_machines: 93 self._experiment.remote.remove(m) 94 95 for l in self._experiment.labels: 96 for m in l.remote: 97 if m not in locked_machines: 98 l.remote.remove(m) 99 100 def _LockAllMachines(self, experiment): 101 """Attempt to globally lock all of the machines requested for run. 102 103 This method will use the AFE server to globally lock all of the machines 104 requested for this crosperf run, to prevent any other crosperf runs from 105 being able to update/use the machines while this experiment is running. 106 """ 107 if test_flag.GetTestMode(): 108 self.locked_machines = self._GetMachineList() 109 self._experiment.locked_machines = self.locked_machines 110 else: 111 lock_mgr = afe_lock_machine.AFELockManager( 112 self._GetMachineList(), 113 '', 114 experiment.labels[0].chromeos_root, 115 None, 116 log=self.l,) 117 for m in lock_mgr.machines: 118 if not lock_mgr.MachineIsKnown(m): 119 lock_mgr.AddLocalMachine(m) 120 machine_states = lock_mgr.GetMachineStates('lock') 121 lock_mgr.CheckMachineLocks(machine_states, 'lock') 122 self.locked_machines = lock_mgr.UpdateMachines(True) 123 self._experiment.locked_machines = self.locked_machines 124 self._UpdateMachineList(self.locked_machines) 125 self._experiment.machine_manager.RemoveNonLockedMachines( 126 self.locked_machines) 127 if len(self.locked_machines) == 0: 128 raise RuntimeError('Unable to lock any machines.') 129 130 def _UnlockAllMachines(self, experiment): 131 """Attempt to globally unlock all of the machines requested for run. 132 133 The method will use the AFE server to globally unlock all of the machines 134 requested for this crosperf run. 135 """ 136 if not self.locked_machines or test_flag.GetTestMode(): 137 return 138 139 lock_mgr = afe_lock_machine.AFELockManager( 140 self.locked_machines, 141 '', 142 experiment.labels[0].chromeos_root, 143 None, 144 log=self.l,) 145 machine_states = lock_mgr.GetMachineStates('unlock') 146 lock_mgr.CheckMachineLocks(machine_states, 'unlock') 147 lock_mgr.UpdateMachines(False) 148 149 def _ClearCacheEntries(self, experiment): 150 for br in experiment.benchmark_runs: 151 cache = ResultsCache() 152 cache.Init(br.label.chromeos_image, br.label.chromeos_root, 153 br.benchmark.test_name, br.iteration, br.test_args, 154 br.profiler_args, br.machine_manager, br.machine, 155 br.label.board, br.cache_conditions, 156 br.logger(), br.log_level, br.label, br.share_cache, 157 br.benchmark.suite, br.benchmark.show_all_results, 158 br.benchmark.run_local) 159 cache_dir = cache.GetCacheDirForWrite() 160 if os.path.exists(cache_dir): 161 self.l.LogOutput('Removing cache dir: %s' % cache_dir) 162 shutil.rmtree(cache_dir) 163 164 def _Run(self, experiment): 165 try: 166 if not experiment.locks_dir: 167 self._LockAllMachines(experiment) 168 if self._using_schedv2: 169 schedv2 = Schedv2(experiment) 170 experiment.set_schedv2(schedv2) 171 if CacheConditions.FALSE in experiment.cache_conditions: 172 self._ClearCacheEntries(experiment) 173 status = ExperimentStatus(experiment) 174 experiment.Run() 175 last_status_time = 0 176 last_status_string = '' 177 try: 178 if experiment.log_level != 'verbose': 179 self.l.LogStartDots() 180 while not experiment.IsComplete(): 181 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 182 last_status_time = time.time() 183 border = '==============================' 184 if experiment.log_level == 'verbose': 185 self.l.LogOutput(border) 186 self.l.LogOutput(status.GetProgressString()) 187 self.l.LogOutput(status.GetStatusString()) 188 self.l.LogOutput(border) 189 else: 190 current_status_string = status.GetStatusString() 191 if current_status_string != last_status_string: 192 self.l.LogEndDots() 193 self.l.LogOutput(border) 194 self.l.LogOutput(current_status_string) 195 self.l.LogOutput(border) 196 last_status_string = current_status_string 197 else: 198 self.l.LogAppendDot() 199 time.sleep(self.THREAD_MONITOR_DELAY) 200 except KeyboardInterrupt: 201 self._terminated = True 202 self.l.LogError('Ctrl-c pressed. Cleaning up...') 203 experiment.Terminate() 204 raise 205 except SystemExit: 206 self._terminated = True 207 self.l.LogError('Unexpected exit. Cleaning up...') 208 experiment.Terminate() 209 raise 210 finally: 211 if not experiment.locks_dir: 212 self._UnlockAllMachines(experiment) 213 214 def _PrintTable(self, experiment): 215 self.l.LogOutput(TextResultsReport.FromExperiment(experiment).GetReport()) 216 217 def _Email(self, experiment): 218 # Only email by default if a new run was completed. 219 send_mail = False 220 for benchmark_run in experiment.benchmark_runs: 221 if not benchmark_run.cache_hit: 222 send_mail = True 223 break 224 if (not send_mail and not experiment.email_to or 225 config.GetConfig('no_email')): 226 return 227 228 label_names = [] 229 for label in experiment.labels: 230 label_names.append(label.name) 231 subject = '%s: %s' % (experiment.name, ' vs. '.join(label_names)) 232 233 text_report = TextResultsReport.FromExperiment(experiment, True).GetReport() 234 text_report += ( 235 '\nResults are stored in %s.\n' % experiment.results_directory) 236 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 237 html_report = HTMLResultsReport.FromExperiment(experiment).GetReport() 238 attachment = EmailSender.Attachment('report.html', html_report) 239 email_to = experiment.email_to or [] 240 email_to.append(getpass.getuser()) 241 EmailSender().SendEmail( 242 email_to, 243 subject, 244 text_report, 245 attachments=[attachment], 246 msg_type='html') 247 248 def _StoreResults(self, experiment): 249 if self._terminated: 250 return 251 results_directory = experiment.results_directory 252 FileUtils().RmDir(results_directory) 253 FileUtils().MkDirP(results_directory) 254 self.l.LogOutput('Storing experiment file in %s.' % results_directory) 255 experiment_file_path = os.path.join(results_directory, 'experiment.exp') 256 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 257 258 self.l.LogOutput('Storing results report in %s.' % results_directory) 259 results_table_path = os.path.join(results_directory, 'results.html') 260 report = HTMLResultsReport.FromExperiment(experiment).GetReport() 261 if self.json_report: 262 json_report = JSONResultsReport.FromExperiment( 263 experiment, json_args={'indent': 2}) 264 _WriteJSONReportToFile(experiment, results_directory, json_report) 265 266 FileUtils().WriteFile(results_table_path, report) 267 268 self.l.LogOutput('Storing email message body in %s.' % results_directory) 269 msg_file_path = os.path.join(results_directory, 'msg_body.html') 270 text_report = TextResultsReport.FromExperiment(experiment, True).GetReport() 271 text_report += ( 272 '\nResults are stored in %s.\n' % experiment.results_directory) 273 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 274 FileUtils().WriteFile(msg_file_path, msg_body) 275 276 self.l.LogOutput('Storing results of each benchmark run.') 277 for benchmark_run in experiment.benchmark_runs: 278 if benchmark_run.result: 279 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 280 benchmark_run_path = os.path.join(results_directory, benchmark_run_name) 281 benchmark_run.result.CopyResultsTo(benchmark_run_path) 282 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 283 284 def Run(self): 285 try: 286 self._Run(self._experiment) 287 finally: 288 # Always print the report at the end of the run. 289 self._PrintTable(self._experiment) 290 if not self._terminated: 291 self._StoreResults(self._experiment) 292 self._Email(self._experiment) 293 294 295class MockExperimentRunner(ExperimentRunner): 296 """Mocked ExperimentRunner for testing.""" 297 298 def __init__(self, experiment, json_report): 299 super(MockExperimentRunner, self).__init__(experiment, json_report) 300 301 def _Run(self, experiment): 302 self.l.LogOutput( 303 "Would run the following experiment: '%s'." % experiment.name) 304 305 def _PrintTable(self, experiment): 306 self.l.LogOutput('Would print the experiment table.') 307 308 def _Email(self, experiment): 309 self.l.LogOutput('Would send result email.') 310 311 def _StoreResults(self, experiment): 312 self.l.LogOutput('Would store the results.') 313