1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Monitor jobs and abort them as necessary. 6 7This daemon does a number of upkeep tasks: 8 9* When a process owning a job crashes, job_aborter will mark the job as 10 aborted in the database and clean up its lease files. 11 12* When a job is marked aborted in the database, job_aborter will signal 13 the process owning the job to abort. 14 15See also http://goto.google.com/monitor_db_per_job_refactor 16""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import print_function 21 22import argparse 23import logging 24import sys 25import time 26 27from lucifer import autotest 28from lucifer import handoffs 29from lucifer import leasing 30from lucifer import loglib 31 32logger = logging.getLogger(__name__) 33 34 35def main(args): 36 """Main function 37 38 @param args: list of command line args 39 """ 40 41 parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__) 42 parser.add_argument('--jobdir', required=True) 43 loglib.add_logging_options(parser) 44 args = parser.parse_args(args) 45 loglib.configure_logging_with_args(parser, args) 46 logger.info('Starting with args: %r', args) 47 48 autotest.monkeypatch() 49 ts_mon_config = autotest.chromite_load('ts_mon_config') 50 with ts_mon_config.SetupTsMonGlobalState('job_aborter'): 51 _main_loop(jobdir=args.jobdir) 52 assert False # cannot exit normally 53 54 55def _main_loop(jobdir): 56 transaction = autotest.deps_load('django.db.transaction') 57 58 @transaction.commit_manually 59 def flush_transaction(): 60 """Flush transaction https://stackoverflow.com/questions/3346124/""" 61 transaction.commit() 62 63 metrics = _Metrics() 64 metrics.send_starting() 65 while True: 66 logger.debug('Tick') 67 metrics.send_tick() 68 _main_loop_body(metrics, jobdir) 69 flush_transaction() 70 time.sleep(20) 71 72 73def _main_loop_body(metrics, jobdir): 74 active_leases = { 75 lease.id: lease for lease in leasing.leases_iter(jobdir) 76 if not lease.expired() 77 } 78 _mark_expired_jobs_failed(metrics, active_leases) 79 _abort_timed_out_jobs(active_leases) 80 _abort_jobs_marked_aborting(active_leases) 81 _abort_special_tasks_marked_aborted() 82 _clean_up_expired_leases(jobdir) 83 # TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into 84 # lucifer_run_job 85 86 87def _mark_expired_jobs_failed(metrics, active_leases): 88 """Mark expired jobs failed. 89 90 Expired jobs are jobs that have an incomplete JobHandoff and that do 91 not have an active lease. These jobs have been handed off to a 92 job_reporter, but that job_reporter has crashed. These jobs are 93 marked failed in the database. 94 95 @param metrics: _Metrics instance. 96 @param active_leases: dict mapping job ids to Leases. 97 """ 98 logger.debug('Looking for expired jobs') 99 job_ids = [] 100 for handoff in handoffs.incomplete(): 101 logger.debug('Found handoff: %d', handoff.job_id) 102 if handoff.job_id not in active_leases: 103 logger.debug('Handoff %d is missing active lease', handoff.job_id) 104 job_ids.append(handoff.job_id) 105 handoffs.clean_up(job_ids) 106 handoffs.mark_complete(job_ids) 107 metrics.send_expired_jobs(len(job_ids)) 108 109 110def _abort_timed_out_jobs(active_leases): 111 """Send abort to timed out jobs. 112 113 @param active_leases: dict mapping job ids to Leases. 114 """ 115 for job in _timed_out_jobs_queryset(): 116 if job.id in active_leases: 117 active_leases[job.id].maybe_abort() 118 119 120def _abort_jobs_marked_aborting(active_leases): 121 """Send abort to jobs marked aborting in Autotest database. 122 123 @param active_leases: dict mapping job ids to Leases. 124 """ 125 for job in _aborting_jobs_queryset(): 126 if job.id in active_leases: 127 active_leases[job.id].maybe_abort() 128 129 130def _abort_special_tasks_marked_aborted(): 131 # TODO(crbug.com/748234): Special tasks not implemented yet. This 132 # would abort jobs running on the behalf of special tasks and thus 133 # need to check a different database table. 134 pass 135 136 137def _clean_up_expired_leases(jobdir): 138 """Clean up files for expired leases. 139 140 We only care about active leases, so we can remove the stale files 141 for expired leases. 142 """ 143 for lease in leasing.leases_iter(jobdir): 144 if lease.expired(): 145 lease.cleanup() 146 147 148def _timed_out_jobs_queryset(): 149 """Return a QuerySet of timed out Jobs. 150 151 @returns: Django QuerySet 152 """ 153 models = autotest.load('frontend.afe.models') 154 return ( 155 models.Job.objects 156 .filter(hostqueueentry__complete=False) 157 .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) 158 .distinct() 159 ) 160 161 162def _aborting_jobs_queryset(): 163 """Return a QuerySet of aborting Jobs. 164 165 @returns: Django QuerySet 166 """ 167 models = autotest.load('frontend.afe.models') 168 return ( 169 models.Job.objects 170 .filter(hostqueueentry__aborted=True) 171 .filter(hostqueueentry__complete=False) 172 .distinct() 173 ) 174 175 176class _Metrics(object): 177 178 """Class for sending job_aborter metrics.""" 179 180 def __init__(self): 181 metrics = autotest.chromite_load('metrics') 182 prefix = 'chromeos/lucifer/job_aborter' 183 self._starting_m = metrics.Counter(prefix + '/start') 184 self._tick_m = metrics.Counter(prefix + '/tick') 185 self._expired_m = metrics.Counter(prefix + '/expired_jobs') 186 187 def send_starting(self): 188 """Send starting metric.""" 189 self._starting_m.increment() 190 191 def send_tick(self): 192 """Send tick metric.""" 193 self._tick_m.increment() 194 195 def send_expired_jobs(self, count): 196 """Send expired_jobs metric.""" 197 self._expired_m.increment_by(count) 198 199 200if __name__ == '__main__': 201 main(sys.argv[1:]) 202