• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
9import httplib
10import logging
11import os
12import random
13import signal
14import time
15import urllib2
16
17import common
18
19from autotest_lib.frontend import setup_django_environment
20from autotest_lib.frontend.afe.json_rpc import proxy
21from autotest_lib.client.common_lib import error
22from autotest_lib.client.common_lib import global_config
23from autotest_lib.frontend.afe import models
24from autotest_lib.scheduler import email_manager
25from autotest_lib.scheduler import scheduler_lib
26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27from autotest_lib.server import utils as server_utils
28from chromite.lib import timeout_util
29from django.db import transaction
30
31try:
32    from chromite.lib import metrics
33    from chromite.lib import ts_mon_config
34except ImportError:
35    metrics = server_utils.metrics_mock
36    ts_mon_config = server_utils.metrics_mock
37
38
39"""
40Autotest shard client
41
42The shard client can be run as standalone service. It periodically polls the
43master in a heartbeat, retrieves new jobs and hosts and inserts them into the
44local database.
45
46A shard is set up (by a human) and pointed to the global AFE (cautotest).
47On the shard, this script periodically makes so called heartbeat requests to the
48global AFE, which will then complete the following actions:
49
501. Find the previously created (with atest) record for the shard. Shards are
51   identified by their hostnames, specified in the shadow_config.
522. Take the records that were sent in the heartbeat and insert them into the
53   global database.
54   - This is to set the status of jobs to completed in the master database after
55     they were run by a slave. This is necessary so one can just look at the
56     master's afe to see the statuses of all jobs. Otherwise one would have to
57     check the tko tables or the individual slave AFEs.
583. Find labels that have been assigned to this shard.
594. Assign hosts that:
60   - have the specified label
61   - aren't leased
62   - have an id which is not in the known_host_ids which were sent in the
63     heartbeat request.
645. Assign jobs that:
65   - depend on the specified label
66   - haven't been assigned before
67   - aren't started yet
68   - aren't completed yet
69   - have an id which is not in the jobs_known_ids which were sent in the
70     heartbeat request.
716. Serialize the chosen jobs and hosts.
72   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
73     and many more. Details about this can be found around
74     model_logic.serialize()
757. Send these objects to the slave.
76
77
78On the client side, this will happen:
791. Deserialize the objects sent from the master and persist them to the local
80   database.
812. monitor_db on the shard will pick up these jobs and schedule them on the
82   available hosts (which were retrieved from a heartbeat).
833. Once a job is finished, it's shard_id is set to NULL
844. The shard_client will pick up all jobs where shard_id=NULL and will
85   send them to the master in the request of the next heartbeat.
86   - The master will persist them as described earlier.
87   - the shard_id will be set back to the shard's id, so the record won't be
88     uploaded again.
89   The heartbeat request will also contain the ids of incomplete jobs and the
90   ids of all hosts. This is used to not send objects repeatedly. For more
91   information on this and alternatives considered
92   see rpc_interface.shard_heartbeat.
93"""
94
95
96HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
97
98RPC_TIMEOUT_MIN = 5
99RPC_DELAY_SEC = 5
100
101_heartbeat_client = None
102
103
104class ShardClient(object):
105    """Performs client side tasks of sharding, i.e. the heartbeat.
106
107    This class contains the logic to do periodic heartbeats to a global AFE,
108    to retrieve new jobs from it and to report completed jobs back.
109    """
110
111    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
112        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
113                                                 timeout_min=RPC_TIMEOUT_MIN,
114                                                 delay_sec=RPC_DELAY_SEC)
115        self.hostname = shard_hostname
116        self.tick_pause_sec = tick_pause_sec
117        self._shutdown = False
118        self._shard = None
119
120
121    def _deserialize_many(self, serialized_list, djmodel, message):
122        """Deserialize data in JSON format to database.
123
124        Deserialize a list of JSON-formatted data to database using Django.
125
126        @param serialized_list: A list of JSON-formatted data.
127        @param djmodel: Django model type.
128        @param message: A string to be used in a logging message.
129        """
130        for serialized in serialized_list:
131            with transaction.commit_on_success():
132                try:
133                    djmodel.deserialize(serialized)
134                except Exception as e:
135                    logging.error('Deserializing a %s fails: %s, Error: %s',
136                                  message, serialized, e)
137                    metrics.Counter(
138                        'chromeos/autotest/shard_client/deserialization_failed'
139                        ).increment()
140
141
142    @metrics.SecondsTimerDecorator(
143            'chromeos/autotest/shard_client/heartbeat_response_duration')
144    def process_heartbeat_response(self, heartbeat_response):
145        """Save objects returned by a heartbeat to the local database.
146
147        This deseralizes hosts and jobs including their dependencies and saves
148        them to the local database.
149
150        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
151                                   as returned by the `shard_heartbeat` rpc
152                                   call.
153        """
154        hosts_serialized = heartbeat_response['hosts']
155        jobs_serialized = heartbeat_response['jobs']
156        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
157        incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
158
159        metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
160                      ).set(len(hosts_serialized))
161        metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
162                      ).set(len(jobs_serialized))
163        metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
164                      ).set(len(suite_keyvals_serialized))
165
166        self._deserialize_many(hosts_serialized, models.Host, 'host')
167        self._deserialize_many(jobs_serialized, models.Job, 'job')
168        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
169                               'jobkeyval')
170
171        host_ids = [h['id'] for h in hosts_serialized]
172        logging.info('Heartbeat response contains hosts %s', host_ids)
173        job_ids = [j['id'] for j in jobs_serialized]
174        logging.info('Heartbeat response contains jobs %s', job_ids)
175        parent_jobs_with_keyval = set([kv['job_id']
176                                       for kv in suite_keyvals_serialized])
177        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
178                     list(parent_jobs_with_keyval))
179        if incorrect_host_ids:
180            logging.info('Heartbeat response contains incorrect_host_ids %s '
181                         'which will be deleted.', incorrect_host_ids)
182            self._remove_incorrect_hosts(incorrect_host_ids)
183
184        # If the master has just sent any jobs that we think have completed,
185        # re-sync them with the master. This is especially useful when a
186        # heartbeat or job is silently dropped, as the next heartbeat will
187        # have a disagreement. Updating the shard_id to NULL will mark these
188        # jobs for upload on the next heartbeat.
189        job_models = models.Job.objects.filter(
190                id__in=job_ids, hostqueueentry__complete=True)
191        if job_models:
192            job_models.update(shard=None)
193            job_ids_repr = ', '.join([str(job.id) for job in job_models])
194            logging.warn('Following completed jobs are reset shard_id to NULL '
195                         'to be uploaded to master again: %s', job_ids_repr)
196
197
198    def _remove_incorrect_hosts(self, incorrect_host_ids=None):
199        """Remove from local database any hosts that should not exist.
200
201        Entries of |incorrect_host_ids| that are absent from database will be
202        silently ignored.
203
204        @param incorrect_host_ids: a list of ids (as integers) to remove.
205        """
206        if not incorrect_host_ids:
207            return
208
209        models.Host.objects.filter(id__in=incorrect_host_ids).delete()
210
211
212    @property
213    def shard(self):
214        """Return this shard's own shard object, fetched from the database.
215
216        A shard's object is fetched from the master with the first jobs. It will
217        not exist before that time.
218
219        @returns: The shard object if it already exists, otherwise None
220        """
221        if self._shard is None:
222            try:
223                self._shard = models.Shard.smart_get(self.hostname)
224            except models.Shard.DoesNotExist:
225                # This might happen before any jobs are assigned to this shard.
226                # This is okay because then there is nothing to offload anyway.
227                pass
228        return self._shard
229
230
231    def _get_jobs_to_upload(self):
232        jobs = []
233        # The scheduler sets shard to None upon completion of the job.
234        # For more information on the shard field's semantic see
235        # models.Job.shard. We need to be careful to wait for both the
236        # shard_id and the complete bit here, or we will end up syncing
237        # the job without ever setting the complete bit.
238        job_ids = list(models.Job.objects.filter(
239            shard=None,
240            hostqueueentry__complete=True).values_list('pk', flat=True))
241
242        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
243            jobs.append(job_to_upload)
244        return jobs
245
246
247    def _mark_jobs_as_uploaded(self, job_ids):
248        # self.shard might be None if no jobs were downloaded yet.
249        # But then job_ids is empty, so this is harmless.
250        # Even if there were jobs we'd in the worst case upload them twice.
251        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
252
253
254    def _get_hqes_for_jobs(self, jobs):
255        hqes = []
256        for job in jobs:
257            hqes.extend(job.hostqueueentry_set.all())
258        return hqes
259
260
261    def _get_known_jobs_and_hosts(self):
262        """Returns lists of host and job info to send in a heartbeat.
263
264        The host and job ids are ids of objects that are already present on the
265        shard and therefore don't need to be sent again.
266
267        For jobs, only incomplete jobs are sent, as the master won't send
268        already completed jobs anyway. This helps keeping the list of id's
269        considerably small.
270
271        For hosts, host status in addition to host id are sent to master
272        to sync the host status.
273
274        @returns: Tuple of three lists. The first one contains job ids, the
275                  second one host ids, and the third one host statuses.
276        """
277        job_ids = list(models.Job.objects.filter(
278                hostqueueentry__complete=False).values_list('id', flat=True))
279        host_models = models.Host.objects.filter(invalid=0)
280        host_ids = []
281        host_statuses = []
282        for h in host_models:
283            host_ids.append(h.id)
284            host_statuses.append(h.status)
285        return job_ids, host_ids, host_statuses
286
287
288    def _heartbeat_packet(self):
289        """Construct the heartbeat packet.
290
291        See rpc_interface for a more detailed description of the heartbeat.
292
293        @return: A heartbeat packet.
294        """
295        known_job_ids, known_host_ids, known_host_statuses = (
296                self._get_known_jobs_and_hosts())
297        logging.info('Known jobs: %s', known_job_ids)
298
299        job_objs = self._get_jobs_to_upload()
300        hqes = [hqe.serialize(include_dependencies=False)
301                for hqe in self._get_hqes_for_jobs(job_objs)]
302        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
303        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
304
305        return {'shard_hostname': self.hostname,
306                'known_job_ids': known_job_ids,
307                'known_host_ids': known_host_ids,
308                'known_host_statuses': known_host_statuses,
309                'jobs': jobs, 'hqes': hqes}
310
311
312    def _heartbeat_failure(self, log_message, failure_type_str=''):
313        logging.error("Heartbeat failed. %s", log_message)
314        metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
315                        ).increment(fields={'failure_type': failure_type_str})
316
317
318    @metrics.SecondsTimerDecorator(
319            'chromeos/autotest/shard_client/do_heatbeat_duration')
320    def do_heartbeat(self):
321        """Perform a heartbeat: Retreive new jobs.
322
323        This function executes a `shard_heartbeat` RPC. It retrieves the
324        response of this call and processes the response by storing the returned
325        objects in the local database.
326        """
327        heartbeat_metrics_prefix  = 'chromeos/autotest/shard_client/heartbeat/'
328
329        logging.info("Performing heartbeat.")
330        packet = self._heartbeat_packet()
331        metrics.Gauge(heartbeat_metrics_prefix + 'request_size').set(
332            len(str(packet)))
333
334        try:
335            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
336        except urllib2.HTTPError as e:
337            self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
338                                    'HTTPError')
339            return
340        except urllib2.URLError as e:
341            self._heartbeat_failure('URLError: %s' % e.reason,
342                                    'URLError')
343            return
344        except httplib.HTTPException as e:
345            self._heartbeat_failure('HTTPException: %s' % e,
346                                    'HTTPException')
347            return
348        except timeout_util.TimeoutError as e:
349            self._heartbeat_failure('TimeoutError: %s' % e,
350                                    'TimeoutError')
351            return
352        except proxy.JSONRPCException as e:
353            self._heartbeat_failure('JSONRPCException: %s' % e,
354                                    'JSONRPCException')
355            return
356
357        metrics.Gauge(heartbeat_metrics_prefix + 'response_size').set(
358            len(str(response)))
359        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
360        self.process_heartbeat_response(response)
361        logging.info("Heartbeat completed.")
362
363
364    def tick(self):
365        """Performs all tasks the shard clients needs to do periodically."""
366        self.do_heartbeat()
367        metrics.Counter('chromeos/autotest/shard_client/tick').increment()
368
369
370    def loop(self):
371        """Calls tick() until shutdown() is called."""
372        while not self._shutdown:
373            self.tick()
374            # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
375            tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
376            time.sleep(self.tick_pause_sec + tick_fuzz)
377
378
379    def shutdown(self):
380        """Stops the shard client after the current tick."""
381        logging.info("Shutdown request received.")
382        self._shutdown = True
383
384
385def handle_signal(signum, frame):
386    """Sigint handler so we don't crash mid-tick."""
387    _heartbeat_client.shutdown()
388
389
390def _get_shard_hostname_and_ensure_running_on_shard():
391    """Read the hostname the local shard from the global configuration.
392
393    Raise an exception if run from elsewhere than a shard.
394
395    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
396            elsewhere than from a shard.
397    """
398    hostname = global_config.global_config.get_config_value(
399        'SHARD', 'shard_hostname', default=None)
400    if not hostname:
401        raise error.HeartbeatOnlyAllowedInShardModeException(
402            'To run the shard client, shard_hostname must neither be None nor '
403            'empty.')
404    return hostname
405
406
407def _get_tick_pause_sec():
408    """Read pause to make between two ticks from the global configuration."""
409    return global_config.global_config.get_config_value(
410        'SHARD', 'heartbeat_pause_sec', type=float)
411
412
413def get_shard_client():
414    """Instantiate a shard client instance.
415
416    Configuration values will be read from the global configuration.
417
418    @returns A shard client instance.
419    """
420    global_afe_hostname = server_utils.get_global_afe_hostname()
421    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
422    tick_pause_sec = _get_tick_pause_sec()
423    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
424
425
426def main():
427    ts_mon_config.SetupTsMonGlobalState('shard_client')
428
429    try:
430        metrics.Counter('chromeos/autotest/shard_client/start').increment()
431        main_without_exception_handling()
432    except Exception as e:
433        metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
434                        ).increment()
435        message = 'Uncaught exception. Terminating shard_client.'
436        email_manager.manager.log_stacktrace(message)
437        logging.exception(message)
438        raise
439    finally:
440        email_manager.manager.send_queued_emails()
441
442
443def main_without_exception_handling():
444    parser = argparse.ArgumentParser(description='Shard client.')
445    options = parser.parse_args()
446
447    scheduler_lib.setup_logging(
448            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
449            None, timestamped_logfile_prefix='shard_client')
450
451    logging.info("Setting signal handler.")
452    signal.signal(signal.SIGINT, handle_signal)
453    signal.signal(signal.SIGTERM, handle_signal)
454
455    logging.info("Starting shard client.")
456    global _heartbeat_client
457    _heartbeat_client = get_shard_client()
458    _heartbeat_client.loop()
459
460
461if __name__ == '__main__':
462    main()
463