1# -*- coding: utf-8 -*- 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Module to optimize the scheduling of benchmark_run tasks.""" 7 8from __future__ import division 9from __future__ import print_function 10 11import sys 12import traceback 13 14from collections import defaultdict 15from threading import Lock 16from threading import Thread 17 18import test_flag 19 20from machine_image_manager import MachineImageManager 21from cros_utils import command_executer 22from cros_utils import logger 23 24 25class DutWorker(Thread): 26 """Working thread for a dut.""" 27 28 def __init__(self, dut, sched): 29 super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) 30 self._dut = dut 31 self._sched = sched 32 self._stat_num_br_run = 0 33 self._stat_num_reimage = 0 34 self._stat_annotation = '' 35 self._logger = logger.GetLogger(self._sched.get_experiment().log_dir) 36 self.daemon = True 37 self._terminated = False 38 self._active_br = None 39 # Race condition accessing _active_br between _execute_benchmark_run and 40 # _terminate, so lock it up. 41 self._active_br_lock = Lock() 42 43 def terminate(self): 44 self._terminated = True 45 with self._active_br_lock: 46 if self._active_br is not None: 47 # BenchmarkRun.Terminate() terminates any running testcase via 48 # suite_runner.Terminate and updates timeline. 49 self._active_br.Terminate() 50 51 def run(self): 52 """Do the "run-test->(optionally reimage)->run-test" chore. 53 54 Note - 'br' below means 'benchmark_run'. 55 """ 56 57 # Firstly, handle benchmarkruns that have cache hit. 58 br = self._sched.get_cached_benchmark_run() 59 while br: 60 try: 61 self._stat_annotation = 'finishing cached {}'.format(br) 62 br.run() 63 except RuntimeError: 64 traceback.print_exc(file=sys.stdout) 65 br = self._sched.get_cached_benchmark_run() 66 67 # Secondly, handle benchmarkruns that needs to be run on dut. 68 self._setup_dut_label() 69 try: 70 self._logger.LogOutput('{} started.'.format(self)) 71 while not self._terminated: 72 br = self._sched.get_benchmark_run(self._dut) 73 if br is None: 74 # No br left for this label. Considering reimaging. 75 label = self._sched.allocate_label(self._dut) 76 if label is None: 77 # No br even for other labels. We are done. 78 self._logger.LogOutput('ImageManager found no label ' 79 'for dut, stopping working ' 80 'thread {}.'.format(self)) 81 break 82 if self._reimage(label): 83 # Reimage to run other br fails, dut is doomed, stop 84 # this thread. 85 self._logger.LogWarning('Re-image failed, dut ' 86 'in an unstable state, stopping ' 87 'working thread {}.'.format(self)) 88 break 89 else: 90 # Execute the br. 91 self._execute_benchmark_run(br) 92 finally: 93 self._stat_annotation = 'finished' 94 # Thread finishes. Notify scheduler that I'm done. 95 self._sched.dut_worker_finished(self) 96 97 def _reimage(self, label): 98 """Reimage image to label. 99 100 Args: 101 label: the label to remimage onto dut. 102 103 Returns: 104 0 if successful, otherwise 1. 105 """ 106 107 # Termination could happen anywhere, check it. 108 if self._terminated: 109 return 1 110 111 if self._sched.get_experiment().skylab: 112 self._logger.LogOutput('Skylab mode, do not image before testing.') 113 self._dut.label = label 114 return 0 115 116 self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) 117 self._stat_num_reimage += 1 118 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 119 try: 120 # Note, only 1 reimage at any given time, this is guaranteed in 121 # ImageMachine, so no sync needed below. 122 retval = self._sched.get_experiment().machine_manager.ImageMachine( 123 self._dut, label) 124 125 if retval: 126 return 1 127 except RuntimeError: 128 return 1 129 130 self._dut.label = label 131 return 0 132 133 def _execute_benchmark_run(self, br): 134 """Execute a single benchmark_run. 135 136 Note - this function never throws exceptions. 137 """ 138 139 # Termination could happen anywhere, check it. 140 if self._terminated: 141 return 142 143 self._logger.LogOutput('{} started working on {}'.format(self, br)) 144 self._stat_num_br_run += 1 145 self._stat_annotation = 'executing {}'.format(br) 146 # benchmark_run.run does not throws, but just play it safe here. 147 try: 148 assert br.owner_thread is None 149 br.owner_thread = self 150 with self._active_br_lock: 151 self._active_br = br 152 br.run() 153 finally: 154 self._sched.get_experiment().BenchmarkRunFinished(br) 155 with self._active_br_lock: 156 self._active_br = None 157 158 def _setup_dut_label(self): 159 """Try to match dut image with a certain experiment label. 160 161 If such match is found, we just skip doing reimage and jump to execute 162 some benchmark_runs. 163 """ 164 165 checksum_file = '/usr/local/osimage_checksum_file' 166 try: 167 rv, checksum, _ = command_executer.GetCommandExecuter().\ 168 CrosRunCommandWOutput( 169 'cat ' + checksum_file, 170 chromeos_root=self._sched.get_labels(0).chromeos_root, 171 machine=self._dut.name, 172 print_to_console=False) 173 if rv == 0: 174 checksum = checksum.strip() 175 for l in self._sched.get_labels(): 176 if l.checksum == checksum: 177 self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format( 178 self._dut.name, l)) 179 self._dut.label = l 180 return 181 except RuntimeError: 182 traceback.print_exc(file=sys.stdout) 183 self._dut.label = None 184 185 def __str__(self): 186 return 'DutWorker[dut="{}", label="{}"]'.format( 187 self._dut.name, self._dut.label.name if self._dut.label else 'None') 188 189 def dut(self): 190 return self._dut 191 192 def status_str(self): 193 """Report thread status.""" 194 195 return ('Worker thread "{}", label="{}", benchmark_run={}, ' 196 'reimage={}, now {}'.format( 197 self._dut.name, 198 'None' if self._dut.label is None else self._dut.label.name, 199 self._stat_num_br_run, self._stat_num_reimage, 200 self._stat_annotation)) 201 202 203class BenchmarkRunCacheReader(Thread): 204 """The thread to read cache for a list of benchmark_runs. 205 206 On creation, each instance of this class is given a br_list, which is a 207 subset of experiment._benchmark_runs. 208 """ 209 210 def __init__(self, schedv2, br_list): 211 super(BenchmarkRunCacheReader, self).__init__() 212 self._schedv2 = schedv2 213 self._br_list = br_list 214 self._logger = self._schedv2.get_logger() 215 216 def run(self): 217 for br in self._br_list: 218 try: 219 br.ReadCache() 220 if br.cache_hit: 221 self._logger.LogOutput('Cache hit - {}'.format(br)) 222 with self._schedv2.lock_on('_cached_br_list'): 223 self._schedv2.get_cached_run_list().append(br) 224 else: 225 self._logger.LogOutput('Cache not hit - {}'.format(br)) 226 except RuntimeError: 227 traceback.print_exc(file=sys.stderr) 228 229 230class Schedv2(object): 231 """New scheduler for crosperf.""" 232 233 def __init__(self, experiment): 234 self._experiment = experiment 235 self._logger = logger.GetLogger(experiment.log_dir) 236 237 # Create shortcuts to nested data structure. "_duts" points to a list of 238 # locked machines. _labels points to a list of all labels. 239 self._duts = self._experiment.machine_manager.GetMachines() 240 self._labels = self._experiment.labels 241 242 # Bookkeeping for synchronization. 243 self._workers_lock = Lock() 244 # pylint: disable=unnecessary-lambda 245 self._lock_map = defaultdict(lambda: Lock()) 246 247 # Test mode flag 248 self._in_test_mode = test_flag.GetTestMode() 249 250 # Read benchmarkrun cache. 251 self._read_br_cache() 252 253 # Mapping from label to a list of benchmark_runs. 254 self._label_brl_map = dict((l, []) for l in self._labels) 255 for br in self._experiment.benchmark_runs: 256 assert br.label in self._label_brl_map 257 # Only put no-cache-hit br into the map. 258 if br not in self._cached_br_list: 259 self._label_brl_map[br.label].append(br) 260 261 # Use machine image manager to calculate initial label allocation. 262 self._mim = MachineImageManager(self._labels, self._duts) 263 self._mim.compute_initial_allocation() 264 265 # Create worker thread, 1 per dut. 266 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 267 self._finished_workers = [] 268 269 # Termination flag. 270 self._terminated = False 271 272 def run_sched(self): 273 """Start all dut worker threads and return immediately.""" 274 275 for w in self._active_workers: 276 w.start() 277 278 def _read_br_cache(self): 279 """Use multi-threading to read cache for all benchmarkruns. 280 281 We do this by firstly creating a few threads, and then assign each 282 thread a segment of all brs. Each thread will check cache status for 283 each br and put those with cache into '_cached_br_list'. 284 """ 285 286 self._cached_br_list = [] 287 n_benchmarkruns = len(self._experiment.benchmark_runs) 288 if n_benchmarkruns <= 4: 289 # Use single thread to read cache. 290 self._logger.LogOutput(('Starting to read cache status for ' 291 '{} benchmark runs ...').format(n_benchmarkruns)) 292 BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() 293 return 294 295 # Split benchmarkruns set into segments. Each segment will be handled by 296 # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). 297 n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4)) 298 self._logger.LogOutput(('Starting {} threads to read cache status for ' 299 '{} benchmark runs ...').format( 300 n_threads, n_benchmarkruns)) 301 benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) // n_threads 302 benchmarkrun_segments = [] 303 for i in range(n_threads - 1): 304 start = i * benchmarkruns_per_thread 305 end = (i + 1) * benchmarkruns_per_thread 306 benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end]) 307 benchmarkrun_segments.append( 308 self._experiment.benchmark_runs[(n_threads - 1) * 309 benchmarkruns_per_thread:]) 310 311 # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. 312 assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns 313 314 # Create and start all readers. 315 cache_readers = [ 316 BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments 317 ] 318 319 for x in cache_readers: 320 x.start() 321 322 # Wait till all readers finish. 323 for x in cache_readers: 324 x.join() 325 326 # Summarize. 327 self._logger.LogOutput( 328 'Total {} cache hit out of {} benchmark_runs.'.format( 329 len(self._cached_br_list), n_benchmarkruns)) 330 331 def get_cached_run_list(self): 332 return self._cached_br_list 333 334 def get_label_map(self): 335 return self._label_brl_map 336 337 def get_experiment(self): 338 return self._experiment 339 340 def get_labels(self, i=None): 341 if i is None: 342 return self._labels 343 return self._labels[i] 344 345 def get_logger(self): 346 return self._logger 347 348 def get_cached_benchmark_run(self): 349 """Get a benchmark_run with 'cache hit'. 350 351 Returns: 352 The benchmark that has cache hit, if any. Otherwise none. 353 """ 354 355 with self.lock_on('_cached_br_list'): 356 if self._cached_br_list: 357 return self._cached_br_list.pop() 358 return None 359 360 def get_benchmark_run(self, dut): 361 """Get a benchmark_run (br) object for a certain dut. 362 363 Args: 364 dut: the dut for which a br is returned. 365 366 Returns: 367 A br with its label matching that of the dut. If no such br could be 368 found, return None (this usually means a reimage is required for the 369 dut). 370 """ 371 372 # If terminated, stop providing any br. 373 if self._terminated: 374 return None 375 376 # If dut bears an unrecognized label, return None. 377 if dut.label is None: 378 return None 379 380 # If br list for the dut's label is empty (that means all brs for this 381 # label have been done), return None. 382 with self.lock_on(dut.label): 383 brl = self._label_brl_map[dut.label] 384 if not brl: 385 return None 386 # Return the first br. 387 return brl.pop(0) 388 389 def allocate_label(self, dut): 390 """Allocate a label to a dut. 391 392 The work is delegated to MachineImageManager. 393 394 The dut_worker calling this method is responsible for reimage the dut to 395 this label. 396 397 Args: 398 dut: the new label that is to be reimaged onto the dut. 399 400 Returns: 401 The label or None. 402 """ 403 404 if self._terminated: 405 return None 406 407 return self._mim.allocate(dut, self) 408 409 def dut_worker_finished(self, dut_worker): 410 """Notify schedv2 that the dut_worker thread finished. 411 412 Args: 413 dut_worker: the thread that is about to end. 414 """ 415 416 self._logger.LogOutput('{} finished.'.format(dut_worker)) 417 with self._workers_lock: 418 self._active_workers.remove(dut_worker) 419 self._finished_workers.append(dut_worker) 420 421 def is_complete(self): 422 return len(self._active_workers) == 0 423 424 def lock_on(self, my_object): 425 return self._lock_map[my_object] 426 427 def terminate(self): 428 """Mark flag so we stop providing br/reimages. 429 430 Also terminate each DutWorker, so they refuse to execute br or reimage. 431 """ 432 433 self._terminated = True 434 for dut_worker in self._active_workers: 435 dut_worker.terminate() 436 437 def threads_status_as_string(self): 438 """Report the dut worker threads status.""" 439 440 status = '{} active threads, {} finished threads.\n'.format( 441 len(self._active_workers), len(self._finished_workers)) 442 status += ' Active threads:' 443 for dw in self._active_workers: 444 status += '\n ' + dw.status_str() 445 if self._finished_workers: 446 status += '\n Finished threads:' 447 for dw in self._finished_workers: 448 status += '\n ' + dw.status_str() 449 return status 450