• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2010 Google Inc. All Rights Reserved.
2#
3"""A module for a job in the infrastructure."""
4
5__author__ = 'raymes@google.com (Raymes Khoury)'
6
7import os.path
8
9from automation.common import state_machine
10
11STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
12STATUS_SETUP = 'SETUP'
13STATUS_COPYING = 'COPYING'
14STATUS_RUNNING = 'RUNNING'
15STATUS_SUCCEEDED = 'SUCCEEDED'
16STATUS_FAILED = 'FAILED'
17
18
19class FolderDependency(object):
20
21  def __init__(self, job, src, dest=None):
22    if not dest:
23      dest = src
24
25    # TODO(kbaclawski): rename to producer
26    self.job = job
27    self.src = src
28    self.dest = dest
29
30  @property
31  def read_only(self):
32    return self.dest == self.src
33
34
35class JobStateMachine(state_machine.BasicStateMachine):
36  state_machine = {
37      STATUS_NOT_EXECUTED: [STATUS_SETUP],
38      STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
39      STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
40      STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]
41  }
42
43  final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
44
45
46class JobFailure(Exception):
47
48  def __init__(self, message, exit_code):
49    Exception.__init__(self, message)
50    self.exit_code = exit_code
51
52
53class Job(object):
54  """A class representing a job whose commands will be executed."""
55
56  WORKDIR_PREFIX = '/usr/local/google/tmp/automation'
57
58  def __init__(self, label, command, timeout=4 * 60 * 60):
59    self._state = JobStateMachine(STATUS_NOT_EXECUTED)
60    self.predecessors = set()
61    self.successors = set()
62    self.machine_dependencies = []
63    self.folder_dependencies = []
64    self.id = 0
65    self.machines = []
66    self.command = command
67    self._has_primary_machine_spec = False
68    self.group = None
69    self.dry_run = None
70    self.label = label
71    self.timeout = timeout
72
73  def _StateGet(self):
74    return self._state
75
76  def _StateSet(self, new_state):
77    self._state.Change(new_state)
78
79  status = property(_StateGet, _StateSet)
80
81  @property
82  def timeline(self):
83    return self._state.timeline
84
85  def __repr__(self):
86    return '{%s: %s}' % (self.__class__.__name__, self.id)
87
88  def __str__(self):
89    res = []
90    res.append('%d' % self.id)
91    res.append('Predecessors:')
92    res.extend(['%d' % pred.id for pred in self.predecessors])
93    res.append('Successors:')
94    res.extend(['%d' % succ.id for succ in self.successors])
95    res.append('Machines:')
96    res.extend(['%s' % machine for machine in self.machines])
97    res.append(self.PrettyFormatCommand())
98    res.append('%s' % self.status)
99    res.append(self.timeline.GetTransitionEventReport())
100    return '\n'.join(res)
101
102  @staticmethod
103  def _FormatCommand(cmd, substitutions):
104    for pattern, replacement in substitutions:
105      cmd = cmd.replace(pattern, replacement)
106
107    return cmd
108
109  def GetCommand(self):
110    substitutions = [
111        ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir),
112        ('$JOB_HOME', self.home_dir),
113        ('$PRIMARY_MACHINE', self.primary_machine.hostname)
114    ]
115
116    if len(self.machines) > 1:
117      for num, machine in enumerate(self.machines[1:]):
118        substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname
119                             ))
120
121    return self._FormatCommand(str(self.command), substitutions)
122
123  def PrettyFormatCommand(self):
124    # TODO(kbaclawski): This method doesn't belong here, but rather to
125    # non existing Command class. If one is created then PrettyFormatCommand
126    # shall become its method.
127    return self._FormatCommand(self.GetCommand(), [
128        ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')
129    ])
130
131  def DependsOnFolder(self, dependency):
132    self.folder_dependencies.append(dependency)
133    self.DependsOn(dependency.job)
134
135  @property
136  def results_dir(self):
137    return os.path.join(self.work_dir, 'results')
138
139  @property
140  def logs_dir(self):
141    return os.path.join(self.home_dir, 'logs')
142
143  @property
144  def log_filename_prefix(self):
145    return 'job-%d.log' % self.id
146
147  @property
148  def work_dir(self):
149    return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)
150
151  @property
152  def home_dir(self):
153    return os.path.join(self.group.home_dir, 'job-%d' % self.id)
154
155  @property
156  def primary_machine(self):
157    return self.machines[0]
158
159  def DependsOn(self, job):
160    """Specifies Jobs to be finished before this job can be launched."""
161    self.predecessors.add(job)
162    job.successors.add(self)
163
164  @property
165  def is_ready(self):
166    """Check that all our dependencies have been executed."""
167    return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)
168
169  def DependsOnMachine(self, machine_spec, primary=True):
170    # Job will run on arbitrarily chosen machine specified by
171    # MachineSpecification class instances passed to this method.
172    if primary:
173      if self._has_primary_machine_spec:
174        raise RuntimeError('Only one primary machine specification allowed.')
175      self._has_primary_machine_spec = True
176      self.machine_dependencies.insert(0, machine_spec)
177    else:
178      self.machine_dependencies.append(machine_spec)
179