# Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import re import time import urllib from datetime import datetime, timedelta from google.appengine.api import taskqueue import webapp2 from common_utils import req, utc_now_iso, parse_iso_time, SCOPES from config import DB, GERRIT_HOST, GERRIT_PROJECT, GERRIT_POLL_SEC, PROJECT from config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS from config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC from config import CL_TIMEOUT_SEC from stackdriver_metrics import STACKDRIVER_METRICS STACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT SCOPES.append('https://www.googleapis.com/auth/firebase.database') SCOPES.append('https://www.googleapis.com/auth/userinfo.email') SCOPES.append('https://www.googleapis.com/auth/datastore') SCOPES.append('https://www.googleapis.com/auth/monitoring') SCOPES.append('https://www.googleapis.com/auth/monitoring.write') last_tick = 0 # ------------------------------------------------------------------------------ # Misc utility functions # ------------------------------------------------------------------------------ def defer(action, **kwargs): '''Appends a task to the deferred queue. Each task will become a new HTTP request made by the AppEngine service. This pattern is used extensively here for several reasons: - Auditability in logs: it's easier to scrape logs and debug. - Stability: an exception fails only the current task not the whole function. - Reliability: The AppEngine runtime will retry failed tasks with exponential backoff. - Performance: tasks are run concurrently, which is quite important given that most of them are bound by HTTP latency to Gerrit of Firebase. ''' taskqueue.add( queue_name='deferred-jobs', url='/controller/' + action, params=kwargs, method='GET') def create_stackdriver_metric_definitions(): logging.info('Creating Stackdriver metric definitions') for name, metric in STACKDRIVER_METRICS.iteritems(): logging.info('Creating metric %s', name) req('POST', STACKDRIVER_API + '/metricDescriptors', body=metric) def write_metrics(metric_dict): now = utc_now_iso() desc = {'timeSeries': []} for key, spec in metric_dict.iteritems(): desc['timeSeries'] += [{ 'metric': { 'type': STACKDRIVER_METRICS[key]['type'], 'labels': spec.get('l', {}) }, 'resource': { 'type': 'global' }, 'points': [{ 'interval': { 'endTime': now }, 'value': { 'int64Value': str(spec['v']) } }] }] try: req('POST', STACKDRIVER_API + '/timeSeries', body=desc) except Exception as e: # Metric updates can easily fail due to Stackdriver API limitations. msg = str(e) if 'written more frequently than the maximum sampling' not in msg: logging.error('Metrics update failed: %s', msg) def is_trusted(email): return re.match(TRUSTED_EMAILS, email) # ------------------------------------------------------------------------------ # Deferred job handlers # ------------------------------------------------------------------------------ def start(handler): create_stackdriver_metric_definitions() tick(handler) def tick(handler): global last_tick now = time.time() # Avoid avalanching effects due to the failsafe tick job in cron.yaml. if now - last_tick < GERRIT_POLL_SEC - 1: return taskqueue.add( url='/controller/tick', queue_name='tick', countdown=GERRIT_POLL_SEC) defer('check_new_cls') defer('check_pending_cls') defer('update_queue_metrics') last_tick = now def check_new_cls(handler): ''' Poll for new CLs and asynchronously enqueue jobs for them.''' logging.info('Polling for new Gerrit CLs') date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d') url = 'https://%s/a/changes/' % GERRIT_HOST url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=200' url += '&q=branch:master+project:%s' % GERRIT_PROJECT url += '+is:open+after:%s' % date_limit resp = req('GET', url, gerrit=True) for change in (change for change in resp if 'revisions' in change): rev_hash = change['revisions'].keys()[0] rev = change['revisions'][rev_hash] owner = rev['uploader']['email'] prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {}) prs_owner = prs_ready.get('email', '') # Only submit jobs for patchsets that are either uploaded by a trusted # account or are marked as Presubmit-Verified by a trustd account. if not is_trusted(owner) and not is_trusted(prs_owner): continue defer( 'check_new_cl', cl=str(change['_number']), patchset=str(rev['_number']), change_id=change['id'], rev_hash=rev_hash, ref=rev['ref'], owner=rev['uploader']['email'], wants_vote='1' if prs_ready else '0') def append_jobs(patch_obj, src, git_ref, now=None): '''Creates the worker jobs (defined in config.py) for the given CL. Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers pull jobs ordered by the key above). It dosn't directly write into the DB, it just appends keys to the passed |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically to the datastore. src: is cls/1234/1 (cl and patchset number). ''' logging.info('Enqueueing jobs fos cl %s', src) timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S') for cfg_name, env in JOB_CONFIGS.iteritems(): job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name) logging.info('Enqueueing job %s', job_id) patch_obj['jobs/' + job_id] = { 'src': src, 'type': cfg_name, 'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref), 'status': 'QUEUED', 'time_queued': utc_now_iso(), } patch_obj['jobs_queued/' + job_id] = 0 patch_obj[src]['jobs'][job_id] = 0 def check_new_cl(handler): '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist If exists check if a Presubmit-Ready label has been added and if so updates it with the message + vote. ''' change_id = handler.request.get('change_id') rev_hash = handler.request.get('rev_hash') cl = handler.request.get('cl') patchset = handler.request.get('patchset') ref = handler.request.get('ref') wants_vote = handler.request.get('wants_vote') == '1' # We want to do two things here: # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and # enqueue jobs for it. # 2) If the CL exists, we don't need to kick new jobs. However, the user # might have addeed a Presubmit-Ready label after we created the CL. In # this case update the |wants_vote| flag and return. vote_prop = req('GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset)) if vote_prop is not None: if vote_prop != wants_vote and wants_vote: logging.info('Updating wants_vote flag on %s-%s', cl, patchset) req('PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True) # If the label is applied after we have finished running all the jobs just # jump straight to the voting. defer('check_pending_cl', cl_and_ps='%s-%s' % (cl, patchset)) return # This is the first time we see this patchset, enqueue jobs for it. # Dequeue jobs for older patchsets, if any. defer('cancel_older_jobs', cl=cl, patchset=patchset) src = 'cls/%s-%s' % (cl, patchset) # Enqueue jobs for the latest patchset. patch_obj = {} patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0 patch_obj[src] = { 'change_id': change_id, 'revision_id': rev_hash, 'time_queued': utc_now_iso(), 'jobs': {}, 'wants_vote': wants_vote, } append_jobs(patch_obj, src, ref) req('PATCH', DB + '.json', body=patch_obj) def cancel_older_jobs(handler): cl = handler.request.get('cl') patchset = handler.request.get('patchset') first_key = '%s-0' % cl last_key = '%s-z' % cl filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key) cl_objs = req('GET', '%s/cls.json?%s' % (DB, filt)) or {} for cl_and_ps, cl_obj in cl_objs.iteritems(): ps = int(cl_and_ps.split('-')[-1]) if cl_obj.get('time_ended') or ps >= int(patchset): continue logging.info('Cancelling jobs for previous patchset %s', cl_and_ps) map(lambda x: defer('cancel_job', job_id=x), cl_obj['jobs'].keys()) def check_pending_cls(handler): # Check if any pending CL has completed (all jobs are done). If so publish # the comment and vote on the CL. pending_cls = req('GET', '%s/cls_pending.json' % DB) or {} for cl_and_ps, _ in pending_cls.iteritems(): defer('check_pending_cl', cl_and_ps=cl_and_ps) def check_pending_cl(handler): # This function can be called twice on the same CL, e.g., in the case when the # Presubmit-Ready label is applied after we have finished running all the # jobs (we run presubmit regardless, only the voting is conditioned by PR). cl_and_ps = handler.request.get('cl_and_ps') cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) all_jobs = cl_obj.get('jobs', {}).keys() pending_jobs = [] for job_id in all_jobs: job_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else [] if pending_jobs: # If the CL has been pending for too long cancel all its jobs. Upon the next # scan it will be deleted and optionally voted on. t_queued = parse_iso_time(cl_obj['time_queued']) age_sec = (datetime.utcnow() - t_queued).total_seconds() if age_sec > CL_TIMEOUT_SEC: logging.warning('Canceling %s, it has been pending for too long (%s sec)', cl_and_ps, int(age_sec)) map(lambda x: defer('cancel_job', job_id=x), pending_jobs) return logging.info('All jobs completed for CL %s', cl_and_ps) # Remove the CL from the pending queue and update end time. patch_obj = { 'cls_pending/%s' % cl_and_ps: {}, # = DELETE 'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()), } req('PATCH', '%s.json' % DB, body=patch_obj) defer('update_cl_metrics', src='cls/' + cl_and_ps) map(lambda x: defer('update_job_metrics', job_id=x), all_jobs) if cl_obj.get('wants_vote'): defer('comment_and_vote_cl', cl_and_ps=cl_and_ps) def comment_and_vote_cl(handler): cl_and_ps = handler.request.get('cl_and_ps') cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) if cl_obj.get('voted'): logging.error('Already voted on CL %s', cl_and_ps) return if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED: logging.info('Skipping voting on CL %s', cl_and_ps) return cl_vote = 1 passed_jobs = [] failed_jobs = {} ui_links = [] cancelled = False for job_id in cl_obj['jobs'].keys(): job_obj = req('GET', '%s/jobs/%s.json' % (DB, job_id)) job_config = JOB_CONFIGS.get(job_obj['type'], {}) if job_obj['status'] == 'CANCELLED': cancelled = True if '-ui-' in job_id: ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' % (GCS_ARTIFACTS, job_id)) if job_obj['status'] == 'COMPLETED': passed_jobs.append(job_id) elif not job_config.get('SKIP_VOTING', False): cl_vote = -1 failed_jobs[job_id] = job_obj['status'] msg = '' if cancelled: msg += 'Some jobs in this CI run were cancelled. This likely happened ' msg += 'because a new patchset has been uploaded. Skipping vote.\n' log_url = CI_SITE + '/#!/logs' if failed_jobs: msg += 'FAIL:\n' msg += ''.join([ ' %s/%s (%s)\n' % (log_url, job_id, status) for (job_id, status) in failed_jobs.iteritems() ]) if passed_jobs: msg += 'PASS:\n' msg += ''.join([' %s/%s\n' % (log_url, job_id) for job_id in passed_jobs]) if ui_links: msg += 'Artifacts:\n' + ''.join(' %s\n' % link for link in ui_links) msg += 'CI page for this CL:\n' msg += ' https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0] body = {'labels': {}, 'message': msg} if not cancelled: body['labels']['Code-Review'] = cl_vote logging.info('Posting results for CL %s', cl_and_ps) url = 'https://%s/a/changes/%s/revisions/%s/review' % ( GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id']) req('POST', url, body=body, gerrit=True) req('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True) def queue_postsubmit_jobs(handler): '''Creates the jobs entries in the DB for the given branch or revision Can be called in two modes: 1. ?branch=master: Will retrieve the SHA1 of master and call the one below. 2. ?branch=master&rev=deadbeef1234: queues jobs for the given revision. ''' prj = urllib.quote(GERRIT_PROJECT, '') branch = handler.request.get('branch') revision = handler.request.get('revision') assert branch if not revision: # Get the commit SHA1 of the head of the branch. url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch) revision = req('GET', url, gerrit=True)['revision'] assert revision defer('queue_postsubmit_jobs', branch=branch, revision=revision) return # Get the committer datetime for the given revision. url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision) commit_info = req('GET', url, gerrit=True) time_committed = commit_info['committer']['date'].split('.')[0] time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S') # Enqueue jobs. src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S')) now = datetime.utcnow() patch_obj = { src: { 'rev': revision, 'subject': commit_info['subject'][:100], 'author': commit_info['author'].get('email', 'N/A'), 'time_committed': utc_now_iso(time_committed), 'time_queued': utc_now_iso(), 'jobs': {}, } } ref = 'refs/heads/' + branch append_jobs(patch_obj, src, ref, now) req('PATCH', DB + '.json', body=patch_obj) def delete_stale_jobs(handler): '''Deletes jobs that are left in the running queue for too long This is usually due to a crash in the VM that handles them. ''' running_jobs = req('GET', '%s/jobs_running.json?shallow=true' % (DB)) or {} for job_id in running_jobs.iterkeys(): job = req('GET', '%s/jobs/%s.json' % (DB, job_id)) time_started = parse_iso_time(job.get('time_started', utc_now_iso())) age = (datetime.now() - time_started).total_seconds() if age > JOB_TIMEOUT_SEC * 2: defer('cancel_job', job_id=job_id) def cancel_job(handler): '''Cancels a job if not completed or failed. This function is racy: workers can complete the queued jobs while we mark them as cancelled. The result of such race is still acceptable.''' job_id = handler.request.get('job_id') status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) patch_obj = { 'jobs_running/%s' % job_id: {}, # = DELETE, 'jobs_queued/%s' % job_id: {}, # = DELETE, } if status in ('QUEUED', 'STARTED'): patch_obj['jobs/%s/status' % job_id] = 'CANCELLED' patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso() req('PATCH', DB + '.json', body=patch_obj) def delete_expired_logs(handler): logs = req('GET', '%s/logs.json?shallow=true' % (DB)) or {} for job_id in logs.iterkeys(): age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days if age_days > LOGS_TTL_DAYS: defer('delete_job_logs', job_id=job_id) def delete_job_logs(handler): req('DELETE', '%s/logs/%s.json' % (DB, handler.request.get('job_id'))) def update_cl_metrics(handler): cl_obj = req('GET', '%s/%s.json' % (DB, handler.request.get('src'))) t_queued = parse_iso_time(cl_obj['time_queued']) t_ended = parse_iso_time(cl_obj['time_ended']) write_metrics({ 'ci_cl_completion_time': { 'l': {}, 'v': int((t_ended - t_queued).total_seconds()) } }) def update_job_metrics(handler): job_id = handler.request.get('job_id') job = req('GET', '%s/jobs/%s.json' % (DB, job_id)) metrics = {} if 'time_queued' in job and 'time_started' in job: t_queued = parse_iso_time(job['time_queued']) t_started = parse_iso_time(job['time_started']) metrics['ci_job_queue_time'] = { 'l': { 'job_type': job['type'] }, 'v': int((t_started - t_queued).total_seconds()) } if 'time_ended' in job and 'time_started' in job: t_started = parse_iso_time(job['time_started']) t_ended = parse_iso_time(job['time_ended']) metrics['ci_job_run_time'] = { 'l': { 'job_type': job['type'] }, 'v': int((t_ended - t_started).total_seconds()) } if metrics: write_metrics(metrics) def update_queue_metrics(handler): # Update the stackdriver metric that will drive the autoscaler. queued = req('GET', DB + '/jobs_queued.json?shallow=true') or {} running = req('GET', DB + '/jobs_running.json?shallow=true') or {} write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}}) class ControllerHandler(webapp2.RequestHandler): ACTIONS = { 'start': start, 'tick': tick, 'check_pending_cls': check_pending_cls, 'check_pending_cl': check_pending_cl, 'check_new_cls': check_new_cls, 'check_new_cl': check_new_cl, 'comment_and_vote_cl': comment_and_vote_cl, 'cancel_older_jobs': cancel_older_jobs, 'queue_postsubmit_jobs': queue_postsubmit_jobs, 'update_job_metrics': update_job_metrics, 'update_queue_metrics': update_queue_metrics, 'update_cl_metrics': update_cl_metrics, 'delete_expired_logs': delete_expired_logs, 'delete_job_logs': delete_job_logs, 'delete_stale_jobs': delete_stale_jobs, 'cancel_job': cancel_job, } def handle(self, action): if action in ControllerHandler.ACTIONS: return ControllerHandler.ACTIONS[action](self) raise Exception('Invalid request %s' % action) get = handle post = handle app = webapp2.WSGIApplication([ ('/_ah/(start)', ControllerHandler), (r'/controller/(\w+)', ControllerHandler), ], debug=True)