1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Module to optimize the scheduling of benchmark_run tasks.""" 5 6 7from __future__ import print_function 8 9import sys 10import test_flag 11import traceback 12 13from collections import defaultdict 14from machine_image_manager import MachineImageManager 15from threading import Lock 16from threading import Thread 17from cros_utils import command_executer 18from cros_utils import logger 19 20 21class DutWorker(Thread): 22 """Working thread for a dut.""" 23 24 def __init__(self, dut, sched): 25 super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) 26 self._dut = dut 27 self._sched = sched 28 self._stat_num_br_run = 0 29 self._stat_num_reimage = 0 30 self._stat_annotation = '' 31 self._logger = logger.GetLogger(self._sched.get_experiment().log_dir) 32 self.daemon = True 33 self._terminated = False 34 self._active_br = None 35 # Race condition accessing _active_br between _execute_benchmark_run and 36 # _terminate, so lock it up. 37 self._active_br_lock = Lock() 38 39 def terminate(self): 40 self._terminated = True 41 with self._active_br_lock: 42 if self._active_br is not None: 43 # BenchmarkRun.Terminate() terminates any running testcase via 44 # suite_runner.Terminate and updates timeline. 45 self._active_br.Terminate() 46 47 def run(self): 48 """Do the "run-test->(optionally reimage)->run-test" chore. 49 50 Note - 'br' below means 'benchmark_run'. 51 """ 52 53 # Firstly, handle benchmarkruns that have cache hit. 54 br = self._sched.get_cached_benchmark_run() 55 while br: 56 try: 57 self._stat_annotation = 'finishing cached {}'.format(br) 58 br.run() 59 except RuntimeError: 60 traceback.print_exc(file=sys.stdout) 61 br = self._sched.get_cached_benchmark_run() 62 63 # Secondly, handle benchmarkruns that needs to be run on dut. 64 self._setup_dut_label() 65 try: 66 self._logger.LogOutput('{} started.'.format(self)) 67 while not self._terminated: 68 br = self._sched.get_benchmark_run(self._dut) 69 if br is None: 70 # No br left for this label. Considering reimaging. 71 label = self._sched.allocate_label(self._dut) 72 if label is None: 73 # No br even for other labels. We are done. 74 self._logger.LogOutput('ImageManager found no label ' 75 'for dut, stopping working ' 76 'thread {}.'.format(self)) 77 break 78 if self._reimage(label): 79 # Reimage to run other br fails, dut is doomed, stop 80 # this thread. 81 self._logger.LogWarning('Re-image failed, dut ' 82 'in an unstable state, stopping ' 83 'working thread {}.'.format(self)) 84 break 85 else: 86 # Execute the br. 87 self._execute_benchmark_run(br) 88 finally: 89 self._stat_annotation = 'finished' 90 # Thread finishes. Notify scheduler that I'm done. 91 self._sched.dut_worker_finished(self) 92 93 def _reimage(self, label): 94 """Reimage image to label. 95 96 Args: 97 label: the label to remimage onto dut. 98 99 Returns: 100 0 if successful, otherwise 1. 101 """ 102 103 # Termination could happen anywhere, check it. 104 if self._terminated: 105 return 1 106 107 self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) 108 self._stat_num_reimage += 1 109 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 110 try: 111 # Note, only 1 reimage at any given time, this is guaranteed in 112 # ImageMachine, so no sync needed below. 113 retval = self._sched.get_experiment().machine_manager.ImageMachine( 114 self._dut, 115 label) 116 117 if retval: 118 return 1 119 except RuntimeError: 120 return 1 121 122 self._dut.label = label 123 return 0 124 125 def _execute_benchmark_run(self, br): 126 """Execute a single benchmark_run. 127 128 Note - this function never throws exceptions. 129 """ 130 131 # Termination could happen anywhere, check it. 132 if self._terminated: 133 return 134 135 self._logger.LogOutput('{} started working on {}'.format(self, br)) 136 self._stat_num_br_run += 1 137 self._stat_annotation = 'executing {}'.format(br) 138 # benchmark_run.run does not throws, but just play it safe here. 139 try: 140 assert br.owner_thread is None 141 br.owner_thread = self 142 with self._active_br_lock: 143 self._active_br = br 144 br.run() 145 finally: 146 self._sched.get_experiment().BenchmarkRunFinished(br) 147 with self._active_br_lock: 148 self._active_br = None 149 150 def _setup_dut_label(self): 151 """Try to match dut image with a certain experiment label. 152 153 If such match is found, we just skip doing reimage and jump to execute 154 some benchmark_runs. 155 """ 156 157 checksum_file = '/usr/local/osimage_checksum_file' 158 try: 159 rv, checksum, _ = command_executer.GetCommandExecuter().\ 160 CrosRunCommandWOutput( 161 'cat ' + checksum_file, 162 chromeos_root=self._sched.get_labels(0).chromeos_root, 163 machine=self._dut.name, 164 print_to_console=False) 165 if rv == 0: 166 checksum = checksum.strip() 167 for l in self._sched.get_labels(): 168 if l.checksum == checksum: 169 self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format( 170 self._dut.name, l)) 171 self._dut.label = l 172 return 173 except RuntimeError: 174 traceback.print_exc(file=sys.stdout) 175 self._dut.label = None 176 177 def __str__(self): 178 return 'DutWorker[dut="{}", label="{}"]'.format( 179 self._dut.name, self._dut.label.name if self._dut.label else 'None') 180 181 def dut(self): 182 return self._dut 183 184 def status_str(self): 185 """Report thread status.""" 186 187 return ('Worker thread "{}", label="{}", benchmark_run={}, ' 188 'reimage={}, now {}'.format( 189 self._dut.name, 'None' if self._dut.label is None else 190 self._dut.label.name, self._stat_num_br_run, 191 self._stat_num_reimage, self._stat_annotation)) 192 193 194class BenchmarkRunCacheReader(Thread): 195 """The thread to read cache for a list of benchmark_runs. 196 197 On creation, each instance of this class is given a br_list, which is a 198 subset of experiment._benchmark_runs. 199 """ 200 201 def __init__(self, schedv2, br_list): 202 super(BenchmarkRunCacheReader, self).__init__() 203 self._schedv2 = schedv2 204 self._br_list = br_list 205 self._logger = self._schedv2.get_logger() 206 207 def run(self): 208 for br in self._br_list: 209 try: 210 br.ReadCache() 211 if br.cache_hit: 212 self._logger.LogOutput('Cache hit - {}'.format(br)) 213 with self._schedv2.lock_on('_cached_br_list'): 214 self._schedv2.get_cached_run_list().append(br) 215 else: 216 self._logger.LogOutput('Cache not hit - {}'.format(br)) 217 except RuntimeError: 218 traceback.print_exc(file=sys.stderr) 219 220 221class Schedv2(object): 222 """New scheduler for crosperf.""" 223 224 def __init__(self, experiment): 225 self._experiment = experiment 226 self._logger = logger.GetLogger(experiment.log_dir) 227 228 # Create shortcuts to nested data structure. "_duts" points to a list of 229 # locked machines. _labels points to a list of all labels. 230 self._duts = self._experiment.machine_manager.GetMachines() 231 self._labels = self._experiment.labels 232 233 # Bookkeeping for synchronization. 234 self._workers_lock = Lock() 235 # pylint: disable=unnecessary-lambda 236 self._lock_map = defaultdict(lambda: Lock()) 237 238 # Test mode flag 239 self._in_test_mode = test_flag.GetTestMode() 240 241 # Read benchmarkrun cache. 242 self._read_br_cache() 243 244 # Mapping from label to a list of benchmark_runs. 245 self._label_brl_map = dict((l, []) for l in self._labels) 246 for br in self._experiment.benchmark_runs: 247 assert br.label in self._label_brl_map 248 # Only put no-cache-hit br into the map. 249 if br not in self._cached_br_list: 250 self._label_brl_map[br.label].append(br) 251 252 # Use machine image manager to calculate initial label allocation. 253 self._mim = MachineImageManager(self._labels, self._duts) 254 self._mim.compute_initial_allocation() 255 256 # Create worker thread, 1 per dut. 257 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 258 self._finished_workers = [] 259 260 # Termination flag. 261 self._terminated = False 262 263 def run_sched(self): 264 """Start all dut worker threads and return immediately.""" 265 266 for w in self._active_workers: 267 w.start() 268 269 def _read_br_cache(self): 270 """Use multi-threading to read cache for all benchmarkruns. 271 272 We do this by firstly creating a few threads, and then assign each 273 thread a segment of all brs. Each thread will check cache status for 274 each br and put those with cache into '_cached_br_list'. 275 """ 276 277 self._cached_br_list = [] 278 n_benchmarkruns = len(self._experiment.benchmark_runs) 279 if n_benchmarkruns <= 4: 280 # Use single thread to read cache. 281 self._logger.LogOutput(('Starting to read cache status for ' 282 '{} benchmark runs ...').format(n_benchmarkruns)) 283 BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() 284 return 285 286 # Split benchmarkruns set into segments. Each segment will be handled by 287 # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). 288 n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4)) 289 self._logger.LogOutput(('Starting {} threads to read cache status for ' 290 '{} benchmark runs ...').format(n_threads, 291 n_benchmarkruns)) 292 benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads 293 benchmarkrun_segments = [] 294 for i in range(n_threads - 1): 295 start = i * benchmarkruns_per_thread 296 end = (i + 1) * benchmarkruns_per_thread 297 benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end]) 298 benchmarkrun_segments.append(self._experiment.benchmark_runs[ 299 (n_threads - 1) * benchmarkruns_per_thread:]) 300 301 # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. 302 assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns 303 304 # Create and start all readers. 305 cache_readers = [ 306 BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments 307 ] 308 309 for x in cache_readers: 310 x.start() 311 312 # Wait till all readers finish. 313 for x in cache_readers: 314 x.join() 315 316 # Summarize. 317 self._logger.LogOutput( 318 'Total {} cache hit out of {} benchmark_runs.'.format( 319 len(self._cached_br_list), n_benchmarkruns)) 320 321 def get_cached_run_list(self): 322 return self._cached_br_list 323 324 def get_label_map(self): 325 return self._label_brl_map 326 327 def get_experiment(self): 328 return self._experiment 329 330 def get_labels(self, i=None): 331 if i == None: 332 return self._labels 333 return self._labels[i] 334 335 def get_logger(self): 336 return self._logger 337 338 def get_cached_benchmark_run(self): 339 """Get a benchmark_run with 'cache hit'. 340 341 Returns: 342 The benchmark that has cache hit, if any. Otherwise none. 343 """ 344 345 with self.lock_on('_cached_br_list'): 346 if self._cached_br_list: 347 return self._cached_br_list.pop() 348 return None 349 350 def get_benchmark_run(self, dut): 351 """Get a benchmark_run (br) object for a certain dut. 352 353 Args: 354 dut: the dut for which a br is returned. 355 356 Returns: 357 A br with its label matching that of the dut. If no such br could be 358 found, return None (this usually means a reimage is required for the 359 dut). 360 """ 361 362 # If terminated, stop providing any br. 363 if self._terminated: 364 return None 365 366 # If dut bears an unrecognized label, return None. 367 if dut.label is None: 368 return None 369 370 # If br list for the dut's label is empty (that means all brs for this 371 # label have been done), return None. 372 with self.lock_on(dut.label): 373 brl = self._label_brl_map[dut.label] 374 if not brl: 375 return None 376 # Return the first br. 377 return brl.pop(0) 378 379 def allocate_label(self, dut): 380 """Allocate a label to a dut. 381 382 The work is delegated to MachineImageManager. 383 384 The dut_worker calling this method is responsible for reimage the dut to 385 this label. 386 387 Args: 388 dut: the new label that is to be reimaged onto the dut. 389 390 Returns: 391 The label or None. 392 """ 393 394 if self._terminated: 395 return None 396 397 return self._mim.allocate(dut, self) 398 399 def dut_worker_finished(self, dut_worker): 400 """Notify schedv2 that the dut_worker thread finished. 401 402 Args: 403 dut_worker: the thread that is about to end. 404 """ 405 406 self._logger.LogOutput('{} finished.'.format(dut_worker)) 407 with self._workers_lock: 408 self._active_workers.remove(dut_worker) 409 self._finished_workers.append(dut_worker) 410 411 def is_complete(self): 412 return len(self._active_workers) == 0 413 414 def lock_on(self, my_object): 415 return self._lock_map[my_object] 416 417 def terminate(self): 418 """Mark flag so we stop providing br/reimages. 419 420 Also terminate each DutWorker, so they refuse to execute br or reimage. 421 """ 422 423 self._terminated = True 424 for dut_worker in self._active_workers: 425 dut_worker.terminate() 426 427 def threads_status_as_string(self): 428 """Report the dut worker threads status.""" 429 430 status = '{} active threads, {} finished threads.\n'.format( 431 len(self._active_workers), len(self._finished_workers)) 432 status += ' Active threads:' 433 for dw in self._active_workers: 434 status += '\n ' + dw.status_str() 435 if self._finished_workers: 436 status += '\n Finished threads:' 437 for dw in self._finished_workers: 438 status += '\n ' + dw.status_str() 439 return status 440