# Copyright 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Mapper functions for mapreduce jobs. Jobs can also be started through the /mapreduce endpoint. Configuration for jobs started through that endpoint is set in mapreduce.yaml. See: https://code.google.com/p/appengine-mapreduce/wiki/GettingStartedInPython Note about running mapreduce jobs that write to the datastore while datastore writes are disabled: There is a parameter 'force_ops_writes' for mapreduce jobs that is supposed to force writes. Another way of forcing writes for a one-off job is to deploy a version of the app with the force writes option always set to True the mapreduce code (see mapreduce/util.py line 334). """ import datetime import logging from mapreduce import control as mr_control from mapreduce import operation as op from google.appengine.ext import ndb from dashboard import datastore_hooks from dashboard import layered_cache from dashboard import request_handler from dashboard.models import graph_data from dashboard.models import stoppage_alert # Length of time required to pass for a test to be considered deprecated. _OLDEST_REVISION_DELTA = datetime.timedelta(days=14) # The time between runs of the deprecate test job. # This should be kept in sync with the time interval in cron.yaml. _DEPRECATE_JOB_INTERVAL = datetime.timedelta(days=2) def SaveAllMapper(entity): """This mapper just puts an entity. When an entity is put, the pre-put hook is called, which may update some property of the entity. This may also be needed after adding a new index on an existing property, or adding a query for a new property with a default value. Args: entity: The entity to put. Can be any kind. """ entity.put() class MRDeprecateTestsHandler(request_handler.RequestHandler): """Handler to run a deprecate tests mapper job.""" def get(self): # TODO(qyearsley): Add test coverage. See catapult:#1346. name = 'Update test deprecation status.' handler = ('dashboard.mr.DeprecateTestsMapper') reader = 'mapreduce.input_readers.DatastoreInputReader' mapper_parameters = { 'entity_kind': ('dashboard.models.graph_data.Test'), 'filters': [('has_rows', '=', True), ('deprecated', '=', False)], } mr_control.start_map(name, handler, reader, mapper_parameters) def DeprecateTestsMapper(entity): """Marks a Test entity as deprecated if the last row is too old. What is considered "too old" is defined by _OLDEST_REVISION_DELTA. Also, if all of the subtests in a test have been marked as deprecated, then that parent test will be marked as deprecated. This mapper doesn't un-deprecate tests if new data has been added; that happens in add_point.py. Args: entity: The Test entity to check. Yields: Zero or more datastore mutation operations. """ # Make sure that we have a non-deprecated Test with Rows. if entity.key.kind() != 'Test' or not entity.has_rows or entity.deprecated: # TODO(qyearsley): Add test coverage. See catapult:#1346. logging.error( 'Got bad entity in mapreduce! Kind: %s, has_rows: %s, deprecated: %s', entity.key.kind(), entity.has_rows, entity.deprecated) return # Fetch the last row. datastore_hooks.SetPrivilegedRequest() query = graph_data.Row.query(graph_data.Row.parent_test == entity.key) query = query.order(-graph_data.Row.timestamp) last_row = query.get() if not last_row: # TODO(qyearsley): Add test coverage. See catapult:#1346. logging.error('No rows for %s (but has_rows=True)', entity.key) return now = datetime.datetime.now() if last_row.timestamp < now - _OLDEST_REVISION_DELTA: for operation in _MarkDeprecated(entity): yield operation for operation in _CreateStoppageAlerts(entity, last_row): yield operation def _CreateStoppageAlerts(test, last_row): """Yields put operations for any StoppageAlert that may be created. A stoppage alert is an alert created to warn people that data has not been received for a particular test for some length of time. An alert will only be created if the stoppage_alert_delay property of the sheriff is non-zero -- the value of this property is the number of days that should pass before an alert is created. Args: test: A Test entity. last_row: The Row entity that was last added. Yields: Either one op.db.Put, or nothing. """ if not test.sheriff: return sheriff_entity = test.sheriff.get() warn_sheriff_delay_days = sheriff_entity.stoppage_alert_delay if warn_sheriff_delay_days < 0: return now = datetime.datetime.now() warn_sheriff_delta = datetime.timedelta(days=warn_sheriff_delay_days) earliest_warn_time = now - warn_sheriff_delta if last_row.timestamp >= earliest_warn_time: return if stoppage_alert.GetStoppageAlert(test.test_path, last_row.revision): return new_alert = stoppage_alert.CreateStoppageAlert(test, last_row) if not new_alert: return yield op.db.Put(new_alert) def _MarkDeprecated(test): """Marks a Test as deprecated and yields Put operations.""" test.deprecated = True yield op.db.Put(test) # The sub-test listing function returns different results depending # on whether tests are deprecated, so when a new suite is deprecated, # the cache needs to be cleared. layered_cache.Delete(graph_data.LIST_TESTS_SUBTEST_CACHE_KEY % ( test.master_name, test.bot_name, test.suite_name)) # Check whether the test suite now contains only deprecated tests, and # if so, deprecate it too. suite = ndb.Key(flat=test.key.flat()[:6]).get() if suite and not suite.deprecated and _AllSubtestsDeprecated(suite): suite.deprecated = True yield op.db.Put(suite) def _AllSubtestsDeprecated(test): """Checks whether all descendant tests are marked as deprecated.""" query = graph_data.Test.query(ancestor=test.key) query = query.filter( graph_data.Test.has_rows == True) descendant_tests = query.fetch() return all(t.deprecated for t in descendant_tests)