1# -*- coding: utf-8 -*- 2# Copyright 2013 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The experiment setting module.""" 7 8 9import os 10from threading import Lock 11import time 12 13import benchmark_run 14from cros_utils import logger 15from cros_utils import misc 16from machine_manager import BadChecksum 17from machine_manager import MachineManager 18from machine_manager import MockMachineManager 19import test_flag 20 21 22class Experiment(object): 23 """Class representing an Experiment to be run.""" 24 25 def __init__( 26 self, 27 name, 28 remote, 29 working_directory, 30 chromeos_root, 31 cache_conditions, 32 labels, 33 benchmarks, 34 experiment_file, 35 email_to, 36 acquire_timeout, 37 log_dir, 38 log_level, 39 share_cache, 40 results_directory, 41 compress_results, 42 locks_directory, 43 cwp_dso, 44 ignore_min_max, 45 crosfleet, 46 dut_config, 47 no_lock: bool, 48 ): 49 self.name = name 50 self.working_directory = working_directory 51 self.remote = remote 52 self.chromeos_root = chromeos_root 53 self.cache_conditions = cache_conditions 54 self.experiment_file = experiment_file 55 self.email_to = email_to 56 if not results_directory: 57 self.results_directory = os.path.join( 58 self.working_directory, self.name + "_results" 59 ) 60 else: 61 self.results_directory = misc.CanonicalizePath(results_directory) 62 self.compress_results = compress_results 63 self.log_dir = log_dir 64 self.log_level = log_level 65 self.labels = labels 66 self.benchmarks = benchmarks 67 self.num_complete = 0 68 self.num_run_complete = 0 69 self.share_cache = share_cache 70 self.active_threads = [] 71 self.locks_dir = locks_directory 72 self.locked_machines = [] 73 self.lock_mgr = None 74 self.cwp_dso = cwp_dso 75 self.ignore_min_max = ignore_min_max 76 self.crosfleet = crosfleet 77 self.no_lock = no_lock 78 self.l = logger.GetLogger(log_dir) 79 80 if not self.benchmarks: 81 raise RuntimeError("No benchmarks specified") 82 if not self.labels: 83 raise RuntimeError("No labels specified") 84 if not remote and not self.crosfleet: 85 raise RuntimeError("No remote hosts specified") 86 87 # We need one chromeos_root to run the benchmarks in, but it doesn't 88 # matter where it is, unless the ABIs are different. 89 if not chromeos_root: 90 for label in self.labels: 91 if label.chromeos_root: 92 chromeos_root = label.chromeos_root 93 break 94 if not chromeos_root: 95 raise RuntimeError( 96 "No chromeos_root given and could not determine " 97 "one from the image path." 98 ) 99 100 machine_manager_fn = MachineManager 101 if test_flag.GetTestMode(): 102 machine_manager_fn = MockMachineManager 103 self.machine_manager = machine_manager_fn( 104 chromeos_root, acquire_timeout, log_level, locks_directory 105 ) 106 self.l = logger.GetLogger(log_dir) 107 108 for machine in self.remote: 109 # machine_manager.AddMachine only adds reachable machines. 110 self.machine_manager.AddMachine(machine) 111 # Now machine_manager._all_machines contains a list of reachable 112 # machines. This is a subset of self.remote. We make both lists the same. 113 self.remote = [m.name for m in self.machine_manager.GetAllMachines()] 114 if not self.remote: 115 raise RuntimeError("No machine available for running experiment.") 116 117 # Initialize checksums for all machines, ignore errors at this time. 118 # The checksum will be double checked, and image will be flashed after 119 # duts are locked/leased. 120 self.SetCheckSums() 121 122 self.start_time = None 123 self.benchmark_runs = self._GenerateBenchmarkRuns(dut_config) 124 125 self._schedv2 = None 126 self._internal_counter_lock = Lock() 127 128 def set_schedv2(self, schedv2): 129 self._schedv2 = schedv2 130 131 def schedv2(self): 132 return self._schedv2 133 134 def _GenerateBenchmarkRuns(self, dut_config): 135 """Generate benchmark runs from labels and benchmark defintions.""" 136 benchmark_runs = [] 137 for label in self.labels: 138 for benchmark in self.benchmarks: 139 for iteration in range(1, benchmark.iterations + 1): 140 141 benchmark_run_name = "%s: %s (%s)" % ( 142 label.name, 143 benchmark.name, 144 iteration, 145 ) 146 full_name = "%s_%s_%s" % ( 147 label.name, 148 benchmark.name, 149 iteration, 150 ) 151 logger_to_use = logger.Logger( 152 self.log_dir, "run.%s" % (full_name), True 153 ) 154 benchmark_runs.append( 155 benchmark_run.BenchmarkRun( 156 benchmark_run_name, 157 benchmark, 158 label, 159 iteration, 160 self.cache_conditions, 161 self.machine_manager, 162 logger_to_use, 163 self.log_level, 164 self.share_cache, 165 dut_config, 166 ) 167 ) 168 169 return benchmark_runs 170 171 def SetCheckSums(self, forceSameImage=False): 172 for label in self.labels: 173 # We filter out label remotes that are not reachable (not in 174 # self.remote). So each label.remote is a sublist of experiment.remote. 175 label.remote = [r for r in label.remote if r in self.remote] 176 try: 177 self.machine_manager.ComputeCommonCheckSum(label) 178 except BadChecksum: 179 # Force same image on all machines, then we do checksum again. No 180 # bailout if checksums still do not match. 181 # TODO (zhizhouy): Need to figure out how flashing image will influence 182 # the new checksum. 183 if forceSameImage: 184 self.machine_manager.ForceSameImageToAllMachines(label) 185 self.machine_manager.ComputeCommonCheckSum(label) 186 187 self.machine_manager.ComputeCommonCheckSumString(label) 188 189 def Build(self): 190 pass 191 192 def Terminate(self): 193 if self._schedv2 is not None: 194 self._schedv2.terminate() 195 else: 196 for t in self.benchmark_runs: 197 if t.isAlive(): 198 self.l.LogError("Terminating run: '%s'." % t.name) 199 t.Terminate() 200 201 def IsComplete(self): 202 if self._schedv2: 203 return self._schedv2.is_complete() 204 if self.active_threads: 205 for t in self.active_threads: 206 if t.isAlive(): 207 t.join(0) 208 if not t.isAlive(): 209 self.num_complete += 1 210 if not t.cache_hit: 211 self.num_run_complete += 1 212 self.active_threads.remove(t) 213 return False 214 return True 215 216 def BenchmarkRunFinished(self, br): 217 """Update internal counters after br finishes. 218 219 Note this is only used by schedv2 and is called by multiple threads. 220 Never throw any exception here. 221 """ 222 223 assert self._schedv2 is not None 224 with self._internal_counter_lock: 225 self.num_complete += 1 226 if not br.cache_hit: 227 self.num_run_complete += 1 228 229 def Run(self): 230 self.start_time = time.time() 231 if self._schedv2 is not None: 232 self._schedv2.run_sched() 233 else: 234 self.active_threads = [] 235 for run in self.benchmark_runs: 236 # Set threads to daemon so program exits when ctrl-c is pressed. 237 run.daemon = True 238 run.start() 239 self.active_threads.append(run) 240 241 def SetCacheConditions(self, cache_conditions): 242 for run in self.benchmark_runs: 243 run.SetCacheConditions(cache_conditions) 244 245 def Cleanup(self): 246 """Make sure all machines are unlocked.""" 247 if self.locks_dir: 248 # We are using the file locks mechanism, so call machine_manager.Cleanup 249 # to unlock everything. 250 self.machine_manager.Cleanup() 251 252 if test_flag.GetTestMode() or not self.locked_machines: 253 return 254 255 # If we locked any machines earlier, make sure we unlock them now. 256 if self.lock_mgr: 257 machine_states = self.lock_mgr.GetMachineStates("unlock") 258 self.lock_mgr.CheckMachineLocks(machine_states, "unlock") 259 unlocked_machines = self.lock_mgr.UpdateMachines(False) 260 failed_machines = [ 261 m for m in self.locked_machines if m not in unlocked_machines 262 ] 263 if failed_machines: 264 raise RuntimeError( 265 "These machines are not unlocked correctly: %s" 266 % failed_machines 267 ) 268 self.lock_mgr = None 269