• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Module to optimize the scheduling of benchmark_run tasks."""
5
6from __future__ import print_function
7
8import sys
9import test_flag
10import traceback
11
12from collections import defaultdict
13from machine_image_manager import MachineImageManager
14from threading import Lock
15from threading import Thread
16from cros_utils import command_executer
17from cros_utils import logger
18
19
20class DutWorker(Thread):
21  """Working thread for a dut."""
22
23  def __init__(self, dut, sched):
24    super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
25    self._dut = dut
26    self._sched = sched
27    self._stat_num_br_run = 0
28    self._stat_num_reimage = 0
29    self._stat_annotation = ''
30    self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
31    self.daemon = True
32    self._terminated = False
33    self._active_br = None
34    # Race condition accessing _active_br between _execute_benchmark_run and
35    # _terminate, so lock it up.
36    self._active_br_lock = Lock()
37
38  def terminate(self):
39    self._terminated = True
40    with self._active_br_lock:
41      if self._active_br is not None:
42        # BenchmarkRun.Terminate() terminates any running testcase via
43        # suite_runner.Terminate and updates timeline.
44        self._active_br.Terminate()
45
46  def run(self):
47    """Do the "run-test->(optionally reimage)->run-test" chore.
48
49        Note - 'br' below means 'benchmark_run'.
50    """
51
52    # Firstly, handle benchmarkruns that have cache hit.
53    br = self._sched.get_cached_benchmark_run()
54    while br:
55      try:
56        self._stat_annotation = 'finishing cached {}'.format(br)
57        br.run()
58      except RuntimeError:
59        traceback.print_exc(file=sys.stdout)
60      br = self._sched.get_cached_benchmark_run()
61
62    # Secondly, handle benchmarkruns that needs to be run on dut.
63    self._setup_dut_label()
64    try:
65      self._logger.LogOutput('{} started.'.format(self))
66      while not self._terminated:
67        br = self._sched.get_benchmark_run(self._dut)
68        if br is None:
69          # No br left for this label. Considering reimaging.
70          label = self._sched.allocate_label(self._dut)
71          if label is None:
72            # No br even for other labels. We are done.
73            self._logger.LogOutput('ImageManager found no label '
74                                   'for dut, stopping working '
75                                   'thread {}.'.format(self))
76            break
77          if self._reimage(label):
78            # Reimage to run other br fails, dut is doomed, stop
79            # this thread.
80            self._logger.LogWarning('Re-image failed, dut '
81                                    'in an unstable state, stopping '
82                                    'working thread {}.'.format(self))
83            break
84        else:
85          # Execute the br.
86          self._execute_benchmark_run(br)
87    finally:
88      self._stat_annotation = 'finished'
89      # Thread finishes. Notify scheduler that I'm done.
90      self._sched.dut_worker_finished(self)
91
92  def _reimage(self, label):
93    """Reimage image to label.
94
95    Args:
96      label: the label to remimage onto dut.
97
98    Returns:
99      0 if successful, otherwise 1.
100    """
101
102    # Termination could happen anywhere, check it.
103    if self._terminated:
104      return 1
105
106    self._logger.LogOutput('Reimaging {} using {}'.format(self, label))
107    self._stat_num_reimage += 1
108    self._stat_annotation = 'reimaging using "{}"'.format(label.name)
109    try:
110      # Note, only 1 reimage at any given time, this is guaranteed in
111      # ImageMachine, so no sync needed below.
112      retval = self._sched.get_experiment().machine_manager.ImageMachine(
113          self._dut, label)
114
115      if retval:
116        return 1
117    except RuntimeError:
118      return 1
119
120    self._dut.label = label
121    return 0
122
123  def _execute_benchmark_run(self, br):
124    """Execute a single benchmark_run.
125
126        Note - this function never throws exceptions.
127    """
128
129    # Termination could happen anywhere, check it.
130    if self._terminated:
131      return
132
133    self._logger.LogOutput('{} started working on {}'.format(self, br))
134    self._stat_num_br_run += 1
135    self._stat_annotation = 'executing {}'.format(br)
136    # benchmark_run.run does not throws, but just play it safe here.
137    try:
138      assert br.owner_thread is None
139      br.owner_thread = self
140      with self._active_br_lock:
141        self._active_br = br
142      br.run()
143    finally:
144      self._sched.get_experiment().BenchmarkRunFinished(br)
145      with self._active_br_lock:
146        self._active_br = None
147
148  def _setup_dut_label(self):
149    """Try to match dut image with a certain experiment label.
150
151        If such match is found, we just skip doing reimage and jump to execute
152        some benchmark_runs.
153    """
154
155    checksum_file = '/usr/local/osimage_checksum_file'
156    try:
157      rv, checksum, _ = command_executer.GetCommandExecuter().\
158          CrosRunCommandWOutput(
159              'cat ' + checksum_file,
160              chromeos_root=self._sched.get_labels(0).chromeos_root,
161              machine=self._dut.name,
162              print_to_console=False)
163      if rv == 0:
164        checksum = checksum.strip()
165        for l in self._sched.get_labels():
166          if l.checksum == checksum:
167            self._logger.LogOutput(
168                "Dut '{}' is pre-installed with '{}'".format(self._dut.name, l))
169            self._dut.label = l
170            return
171    except RuntimeError:
172      traceback.print_exc(file=sys.stdout)
173      self._dut.label = None
174
175  def __str__(self):
176    return 'DutWorker[dut="{}", label="{}"]'.format(
177        self._dut.name, self._dut.label.name if self._dut.label else 'None')
178
179  def dut(self):
180    return self._dut
181
182  def status_str(self):
183    """Report thread status."""
184
185    return ('Worker thread "{}", label="{}", benchmark_run={}, '
186            'reimage={}, now {}'.format(
187                self._dut.name, 'None' if self._dut.label is None else
188                self._dut.label.name, self._stat_num_br_run,
189                self._stat_num_reimage, self._stat_annotation))
190
191
192class BenchmarkRunCacheReader(Thread):
193  """The thread to read cache for a list of benchmark_runs.
194
195    On creation, each instance of this class is given a br_list, which is a
196    subset of experiment._benchmark_runs.
197  """
198
199  def __init__(self, schedv2, br_list):
200    super(BenchmarkRunCacheReader, self).__init__()
201    self._schedv2 = schedv2
202    self._br_list = br_list
203    self._logger = self._schedv2.get_logger()
204
205  def run(self):
206    for br in self._br_list:
207      try:
208        br.ReadCache()
209        if br.cache_hit:
210          self._logger.LogOutput('Cache hit - {}'.format(br))
211          with self._schedv2.lock_on('_cached_br_list'):
212            self._schedv2.get_cached_run_list().append(br)
213        else:
214          self._logger.LogOutput('Cache not hit - {}'.format(br))
215      except RuntimeError:
216        traceback.print_exc(file=sys.stderr)
217
218
219class Schedv2(object):
220  """New scheduler for crosperf."""
221
222  def __init__(self, experiment):
223    self._experiment = experiment
224    self._logger = logger.GetLogger(experiment.log_dir)
225
226    # Create shortcuts to nested data structure. "_duts" points to a list of
227    # locked machines. _labels points to a list of all labels.
228    self._duts = self._experiment.machine_manager.GetMachines()
229    self._labels = self._experiment.labels
230
231    # Bookkeeping for synchronization.
232    self._workers_lock = Lock()
233    # pylint: disable=unnecessary-lambda
234    self._lock_map = defaultdict(lambda: Lock())
235
236    # Test mode flag
237    self._in_test_mode = test_flag.GetTestMode()
238
239    # Read benchmarkrun cache.
240    self._read_br_cache()
241
242    # Mapping from label to a list of benchmark_runs.
243    self._label_brl_map = dict((l, []) for l in self._labels)
244    for br in self._experiment.benchmark_runs:
245      assert br.label in self._label_brl_map
246      # Only put no-cache-hit br into the map.
247      if br not in self._cached_br_list:
248        self._label_brl_map[br.label].append(br)
249
250    # Use machine image manager to calculate initial label allocation.
251    self._mim = MachineImageManager(self._labels, self._duts)
252    self._mim.compute_initial_allocation()
253
254    # Create worker thread, 1 per dut.
255    self._active_workers = [DutWorker(dut, self) for dut in self._duts]
256    self._finished_workers = []
257
258    # Termination flag.
259    self._terminated = False
260
261  def run_sched(self):
262    """Start all dut worker threads and return immediately."""
263
264    for w in self._active_workers:
265      w.start()
266
267  def _read_br_cache(self):
268    """Use multi-threading to read cache for all benchmarkruns.
269
270        We do this by firstly creating a few threads, and then assign each
271        thread a segment of all brs. Each thread will check cache status for
272        each br and put those with cache into '_cached_br_list'.
273    """
274
275    self._cached_br_list = []
276    n_benchmarkruns = len(self._experiment.benchmark_runs)
277    if n_benchmarkruns <= 4:
278      # Use single thread to read cache.
279      self._logger.LogOutput(('Starting to read cache status for '
280                              '{} benchmark runs ...').format(n_benchmarkruns))
281      BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
282      return
283
284    # Split benchmarkruns set into segments. Each segment will be handled by
285    # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
286    n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4))
287    self._logger.LogOutput(('Starting {} threads to read cache status for '
288                            '{} benchmark runs ...').format(
289                                n_threads, n_benchmarkruns))
290    benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads
291    benchmarkrun_segments = []
292    for i in range(n_threads - 1):
293      start = i * benchmarkruns_per_thread
294      end = (i + 1) * benchmarkruns_per_thread
295      benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end])
296    benchmarkrun_segments.append(self._experiment.benchmark_runs[(
297        n_threads - 1) * benchmarkruns_per_thread:])
298
299    # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
300    assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
301
302    # Create and start all readers.
303    cache_readers = [
304        BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
305    ]
306
307    for x in cache_readers:
308      x.start()
309
310    # Wait till all readers finish.
311    for x in cache_readers:
312      x.join()
313
314    # Summarize.
315    self._logger.LogOutput('Total {} cache hit out of {} benchmark_runs.'.
316                           format(len(self._cached_br_list), n_benchmarkruns))
317
318  def get_cached_run_list(self):
319    return self._cached_br_list
320
321  def get_label_map(self):
322    return self._label_brl_map
323
324  def get_experiment(self):
325    return self._experiment
326
327  def get_labels(self, i=None):
328    if i == None:
329      return self._labels
330    return self._labels[i]
331
332  def get_logger(self):
333    return self._logger
334
335  def get_cached_benchmark_run(self):
336    """Get a benchmark_run with 'cache hit'.
337
338    Returns:
339      The benchmark that has cache hit, if any. Otherwise none.
340    """
341
342    with self.lock_on('_cached_br_list'):
343      if self._cached_br_list:
344        return self._cached_br_list.pop()
345      return None
346
347  def get_benchmark_run(self, dut):
348    """Get a benchmark_run (br) object for a certain dut.
349
350    Args:
351      dut: the dut for which a br is returned.
352
353    Returns:
354      A br with its label matching that of the dut. If no such br could be
355      found, return None (this usually means a reimage is required for the
356      dut).
357    """
358
359    # If terminated, stop providing any br.
360    if self._terminated:
361      return None
362
363    # If dut bears an unrecognized label, return None.
364    if dut.label is None:
365      return None
366
367    # If br list for the dut's label is empty (that means all brs for this
368    # label have been done), return None.
369    with self.lock_on(dut.label):
370      brl = self._label_brl_map[dut.label]
371      if not brl:
372        return None
373      # Return the first br.
374      return brl.pop(0)
375
376  def allocate_label(self, dut):
377    """Allocate a label to a dut.
378
379        The work is delegated to MachineImageManager.
380
381        The dut_worker calling this method is responsible for reimage the dut to
382        this label.
383
384    Args:
385      dut: the new label that is to be reimaged onto the dut.
386
387    Returns:
388      The label or None.
389    """
390
391    if self._terminated:
392      return None
393
394    return self._mim.allocate(dut, self)
395
396  def dut_worker_finished(self, dut_worker):
397    """Notify schedv2 that the dut_worker thread finished.
398
399    Args:
400      dut_worker: the thread that is about to end.
401    """
402
403    self._logger.LogOutput('{} finished.'.format(dut_worker))
404    with self._workers_lock:
405      self._active_workers.remove(dut_worker)
406      self._finished_workers.append(dut_worker)
407
408  def is_complete(self):
409    return len(self._active_workers) == 0
410
411  def lock_on(self, my_object):
412    return self._lock_map[my_object]
413
414  def terminate(self):
415    """Mark flag so we stop providing br/reimages.
416
417        Also terminate each DutWorker, so they refuse to execute br or reimage.
418    """
419
420    self._terminated = True
421    for dut_worker in self._active_workers:
422      dut_worker.terminate()
423
424  def threads_status_as_string(self):
425    """Report the dut worker threads status."""
426
427    status = '{} active threads, {} finished threads.\n'.format(
428        len(self._active_workers), len(self._finished_workers))
429    status += '  Active threads:'
430    for dw in self._active_workers:
431      status += '\n    ' + dw.status_str()
432    if self._finished_workers:
433      status += '\n  Finished threads:'
434      for dw in self._finished_workers:
435        status += '\n    ' + dw.status_str()
436    return status
437