• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
9import httplib
10import logging
11import os
12import random
13import signal
14import time
15import urllib2
16
17import common
18
19from autotest_lib.frontend import setup_django_environment
20from autotest_lib.frontend.afe.json_rpc import proxy
21from autotest_lib.client.common_lib import error
22from autotest_lib.client.common_lib import global_config
23from autotest_lib.frontend.afe import models
24from autotest_lib.scheduler import email_manager
25from autotest_lib.scheduler import scheduler_lib
26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27from autotest_lib.server import utils as server_utils
28from chromite.lib import timeout_util
29from django.db import transaction
30
31try:
32    from chromite.lib import metrics
33    from chromite.lib import ts_mon_config
34except ImportError:
35    metrics = server_utils.metrics_mock
36    ts_mon_config = server_utils.metrics_mock
37
38
39"""
40Autotest shard client
41
42The shard client can be run as standalone service. It periodically polls the
43master in a heartbeat, retrieves new jobs and hosts and inserts them into the
44local database.
45
46A shard is set up (by a human) and pointed to the global AFE (cautotest).
47On the shard, this script periodically makes so called heartbeat requests to the
48global AFE, which will then complete the following actions:
49
501. Find the previously created (with atest) record for the shard. Shards are
51   identified by their hostnames, specified in the shadow_config.
522. Take the records that were sent in the heartbeat and insert them into the
53   global database.
54   - This is to set the status of jobs to completed in the master database after
55     they were run by a slave. This is necessary so one can just look at the
56     master's afe to see the statuses of all jobs. Otherwise one would have to
57     check the tko tables or the individual slave AFEs.
583. Find labels that have been assigned to this shard.
594. Assign hosts that:
60   - have the specified label
61   - aren't leased
62   - have an id which is not in the known_host_ids which were sent in the
63     heartbeat request.
645. Assign jobs that:
65   - depend on the specified label
66   - haven't been assigned before
67   - aren't started yet
68   - aren't completed yet
69   - have an id which is not in the jobs_known_ids which were sent in the
70     heartbeat request.
716. Serialize the chosen jobs and hosts.
72   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
73     and many more. Details about this can be found around
74     model_logic.serialize()
757. Send these objects to the slave.
76
77
78On the client side, this will happen:
791. Deserialize the objects sent from the master and persist them to the local
80   database.
812. monitor_db on the shard will pick up these jobs and schedule them on the
82   available hosts (which were retrieved from a heartbeat).
833. Once a job is finished, it's shard_id is set to NULL
844. The shard_client will pick up all jobs where shard_id=NULL and will
85   send them to the master in the request of the next heartbeat.
86   - The master will persist them as described earlier.
87   - the shard_id will be set back to the shard's id, so the record won't be
88     uploaded again.
89   The heartbeat request will also contain the ids of incomplete jobs and the
90   ids of all hosts. This is used to not send objects repeatedly. For more
91   information on this and alternatives considered
92   see rpc_interface.shard_heartbeat.
93"""
94
95
96HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
97_METRICS_PREFIX  = 'chromeos/autotest/shard_client/heartbeat/'
98
99RPC_TIMEOUT_MIN = 5
100RPC_DELAY_SEC = 5
101
102_heartbeat_client = None
103
104
105class ShardClient(object):
106    """Performs client side tasks of sharding, i.e. the heartbeat.
107
108    This class contains the logic to do periodic heartbeats to a global AFE,
109    to retrieve new jobs from it and to report completed jobs back.
110    """
111
112    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
113        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
114                                                 timeout_min=RPC_TIMEOUT_MIN,
115                                                 delay_sec=RPC_DELAY_SEC)
116        self.hostname = shard_hostname
117        self.tick_pause_sec = tick_pause_sec
118        self._shutdown_requested = False
119        self._shard = None
120
121
122    def _deserialize_many(self, serialized_list, djmodel, message):
123        """Deserialize data in JSON format to database.
124
125        Deserialize a list of JSON-formatted data to database using Django.
126
127        @param serialized_list: A list of JSON-formatted data.
128        @param djmodel: Django model type.
129        @param message: A string to be used in a logging message.
130        """
131        for serialized in serialized_list:
132            with transaction.commit_on_success():
133                try:
134                    djmodel.deserialize(serialized)
135                except Exception as e:
136                    logging.error('Deserializing a %s fails: %s, Error: %s',
137                                  message, serialized, e)
138                    metrics.Counter(
139                        'chromeos/autotest/shard_client/deserialization_failed'
140                        ).increment()
141
142
143    @metrics.SecondsTimerDecorator(
144            'chromeos/autotest/shard_client/heartbeat_response_duration')
145    def process_heartbeat_response(self, heartbeat_response):
146        """Save objects returned by a heartbeat to the local database.
147
148        This deseralizes hosts and jobs including their dependencies and saves
149        them to the local database.
150
151        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
152                                   as returned by the `shard_heartbeat` rpc
153                                   call.
154        """
155        hosts_serialized = heartbeat_response['hosts']
156        jobs_serialized = heartbeat_response['jobs']
157        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
158        incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
159
160        metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
161                      ).set(len(hosts_serialized))
162        metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
163                      ).set(len(jobs_serialized))
164        metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
165                      ).set(len(suite_keyvals_serialized))
166
167        self._deserialize_many(hosts_serialized, models.Host, 'host')
168        self._deserialize_many(jobs_serialized, models.Job, 'job')
169        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
170                               'jobkeyval')
171
172        host_ids = [h['id'] for h in hosts_serialized]
173        logging.info('Heartbeat response contains hosts %s', host_ids)
174        job_ids = [j['id'] for j in jobs_serialized]
175        logging.info('Heartbeat response contains jobs %s', job_ids)
176        parent_jobs_with_keyval = set([kv['job_id']
177                                       for kv in suite_keyvals_serialized])
178        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
179                     list(parent_jobs_with_keyval))
180        if incorrect_host_ids:
181            logging.info('Heartbeat response contains incorrect_host_ids %s '
182                         'which will be deleted.', incorrect_host_ids)
183            self._remove_incorrect_hosts(incorrect_host_ids)
184
185        # If the master has just sent any jobs that we think have completed,
186        # re-sync them with the master. This is especially useful when a
187        # heartbeat or job is silently dropped, as the next heartbeat will
188        # have a disagreement. Updating the shard_id to NULL will mark these
189        # jobs for upload on the next heartbeat.
190        job_models = models.Job.objects.filter(
191                id__in=job_ids, hostqueueentry__complete=True)
192        if job_models:
193            job_models.update(shard=None)
194            job_ids_repr = ', '.join([str(job.id) for job in job_models])
195            logging.warn('Following completed jobs are reset shard_id to NULL '
196                         'to be uploaded to master again: %s', job_ids_repr)
197
198
199    def _remove_incorrect_hosts(self, incorrect_host_ids=None):
200        """Remove from local database any hosts that should not exist.
201
202        Entries of |incorrect_host_ids| that are absent from database will be
203        silently ignored.
204
205        @param incorrect_host_ids: a list of ids (as integers) to remove.
206        """
207        if not incorrect_host_ids:
208            return
209
210        models.Host.objects.filter(id__in=incorrect_host_ids).delete()
211
212
213    @property
214    def shard(self):
215        """Return this shard's own shard object, fetched from the database.
216
217        A shard's object is fetched from the master with the first jobs. It will
218        not exist before that time.
219
220        @returns: The shard object if it already exists, otherwise None
221        """
222        if self._shard is None:
223            try:
224                self._shard = models.Shard.smart_get(self.hostname)
225            except models.Shard.DoesNotExist:
226                # This might happen before any jobs are assigned to this shard.
227                # This is okay because then there is nothing to offload anyway.
228                pass
229        return self._shard
230
231
232    def _get_jobs_to_upload(self):
233        jobs = []
234        # The scheduler sets shard to None upon completion of the job.
235        # For more information on the shard field's semantic see
236        # models.Job.shard. We need to be careful to wait for both the
237        # shard_id and the complete bit here, or we will end up syncing
238        # the job without ever setting the complete bit.
239        job_ids = list(models.Job.objects.filter(
240            shard=None,
241            hostqueueentry__complete=True).values_list('pk', flat=True))
242
243        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
244            jobs.append(job_to_upload)
245        return jobs
246
247
248    def _mark_jobs_as_uploaded(self, job_ids):
249        # self.shard might be None if no jobs were downloaded yet.
250        # But then job_ids is empty, so this is harmless.
251        # Even if there were jobs we'd in the worst case upload them twice.
252        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
253
254
255    def _get_hqes_for_jobs(self, jobs):
256        hqes = []
257        for job in jobs:
258            hqes.extend(job.hostqueueentry_set.all())
259        return hqes
260
261
262    def _get_known_jobs_and_hosts(self):
263        """Returns lists of host and job info to send in a heartbeat.
264
265        The host and job ids are ids of objects that are already present on the
266        shard and therefore don't need to be sent again.
267
268        For jobs, only incomplete jobs are sent, as the master won't send
269        already completed jobs anyway. This helps keeping the list of id's
270        considerably small.
271
272        For hosts, host status in addition to host id are sent to master
273        to sync the host status.
274
275        @returns: Tuple of three lists. The first one contains job ids, the
276                  second one host ids, and the third one host statuses.
277        """
278        job_ids = list(models.Job.objects.filter(
279                hostqueueentry__complete=False).values_list('id', flat=True))
280        host_models = models.Host.objects.filter(invalid=0)
281        host_ids = []
282        host_statuses = []
283        for h in host_models:
284            host_ids.append(h.id)
285            host_statuses.append(h.status)
286        return job_ids, host_ids, host_statuses
287
288
289    def _heartbeat_packet(self):
290        """Construct the heartbeat packet.
291
292        See rpc_interface for a more detailed description of the heartbeat.
293
294        @return: A heartbeat packet.
295        """
296        known_job_ids, known_host_ids, known_host_statuses = (
297                self._get_known_jobs_and_hosts())
298        logging.info('Known jobs: %s', known_job_ids)
299
300        job_objs = self._get_jobs_to_upload()
301        hqes = [hqe.serialize(include_dependencies=False)
302                for hqe in self._get_hqes_for_jobs(job_objs)]
303        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
304        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
305
306        return {'shard_hostname': self.hostname,
307                'known_job_ids': known_job_ids,
308                'known_host_ids': known_host_ids,
309                'known_host_statuses': known_host_statuses,
310                'jobs': jobs, 'hqes': hqes}
311
312
313    def _report_packet_metrics(self, packet):
314        """Report stats about outgoing packet to monarch."""
315        metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set(
316            len(packet['known_job_ids']))
317        metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set(
318            len(packet['jobs']))
319        metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set(
320            len(packet['known_host_ids']))
321
322
323    def _heartbeat_failure(self, log_message, failure_type_str=''):
324        logging.error("Heartbeat failed. %s", log_message)
325        metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
326                        ).increment(fields={'failure_type': failure_type_str})
327
328
329    @metrics.SecondsTimerDecorator(
330            'chromeos/autotest/shard_client/do_heatbeat_duration')
331    def do_heartbeat(self):
332        """Perform a heartbeat: Retreive new jobs.
333
334        This function executes a `shard_heartbeat` RPC. It retrieves the
335        response of this call and processes the response by storing the returned
336        objects in the local database.
337
338        Returns: True if the heartbeat ran successfully, False otherwise.
339        """
340
341        logging.info("Performing heartbeat.")
342        packet = self._heartbeat_packet()
343        self._report_packet_metrics(packet)
344        metrics.Gauge(_METRICS_PREFIX + 'request_size').set(
345            len(str(packet)))
346
347        try:
348            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
349        except urllib2.HTTPError as e:
350            self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
351                                    'HTTPError')
352            return False
353        except urllib2.URLError as e:
354            self._heartbeat_failure('URLError: %s' % e.reason,
355                                    'URLError')
356            return False
357        except httplib.HTTPException as e:
358            self._heartbeat_failure('HTTPException: %s' % e,
359                                    'HTTPException')
360            return False
361        except timeout_util.TimeoutError as e:
362            self._heartbeat_failure('TimeoutError: %s' % e,
363                                    'TimeoutError')
364            return False
365        except proxy.JSONRPCException as e:
366            self._heartbeat_failure('JSONRPCException: %s' % e,
367                                    'JSONRPCException')
368            return False
369
370        metrics.Gauge(_METRICS_PREFIX + 'response_size').set(
371            len(str(response)))
372        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
373        self.process_heartbeat_response(response)
374        logging.info("Heartbeat completed.")
375        return True
376
377
378    def tick(self):
379        """Performs all tasks the shard clients needs to do periodically."""
380        success = self.do_heartbeat()
381        if success:
382            metrics.Counter('chromeos/autotest/shard_client/tick').increment()
383
384
385    def loop(self, lifetime_hours):
386        """Calls tick() until shutdown() is called or lifetime expires.
387
388        @param lifetime_hours: (int) hours to loop for.
389        """
390        loop_start_time = time.time()
391        while self._continue_looping(lifetime_hours, loop_start_time):
392            self.tick()
393            # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
394            tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
395            time.sleep(self.tick_pause_sec + tick_fuzz)
396
397
398    def shutdown(self):
399        """Stops the shard client after the current tick."""
400        logging.info("Shutdown request received.")
401        self._shutdown_requested = True
402
403
404    def _continue_looping(self, lifetime_hours, loop_start_time):
405        """Determines if we should continue with the next mainloop iteration.
406
407        @param lifetime_hours: (float) number of hours to loop for. None
408                implies no deadline.
409        @param process_start_time: Time when we started looping.
410        @returns True if we should continue looping, False otherwise.
411        """
412        if self._shutdown_requested:
413            return False
414
415        if (lifetime_hours is None
416            or time.time() - loop_start_time < lifetime_hours * 3600):
417            return True
418        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
419                     lifetime_hours)
420        return False
421
422
423def handle_signal(signum, frame):
424    """Sigint handler so we don't crash mid-tick."""
425    _heartbeat_client.shutdown()
426
427
428def _get_shard_hostname_and_ensure_running_on_shard():
429    """Read the hostname the local shard from the global configuration.
430
431    Raise an exception if run from elsewhere than a shard.
432
433    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
434            elsewhere than from a shard.
435    """
436    hostname = global_config.global_config.get_config_value(
437        'SHARD', 'shard_hostname', default=None)
438    if not hostname:
439        raise error.HeartbeatOnlyAllowedInShardModeException(
440            'To run the shard client, shard_hostname must neither be None nor '
441            'empty.')
442    return hostname
443
444
445def _get_tick_pause_sec():
446    """Read pause to make between two ticks from the global configuration."""
447    return global_config.global_config.get_config_value(
448        'SHARD', 'heartbeat_pause_sec', type=float)
449
450
451def get_shard_client():
452    """Instantiate a shard client instance.
453
454    Configuration values will be read from the global configuration.
455
456    @returns A shard client instance.
457    """
458    global_afe_hostname = server_utils.get_global_afe_hostname()
459    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
460    tick_pause_sec = _get_tick_pause_sec()
461    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
462
463
464def main():
465    parser = argparse.ArgumentParser(description='Shard client.')
466    parser.add_argument(
467            '--lifetime-hours',
468            type=float,
469            default=None,
470            help='If provided, number of hours we should run for. '
471                 'At the expiry of this time, the process will exit '
472                 'gracefully.',
473    )
474    parser.add_argument(
475            '--metrics-file',
476            help='If provided, drop metrics to this local file instead of '
477                 'reporting to ts_mon',
478            type=str,
479            default=None,
480    )
481    options = parser.parse_args()
482
483    with ts_mon_config.SetupTsMonGlobalState(
484          'shard_client',
485          indirect=True,
486          debug_file=options.metrics_file,
487    ):
488        try:
489            metrics.Counter('chromeos/autotest/shard_client/start').increment()
490            main_without_exception_handling(options)
491        except Exception as e:
492            metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
493                            ).increment()
494            message = 'Uncaught exception. Terminating shard_client.'
495            email_manager.manager.log_stacktrace(message)
496            logging.exception(message)
497            raise
498        finally:
499            email_manager.manager.send_queued_emails()
500
501
502def main_without_exception_handling(options):
503    scheduler_lib.setup_logging(
504            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
505            None, timestamped_logfile_prefix='shard_client')
506
507    logging.info("Setting signal handler.")
508    signal.signal(signal.SIGINT, handle_signal)
509    signal.signal(signal.SIGTERM, handle_signal)
510
511    logging.info("Starting shard client.")
512    global _heartbeat_client
513    _heartbeat_client = get_shard_client()
514    _heartbeat_client.loop(options.lifetime_hours)
515
516
517if __name__ == '__main__':
518    main()
519