1#!/usr/bin/env python2 2# Copyright (C) 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox 16 17It also handles timeouts and graceful container termination. 18''' 19 20import logging 21import os 22import random 23import signal 24import socket 25import subprocess 26import threading 27import time 28import traceback 29 30from config import DB, JOB_TIMEOUT_SEC 31from common_utils import req, utc_now_iso, init_logging 32from common_utils import ConcurrentModificationError, SCOPES 33 34CUR_DIR = os.path.dirname(__file__) 35SCOPES.append('https://www.googleapis.com/auth/firebase.database') 36SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 37WORKER_NAME = '%s-%s' % (os.getenv('WORKER_HOST', 'local').split('-')[-1], 38 socket.gethostname()) 39sigterm = threading.Event() 40 41 42def try_acquire_job(job_id): 43 ''' Transactionally acquire the given job. 44 45 Returns the job JSON object if it managed to acquire and put it into the 46 STARTED state, None if another worker got there first. 47 ''' 48 logging.debug('Trying to acquire job %s', job_id) 49 50 uri = '%s/jobs/%s.json' % (DB, job_id) 51 job, etag = req('GET', uri, req_etag=True) 52 if job['status'] != 'QUEUED': 53 return None # Somebody else took it 54 try: 55 job['status'] = 'STARTED' 56 job['time_started'] = utc_now_iso() 57 job['worker'] = WORKER_NAME 58 req('PUT', uri, body=job, etag=etag) 59 return job 60 except ConcurrentModificationError: 61 return None 62 63 64def make_worker_obj(status, job_id=None): 65 return { 66 'job_id': job_id, 67 'status': status, 68 'last_update': utc_now_iso(), 69 'host': os.getenv('WORKER_HOST', '') 70 } 71 72 73def worker_loop(): 74 ''' Pulls a job from the queue and runs it invoking run_job.py ''' 75 uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=10' % DB 76 jobs = req('GET', uri) 77 if not jobs: 78 return 79 80 # Transactionally acquire a job. Deal with races (two workers trying to 81 # acquire the same job). 82 job = None 83 job_id = None 84 for job_id in sorted(jobs.keys(), reverse=True): 85 job = try_acquire_job(job_id) 86 if job is not None: 87 break 88 logging.info('Raced while trying to acquire job %s, retrying', job_id) 89 time.sleep(int(random.random() * 3)) 90 if job is None: 91 logging.error('Failed to acquire a job') 92 return 93 94 logging.info('Starting job %s', job_id) 95 96 # Update the db, move the job to the running queue. 97 patch_obj = { 98 'jobs_queued/' + job_id: {}, # = DELETE 99 'jobs_running/' + job_id: { 100 'worker': WORKER_NAME 101 }, 102 'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id) 103 } 104 req('PATCH', '%s.json' % DB, body=patch_obj) 105 106 cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id] 107 108 # Propagate the worker's PERFETTO_ vars and merge with the job-specific vars. 109 env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].iteritems()}) 110 job_runner = subprocess.Popen(cmd, env=env) 111 112 # Run the job in a python subprocess, to isolate the main loop from logs 113 # uploader failures. 114 res = None 115 cancelled = False 116 timed_out = False 117 time_started = time.time() 118 time_last_db_poll = time_started 119 polled_status = 'STARTED' 120 while res is None: 121 time.sleep(0.25) 122 res = job_runner.poll() 123 now = time.time() 124 if now - time_last_db_poll > 10: # Throttle DB polling. 125 polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 126 time_last_db_poll = now 127 if now - time_started > JOB_TIMEOUT_SEC: 128 logging.info('Job %s timed out, terminating', job_id) 129 timed_out = True 130 job_runner.terminate() 131 if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled: 132 logging.info('Job %s cancelled, terminating', job_id) 133 cancelled = True 134 job_runner.terminate() 135 136 status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else 137 'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED') 138 logging.info('Job %s %s with code %s', job_id, status, res) 139 140 # Update the DB, unless the job has been cancelled. The "is not None" 141 # condition deals with a very niche case, that is, avoid creating a partial 142 # job entry after doing a full clear of the DB (which is super rare, happens 143 # only when re-deploying the CI). 144 if polled_status is not None: 145 patch = { 146 'jobs/%s/status' % job_id: status, 147 'jobs/%s/exit_code' % job_id: {} if res is None else res, 148 'jobs/%s/time_ended' % job_id: utc_now_iso(), 149 'jobs_running/%s' % job_id: {}, # = DELETE 150 } 151 req('PATCH', '%s.json' % (DB), body=patch) 152 153 154def sig_handler(_, __): 155 logging.warn('Interrupted by signal, exiting worker') 156 sigterm.set() 157 158 159def main(): 160 init_logging() 161 logging.info('Worker started') 162 signal.signal(signal.SIGTERM, sig_handler) 163 signal.signal(signal.SIGINT, sig_handler) 164 165 while not sigterm.is_set(): 166 logging.debug('Starting poll cycle') 167 try: 168 worker_loop() 169 req('PUT', 170 '%s/workers/%s.json' % (DB, WORKER_NAME), 171 body=make_worker_obj('IDLE')) 172 except: 173 logging.error('Exception in worker loop:\n%s', traceback.format_exc()) 174 if sigterm.is_set(): 175 break 176 time.sleep(5) 177 178 # The use case here is the VM being terminated by the GCE infrastructure. 179 # We mark the worker as terminated and the job as cancelled so we don't wait 180 # forever for it. 181 logging.warn('Exiting the worker loop, got signal: %s', sigterm.is_set()) 182 req('PUT', 183 '%s/workers/%s.json' % (DB, WORKER_NAME), 184 body=make_worker_obj('TERMINATED')) 185 186 187if __name__ == '__main__': 188 main() 189