• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""The experiment setting module."""
5
6from __future__ import print_function
7
8import os
9import time
10
11import afe_lock_machine
12from threading import Lock
13
14from cros_utils import logger
15from cros_utils import misc
16
17import benchmark_run
18from machine_manager import BadChecksum
19from machine_manager import MachineManager
20from machine_manager import MockMachineManager
21import test_flag
22
23
24class Experiment(object):
25  """Class representing an Experiment to be run."""
26
27  def __init__(self, name, remote, working_directory, chromeos_root,
28               cache_conditions, labels, benchmarks, experiment_file, email_to,
29               acquire_timeout, log_dir, log_level, share_cache,
30               results_directory, locks_directory):
31    self.name = name
32    self.working_directory = working_directory
33    self.remote = remote
34    self.chromeos_root = chromeos_root
35    self.cache_conditions = cache_conditions
36    self.experiment_file = experiment_file
37    self.email_to = email_to
38    if not results_directory:
39      self.results_directory = os.path.join(self.working_directory,
40                                            self.name + '_results')
41    else:
42      self.results_directory = misc.CanonicalizePath(results_directory)
43    self.log_dir = log_dir
44    self.log_level = log_level
45    self.labels = labels
46    self.benchmarks = benchmarks
47    self.num_complete = 0
48    self.num_run_complete = 0
49    self.share_cache = share_cache
50    self.active_threads = []
51    # If locks_directory (self.lock_dir) not blank, we will use the file
52    # locking mechanism; if it is blank then we will use the AFE server
53    # locking mechanism.
54    self.locks_dir = locks_directory
55    self.locked_machines = []
56
57    if not remote:
58      raise RuntimeError('No remote hosts specified')
59    if not self.benchmarks:
60      raise RuntimeError('No benchmarks specified')
61    if not self.labels:
62      raise RuntimeError('No labels specified')
63
64    # We need one chromeos_root to run the benchmarks in, but it doesn't
65    # matter where it is, unless the ABIs are different.
66    if not chromeos_root:
67      for label in self.labels:
68        if label.chromeos_root:
69          chromeos_root = label.chromeos_root
70          break
71    if not chromeos_root:
72      raise RuntimeError('No chromeos_root given and could not determine '
73                         'one from the image path.')
74
75    machine_manager_fn = MachineManager
76    if test_flag.GetTestMode():
77      machine_manager_fn = MockMachineManager
78    self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout,
79                                              log_level, locks_directory)
80    self.l = logger.GetLogger(log_dir)
81
82    for machine in self.remote:
83      # machine_manager.AddMachine only adds reachable machines.
84      self.machine_manager.AddMachine(machine)
85    # Now machine_manager._all_machines contains a list of reachable
86    # machines. This is a subset of self.remote. We make both lists the same.
87    self.remote = [m.name for m in self.machine_manager.GetAllMachines()]
88    if not self.remote:
89      raise RuntimeError('No machine available for running experiment.')
90
91    for label in labels:
92      # We filter out label remotes that are not reachable (not in
93      # self.remote). So each label.remote is a sublist of experiment.remote.
94      label.remote = [r for r in label.remote if r in self.remote]
95      try:
96        self.machine_manager.ComputeCommonCheckSum(label)
97      except BadChecksum:
98        # Force same image on all machines, then we do checksum again. No
99        # bailout if checksums still do not match.
100        self.machine_manager.ForceSameImageToAllMachines(label)
101        self.machine_manager.ComputeCommonCheckSum(label)
102
103      self.machine_manager.ComputeCommonCheckSumString(label)
104
105    self.start_time = None
106    self.benchmark_runs = self._GenerateBenchmarkRuns()
107
108    self._schedv2 = None
109    self._internal_counter_lock = Lock()
110
111  def set_schedv2(self, schedv2):
112    self._schedv2 = schedv2
113
114  def schedv2(self):
115    return self._schedv2
116
117  def _GenerateBenchmarkRuns(self):
118    """Generate benchmark runs from labels and benchmark defintions."""
119    benchmark_runs = []
120    for label in self.labels:
121      for benchmark in self.benchmarks:
122        for iteration in xrange(1, benchmark.iterations + 1):
123
124          benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name,
125                                                iteration)
126          full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration)
127          logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name),
128                                        True)
129          benchmark_runs.append(
130              benchmark_run.BenchmarkRun(benchmark_run_name, benchmark, label,
131                                         iteration, self.cache_conditions,
132                                         self.machine_manager, logger_to_use,
133                                         self.log_level, self.share_cache))
134
135    return benchmark_runs
136
137  def Build(self):
138    pass
139
140  def Terminate(self):
141    if self._schedv2 is not None:
142      self._schedv2.terminate()
143    else:
144      for t in self.benchmark_runs:
145        if t.isAlive():
146          self.l.LogError("Terminating run: '%s'." % t.name)
147          t.Terminate()
148
149  def IsComplete(self):
150    if self._schedv2:
151      return self._schedv2.is_complete()
152    if self.active_threads:
153      for t in self.active_threads:
154        if t.isAlive():
155          t.join(0)
156        if not t.isAlive():
157          self.num_complete += 1
158          if not t.cache_hit:
159            self.num_run_complete += 1
160          self.active_threads.remove(t)
161      return False
162    return True
163
164  def BenchmarkRunFinished(self, br):
165    """Update internal counters after br finishes.
166
167    Note this is only used by schedv2 and is called by multiple threads.
168    Never throw any exception here.
169    """
170
171    assert self._schedv2 is not None
172    with self._internal_counter_lock:
173      self.num_complete += 1
174      if not br.cache_hit:
175        self.num_run_complete += 1
176
177  def Run(self):
178    self.start_time = time.time()
179    if self._schedv2 is not None:
180      self._schedv2.run_sched()
181    else:
182      self.active_threads = []
183      for run in self.benchmark_runs:
184        # Set threads to daemon so program exits when ctrl-c is pressed.
185        run.daemon = True
186        run.start()
187        self.active_threads.append(run)
188
189  def SetCacheConditions(self, cache_conditions):
190    for run in self.benchmark_runs:
191      run.SetCacheConditions(cache_conditions)
192
193  def Cleanup(self):
194    """Make sure all machines are unlocked."""
195    if self.locks_dir:
196      # We are using the file locks mechanism, so call machine_manager.Cleanup
197      # to unlock everything.
198      self.machine_manager.Cleanup()
199    else:
200      if test_flag.GetTestMode():
201        return
202
203      all_machines = self.locked_machines
204      if not all_machines:
205        return
206
207      # If we locked any machines earlier, make sure we unlock them now.
208      lock_mgr = afe_lock_machine.AFELockManager(
209          all_machines, '', self.labels[0].chromeos_root, None)
210      machine_states = lock_mgr.GetMachineStates('unlock')
211      for k, state in machine_states.iteritems():
212        if state['locked']:
213          lock_mgr.UpdateLockInAFE(False, k)
214