1#!/usr/bin/env python3 2# Copyright (C) 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox 16 17It also handles timeouts and graceful container termination. 18''' 19 20import logging 21import os 22import random 23import signal 24import socket 25import subprocess 26import threading 27import time 28import traceback 29 30from config import DB, JOB_TIMEOUT_SEC 31from common_utils import req, utc_now_iso, init_logging 32from common_utils import ConcurrentModificationError, SCOPES 33 34CUR_DIR = os.path.dirname(__file__) 35SCOPES.append('https://www.googleapis.com/auth/firebase.database') 36SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 37WORKER_NAME = '%s-%s' % (os.getenv('WORKER_HOST', 'local').split('-')[-1], 38 socket.gethostname()) 39sigterm = threading.Event() 40 41 42def try_acquire_job(job_id): 43 ''' Transactionally acquire the given job. 44 45 Returns the job JSON object if it managed to acquire and put it into the 46 STARTED state, None if another worker got there first. 47 ''' 48 logging.debug('Trying to acquire job %s', job_id) 49 50 uri = '%s/jobs/%s.json' % (DB, job_id) 51 job, etag = req('GET', uri, req_etag=True) 52 if job['status'] != 'QUEUED': 53 return None # Somebody else took it 54 try: 55 job['status'] = 'STARTED' 56 job['time_started'] = utc_now_iso() 57 job['worker'] = WORKER_NAME 58 req('PUT', uri, body=job, etag=etag) 59 return job 60 except ConcurrentModificationError: 61 return None 62 63 64def make_worker_obj(status, job_id=None): 65 return { 66 'job_id': job_id, 67 'status': status, 68 'last_update': utc_now_iso(), 69 'host': os.getenv('WORKER_HOST', '') 70 } 71 72 73def worker_loop(): 74 ''' Pulls a job from the queue and runs it invoking run_job.py ''' 75 uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=10' % DB 76 jobs = req('GET', uri) 77 if not jobs: 78 return 79 80 # Work out the worker number from the hostname. We try to distribute the load 81 # (via the time.sleep below) so that we fill first all the worker-1 of each 82 # vm, then worker-2 and so on. This is designed so that if there is only one 83 # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the 84 # cpu efficiency of each VM. 85 try: 86 worker_num = int(socket.gethostname().split('-')[-1]) 87 except ValueError: 88 worker_num = 1 89 90 # Transactionally acquire a job. Deal with races (two workers trying to 91 # acquire the same job). 92 job = None 93 job_id = None 94 for job_id in sorted(jobs.keys(), reverse=True): 95 job = try_acquire_job(job_id) 96 if job is not None: 97 break 98 logging.info('Raced while trying to acquire job %s, retrying', job_id) 99 time.sleep(worker_num * 2 + random.random()) 100 if job is None: 101 logging.error('Failed to acquire a job') 102 return 103 104 logging.info('Starting job %s', job_id) 105 106 # Update the db, move the job to the running queue. 107 patch_obj = { 108 'jobs_queued/' + job_id: {}, # = DELETE 109 'jobs_running/' + job_id: { 110 'worker': WORKER_NAME 111 }, 112 'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id) 113 } 114 req('PATCH', '%s.json' % DB, body=patch_obj) 115 116 cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id] 117 118 # Propagate the worker's PERFETTO_ vars and merge with the job-specific vars. 119 env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()}) 120 job_runner = subprocess.Popen(cmd, env=env) 121 122 # Run the job in a python subprocess, to isolate the main loop from logs 123 # uploader failures. 124 res = None 125 cancelled = False 126 timed_out = False 127 time_started = time.time() 128 time_last_db_poll = time_started 129 polled_status = 'STARTED' 130 while res is None: 131 time.sleep(0.25) 132 res = job_runner.poll() 133 now = time.time() 134 if now - time_last_db_poll > 10: # Throttle DB polling. 135 polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 136 time_last_db_poll = now 137 if now - time_started > JOB_TIMEOUT_SEC: 138 logging.info('Job %s timed out, terminating', job_id) 139 timed_out = True 140 job_runner.terminate() 141 if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled: 142 logging.info('Job %s cancelled, terminating', job_id) 143 cancelled = True 144 job_runner.terminate() 145 146 status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else 147 'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED') 148 logging.info('Job %s %s with code %s', job_id, status, res) 149 150 # Update the DB, unless the job has been cancelled. The "is not None" 151 # condition deals with a very niche case, that is, avoid creating a partial 152 # job entry after doing a full clear of the DB (which is super rare, happens 153 # only when re-deploying the CI). 154 if polled_status is not None: 155 patch = { 156 'jobs/%s/status' % job_id: status, 157 'jobs/%s/exit_code' % job_id: {} if res is None else res, 158 'jobs/%s/time_ended' % job_id: utc_now_iso(), 159 'jobs_running/%s' % job_id: {}, # = DELETE 160 } 161 req('PATCH', '%s.json' % (DB), body=patch) 162 163 164def sig_handler(_, __): 165 logging.warning('Interrupted by signal, exiting worker') 166 sigterm.set() 167 168 169def main(): 170 init_logging() 171 logging.info('Worker started') 172 signal.signal(signal.SIGTERM, sig_handler) 173 signal.signal(signal.SIGINT, sig_handler) 174 175 while not sigterm.is_set(): 176 logging.debug('Starting poll cycle') 177 try: 178 worker_loop() 179 req('PUT', 180 '%s/workers/%s.json' % (DB, WORKER_NAME), 181 body=make_worker_obj('IDLE')) 182 except: 183 logging.error('Exception in worker loop:\n%s', traceback.format_exc()) 184 if sigterm.is_set(): 185 break 186 187 # Synchronize sleeping with the wall clock. This is so all VMs wake up at 188 # the same time. See comment on distributing load above in this file. 189 poll_time_sec = 5 190 time.sleep(poll_time_sec - (time.time() % poll_time_sec)) 191 192 # The use case here is the VM being terminated by the GCE infrastructure. 193 # We mark the worker as terminated and the job as cancelled so we don't wait 194 # forever for it. 195 logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set()) 196 req('PUT', 197 '%s/workers/%s.json' % (DB, WORKER_NAME), 198 body=make_worker_obj('TERMINATED')) 199 200 201if __name__ == '__main__': 202 main() 203