1# Copyright (C) 2019 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import re 17import time 18import urllib 19 20from datetime import datetime, timedelta 21from google.appengine.api import taskqueue 22 23import webapp2 24 25from common_utils import req, utc_now_iso, parse_iso_time, SCOPES 26from config import DB, GERRIT_HOST, GERRIT_PROJECT, GERRIT_POLL_SEC, PROJECT 27from config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS 28from config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC 29from config import CL_TIMEOUT_SEC 30from stackdriver_metrics import STACKDRIVER_METRICS 31 32STACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT 33 34SCOPES.append('https://www.googleapis.com/auth/firebase.database') 35SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 36SCOPES.append('https://www.googleapis.com/auth/datastore') 37SCOPES.append('https://www.googleapis.com/auth/monitoring') 38SCOPES.append('https://www.googleapis.com/auth/monitoring.write') 39 40last_tick = 0 41 42# ------------------------------------------------------------------------------ 43# Misc utility functions 44# ------------------------------------------------------------------------------ 45 46 47def defer(action, **kwargs): 48 '''Appends a task to the deferred queue. 49 50 Each task will become a new HTTP request made by the AppEngine service. 51 This pattern is used extensively here for several reasons: 52 - Auditability in logs: it's easier to scrape logs and debug. 53 - Stability: an exception fails only the current task not the whole function. 54 - Reliability: The AppEngine runtime will retry failed tasks with exponential 55 backoff. 56 - Performance: tasks are run concurrently, which is quite important given that 57 most of them are bound by HTTP latency to Gerrit of Firebase. 58 ''' 59 taskqueue.add( 60 queue_name='deferred-jobs', 61 url='/controller/' + action, 62 params=kwargs, 63 method='GET') 64 65 66def create_stackdriver_metric_definitions(): 67 logging.info('Creating Stackdriver metric definitions') 68 for name, metric in STACKDRIVER_METRICS.iteritems(): 69 logging.info('Creating metric %s', name) 70 req('POST', STACKDRIVER_API + '/metricDescriptors', body=metric) 71 72 73def write_metrics(metric_dict): 74 now = utc_now_iso() 75 desc = {'timeSeries': []} 76 for key, spec in metric_dict.iteritems(): 77 desc['timeSeries'] += [{ 78 'metric': { 79 'type': STACKDRIVER_METRICS[key]['type'], 80 'labels': spec.get('l', {}) 81 }, 82 'resource': { 83 'type': 'global' 84 }, 85 'points': [{ 86 'interval': { 87 'endTime': now 88 }, 89 'value': { 90 'int64Value': str(spec['v']) 91 } 92 }] 93 }] 94 try: 95 req('POST', STACKDRIVER_API + '/timeSeries', body=desc) 96 except Exception as e: 97 # Metric updates can easily fail due to Stackdriver API limitations. 98 msg = str(e) 99 if 'written more frequently than the maximum sampling' not in msg: 100 logging.error('Metrics update failed: %s', msg) 101 102 103def is_trusted(email): 104 return re.match(TRUSTED_EMAILS, email) 105 106 107# ------------------------------------------------------------------------------ 108# Deferred job handlers 109# ------------------------------------------------------------------------------ 110 111 112def start(handler): 113 create_stackdriver_metric_definitions() 114 tick(handler) 115 116 117def tick(handler): 118 global last_tick 119 now = time.time() 120 # Avoid avalanching effects due to the failsafe tick job in cron.yaml. 121 if now - last_tick < GERRIT_POLL_SEC - 1: 122 return 123 taskqueue.add( 124 url='/controller/tick', queue_name='tick', countdown=GERRIT_POLL_SEC) 125 defer('check_new_cls') 126 defer('check_pending_cls') 127 defer('update_queue_metrics') 128 last_tick = now 129 130 131def check_new_cls(handler): 132 ''' Poll for new CLs and asynchronously enqueue jobs for them.''' 133 logging.info('Polling for new Gerrit CLs') 134 date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d') 135 url = 'https://%s/a/changes/' % GERRIT_HOST 136 url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=200' 137 url += '&q=branch:master+project:%s' % GERRIT_PROJECT 138 url += '+is:open+after:%s' % date_limit 139 resp = req('GET', url, gerrit=True) 140 for change in (change for change in resp if 'revisions' in change): 141 rev_hash = change['revisions'].keys()[0] 142 rev = change['revisions'][rev_hash] 143 owner = rev['uploader']['email'] 144 prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {}) 145 prs_owner = prs_ready.get('email', '') 146 # Only submit jobs for patchsets that are either uploaded by a trusted 147 # account or are marked as Presubmit-Verified by a trustd account. 148 if not is_trusted(owner) and not is_trusted(prs_owner): 149 continue 150 defer( 151 'check_new_cl', 152 cl=str(change['_number']), 153 patchset=str(rev['_number']), 154 change_id=change['id'], 155 rev_hash=rev_hash, 156 ref=rev['ref'], 157 owner=rev['uploader']['email'], 158 wants_vote='1' if prs_ready else '0') 159 160 161def append_jobs(patch_obj, src, git_ref, now=None): 162 '''Creates the worker jobs (defined in config.py) for the given CL. 163 164 Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers 165 pull jobs ordered by the key above). 166 It dosn't directly write into the DB, it just appends keys to the passed 167 |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically 168 to the datastore. 169 src: is cls/1234/1 (cl and patchset number). 170 ''' 171 logging.info('Enqueueing jobs fos cl %s', src) 172 timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S') 173 for cfg_name, env in JOB_CONFIGS.iteritems(): 174 job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name) 175 logging.info('Enqueueing job %s', job_id) 176 patch_obj['jobs/' + job_id] = { 177 'src': src, 178 'type': cfg_name, 179 'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref), 180 'status': 'QUEUED', 181 'time_queued': utc_now_iso(), 182 } 183 patch_obj['jobs_queued/' + job_id] = 0 184 patch_obj[src]['jobs'][job_id] = 0 185 186 187def check_new_cl(handler): 188 '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist 189 190 If exists check if a Presubmit-Ready label has been added and if so updates it 191 with the message + vote. 192 ''' 193 change_id = handler.request.get('change_id') 194 rev_hash = handler.request.get('rev_hash') 195 cl = handler.request.get('cl') 196 patchset = handler.request.get('patchset') 197 ref = handler.request.get('ref') 198 wants_vote = handler.request.get('wants_vote') == '1' 199 200 # We want to do two things here: 201 # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and 202 # enqueue jobs for it. 203 # 2) If the CL exists, we don't need to kick new jobs. However, the user 204 # might have addeed a Presubmit-Ready label after we created the CL. In 205 # this case update the |wants_vote| flag and return. 206 vote_prop = req('GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset)) 207 if vote_prop is not None: 208 if vote_prop != wants_vote and wants_vote: 209 logging.info('Updating wants_vote flag on %s-%s', cl, patchset) 210 req('PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True) 211 # If the label is applied after we have finished running all the jobs just 212 # jump straight to the voting. 213 defer('check_pending_cl', cl_and_ps='%s-%s' % (cl, patchset)) 214 return 215 216 # This is the first time we see this patchset, enqueue jobs for it. 217 218 # Dequeue jobs for older patchsets, if any. 219 defer('cancel_older_jobs', cl=cl, patchset=patchset) 220 221 src = 'cls/%s-%s' % (cl, patchset) 222 # Enqueue jobs for the latest patchset. 223 patch_obj = {} 224 patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0 225 patch_obj[src] = { 226 'change_id': change_id, 227 'revision_id': rev_hash, 228 'time_queued': utc_now_iso(), 229 'jobs': {}, 230 'wants_vote': wants_vote, 231 } 232 append_jobs(patch_obj, src, ref) 233 req('PATCH', DB + '.json', body=patch_obj) 234 235 236def cancel_older_jobs(handler): 237 cl = handler.request.get('cl') 238 patchset = handler.request.get('patchset') 239 first_key = '%s-0' % cl 240 last_key = '%s-z' % cl 241 filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key) 242 cl_objs = req('GET', '%s/cls.json?%s' % (DB, filt)) or {} 243 for cl_and_ps, cl_obj in cl_objs.iteritems(): 244 ps = int(cl_and_ps.split('-')[-1]) 245 if cl_obj.get('time_ended') or ps >= int(patchset): 246 continue 247 logging.info('Cancelling jobs for previous patchset %s', cl_and_ps) 248 map(lambda x: defer('cancel_job', job_id=x), cl_obj['jobs'].keys()) 249 250 251def check_pending_cls(handler): 252 # Check if any pending CL has completed (all jobs are done). If so publish 253 # the comment and vote on the CL. 254 pending_cls = req('GET', '%s/cls_pending.json' % DB) or {} 255 for cl_and_ps, _ in pending_cls.iteritems(): 256 defer('check_pending_cl', cl_and_ps=cl_and_ps) 257 258 259def check_pending_cl(handler): 260 # This function can be called twice on the same CL, e.g., in the case when the 261 # Presubmit-Ready label is applied after we have finished running all the 262 # jobs (we run presubmit regardless, only the voting is conditioned by PR). 263 cl_and_ps = handler.request.get('cl_and_ps') 264 cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 265 all_jobs = cl_obj.get('jobs', {}).keys() 266 pending_jobs = [] 267 for job_id in all_jobs: 268 job_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 269 pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else [] 270 271 if pending_jobs: 272 # If the CL has been pending for too long cancel all its jobs. Upon the next 273 # scan it will be deleted and optionally voted on. 274 t_queued = parse_iso_time(cl_obj['time_queued']) 275 age_sec = (datetime.utcnow() - t_queued).total_seconds() 276 if age_sec > CL_TIMEOUT_SEC: 277 logging.warning('Canceling %s, it has been pending for too long (%s sec)', 278 cl_and_ps, int(age_sec)) 279 map(lambda x: defer('cancel_job', job_id=x), pending_jobs) 280 return 281 282 logging.info('All jobs completed for CL %s', cl_and_ps) 283 284 # Remove the CL from the pending queue and update end time. 285 patch_obj = { 286 'cls_pending/%s' % cl_and_ps: {}, # = DELETE 287 'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()), 288 } 289 req('PATCH', '%s.json' % DB, body=patch_obj) 290 defer('update_cl_metrics', src='cls/' + cl_and_ps) 291 map(lambda x: defer('update_job_metrics', job_id=x), all_jobs) 292 if cl_obj.get('wants_vote'): 293 defer('comment_and_vote_cl', cl_and_ps=cl_and_ps) 294 295 296def comment_and_vote_cl(handler): 297 cl_and_ps = handler.request.get('cl_and_ps') 298 cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 299 300 if cl_obj.get('voted'): 301 logging.error('Already voted on CL %s', cl_and_ps) 302 return 303 304 if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED: 305 logging.info('Skipping voting on CL %s', cl_and_ps) 306 return 307 308 cl_vote = 1 309 passed_jobs = [] 310 failed_jobs = {} 311 ui_links = [] 312 cancelled = False 313 for job_id in cl_obj['jobs'].keys(): 314 job_obj = req('GET', '%s/jobs/%s.json' % (DB, job_id)) 315 job_config = JOB_CONFIGS.get(job_obj['type'], {}) 316 if job_obj['status'] == 'CANCELLED': 317 cancelled = True 318 if '-ui-' in job_id: 319 ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' % 320 (GCS_ARTIFACTS, job_id)) 321 if job_obj['status'] == 'COMPLETED': 322 passed_jobs.append(job_id) 323 elif not job_config.get('SKIP_VOTING', False): 324 cl_vote = -1 325 failed_jobs[job_id] = job_obj['status'] 326 327 msg = '' 328 if cancelled: 329 msg += 'Some jobs in this CI run were cancelled. This likely happened ' 330 msg += 'because a new patchset has been uploaded. Skipping vote.\n' 331 log_url = CI_SITE + '/#!/logs' 332 if failed_jobs: 333 msg += 'FAIL:\n' 334 msg += ''.join([ 335 ' %s/%s (%s)\n' % (log_url, job_id, status) 336 for (job_id, status) in failed_jobs.iteritems() 337 ]) 338 if passed_jobs: 339 msg += 'PASS:\n' 340 msg += ''.join([' %s/%s\n' % (log_url, job_id) for job_id in passed_jobs]) 341 if ui_links: 342 msg += 'Artifacts:\n' + ''.join(' %s\n' % link for link in ui_links) 343 msg += 'CI page for this CL:\n' 344 msg += ' https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0] 345 body = {'labels': {}, 'message': msg} 346 if not cancelled: 347 body['labels']['Code-Review'] = cl_vote 348 logging.info('Posting results for CL %s', cl_and_ps) 349 url = 'https://%s/a/changes/%s/revisions/%s/review' % ( 350 GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id']) 351 req('POST', url, body=body, gerrit=True) 352 req('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True) 353 354 355def queue_postsubmit_jobs(handler): 356 '''Creates the jobs entries in the DB for the given branch or revision 357 358 Can be called in two modes: 359 1. ?branch=master: Will retrieve the SHA1 of master and call the one below. 360 2. ?branch=master&rev=deadbeef1234: queues jobs for the given revision. 361 ''' 362 prj = urllib.quote(GERRIT_PROJECT, '') 363 branch = handler.request.get('branch') 364 revision = handler.request.get('revision') 365 assert branch 366 367 if not revision: 368 # Get the commit SHA1 of the head of the branch. 369 url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch) 370 revision = req('GET', url, gerrit=True)['revision'] 371 assert revision 372 defer('queue_postsubmit_jobs', branch=branch, revision=revision) 373 return 374 375 # Get the committer datetime for the given revision. 376 url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision) 377 commit_info = req('GET', url, gerrit=True) 378 time_committed = commit_info['committer']['date'].split('.')[0] 379 time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S') 380 381 # Enqueue jobs. 382 src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S')) 383 now = datetime.utcnow() 384 patch_obj = { 385 src: { 386 'rev': revision, 387 'subject': commit_info['subject'][:100], 388 'author': commit_info['author'].get('email', 'N/A'), 389 'time_committed': utc_now_iso(time_committed), 390 'time_queued': utc_now_iso(), 391 'jobs': {}, 392 } 393 } 394 ref = 'refs/heads/' + branch 395 append_jobs(patch_obj, src, ref, now) 396 req('PATCH', DB + '.json', body=patch_obj) 397 398 399def delete_stale_jobs(handler): 400 '''Deletes jobs that are left in the running queue for too long 401 402 This is usually due to a crash in the VM that handles them. 403 ''' 404 running_jobs = req('GET', '%s/jobs_running.json?shallow=true' % (DB)) or {} 405 for job_id in running_jobs.iterkeys(): 406 job = req('GET', '%s/jobs/%s.json' % (DB, job_id)) 407 time_started = parse_iso_time(job.get('time_started', utc_now_iso())) 408 age = (datetime.now() - time_started).total_seconds() 409 if age > JOB_TIMEOUT_SEC * 2: 410 defer('cancel_job', job_id=job_id) 411 412 413def cancel_job(handler): 414 '''Cancels a job if not completed or failed. 415 416 This function is racy: workers can complete the queued jobs while we mark them 417 as cancelled. The result of such race is still acceptable.''' 418 job_id = handler.request.get('job_id') 419 status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 420 patch_obj = { 421 'jobs_running/%s' % job_id: {}, # = DELETE, 422 'jobs_queued/%s' % job_id: {}, # = DELETE, 423 } 424 if status in ('QUEUED', 'STARTED'): 425 patch_obj['jobs/%s/status' % job_id] = 'CANCELLED' 426 patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso() 427 req('PATCH', DB + '.json', body=patch_obj) 428 429 430def delete_expired_logs(handler): 431 logs = req('GET', '%s/logs.json?shallow=true' % (DB)) or {} 432 for job_id in logs.iterkeys(): 433 age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days 434 if age_days > LOGS_TTL_DAYS: 435 defer('delete_job_logs', job_id=job_id) 436 437 438def delete_job_logs(handler): 439 req('DELETE', '%s/logs/%s.json' % (DB, handler.request.get('job_id'))) 440 441 442def update_cl_metrics(handler): 443 cl_obj = req('GET', '%s/%s.json' % (DB, handler.request.get('src'))) 444 t_queued = parse_iso_time(cl_obj['time_queued']) 445 t_ended = parse_iso_time(cl_obj['time_ended']) 446 write_metrics({ 447 'ci_cl_completion_time': { 448 'l': {}, 449 'v': int((t_ended - t_queued).total_seconds()) 450 } 451 }) 452 453 454def update_job_metrics(handler): 455 job_id = handler.request.get('job_id') 456 job = req('GET', '%s/jobs/%s.json' % (DB, job_id)) 457 metrics = {} 458 if 'time_queued' in job and 'time_started' in job: 459 t_queued = parse_iso_time(job['time_queued']) 460 t_started = parse_iso_time(job['time_started']) 461 metrics['ci_job_queue_time'] = { 462 'l': { 463 'job_type': job['type'] 464 }, 465 'v': int((t_started - t_queued).total_seconds()) 466 } 467 if 'time_ended' in job and 'time_started' in job: 468 t_started = parse_iso_time(job['time_started']) 469 t_ended = parse_iso_time(job['time_ended']) 470 metrics['ci_job_run_time'] = { 471 'l': { 472 'job_type': job['type'] 473 }, 474 'v': int((t_ended - t_started).total_seconds()) 475 } 476 if metrics: 477 write_metrics(metrics) 478 479 480def update_queue_metrics(handler): 481 # Update the stackdriver metric that will drive the autoscaler. 482 queued = req('GET', DB + '/jobs_queued.json?shallow=true') or {} 483 running = req('GET', DB + '/jobs_running.json?shallow=true') or {} 484 write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}}) 485 486 487class ControllerHandler(webapp2.RequestHandler): 488 ACTIONS = { 489 'start': start, 490 'tick': tick, 491 'check_pending_cls': check_pending_cls, 492 'check_pending_cl': check_pending_cl, 493 'check_new_cls': check_new_cls, 494 'check_new_cl': check_new_cl, 495 'comment_and_vote_cl': comment_and_vote_cl, 496 'cancel_older_jobs': cancel_older_jobs, 497 'queue_postsubmit_jobs': queue_postsubmit_jobs, 498 'update_job_metrics': update_job_metrics, 499 'update_queue_metrics': update_queue_metrics, 500 'update_cl_metrics': update_cl_metrics, 501 'delete_expired_logs': delete_expired_logs, 502 'delete_job_logs': delete_job_logs, 503 'delete_stale_jobs': delete_stale_jobs, 504 'cancel_job': cancel_job, 505 } 506 507 def handle(self, action): 508 if action in ControllerHandler.ACTIONS: 509 return ControllerHandler.ACTIONS[action](self) 510 raise Exception('Invalid request %s' % action) 511 512 get = handle 513 post = handle 514 515 516app = webapp2.WSGIApplication([ 517 ('/_ah/(start)', ControllerHandler), 518 (r'/controller/(\w+)', ControllerHandler), 519], 520 debug=True) 521