• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# Copyright (C) 2019 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox
16
17It also handles timeouts and graceful container termination.
18'''
19
20import logging
21import os
22import random
23import signal
24import socket
25import subprocess
26import threading
27import time
28import traceback
29
30from config import DB, JOB_TIMEOUT_SEC
31from common_utils import req, utc_now_iso, init_logging
32from common_utils import ConcurrentModificationError, SCOPES
33
34CUR_DIR = os.path.dirname(__file__)
35SCOPES.append('https://www.googleapis.com/auth/firebase.database')
36SCOPES.append('https://www.googleapis.com/auth/userinfo.email')
37WORKER_NAME = '%s-%s' % (os.getenv('WORKER_HOST', 'local').split('-')[-1],
38                         socket.gethostname())
39sigterm = threading.Event()
40
41
42def try_acquire_job(job_id):
43  ''' Transactionally acquire the given job.
44
45  Returns the job JSON object if it managed to acquire and put it into the
46  STARTED state, None if another worker got there first.
47  '''
48  logging.debug('Trying to acquire job %s', job_id)
49
50  uri = '%s/jobs/%s.json' % (DB, job_id)
51  job, etag = req('GET', uri, req_etag=True)
52  if job['status'] != 'QUEUED':
53    return None  # Somebody else took it
54  try:
55    job['status'] = 'STARTED'
56    job['time_started'] = utc_now_iso()
57    job['worker'] = WORKER_NAME
58    req('PUT', uri, body=job, etag=etag)
59    return job
60  except ConcurrentModificationError:
61    return None
62
63
64def make_worker_obj(status, job_id=None):
65  return {
66      'job_id': job_id,
67      'status': status,
68      'last_update': utc_now_iso(),
69      'host': os.getenv('WORKER_HOST', '')
70  }
71
72
73def worker_loop():
74  ''' Pulls a job from the queue and runs it invoking run_job.py  '''
75  uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=10' % DB
76  jobs = req('GET', uri)
77  if not jobs:
78    return
79
80  # Transactionally acquire a job. Deal with races (two workers trying to
81  # acquire the same job).
82  job = None
83  job_id = None
84  for job_id in sorted(jobs.keys(), reverse=True):
85    job = try_acquire_job(job_id)
86    if job is not None:
87      break
88    logging.info('Raced while trying to acquire job %s, retrying', job_id)
89    time.sleep(int(random.random() * 3))
90  if job is None:
91    logging.error('Failed to acquire a job')
92    return
93
94  logging.info('Starting job %s', job_id)
95
96  # Update the db, move the job to the running queue.
97  patch_obj = {
98      'jobs_queued/' + job_id: {},  # = DELETE
99      'jobs_running/' + job_id: {
100          'worker': WORKER_NAME
101      },
102      'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id)
103  }
104  req('PATCH', '%s.json' % DB, body=patch_obj)
105
106  cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id]
107
108  # Propagate the worker's PERFETTO_  vars and merge with the job-specific vars.
109  env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].iteritems()})
110  job_runner = subprocess.Popen(cmd, env=env)
111
112  # Run the job in a python subprocess, to isolate the main loop from logs
113  # uploader failures.
114  res = None
115  cancelled = False
116  timed_out = False
117  time_started = time.time()
118  time_last_db_poll = time_started
119  polled_status = 'STARTED'
120  while res is None:
121    time.sleep(0.25)
122    res = job_runner.poll()
123    now = time.time()
124    if now - time_last_db_poll > 10:  # Throttle DB polling.
125      polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id))
126      time_last_db_poll = now
127    if now - time_started > JOB_TIMEOUT_SEC:
128      logging.info('Job %s timed out, terminating', job_id)
129      timed_out = True
130      job_runner.terminate()
131    if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled:
132      logging.info('Job %s cancelled, terminating', job_id)
133      cancelled = True
134      job_runner.terminate()
135
136  status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else
137            'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED')
138  logging.info('Job %s %s with code %s', job_id, status, res)
139
140  # Update the DB, unless the job has been cancelled. The "is not None"
141  # condition deals with a very niche case, that is, avoid creating a partial
142  # job entry after doing a full clear of the DB (which is super rare, happens
143  # only when re-deploying the CI).
144  if polled_status is not None:
145    patch = {
146        'jobs/%s/status' % job_id: status,
147        'jobs/%s/exit_code' % job_id: {} if res is None else res,
148        'jobs/%s/time_ended' % job_id: utc_now_iso(),
149        'jobs_running/%s' % job_id: {},  # = DELETE
150    }
151    req('PATCH', '%s.json' % (DB), body=patch)
152
153
154def sig_handler(_, __):
155  logging.warn('Interrupted by signal, exiting worker')
156  sigterm.set()
157
158
159def main():
160  init_logging()
161  logging.info('Worker started')
162  signal.signal(signal.SIGTERM, sig_handler)
163  signal.signal(signal.SIGINT, sig_handler)
164
165  while not sigterm.is_set():
166    logging.debug('Starting poll cycle')
167    try:
168      worker_loop()
169      req('PUT',
170          '%s/workers/%s.json' % (DB, WORKER_NAME),
171          body=make_worker_obj('IDLE'))
172    except:
173      logging.error('Exception in worker loop:\n%s', traceback.format_exc())
174    if sigterm.is_set():
175      break
176    time.sleep(5)
177
178  # The use case here is the VM being terminated by the GCE infrastructure.
179  # We mark the worker as terminated and the job as cancelled so we don't wait
180  # forever for it.
181  logging.warn('Exiting the worker loop, got signal: %s', sigterm.is_set())
182  req('PUT',
183      '%s/workers/%s.json' % (DB, WORKER_NAME),
184      body=make_worker_obj('TERMINATED'))
185
186
187if __name__ == '__main__':
188  main()
189