• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright (C) 2019 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox
16
17It also handles timeouts and graceful container termination.
18'''
19
20import logging
21import os
22import random
23import signal
24import socket
25import subprocess
26import threading
27import time
28import traceback
29
30from config import DB, JOB_TIMEOUT_SEC
31from common_utils import req, utc_now_iso, init_logging
32from common_utils import ConcurrentModificationError, SCOPES
33
34CUR_DIR = os.path.dirname(__file__)
35SCOPES.append('https://www.googleapis.com/auth/firebase.database')
36SCOPES.append('https://www.googleapis.com/auth/userinfo.email')
37WORKER_NAME = '%s-%s' % (os.getenv('WORKER_HOST', 'local').split('-')[-1],
38                         socket.gethostname())
39sigterm = threading.Event()
40
41
42def try_acquire_job(job_id):
43  ''' Transactionally acquire the given job.
44
45  Returns the job JSON object if it managed to acquire and put it into the
46  STARTED state, None if another worker got there first.
47  '''
48  logging.debug('Trying to acquire job %s', job_id)
49
50  uri = '%s/jobs/%s.json' % (DB, job_id)
51  job, etag = req('GET', uri, req_etag=True)
52  if job['status'] != 'QUEUED':
53    return None  # Somebody else took it
54  try:
55    job['status'] = 'STARTED'
56    job['time_started'] = utc_now_iso()
57    job['worker'] = WORKER_NAME
58    req('PUT', uri, body=job, etag=etag)
59    return job
60  except ConcurrentModificationError:
61    return None
62
63
64def make_worker_obj(status, job_id=None):
65  return {
66      'job_id': job_id,
67      'status': status,
68      'last_update': utc_now_iso(),
69      'host': os.getenv('WORKER_HOST', '')
70  }
71
72
73def worker_loop():
74  ''' Pulls a job from the queue and runs it invoking run_job.py  '''
75  uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=10' % DB
76  jobs = req('GET', uri)
77  if not jobs:
78    return
79
80  # Work out the worker number from the hostname. We try to distribute the load
81  # (via the time.sleep below) so that we fill first all the worker-1 of each
82  # vm, then worker-2 and so on. This is designed so that if there is only one
83  # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the
84  # cpu efficiency of each VM.
85  try:
86    worker_num = int(socket.gethostname().split('-')[-1])
87  except ValueError:
88    worker_num = 1
89
90  # Transactionally acquire a job. Deal with races (two workers trying to
91  # acquire the same job).
92  job = None
93  job_id = None
94  for job_id in sorted(jobs.keys(), reverse=True):
95    job = try_acquire_job(job_id)
96    if job is not None:
97      break
98    logging.info('Raced while trying to acquire job %s, retrying', job_id)
99    time.sleep(worker_num * 2 + random.random())
100  if job is None:
101    logging.error('Failed to acquire a job')
102    return
103
104  logging.info('Starting job %s', job_id)
105
106  # Update the db, move the job to the running queue.
107  patch_obj = {
108      'jobs_queued/' + job_id: {},  # = DELETE
109      'jobs_running/' + job_id: {
110          'worker': WORKER_NAME
111      },
112      'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id)
113  }
114  req('PATCH', '%s.json' % DB, body=patch_obj)
115
116  cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id]
117
118  # Propagate the worker's PERFETTO_  vars and merge with the job-specific vars.
119  env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()})
120  job_runner = subprocess.Popen(cmd, env=env)
121
122  # Run the job in a python subprocess, to isolate the main loop from logs
123  # uploader failures.
124  res = None
125  cancelled = False
126  timed_out = False
127  time_started = time.time()
128  time_last_db_poll = time_started
129  polled_status = 'STARTED'
130  while res is None:
131    time.sleep(0.25)
132    res = job_runner.poll()
133    now = time.time()
134    if now - time_last_db_poll > 10:  # Throttle DB polling.
135      polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id))
136      time_last_db_poll = now
137    if now - time_started > JOB_TIMEOUT_SEC:
138      logging.info('Job %s timed out, terminating', job_id)
139      timed_out = True
140      job_runner.terminate()
141    if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled:
142      logging.info('Job %s cancelled, terminating', job_id)
143      cancelled = True
144      job_runner.terminate()
145
146  status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else
147            'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED')
148  logging.info('Job %s %s with code %s', job_id, status, res)
149
150  # Update the DB, unless the job has been cancelled. The "is not None"
151  # condition deals with a very niche case, that is, avoid creating a partial
152  # job entry after doing a full clear of the DB (which is super rare, happens
153  # only when re-deploying the CI).
154  if polled_status is not None:
155    patch = {
156        'jobs/%s/status' % job_id: status,
157        'jobs/%s/exit_code' % job_id: {} if res is None else res,
158        'jobs/%s/time_ended' % job_id: utc_now_iso(),
159        'jobs_running/%s' % job_id: {},  # = DELETE
160    }
161    req('PATCH', '%s.json' % (DB), body=patch)
162
163
164def sig_handler(_, __):
165  logging.warning('Interrupted by signal, exiting worker')
166  sigterm.set()
167
168
169def main():
170  init_logging()
171  logging.info('Worker started')
172  signal.signal(signal.SIGTERM, sig_handler)
173  signal.signal(signal.SIGINT, sig_handler)
174
175  while not sigterm.is_set():
176    logging.debug('Starting poll cycle')
177    try:
178      worker_loop()
179      req('PUT',
180          '%s/workers/%s.json' % (DB, WORKER_NAME),
181          body=make_worker_obj('IDLE'))
182    except:
183      logging.error('Exception in worker loop:\n%s', traceback.format_exc())
184    if sigterm.is_set():
185      break
186
187    # Synchronize sleeping with the wall clock. This is so all VMs wake up at
188    # the same time. See comment on distributing load above in this file.
189    poll_time_sec = 5
190    time.sleep(poll_time_sec - (time.time() % poll_time_sec))
191
192  # The use case here is the VM being terminated by the GCE infrastructure.
193  # We mark the worker as terminated and the job as cancelled so we don't wait
194  # forever for it.
195  logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set())
196  req('PUT',
197      '%s/workers/%s.json' % (DB, WORKER_NAME),
198      body=make_worker_obj('TERMINATED'))
199
200
201if __name__ == '__main__':
202  main()
203