1# -*- coding: utf-8 -*- 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The experiment setting module.""" 7 8from __future__ import print_function 9 10import os 11import time 12 13from threading import Lock 14 15from cros_utils import logger 16from cros_utils import misc 17 18import benchmark_run 19from machine_manager import BadChecksum 20from machine_manager import MachineManager 21from machine_manager import MockMachineManager 22import test_flag 23 24 25class Experiment(object): 26 """Class representing an Experiment to be run.""" 27 28 def __init__(self, name, remote, working_directory, chromeos_root, 29 cache_conditions, labels, benchmarks, experiment_file, email_to, 30 acquire_timeout, log_dir, log_level, share_cache, 31 results_directory, compress_results, locks_directory, cwp_dso, 32 ignore_min_max, crosfleet, dut_config, no_lock: bool): 33 self.name = name 34 self.working_directory = working_directory 35 self.remote = remote 36 self.chromeos_root = chromeos_root 37 self.cache_conditions = cache_conditions 38 self.experiment_file = experiment_file 39 self.email_to = email_to 40 if not results_directory: 41 self.results_directory = os.path.join(self.working_directory, 42 self.name + '_results') 43 else: 44 self.results_directory = misc.CanonicalizePath(results_directory) 45 self.compress_results = compress_results 46 self.log_dir = log_dir 47 self.log_level = log_level 48 self.labels = labels 49 self.benchmarks = benchmarks 50 self.num_complete = 0 51 self.num_run_complete = 0 52 self.share_cache = share_cache 53 self.active_threads = [] 54 self.locks_dir = locks_directory 55 self.locked_machines = [] 56 self.lock_mgr = None 57 self.cwp_dso = cwp_dso 58 self.ignore_min_max = ignore_min_max 59 self.crosfleet = crosfleet 60 self.no_lock = no_lock 61 self.l = logger.GetLogger(log_dir) 62 63 if not self.benchmarks: 64 raise RuntimeError('No benchmarks specified') 65 if not self.labels: 66 raise RuntimeError('No labels specified') 67 if not remote and not self.crosfleet: 68 raise RuntimeError('No remote hosts specified') 69 70 # We need one chromeos_root to run the benchmarks in, but it doesn't 71 # matter where it is, unless the ABIs are different. 72 if not chromeos_root: 73 for label in self.labels: 74 if label.chromeos_root: 75 chromeos_root = label.chromeos_root 76 break 77 if not chromeos_root: 78 raise RuntimeError('No chromeos_root given and could not determine ' 79 'one from the image path.') 80 81 machine_manager_fn = MachineManager 82 if test_flag.GetTestMode(): 83 machine_manager_fn = MockMachineManager 84 self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout, 85 log_level, locks_directory) 86 self.l = logger.GetLogger(log_dir) 87 88 for machine in self.remote: 89 # machine_manager.AddMachine only adds reachable machines. 90 self.machine_manager.AddMachine(machine) 91 # Now machine_manager._all_machines contains a list of reachable 92 # machines. This is a subset of self.remote. We make both lists the same. 93 self.remote = [m.name for m in self.machine_manager.GetAllMachines()] 94 if not self.remote: 95 raise RuntimeError('No machine available for running experiment.') 96 97 # Initialize checksums for all machines, ignore errors at this time. 98 # The checksum will be double checked, and image will be flashed after 99 # duts are locked/leased. 100 self.SetCheckSums() 101 102 self.start_time = None 103 self.benchmark_runs = self._GenerateBenchmarkRuns(dut_config) 104 105 self._schedv2 = None 106 self._internal_counter_lock = Lock() 107 108 def set_schedv2(self, schedv2): 109 self._schedv2 = schedv2 110 111 def schedv2(self): 112 return self._schedv2 113 114 def _GenerateBenchmarkRuns(self, dut_config): 115 """Generate benchmark runs from labels and benchmark defintions.""" 116 benchmark_runs = [] 117 for label in self.labels: 118 for benchmark in self.benchmarks: 119 for iteration in range(1, benchmark.iterations + 1): 120 121 benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name, 122 iteration) 123 full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration) 124 logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name), 125 True) 126 benchmark_runs.append( 127 benchmark_run.BenchmarkRun(benchmark_run_name, benchmark, label, 128 iteration, self.cache_conditions, 129 self.machine_manager, logger_to_use, 130 self.log_level, self.share_cache, 131 dut_config)) 132 133 return benchmark_runs 134 135 def SetCheckSums(self, forceSameImage=False): 136 for label in self.labels: 137 # We filter out label remotes that are not reachable (not in 138 # self.remote). So each label.remote is a sublist of experiment.remote. 139 label.remote = [r for r in label.remote if r in self.remote] 140 try: 141 self.machine_manager.ComputeCommonCheckSum(label) 142 except BadChecksum: 143 # Force same image on all machines, then we do checksum again. No 144 # bailout if checksums still do not match. 145 # TODO (zhizhouy): Need to figure out how flashing image will influence 146 # the new checksum. 147 if forceSameImage: 148 self.machine_manager.ForceSameImageToAllMachines(label) 149 self.machine_manager.ComputeCommonCheckSum(label) 150 151 self.machine_manager.ComputeCommonCheckSumString(label) 152 153 def Build(self): 154 pass 155 156 def Terminate(self): 157 if self._schedv2 is not None: 158 self._schedv2.terminate() 159 else: 160 for t in self.benchmark_runs: 161 if t.isAlive(): 162 self.l.LogError("Terminating run: '%s'." % t.name) 163 t.Terminate() 164 165 def IsComplete(self): 166 if self._schedv2: 167 return self._schedv2.is_complete() 168 if self.active_threads: 169 for t in self.active_threads: 170 if t.isAlive(): 171 t.join(0) 172 if not t.isAlive(): 173 self.num_complete += 1 174 if not t.cache_hit: 175 self.num_run_complete += 1 176 self.active_threads.remove(t) 177 return False 178 return True 179 180 def BenchmarkRunFinished(self, br): 181 """Update internal counters after br finishes. 182 183 Note this is only used by schedv2 and is called by multiple threads. 184 Never throw any exception here. 185 """ 186 187 assert self._schedv2 is not None 188 with self._internal_counter_lock: 189 self.num_complete += 1 190 if not br.cache_hit: 191 self.num_run_complete += 1 192 193 def Run(self): 194 self.start_time = time.time() 195 if self._schedv2 is not None: 196 self._schedv2.run_sched() 197 else: 198 self.active_threads = [] 199 for run in self.benchmark_runs: 200 # Set threads to daemon so program exits when ctrl-c is pressed. 201 run.daemon = True 202 run.start() 203 self.active_threads.append(run) 204 205 def SetCacheConditions(self, cache_conditions): 206 for run in self.benchmark_runs: 207 run.SetCacheConditions(cache_conditions) 208 209 def Cleanup(self): 210 """Make sure all machines are unlocked.""" 211 if self.locks_dir: 212 # We are using the file locks mechanism, so call machine_manager.Cleanup 213 # to unlock everything. 214 self.machine_manager.Cleanup() 215 216 if test_flag.GetTestMode() or not self.locked_machines: 217 return 218 219 # If we locked any machines earlier, make sure we unlock them now. 220 if self.lock_mgr: 221 machine_states = self.lock_mgr.GetMachineStates('unlock') 222 self.lock_mgr.CheckMachineLocks(machine_states, 'unlock') 223 unlocked_machines = self.lock_mgr.UpdateMachines(False) 224 failed_machines = [ 225 m for m in self.locked_machines if m not in unlocked_machines 226 ] 227 if failed_machines: 228 raise RuntimeError('These machines are not unlocked correctly: %s' % 229 failed_machines) 230 self.lock_mgr = None 231