1# -*- coding: utf-8 -*- 2# Copyright 2015 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Module to optimize the scheduling of benchmark_run tasks.""" 7 8 9from collections import defaultdict 10import sys 11from threading import Lock 12from threading import Thread 13import traceback 14 15from cros_utils import command_executer 16from cros_utils import logger 17from machine_image_manager import MachineImageManager 18import test_flag 19 20 21class DutWorker(Thread): 22 """Working thread for a dut.""" 23 24 def __init__(self, dut, sched): 25 super(DutWorker, self).__init__(name="DutWorker-{}".format(dut.name)) 26 self._dut = dut 27 self._sched = sched 28 self._stat_num_br_run = 0 29 self._stat_num_reimage = 0 30 self._stat_annotation = "" 31 self._logger = logger.GetLogger(self._sched.get_experiment().log_dir) 32 self.daemon = True 33 self._terminated = False 34 self._active_br = None 35 # Race condition accessing _active_br between _execute_benchmark_run and 36 # _terminate, so lock it up. 37 self._active_br_lock = Lock() 38 39 def terminate(self): 40 self._terminated = True 41 with self._active_br_lock: 42 if self._active_br is not None: 43 # BenchmarkRun.Terminate() terminates any running testcase via 44 # suite_runner.Terminate and updates timeline. 45 self._active_br.Terminate() 46 47 def run(self): 48 """Do the "run-test->(optionally reimage)->run-test" chore. 49 50 Note - 'br' below means 'benchmark_run'. 51 """ 52 53 # Firstly, handle benchmarkruns that have cache hit. 54 br = self._sched.get_cached_benchmark_run() 55 while br: 56 try: 57 self._stat_annotation = "finishing cached {}".format(br) 58 br.run() 59 except RuntimeError: 60 traceback.print_exc(file=sys.stdout) 61 br = self._sched.get_cached_benchmark_run() 62 63 # Secondly, handle benchmarkruns that needs to be run on dut. 64 self._setup_dut_label() 65 try: 66 self._logger.LogOutput("{} started.".format(self)) 67 while not self._terminated: 68 br = self._sched.get_benchmark_run(self._dut) 69 if br is None: 70 # No br left for this label. Considering reimaging. 71 label = self._sched.allocate_label(self._dut) 72 if label is None: 73 # No br even for other labels. We are done. 74 self._logger.LogOutput( 75 "ImageManager found no label " 76 "for dut, stopping working " 77 "thread {}.".format(self) 78 ) 79 break 80 if self._reimage(label): 81 # Reimage to run other br fails, dut is doomed, stop 82 # this thread. 83 self._logger.LogWarning( 84 "Re-image failed, dut " 85 "in an unstable state, stopping " 86 "working thread {}.".format(self) 87 ) 88 break 89 else: 90 # Execute the br. 91 self._execute_benchmark_run(br) 92 finally: 93 self._stat_annotation = "finished" 94 # Thread finishes. Notify scheduler that I'm done. 95 self._sched.dut_worker_finished(self) 96 97 def _reimage(self, label): 98 """Reimage image to label. 99 100 Args: 101 label: the label to remimage onto dut. 102 103 Returns: 104 0 if successful, otherwise 1. 105 """ 106 107 # Termination could happen anywhere, check it. 108 if self._terminated: 109 return 1 110 111 if self._sched.get_experiment().crosfleet: 112 self._logger.LogOutput( 113 "Crosfleet mode, do not image before testing." 114 ) 115 self._dut.label = label 116 return 0 117 118 self._logger.LogOutput("Reimaging {} using {}".format(self, label)) 119 self._stat_num_reimage += 1 120 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 121 try: 122 # Note, only 1 reimage at any given time, this is guaranteed in 123 # ImageMachine, so no sync needed below. 124 retval = self._sched.get_experiment().machine_manager.ImageMachine( 125 self._dut, label 126 ) 127 128 if retval: 129 return 1 130 except RuntimeError: 131 return 1 132 133 self._dut.label = label 134 return 0 135 136 def _execute_benchmark_run(self, br): 137 """Execute a single benchmark_run. 138 139 Note - this function never throws exceptions. 140 """ 141 142 # Termination could happen anywhere, check it. 143 if self._terminated: 144 return 145 146 self._logger.LogOutput("{} started working on {}".format(self, br)) 147 self._stat_num_br_run += 1 148 self._stat_annotation = "executing {}".format(br) 149 # benchmark_run.run does not throws, but just play it safe here. 150 try: 151 assert br.owner_thread is None 152 br.owner_thread = self 153 with self._active_br_lock: 154 self._active_br = br 155 br.run() 156 finally: 157 self._sched.get_experiment().BenchmarkRunFinished(br) 158 with self._active_br_lock: 159 self._active_br = None 160 161 def _setup_dut_label(self): 162 """Try to match dut image with a certain experiment label. 163 164 If such match is found, we just skip doing reimage and jump to execute 165 some benchmark_runs. 166 """ 167 168 checksum_file = "/usr/local/osimage_checksum_file" 169 try: 170 ( 171 rv, 172 checksum, 173 _, 174 ) = command_executer.GetCommandExecuter().CrosRunCommandWOutput( 175 "cat " + checksum_file, 176 chromeos_root=self._sched.get_labels(0).chromeos_root, 177 machine=self._dut.name, 178 print_to_console=False, 179 ) 180 if rv == 0: 181 checksum = checksum.strip() 182 for l in self._sched.get_labels(): 183 if l.checksum == checksum: 184 self._logger.LogOutput( 185 "Dut '{}' is pre-installed with '{}'".format( 186 self._dut.name, l 187 ) 188 ) 189 self._dut.label = l 190 return 191 except RuntimeError: 192 traceback.print_exc(file=sys.stdout) 193 self._dut.label = None 194 195 def __str__(self): 196 return 'DutWorker[dut="{}", label="{}"]'.format( 197 self._dut.name, self._dut.label.name if self._dut.label else "None" 198 ) 199 200 def dut(self): 201 return self._dut 202 203 def status_str(self): 204 """Report thread status.""" 205 206 return ( 207 'Worker thread "{}", label="{}", benchmark_run={}, ' 208 "reimage={}, now {}".format( 209 self._dut.name, 210 "None" if self._dut.label is None else self._dut.label.name, 211 self._stat_num_br_run, 212 self._stat_num_reimage, 213 self._stat_annotation, 214 ) 215 ) 216 217 218class BenchmarkRunCacheReader(Thread): 219 """The thread to read cache for a list of benchmark_runs. 220 221 On creation, each instance of this class is given a br_list, which is a 222 subset of experiment._benchmark_runs. 223 """ 224 225 def __init__(self, schedv2, br_list): 226 super(BenchmarkRunCacheReader, self).__init__() 227 self._schedv2 = schedv2 228 self._br_list = br_list 229 self._logger = self._schedv2.get_logger() 230 231 def run(self): 232 for br in self._br_list: 233 try: 234 br.ReadCache() 235 if br.cache_hit: 236 self._logger.LogOutput("Cache hit - {}".format(br)) 237 with self._schedv2.lock_on("_cached_br_list"): 238 self._schedv2.get_cached_run_list().append(br) 239 else: 240 self._logger.LogOutput("Cache not hit - {}".format(br)) 241 except RuntimeError: 242 traceback.print_exc(file=sys.stderr) 243 244 245class Schedv2(object): 246 """New scheduler for crosperf.""" 247 248 def __init__(self, experiment): 249 self._experiment = experiment 250 self._logger = logger.GetLogger(experiment.log_dir) 251 252 # Create shortcuts to nested data structure. "_duts" points to a list of 253 # locked machines. _labels points to a list of all labels. 254 self._duts = self._experiment.machine_manager.GetMachines() 255 self._labels = self._experiment.labels 256 257 # Bookkeeping for synchronization. 258 self._workers_lock = Lock() 259 # pylint: disable=unnecessary-lambda 260 self._lock_map = defaultdict(lambda: Lock()) 261 262 # Test mode flag 263 self._in_test_mode = test_flag.GetTestMode() 264 265 # Read benchmarkrun cache. 266 self._read_br_cache() 267 268 # Mapping from label to a list of benchmark_runs. 269 self._label_brl_map = dict((l, []) for l in self._labels) 270 for br in self._experiment.benchmark_runs: 271 assert br.label in self._label_brl_map 272 # Only put no-cache-hit br into the map. 273 if br not in self._cached_br_list: 274 self._label_brl_map[br.label].append(br) 275 276 # Use machine image manager to calculate initial label allocation. 277 self._mim = MachineImageManager(self._labels, self._duts) 278 self._mim.compute_initial_allocation() 279 280 # Create worker thread, 1 per dut. 281 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 282 self._finished_workers = [] 283 284 # Termination flag. 285 self._terminated = False 286 287 def run_sched(self): 288 """Start all dut worker threads and return immediately.""" 289 290 for w in self._active_workers: 291 w.start() 292 293 def _read_br_cache(self): 294 """Use multi-threading to read cache for all benchmarkruns. 295 296 We do this by firstly creating a few threads, and then assign each 297 thread a segment of all brs. Each thread will check cache status for 298 each br and put those with cache into '_cached_br_list'. 299 """ 300 301 self._cached_br_list = [] 302 n_benchmarkruns = len(self._experiment.benchmark_runs) 303 if n_benchmarkruns <= 4: 304 # Use single thread to read cache. 305 self._logger.LogOutput( 306 ( 307 "Starting to read cache status for " "{} benchmark runs ..." 308 ).format(n_benchmarkruns) 309 ) 310 BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() 311 return 312 313 # Split benchmarkruns set into segments. Each segment will be handled by 314 # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). 315 n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4)) 316 self._logger.LogOutput( 317 ( 318 "Starting {} threads to read cache status for " 319 "{} benchmark runs ..." 320 ).format(n_threads, n_benchmarkruns) 321 ) 322 benchmarkruns_per_thread = ( 323 n_benchmarkruns + n_threads - 1 324 ) // n_threads 325 benchmarkrun_segments = [] 326 for i in range(n_threads - 1): 327 start = i * benchmarkruns_per_thread 328 end = (i + 1) * benchmarkruns_per_thread 329 benchmarkrun_segments.append( 330 self._experiment.benchmark_runs[start:end] 331 ) 332 benchmarkrun_segments.append( 333 self._experiment.benchmark_runs[ 334 (n_threads - 1) * benchmarkruns_per_thread : 335 ] 336 ) 337 338 # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. 339 assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns 340 341 # Create and start all readers. 342 cache_readers = [ 343 BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments 344 ] 345 346 for x in cache_readers: 347 x.start() 348 349 # Wait till all readers finish. 350 for x in cache_readers: 351 x.join() 352 353 # Summarize. 354 self._logger.LogOutput( 355 "Total {} cache hit out of {} benchmark_runs.".format( 356 len(self._cached_br_list), n_benchmarkruns 357 ) 358 ) 359 360 def get_cached_run_list(self): 361 return self._cached_br_list 362 363 def get_label_map(self): 364 return self._label_brl_map 365 366 def get_experiment(self): 367 return self._experiment 368 369 def get_labels(self, i=None): 370 if i is None: 371 return self._labels 372 return self._labels[i] 373 374 def get_logger(self): 375 return self._logger 376 377 def get_cached_benchmark_run(self): 378 """Get a benchmark_run with 'cache hit'. 379 380 Returns: 381 The benchmark that has cache hit, if any. Otherwise none. 382 """ 383 384 with self.lock_on("_cached_br_list"): 385 if self._cached_br_list: 386 return self._cached_br_list.pop() 387 return None 388 389 def get_benchmark_run(self, dut): 390 """Get a benchmark_run (br) object for a certain dut. 391 392 Args: 393 dut: the dut for which a br is returned. 394 395 Returns: 396 A br with its label matching that of the dut. If no such br could be 397 found, return None (this usually means a reimage is required for the 398 dut). 399 """ 400 401 # If terminated, stop providing any br. 402 if self._terminated: 403 return None 404 405 # If dut bears an unrecognized label, return None. 406 if dut.label is None: 407 return None 408 409 # If br list for the dut's label is empty (that means all brs for this 410 # label have been done), return None. 411 with self.lock_on(dut.label): 412 brl = self._label_brl_map[dut.label] 413 if not brl: 414 return None 415 # Return the first br. 416 return brl.pop(0) 417 418 def allocate_label(self, dut): 419 """Allocate a label to a dut. 420 421 The work is delegated to MachineImageManager. 422 423 The dut_worker calling this method is responsible for reimage the dut to 424 this label. 425 426 Args: 427 dut: the new label that is to be reimaged onto the dut. 428 429 Returns: 430 The label or None. 431 """ 432 433 if self._terminated: 434 return None 435 436 return self._mim.allocate(dut, self) 437 438 def dut_worker_finished(self, dut_worker): 439 """Notify schedv2 that the dut_worker thread finished. 440 441 Args: 442 dut_worker: the thread that is about to end. 443 """ 444 445 self._logger.LogOutput("{} finished.".format(dut_worker)) 446 with self._workers_lock: 447 self._active_workers.remove(dut_worker) 448 self._finished_workers.append(dut_worker) 449 450 def is_complete(self): 451 return len(self._active_workers) == 0 452 453 def lock_on(self, my_object): 454 return self._lock_map[my_object] 455 456 def terminate(self): 457 """Mark flag so we stop providing br/reimages. 458 459 Also terminate each DutWorker, so they refuse to execute br or reimage. 460 """ 461 462 self._terminated = True 463 for dut_worker in self._active_workers: 464 dut_worker.terminate() 465 466 def threads_status_as_string(self): 467 """Report the dut worker threads status.""" 468 469 status = "{} active threads, {} finished threads.\n".format( 470 len(self._active_workers), len(self._finished_workers) 471 ) 472 status += " Active threads:" 473 for dw in self._active_workers: 474 status += "\n " + dw.status_str() 475 if self._finished_workers: 476 status += "\n Finished threads:" 477 for dw in self._finished_workers: 478 status += "\n " + dw.status_str() 479 return status 480