1# -*- coding: utf-8 -*- 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The experiment setting module.""" 7 8from __future__ import print_function 9 10import os 11import time 12 13from threading import Lock 14 15from cros_utils import logger 16from cros_utils import misc 17 18import benchmark_run 19from machine_manager import BadChecksum 20from machine_manager import MachineManager 21from machine_manager import MockMachineManager 22import test_flag 23 24 25class Experiment(object): 26 """Class representing an Experiment to be run.""" 27 28 def __init__(self, name, remote, working_directory, chromeos_root, 29 cache_conditions, labels, benchmarks, experiment_file, email_to, 30 acquire_timeout, log_dir, log_level, share_cache, 31 results_directory, compress_results, locks_directory, cwp_dso, 32 ignore_min_max, skylab, dut_config): 33 self.name = name 34 self.working_directory = working_directory 35 self.remote = remote 36 self.chromeos_root = chromeos_root 37 self.cache_conditions = cache_conditions 38 self.experiment_file = experiment_file 39 self.email_to = email_to 40 if not results_directory: 41 self.results_directory = os.path.join(self.working_directory, 42 self.name + '_results') 43 else: 44 self.results_directory = misc.CanonicalizePath(results_directory) 45 self.compress_results = compress_results 46 self.log_dir = log_dir 47 self.log_level = log_level 48 self.labels = labels 49 self.benchmarks = benchmarks 50 self.num_complete = 0 51 self.num_run_complete = 0 52 self.share_cache = share_cache 53 self.active_threads = [] 54 self.locks_dir = locks_directory 55 self.locked_machines = [] 56 self.lock_mgr = None 57 self.cwp_dso = cwp_dso 58 self.ignore_min_max = ignore_min_max 59 self.skylab = skylab 60 self.l = logger.GetLogger(log_dir) 61 62 if not self.benchmarks: 63 raise RuntimeError('No benchmarks specified') 64 if not self.labels: 65 raise RuntimeError('No labels specified') 66 if not remote and not self.skylab: 67 raise RuntimeError('No remote hosts specified') 68 69 # We need one chromeos_root to run the benchmarks in, but it doesn't 70 # matter where it is, unless the ABIs are different. 71 if not chromeos_root: 72 for label in self.labels: 73 if label.chromeos_root: 74 chromeos_root = label.chromeos_root 75 break 76 if not chromeos_root: 77 raise RuntimeError('No chromeos_root given and could not determine ' 78 'one from the image path.') 79 80 machine_manager_fn = MachineManager 81 if test_flag.GetTestMode(): 82 machine_manager_fn = MockMachineManager 83 self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout, 84 log_level, locks_directory) 85 self.l = logger.GetLogger(log_dir) 86 87 for machine in self.remote: 88 # machine_manager.AddMachine only adds reachable machines. 89 self.machine_manager.AddMachine(machine) 90 # Now machine_manager._all_machines contains a list of reachable 91 # machines. This is a subset of self.remote. We make both lists the same. 92 self.remote = [m.name for m in self.machine_manager.GetAllMachines()] 93 if not self.remote: 94 raise RuntimeError('No machine available for running experiment.') 95 96 # Initialize checksums for all machines, ignore errors at this time. 97 # The checksum will be double checked, and image will be flashed after 98 # duts are locked/leased. 99 self.SetCheckSums() 100 101 self.start_time = None 102 self.benchmark_runs = self._GenerateBenchmarkRuns(dut_config) 103 104 self._schedv2 = None 105 self._internal_counter_lock = Lock() 106 107 def set_schedv2(self, schedv2): 108 self._schedv2 = schedv2 109 110 def schedv2(self): 111 return self._schedv2 112 113 def _GenerateBenchmarkRuns(self, dut_config): 114 """Generate benchmark runs from labels and benchmark defintions.""" 115 benchmark_runs = [] 116 for label in self.labels: 117 for benchmark in self.benchmarks: 118 for iteration in range(1, benchmark.iterations + 1): 119 120 benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name, 121 iteration) 122 full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration) 123 logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name), 124 True) 125 benchmark_runs.append( 126 benchmark_run.BenchmarkRun( 127 benchmark_run_name, benchmark, label, iteration, 128 self.cache_conditions, self.machine_manager, logger_to_use, 129 self.log_level, self.share_cache, dut_config)) 130 131 return benchmark_runs 132 133 def SetCheckSums(self, forceSameImage=False): 134 for label in self.labels: 135 # We filter out label remotes that are not reachable (not in 136 # self.remote). So each label.remote is a sublist of experiment.remote. 137 label.remote = [r for r in label.remote if r in self.remote] 138 try: 139 self.machine_manager.ComputeCommonCheckSum(label) 140 except BadChecksum: 141 # Force same image on all machines, then we do checksum again. No 142 # bailout if checksums still do not match. 143 # TODO (zhizhouy): Need to figure out how flashing image will influence 144 # the new checksum. 145 if forceSameImage: 146 self.machine_manager.ForceSameImageToAllMachines(label) 147 self.machine_manager.ComputeCommonCheckSum(label) 148 149 self.machine_manager.ComputeCommonCheckSumString(label) 150 151 def Build(self): 152 pass 153 154 def Terminate(self): 155 if self._schedv2 is not None: 156 self._schedv2.terminate() 157 else: 158 for t in self.benchmark_runs: 159 if t.isAlive(): 160 self.l.LogError("Terminating run: '%s'." % t.name) 161 t.Terminate() 162 163 def IsComplete(self): 164 if self._schedv2: 165 return self._schedv2.is_complete() 166 if self.active_threads: 167 for t in self.active_threads: 168 if t.isAlive(): 169 t.join(0) 170 if not t.isAlive(): 171 self.num_complete += 1 172 if not t.cache_hit: 173 self.num_run_complete += 1 174 self.active_threads.remove(t) 175 return False 176 return True 177 178 def BenchmarkRunFinished(self, br): 179 """Update internal counters after br finishes. 180 181 Note this is only used by schedv2 and is called by multiple threads. 182 Never throw any exception here. 183 """ 184 185 assert self._schedv2 is not None 186 with self._internal_counter_lock: 187 self.num_complete += 1 188 if not br.cache_hit: 189 self.num_run_complete += 1 190 191 def Run(self): 192 self.start_time = time.time() 193 if self._schedv2 is not None: 194 self._schedv2.run_sched() 195 else: 196 self.active_threads = [] 197 for run in self.benchmark_runs: 198 # Set threads to daemon so program exits when ctrl-c is pressed. 199 run.daemon = True 200 run.start() 201 self.active_threads.append(run) 202 203 def SetCacheConditions(self, cache_conditions): 204 for run in self.benchmark_runs: 205 run.SetCacheConditions(cache_conditions) 206 207 def Cleanup(self): 208 """Make sure all machines are unlocked.""" 209 if self.locks_dir: 210 # We are using the file locks mechanism, so call machine_manager.Cleanup 211 # to unlock everything. 212 self.machine_manager.Cleanup() 213 214 if test_flag.GetTestMode() or not self.locked_machines: 215 return 216 217 # If we locked any machines earlier, make sure we unlock them now. 218 if self.lock_mgr: 219 machine_states = self.lock_mgr.GetMachineStates('unlock') 220 self.lock_mgr.CheckMachineLocks(machine_states, 'unlock') 221 unlocked_machines = self.lock_mgr.UpdateMachines(False) 222 failed_machines = [ 223 m for m in self.locked_machines if m not in unlocked_machines 224 ] 225 if failed_machines: 226 raise RuntimeError( 227 'These machines are not unlocked correctly: %s' % failed_machines) 228 self.lock_mgr = None 229