• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2019 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import re
17import time
18import urllib
19
20from datetime import datetime, timedelta
21from google.appengine.api import taskqueue
22
23import webapp2
24
25from common_utils import req, utc_now_iso, parse_iso_time, SCOPES
26from config import DB, GERRIT_HOST, GERRIT_PROJECT, GERRIT_POLL_SEC, PROJECT
27from config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS
28from config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC
29from config import CL_TIMEOUT_SEC
30from stackdriver_metrics import STACKDRIVER_METRICS
31
32STACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT
33
34SCOPES.append('https://www.googleapis.com/auth/firebase.database')
35SCOPES.append('https://www.googleapis.com/auth/userinfo.email')
36SCOPES.append('https://www.googleapis.com/auth/datastore')
37SCOPES.append('https://www.googleapis.com/auth/monitoring')
38SCOPES.append('https://www.googleapis.com/auth/monitoring.write')
39
40last_tick = 0
41
42# ------------------------------------------------------------------------------
43# Misc utility functions
44# ------------------------------------------------------------------------------
45
46
47def defer(action, **kwargs):
48  '''Appends a task to the deferred queue.
49
50  Each task will become a new HTTP request made by the AppEngine service.
51  This pattern is used extensively here for several reasons:
52  - Auditability in logs: it's easier to scrape logs and debug.
53  - Stability: an exception fails only the current task not the whole function.
54  - Reliability: The AppEngine runtime will retry failed tasks with exponential
55    backoff.
56  - Performance: tasks are run concurrently, which is quite important given that
57    most of them are bound by HTTP latency to Gerrit of Firebase.
58  '''
59  taskqueue.add(
60      queue_name='deferred-jobs',
61      url='/controller/' + action,
62      params=kwargs,
63      method='GET')
64
65
66def create_stackdriver_metric_definitions():
67  logging.info('Creating Stackdriver metric definitions')
68  for name, metric in STACKDRIVER_METRICS.iteritems():
69    logging.info('Creating metric %s', name)
70    req('POST', STACKDRIVER_API + '/metricDescriptors', body=metric)
71
72
73def write_metrics(metric_dict):
74  now = utc_now_iso()
75  desc = {'timeSeries': []}
76  for key, spec in metric_dict.iteritems():
77    desc['timeSeries'] += [{
78        'metric': {
79            'type': STACKDRIVER_METRICS[key]['type'],
80            'labels': spec.get('l', {})
81        },
82        'resource': {
83            'type': 'global'
84        },
85        'points': [{
86            'interval': {
87                'endTime': now
88            },
89            'value': {
90                'int64Value': str(spec['v'])
91            }
92        }]
93    }]
94  try:
95    req('POST', STACKDRIVER_API + '/timeSeries', body=desc)
96  except Exception as e:
97    # Metric updates can easily fail due to Stackdriver API limitations.
98    msg = str(e)
99    if 'written more frequently than the maximum sampling' not in msg:
100      logging.error('Metrics update failed: %s', msg)
101
102
103def is_trusted(email):
104  return re.match(TRUSTED_EMAILS, email)
105
106
107# ------------------------------------------------------------------------------
108# Deferred job handlers
109# ------------------------------------------------------------------------------
110
111
112def start(handler):
113  create_stackdriver_metric_definitions()
114  tick(handler)
115
116
117def tick(handler):
118  global last_tick
119  now = time.time()
120  # Avoid avalanching effects due to the failsafe tick job in cron.yaml.
121  if now - last_tick < GERRIT_POLL_SEC - 1:
122    return
123  taskqueue.add(
124      url='/controller/tick', queue_name='tick', countdown=GERRIT_POLL_SEC)
125  defer('check_new_cls')
126  defer('check_pending_cls')
127  defer('update_queue_metrics')
128  last_tick = now
129
130
131def check_new_cls(handler):
132  ''' Poll for new CLs and asynchronously enqueue jobs for them.'''
133  logging.info('Polling for new Gerrit CLs')
134  date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
135  url = 'https://%s/a/changes/' % GERRIT_HOST
136  url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=200'
137  url += '&q=branch:master+project:%s' % GERRIT_PROJECT
138  url += '+is:open+after:%s' % date_limit
139  resp = req('GET', url, gerrit=True)
140  for change in (change for change in resp if 'revisions' in change):
141    rev_hash = change['revisions'].keys()[0]
142    rev = change['revisions'][rev_hash]
143    owner = rev['uploader']['email']
144    prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {})
145    prs_owner = prs_ready.get('email', '')
146    # Only submit jobs for patchsets that are either uploaded by a trusted
147    # account or are marked as Presubmit-Verified by a trustd account.
148    if not is_trusted(owner) and not is_trusted(prs_owner):
149      continue
150    defer(
151        'check_new_cl',
152        cl=str(change['_number']),
153        patchset=str(rev['_number']),
154        change_id=change['id'],
155        rev_hash=rev_hash,
156        ref=rev['ref'],
157        owner=rev['uploader']['email'],
158        wants_vote='1' if prs_ready else '0')
159
160
161def append_jobs(patch_obj, src, git_ref, now=None):
162  '''Creates the worker jobs (defined in config.py) for the given CL.
163
164  Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers
165  pull jobs ordered by the key above).
166  It dosn't directly write into the DB, it just appends keys to the passed
167  |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically
168  to the datastore.
169  src: is cls/1234/1 (cl and patchset number).
170  '''
171  logging.info('Enqueueing jobs fos cl %s', src)
172  timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S')
173  for cfg_name, env in JOB_CONFIGS.iteritems():
174    job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name)
175    logging.info('Enqueueing job %s', job_id)
176    patch_obj['jobs/' + job_id] = {
177        'src': src,
178        'type': cfg_name,
179        'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref),
180        'status': 'QUEUED',
181        'time_queued': utc_now_iso(),
182    }
183    patch_obj['jobs_queued/' + job_id] = 0
184    patch_obj[src]['jobs'][job_id] = 0
185
186
187def check_new_cl(handler):
188  '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist
189
190  If exists check if a Presubmit-Ready label has been added and if so updates it
191  with the message + vote.
192  '''
193  change_id = handler.request.get('change_id')
194  rev_hash = handler.request.get('rev_hash')
195  cl = handler.request.get('cl')
196  patchset = handler.request.get('patchset')
197  ref = handler.request.get('ref')
198  wants_vote = handler.request.get('wants_vote') == '1'
199
200  # We want to do two things here:
201  # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and
202  #    enqueue jobs for it.
203  # 2) If the CL exists, we don't need to kick new jobs. However, the user
204  #    might have addeed a Presubmit-Ready label after we created the CL. In
205  #    this case update the |wants_vote| flag and return.
206  vote_prop = req('GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset))
207  if vote_prop is not None:
208    if vote_prop != wants_vote and wants_vote:
209      logging.info('Updating wants_vote flag on %s-%s', cl, patchset)
210      req('PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True)
211      # If the label is applied after we have finished running all the jobs just
212      # jump straight to the voting.
213      defer('check_pending_cl', cl_and_ps='%s-%s' % (cl, patchset))
214    return
215
216  # This is the first time we see this patchset, enqueue jobs for it.
217
218  # Dequeue jobs for older patchsets, if any.
219  defer('cancel_older_jobs', cl=cl, patchset=patchset)
220
221  src = 'cls/%s-%s' % (cl, patchset)
222  # Enqueue jobs for the latest patchset.
223  patch_obj = {}
224  patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0
225  patch_obj[src] = {
226      'change_id': change_id,
227      'revision_id': rev_hash,
228      'time_queued': utc_now_iso(),
229      'jobs': {},
230      'wants_vote': wants_vote,
231  }
232  append_jobs(patch_obj, src, ref)
233  req('PATCH', DB + '.json', body=patch_obj)
234
235
236def cancel_older_jobs(handler):
237  cl = handler.request.get('cl')
238  patchset = handler.request.get('patchset')
239  first_key = '%s-0' % cl
240  last_key = '%s-z' % cl
241  filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key)
242  cl_objs = req('GET', '%s/cls.json?%s' % (DB, filt)) or {}
243  for cl_and_ps, cl_obj in cl_objs.iteritems():
244    ps = int(cl_and_ps.split('-')[-1])
245    if cl_obj.get('time_ended') or ps >= int(patchset):
246      continue
247    logging.info('Cancelling jobs for previous patchset %s', cl_and_ps)
248    map(lambda x: defer('cancel_job', job_id=x), cl_obj['jobs'].keys())
249
250
251def check_pending_cls(handler):
252  # Check if any pending CL has completed (all jobs are done). If so publish
253  # the comment and vote on the CL.
254  pending_cls = req('GET', '%s/cls_pending.json' % DB) or {}
255  for cl_and_ps, _ in pending_cls.iteritems():
256    defer('check_pending_cl', cl_and_ps=cl_and_ps)
257
258
259def check_pending_cl(handler):
260  # This function can be called twice on the same CL, e.g., in the case when the
261  # Presubmit-Ready label is applied after we have finished running all the
262  # jobs (we run presubmit regardless, only the voting is conditioned by PR).
263  cl_and_ps = handler.request.get('cl_and_ps')
264  cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
265  all_jobs = cl_obj.get('jobs', {}).keys()
266  pending_jobs = []
267  for job_id in all_jobs:
268    job_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id))
269    pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else []
270
271  if pending_jobs:
272    # If the CL has been pending for too long cancel all its jobs. Upon the next
273    # scan it will be deleted and optionally voted on.
274    t_queued = parse_iso_time(cl_obj['time_queued'])
275    age_sec = (datetime.utcnow() - t_queued).total_seconds()
276    if age_sec > CL_TIMEOUT_SEC:
277      logging.warning('Canceling %s, it has been pending for too long (%s sec)',
278                      cl_and_ps, int(age_sec))
279      map(lambda x: defer('cancel_job', job_id=x), pending_jobs)
280    return
281
282  logging.info('All jobs completed for CL %s', cl_and_ps)
283
284  # Remove the CL from the pending queue and update end time.
285  patch_obj = {
286      'cls_pending/%s' % cl_and_ps: {},  # = DELETE
287      'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()),
288  }
289  req('PATCH', '%s.json' % DB, body=patch_obj)
290  defer('update_cl_metrics', src='cls/' + cl_and_ps)
291  map(lambda x: defer('update_job_metrics', job_id=x), all_jobs)
292  if cl_obj.get('wants_vote'):
293    defer('comment_and_vote_cl', cl_and_ps=cl_and_ps)
294
295
296def comment_and_vote_cl(handler):
297  cl_and_ps = handler.request.get('cl_and_ps')
298  cl_obj = req('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
299
300  if cl_obj.get('voted'):
301    logging.error('Already voted on CL %s', cl_and_ps)
302    return
303
304  if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED:
305    logging.info('Skipping voting on CL %s', cl_and_ps)
306    return
307
308  cl_vote = 1
309  passed_jobs = []
310  failed_jobs = {}
311  ui_links = []
312  cancelled = False
313  for job_id in cl_obj['jobs'].keys():
314    job_obj = req('GET', '%s/jobs/%s.json' % (DB, job_id))
315    job_config = JOB_CONFIGS.get(job_obj['type'], {})
316    if job_obj['status'] == 'CANCELLED':
317      cancelled = True
318    if '-ui-' in job_id:
319      ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' %
320                      (GCS_ARTIFACTS, job_id))
321    if job_obj['status'] == 'COMPLETED':
322      passed_jobs.append(job_id)
323    elif not job_config.get('SKIP_VOTING', False):
324      cl_vote = -1
325      failed_jobs[job_id] = job_obj['status']
326
327  msg = ''
328  if cancelled:
329    msg += 'Some jobs in this CI run were cancelled. This likely happened '
330    msg += 'because a new patchset has been uploaded. Skipping vote.\n'
331  log_url = CI_SITE + '/#!/logs'
332  if failed_jobs:
333    msg += 'FAIL:\n'
334    msg += ''.join([
335        ' %s/%s (%s)\n' % (log_url, job_id, status)
336        for (job_id, status) in failed_jobs.iteritems()
337    ])
338  if passed_jobs:
339    msg += 'PASS:\n'
340    msg += ''.join([' %s/%s\n' % (log_url, job_id) for job_id in passed_jobs])
341  if ui_links:
342    msg += 'Artifacts:\n' + ''.join(' %s\n' % link for link in ui_links)
343  msg += 'CI page for this CL:\n'
344  msg += ' https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0]
345  body = {'labels': {}, 'message': msg}
346  if not cancelled:
347    body['labels']['Code-Review'] = cl_vote
348  logging.info('Posting results for CL %s', cl_and_ps)
349  url = 'https://%s/a/changes/%s/revisions/%s/review' % (
350      GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id'])
351  req('POST', url, body=body, gerrit=True)
352  req('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True)
353
354
355def queue_postsubmit_jobs(handler):
356  '''Creates the jobs entries in the DB for the given branch or revision
357
358  Can be called in two modes:
359    1. ?branch=master: Will retrieve the SHA1 of master and call the one below.
360    2. ?branch=master&rev=deadbeef1234: queues jobs for the given revision.
361  '''
362  prj = urllib.quote(GERRIT_PROJECT, '')
363  branch = handler.request.get('branch')
364  revision = handler.request.get('revision')
365  assert branch
366
367  if not revision:
368    # Get the commit SHA1 of the head of the branch.
369    url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch)
370    revision = req('GET', url, gerrit=True)['revision']
371    assert revision
372    defer('queue_postsubmit_jobs', branch=branch, revision=revision)
373    return
374
375  # Get the committer datetime for the given revision.
376  url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision)
377  commit_info = req('GET', url, gerrit=True)
378  time_committed = commit_info['committer']['date'].split('.')[0]
379  time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S')
380
381  # Enqueue jobs.
382  src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S'))
383  now = datetime.utcnow()
384  patch_obj = {
385      src: {
386          'rev': revision,
387          'subject': commit_info['subject'][:100],
388          'author': commit_info['author'].get('email', 'N/A'),
389          'time_committed': utc_now_iso(time_committed),
390          'time_queued': utc_now_iso(),
391          'jobs': {},
392      }
393  }
394  ref = 'refs/heads/' + branch
395  append_jobs(patch_obj, src, ref, now)
396  req('PATCH', DB + '.json', body=patch_obj)
397
398
399def delete_stale_jobs(handler):
400  '''Deletes jobs that are left in the running queue for too long
401
402  This is usually due to a crash in the VM that handles them.
403  '''
404  running_jobs = req('GET', '%s/jobs_running.json?shallow=true' % (DB)) or {}
405  for job_id in running_jobs.iterkeys():
406    job = req('GET', '%s/jobs/%s.json' % (DB, job_id))
407    time_started = parse_iso_time(job.get('time_started', utc_now_iso()))
408    age = (datetime.now() - time_started).total_seconds()
409    if age > JOB_TIMEOUT_SEC * 2:
410      defer('cancel_job', job_id=job_id)
411
412
413def cancel_job(handler):
414  '''Cancels a job if not completed or failed.
415
416  This function is racy: workers can complete the queued jobs while we mark them
417  as cancelled. The result of such race is still acceptable.'''
418  job_id = handler.request.get('job_id')
419  status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id))
420  patch_obj = {
421      'jobs_running/%s' % job_id: {},  # = DELETE,
422      'jobs_queued/%s' % job_id: {},  # = DELETE,
423  }
424  if status in ('QUEUED', 'STARTED'):
425    patch_obj['jobs/%s/status' % job_id] = 'CANCELLED'
426    patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso()
427  req('PATCH', DB + '.json', body=patch_obj)
428
429
430def delete_expired_logs(handler):
431  logs = req('GET', '%s/logs.json?shallow=true' % (DB)) or {}
432  for job_id in logs.iterkeys():
433    age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days
434    if age_days > LOGS_TTL_DAYS:
435      defer('delete_job_logs', job_id=job_id)
436
437
438def delete_job_logs(handler):
439  req('DELETE', '%s/logs/%s.json' % (DB, handler.request.get('job_id')))
440
441
442def update_cl_metrics(handler):
443  cl_obj = req('GET', '%s/%s.json' % (DB, handler.request.get('src')))
444  t_queued = parse_iso_time(cl_obj['time_queued'])
445  t_ended = parse_iso_time(cl_obj['time_ended'])
446  write_metrics({
447      'ci_cl_completion_time': {
448          'l': {},
449          'v': int((t_ended - t_queued).total_seconds())
450      }
451  })
452
453
454def update_job_metrics(handler):
455  job_id = handler.request.get('job_id')
456  job = req('GET', '%s/jobs/%s.json' % (DB, job_id))
457  metrics = {}
458  if 'time_queued' in job and 'time_started' in job:
459    t_queued = parse_iso_time(job['time_queued'])
460    t_started = parse_iso_time(job['time_started'])
461    metrics['ci_job_queue_time'] = {
462        'l': {
463            'job_type': job['type']
464        },
465        'v': int((t_started - t_queued).total_seconds())
466    }
467  if 'time_ended' in job and 'time_started' in job:
468    t_started = parse_iso_time(job['time_started'])
469    t_ended = parse_iso_time(job['time_ended'])
470    metrics['ci_job_run_time'] = {
471        'l': {
472            'job_type': job['type']
473        },
474        'v': int((t_ended - t_started).total_seconds())
475    }
476  if metrics:
477    write_metrics(metrics)
478
479
480def update_queue_metrics(handler):
481  # Update the stackdriver metric that will drive the autoscaler.
482  queued = req('GET', DB + '/jobs_queued.json?shallow=true') or {}
483  running = req('GET', DB + '/jobs_running.json?shallow=true') or {}
484  write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}})
485
486
487class ControllerHandler(webapp2.RequestHandler):
488  ACTIONS = {
489      'start': start,
490      'tick': tick,
491      'check_pending_cls': check_pending_cls,
492      'check_pending_cl': check_pending_cl,
493      'check_new_cls': check_new_cls,
494      'check_new_cl': check_new_cl,
495      'comment_and_vote_cl': comment_and_vote_cl,
496      'cancel_older_jobs': cancel_older_jobs,
497      'queue_postsubmit_jobs': queue_postsubmit_jobs,
498      'update_job_metrics': update_job_metrics,
499      'update_queue_metrics': update_queue_metrics,
500      'update_cl_metrics': update_cl_metrics,
501      'delete_expired_logs': delete_expired_logs,
502      'delete_job_logs': delete_job_logs,
503      'delete_stale_jobs': delete_stale_jobs,
504      'cancel_job': cancel_job,
505  }
506
507  def handle(self, action):
508    if action in ControllerHandler.ACTIONS:
509      return ControllerHandler.ACTIONS[action](self)
510    raise Exception('Invalid request %s' % action)
511
512  get = handle
513  post = handle
514
515
516app = webapp2.WSGIApplication([
517    ('/_ah/(start)', ControllerHandler),
518    (r'/controller/(\w+)', ControllerHandler),
519],
520                              debug=True)
521