• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2015 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Module to optimize the scheduling of benchmark_run tasks."""
7
8
9from collections import defaultdict
10import sys
11from threading import Lock
12from threading import Thread
13import traceback
14
15from cros_utils import command_executer
16from cros_utils import logger
17from machine_image_manager import MachineImageManager
18import test_flag
19
20
21class DutWorker(Thread):
22    """Working thread for a dut."""
23
24    def __init__(self, dut, sched):
25        super(DutWorker, self).__init__(name="DutWorker-{}".format(dut.name))
26        self._dut = dut
27        self._sched = sched
28        self._stat_num_br_run = 0
29        self._stat_num_reimage = 0
30        self._stat_annotation = ""
31        self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
32        self.daemon = True
33        self._terminated = False
34        self._active_br = None
35        # Race condition accessing _active_br between _execute_benchmark_run and
36        # _terminate, so lock it up.
37        self._active_br_lock = Lock()
38
39    def terminate(self):
40        self._terminated = True
41        with self._active_br_lock:
42            if self._active_br is not None:
43                # BenchmarkRun.Terminate() terminates any running testcase via
44                # suite_runner.Terminate and updates timeline.
45                self._active_br.Terminate()
46
47    def run(self):
48        """Do the "run-test->(optionally reimage)->run-test" chore.
49
50        Note - 'br' below means 'benchmark_run'.
51        """
52
53        # Firstly, handle benchmarkruns that have cache hit.
54        br = self._sched.get_cached_benchmark_run()
55        while br:
56            try:
57                self._stat_annotation = "finishing cached {}".format(br)
58                br.run()
59            except RuntimeError:
60                traceback.print_exc(file=sys.stdout)
61            br = self._sched.get_cached_benchmark_run()
62
63        # Secondly, handle benchmarkruns that needs to be run on dut.
64        self._setup_dut_label()
65        try:
66            self._logger.LogOutput("{} started.".format(self))
67            while not self._terminated:
68                br = self._sched.get_benchmark_run(self._dut)
69                if br is None:
70                    # No br left for this label. Considering reimaging.
71                    label = self._sched.allocate_label(self._dut)
72                    if label is None:
73                        # No br even for other labels. We are done.
74                        self._logger.LogOutput(
75                            "ImageManager found no label "
76                            "for dut, stopping working "
77                            "thread {}.".format(self)
78                        )
79                        break
80                    if self._reimage(label):
81                        # Reimage to run other br fails, dut is doomed, stop
82                        # this thread.
83                        self._logger.LogWarning(
84                            "Re-image failed, dut "
85                            "in an unstable state, stopping "
86                            "working thread {}.".format(self)
87                        )
88                        break
89                else:
90                    # Execute the br.
91                    self._execute_benchmark_run(br)
92        finally:
93            self._stat_annotation = "finished"
94            # Thread finishes. Notify scheduler that I'm done.
95            self._sched.dut_worker_finished(self)
96
97    def _reimage(self, label):
98        """Reimage image to label.
99
100        Args:
101          label: the label to remimage onto dut.
102
103        Returns:
104          0 if successful, otherwise 1.
105        """
106
107        # Termination could happen anywhere, check it.
108        if self._terminated:
109            return 1
110
111        if self._sched.get_experiment().crosfleet:
112            self._logger.LogOutput(
113                "Crosfleet mode, do not image before testing."
114            )
115            self._dut.label = label
116            return 0
117
118        self._logger.LogOutput("Reimaging {} using {}".format(self, label))
119        self._stat_num_reimage += 1
120        self._stat_annotation = 'reimaging using "{}"'.format(label.name)
121        try:
122            # Note, only 1 reimage at any given time, this is guaranteed in
123            # ImageMachine, so no sync needed below.
124            retval = self._sched.get_experiment().machine_manager.ImageMachine(
125                self._dut, label
126            )
127
128            if retval:
129                return 1
130        except RuntimeError:
131            return 1
132
133        self._dut.label = label
134        return 0
135
136    def _execute_benchmark_run(self, br):
137        """Execute a single benchmark_run.
138
139        Note - this function never throws exceptions.
140        """
141
142        # Termination could happen anywhere, check it.
143        if self._terminated:
144            return
145
146        self._logger.LogOutput("{} started working on {}".format(self, br))
147        self._stat_num_br_run += 1
148        self._stat_annotation = "executing {}".format(br)
149        # benchmark_run.run does not throws, but just play it safe here.
150        try:
151            assert br.owner_thread is None
152            br.owner_thread = self
153            with self._active_br_lock:
154                self._active_br = br
155            br.run()
156        finally:
157            self._sched.get_experiment().BenchmarkRunFinished(br)
158            with self._active_br_lock:
159                self._active_br = None
160
161    def _setup_dut_label(self):
162        """Try to match dut image with a certain experiment label.
163
164        If such match is found, we just skip doing reimage and jump to execute
165        some benchmark_runs.
166        """
167
168        checksum_file = "/usr/local/osimage_checksum_file"
169        try:
170            (
171                rv,
172                checksum,
173                _,
174            ) = command_executer.GetCommandExecuter().CrosRunCommandWOutput(
175                "cat " + checksum_file,
176                chromeos_root=self._sched.get_labels(0).chromeos_root,
177                machine=self._dut.name,
178                print_to_console=False,
179            )
180            if rv == 0:
181                checksum = checksum.strip()
182                for l in self._sched.get_labels():
183                    if l.checksum == checksum:
184                        self._logger.LogOutput(
185                            "Dut '{}' is pre-installed with '{}'".format(
186                                self._dut.name, l
187                            )
188                        )
189                        self._dut.label = l
190                        return
191        except RuntimeError:
192            traceback.print_exc(file=sys.stdout)
193            self._dut.label = None
194
195    def __str__(self):
196        return 'DutWorker[dut="{}", label="{}"]'.format(
197            self._dut.name, self._dut.label.name if self._dut.label else "None"
198        )
199
200    def dut(self):
201        return self._dut
202
203    def status_str(self):
204        """Report thread status."""
205
206        return (
207            'Worker thread "{}", label="{}", benchmark_run={}, '
208            "reimage={}, now {}".format(
209                self._dut.name,
210                "None" if self._dut.label is None else self._dut.label.name,
211                self._stat_num_br_run,
212                self._stat_num_reimage,
213                self._stat_annotation,
214            )
215        )
216
217
218class BenchmarkRunCacheReader(Thread):
219    """The thread to read cache for a list of benchmark_runs.
220
221    On creation, each instance of this class is given a br_list, which is a
222    subset of experiment._benchmark_runs.
223    """
224
225    def __init__(self, schedv2, br_list):
226        super(BenchmarkRunCacheReader, self).__init__()
227        self._schedv2 = schedv2
228        self._br_list = br_list
229        self._logger = self._schedv2.get_logger()
230
231    def run(self):
232        for br in self._br_list:
233            try:
234                br.ReadCache()
235                if br.cache_hit:
236                    self._logger.LogOutput("Cache hit - {}".format(br))
237                    with self._schedv2.lock_on("_cached_br_list"):
238                        self._schedv2.get_cached_run_list().append(br)
239                else:
240                    self._logger.LogOutput("Cache not hit - {}".format(br))
241            except RuntimeError:
242                traceback.print_exc(file=sys.stderr)
243
244
245class Schedv2(object):
246    """New scheduler for crosperf."""
247
248    def __init__(self, experiment):
249        self._experiment = experiment
250        self._logger = logger.GetLogger(experiment.log_dir)
251
252        # Create shortcuts to nested data structure. "_duts" points to a list of
253        # locked machines. _labels points to a list of all labels.
254        self._duts = self._experiment.machine_manager.GetMachines()
255        self._labels = self._experiment.labels
256
257        # Bookkeeping for synchronization.
258        self._workers_lock = Lock()
259        # pylint: disable=unnecessary-lambda
260        self._lock_map = defaultdict(lambda: Lock())
261
262        # Test mode flag
263        self._in_test_mode = test_flag.GetTestMode()
264
265        # Read benchmarkrun cache.
266        self._read_br_cache()
267
268        # Mapping from label to a list of benchmark_runs.
269        self._label_brl_map = dict((l, []) for l in self._labels)
270        for br in self._experiment.benchmark_runs:
271            assert br.label in self._label_brl_map
272            # Only put no-cache-hit br into the map.
273            if br not in self._cached_br_list:
274                self._label_brl_map[br.label].append(br)
275
276        # Use machine image manager to calculate initial label allocation.
277        self._mim = MachineImageManager(self._labels, self._duts)
278        self._mim.compute_initial_allocation()
279
280        # Create worker thread, 1 per dut.
281        self._active_workers = [DutWorker(dut, self) for dut in self._duts]
282        self._finished_workers = []
283
284        # Termination flag.
285        self._terminated = False
286
287    def run_sched(self):
288        """Start all dut worker threads and return immediately."""
289
290        for w in self._active_workers:
291            w.start()
292
293    def _read_br_cache(self):
294        """Use multi-threading to read cache for all benchmarkruns.
295
296        We do this by firstly creating a few threads, and then assign each
297        thread a segment of all brs. Each thread will check cache status for
298        each br and put those with cache into '_cached_br_list'.
299        """
300
301        self._cached_br_list = []
302        n_benchmarkruns = len(self._experiment.benchmark_runs)
303        if n_benchmarkruns <= 4:
304            # Use single thread to read cache.
305            self._logger.LogOutput(
306                (
307                    "Starting to read cache status for " "{} benchmark runs ..."
308                ).format(n_benchmarkruns)
309            )
310            BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
311            return
312
313        # Split benchmarkruns set into segments. Each segment will be handled by
314        # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
315        n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4))
316        self._logger.LogOutput(
317            (
318                "Starting {} threads to read cache status for "
319                "{} benchmark runs ..."
320            ).format(n_threads, n_benchmarkruns)
321        )
322        benchmarkruns_per_thread = (
323            n_benchmarkruns + n_threads - 1
324        ) // n_threads
325        benchmarkrun_segments = []
326        for i in range(n_threads - 1):
327            start = i * benchmarkruns_per_thread
328            end = (i + 1) * benchmarkruns_per_thread
329            benchmarkrun_segments.append(
330                self._experiment.benchmark_runs[start:end]
331            )
332        benchmarkrun_segments.append(
333            self._experiment.benchmark_runs[
334                (n_threads - 1) * benchmarkruns_per_thread :
335            ]
336        )
337
338        # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
339        assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
340
341        # Create and start all readers.
342        cache_readers = [
343            BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
344        ]
345
346        for x in cache_readers:
347            x.start()
348
349        # Wait till all readers finish.
350        for x in cache_readers:
351            x.join()
352
353        # Summarize.
354        self._logger.LogOutput(
355            "Total {} cache hit out of {} benchmark_runs.".format(
356                len(self._cached_br_list), n_benchmarkruns
357            )
358        )
359
360    def get_cached_run_list(self):
361        return self._cached_br_list
362
363    def get_label_map(self):
364        return self._label_brl_map
365
366    def get_experiment(self):
367        return self._experiment
368
369    def get_labels(self, i=None):
370        if i is None:
371            return self._labels
372        return self._labels[i]
373
374    def get_logger(self):
375        return self._logger
376
377    def get_cached_benchmark_run(self):
378        """Get a benchmark_run with 'cache hit'.
379
380        Returns:
381          The benchmark that has cache hit, if any. Otherwise none.
382        """
383
384        with self.lock_on("_cached_br_list"):
385            if self._cached_br_list:
386                return self._cached_br_list.pop()
387            return None
388
389    def get_benchmark_run(self, dut):
390        """Get a benchmark_run (br) object for a certain dut.
391
392        Args:
393          dut: the dut for which a br is returned.
394
395        Returns:
396          A br with its label matching that of the dut. If no such br could be
397          found, return None (this usually means a reimage is required for the
398          dut).
399        """
400
401        # If terminated, stop providing any br.
402        if self._terminated:
403            return None
404
405        # If dut bears an unrecognized label, return None.
406        if dut.label is None:
407            return None
408
409        # If br list for the dut's label is empty (that means all brs for this
410        # label have been done), return None.
411        with self.lock_on(dut.label):
412            brl = self._label_brl_map[dut.label]
413            if not brl:
414                return None
415            # Return the first br.
416            return brl.pop(0)
417
418    def allocate_label(self, dut):
419        """Allocate a label to a dut.
420
421            The work is delegated to MachineImageManager.
422
423            The dut_worker calling this method is responsible for reimage the dut to
424            this label.
425
426        Args:
427          dut: the new label that is to be reimaged onto the dut.
428
429        Returns:
430          The label or None.
431        """
432
433        if self._terminated:
434            return None
435
436        return self._mim.allocate(dut, self)
437
438    def dut_worker_finished(self, dut_worker):
439        """Notify schedv2 that the dut_worker thread finished.
440
441        Args:
442          dut_worker: the thread that is about to end.
443        """
444
445        self._logger.LogOutput("{} finished.".format(dut_worker))
446        with self._workers_lock:
447            self._active_workers.remove(dut_worker)
448            self._finished_workers.append(dut_worker)
449
450    def is_complete(self):
451        return len(self._active_workers) == 0
452
453    def lock_on(self, my_object):
454        return self._lock_map[my_object]
455
456    def terminate(self):
457        """Mark flag so we stop providing br/reimages.
458
459        Also terminate each DutWorker, so they refuse to execute br or reimage.
460        """
461
462        self._terminated = True
463        for dut_worker in self._active_workers:
464            dut_worker.terminate()
465
466    def threads_status_as_string(self):
467        """Report the dut worker threads status."""
468
469        status = "{} active threads, {} finished threads.\n".format(
470            len(self._active_workers), len(self._finished_workers)
471        )
472        status += "  Active threads:"
473        for dw in self._active_workers:
474            status += "\n    " + dw.status_str()
475        if self._finished_workers:
476            status += "\n  Finished threads:"
477            for dw in self._finished_workers:
478                status += "\n    " + dw.status_str()
479        return status
480