• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright Martin J. Bligh, Google Inc 2008
2# Released under the GPL v2
3
4"""
5This class allows you to communicate with the frontend to submit jobs etc
6It is designed for writing more sophisiticated server-side control files that
7can recursively add and manage other jobs.
8
9We turn the JSON dictionaries into real objects that are more idiomatic
10
11For docs, see:
12    http://www.chromium.org/chromium-os/testing/afe-rpc-infrastructure
13    http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api
14"""
15
16import getpass
17import os
18import re
19import time
20import traceback
21
22import common
23from autotest_lib.frontend.afe import rpc_client_lib
24from autotest_lib.client.common_lib import control_data
25from autotest_lib.client.common_lib import global_config
26from autotest_lib.client.common_lib import utils
27from autotest_lib.client.common_lib.cros.graphite import autotest_stats
28from autotest_lib.tko import db
29
30
31try:
32    from autotest_lib.server.site_common import site_utils as server_utils
33except:
34    from autotest_lib.server import utils as server_utils
35form_ntuples_from_machines = server_utils.form_ntuples_from_machines
36
37GLOBAL_CONFIG = global_config.global_config
38DEFAULT_SERVER = 'autotest'
39
40_tko_timer = autotest_stats.Timer('tko')
41
42def dump_object(header, obj):
43    """
44    Standard way to print out the frontend objects (eg job, host, acl, label)
45    in a human-readable fashion for debugging
46    """
47    result = header + '\n'
48    for key in obj.hash:
49        if key == 'afe' or key == 'hash':
50            continue
51        result += '%20s: %s\n' % (key, obj.hash[key])
52    return result
53
54
55class RpcClient(object):
56    """
57    Abstract RPC class for communicating with the autotest frontend
58    Inherited for both TKO and AFE uses.
59
60    All the constructors go in the afe / tko class.
61    Manipulating methods go in the object classes themselves
62    """
63    def __init__(self, path, user, server, print_log, debug, reply_debug):
64        """
65        Create a cached instance of a connection to the frontend
66
67            user: username to connect as
68            server: frontend server to connect to
69            print_log: pring a logging message to stdout on every operation
70            debug: print out all RPC traffic
71        """
72        if not user and utils.is_in_container():
73            user = GLOBAL_CONFIG.get_config_value('SSP', 'user', default=None)
74        if not user:
75            user = getpass.getuser()
76        if not server:
77            if 'AUTOTEST_WEB' in os.environ:
78                server = os.environ['AUTOTEST_WEB']
79            else:
80                server = GLOBAL_CONFIG.get_config_value('SERVER', 'hostname',
81                                                        default=DEFAULT_SERVER)
82        self.server = server
83        self.user = user
84        self.print_log = print_log
85        self.debug = debug
86        self.reply_debug = reply_debug
87        headers = {'AUTHORIZATION': self.user}
88        rpc_server = 'http://' + server + path
89        if debug:
90            print 'SERVER: %s' % rpc_server
91            print 'HEADERS: %s' % headers
92        self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers)
93
94
95    def run(self, call, **dargs):
96        """
97        Make a RPC call to the AFE server
98        """
99        rpc_call = getattr(self.proxy, call)
100        if self.debug:
101            print 'DEBUG: %s %s' % (call, dargs)
102        try:
103            result = utils.strip_unicode(rpc_call(**dargs))
104            if self.reply_debug:
105                print result
106            return result
107        except Exception:
108            print 'FAILED RPC CALL: %s %s' % (call, dargs)
109            raise
110
111
112    def log(self, message):
113        if self.print_log:
114            print message
115
116
117class Planner(RpcClient):
118    def __init__(self, user=None, server=None, print_log=True, debug=False,
119                 reply_debug=False):
120        super(Planner, self).__init__(path='/planner/server/rpc/',
121                                      user=user,
122                                      server=server,
123                                      print_log=print_log,
124                                      debug=debug,
125                                      reply_debug=reply_debug)
126
127
128class TKO(RpcClient):
129    def __init__(self, user=None, server=None, print_log=True, debug=False,
130                 reply_debug=False):
131        super(TKO, self).__init__(path='/new_tko/server/noauth/rpc/',
132                                  user=user,
133                                  server=server,
134                                  print_log=print_log,
135                                  debug=debug,
136                                  reply_debug=reply_debug)
137        self._db = None
138
139
140    @_tko_timer.decorate
141    def get_job_test_statuses_from_db(self, job_id):
142        """Get job test statuses from the database.
143
144        Retrieve a set of fields from a job that reflect the status of each test
145        run within a job.
146        fields retrieved: status, test_name, reason, test_started_time,
147                          test_finished_time, afe_job_id, job_owner, hostname.
148
149        @param job_id: The afe job id to look up.
150        @returns a TestStatus object of the resulting information.
151        """
152        if self._db is None:
153            self._db = db.db()
154        fields = ['status', 'test_name', 'subdir', 'reason',
155                  'test_started_time', 'test_finished_time', 'afe_job_id',
156                  'job_owner', 'hostname', 'job_tag']
157        table = 'tko_test_view_2'
158        where = 'job_tag like "%s-%%"' % job_id
159        test_status = []
160        # Run commit before we query to ensure that we are pulling the latest
161        # results.
162        self._db.commit()
163        for entry in self._db.select(','.join(fields), table, (where, None)):
164            status_dict = {}
165            for key,value in zip(fields, entry):
166                # All callers expect values to be a str object.
167                status_dict[key] = str(value)
168            # id is used by TestStatus to uniquely identify each Test Status
169            # obj.
170            status_dict['id'] = [status_dict['reason'], status_dict['hostname'],
171                                 status_dict['test_name']]
172            test_status.append(status_dict)
173
174        return [TestStatus(self, e) for e in test_status]
175
176
177    def get_status_counts(self, job, **data):
178        entries = self.run('get_status_counts',
179                           group_by=['hostname', 'test_name', 'reason'],
180                           job_tag__startswith='%s-' % job, **data)
181        return [TestStatus(self, e) for e in entries['groups']]
182
183
184class AFE(RpcClient):
185    def __init__(self, user=None, server=None, print_log=True, debug=False,
186                 reply_debug=False, job=None):
187        self.job = job
188        super(AFE, self).__init__(path='/afe/server/noauth/rpc/',
189                                  user=user,
190                                  server=server,
191                                  print_log=print_log,
192                                  debug=debug,
193                                  reply_debug=reply_debug)
194
195
196    def host_statuses(self, live=None):
197        dead_statuses = ['Repair Failed', 'Repairing']
198        statuses = self.run('get_static_data')['host_statuses']
199        if live == True:
200            return list(set(statuses) - set(dead_statuses))
201        if live == False:
202            return dead_statuses
203        else:
204            return statuses
205
206
207    @staticmethod
208    def _dict_for_host_query(hostnames=(), status=None, label=None):
209        query_args = {}
210        if hostnames:
211            query_args['hostname__in'] = hostnames
212        if status:
213            query_args['status'] = status
214        if label:
215            query_args['labels__name'] = label
216        return query_args
217
218
219    def get_hosts(self, hostnames=(), status=None, label=None, **dargs):
220        query_args = dict(dargs)
221        query_args.update(self._dict_for_host_query(hostnames=hostnames,
222                                                    status=status,
223                                                    label=label))
224        hosts = self.run('get_hosts', **query_args)
225        return [Host(self, h) for h in hosts]
226
227
228    def get_hostnames(self, status=None, label=None, **dargs):
229        """Like get_hosts() but returns hostnames instead of Host objects."""
230        # This implementation can be replaced with a more efficient one
231        # that does not query for entire host objects in the future.
232        return [host_obj.hostname for host_obj in
233                self.get_hosts(status=status, label=label, **dargs)]
234
235
236    def reverify_hosts(self, hostnames=(), status=None, label=None):
237        query_args = dict(locked=False,
238                          aclgroup__users__login=self.user)
239        query_args.update(self._dict_for_host_query(hostnames=hostnames,
240                                                    status=status,
241                                                    label=label))
242        return self.run('reverify_hosts', **query_args)
243
244
245    def create_host(self, hostname, **dargs):
246        id = self.run('add_host', hostname=hostname, **dargs)
247        return self.get_hosts(id=id)[0]
248
249
250    def get_host_attribute(self, attr, **dargs):
251        host_attrs = self.run('get_host_attribute', attribute=attr, **dargs)
252        return [HostAttribute(self, a) for a in host_attrs]
253
254
255    def set_host_attribute(self, attr, val, **dargs):
256        self.run('set_host_attribute', attribute=attr, value=val, **dargs)
257
258
259    def get_labels(self, **dargs):
260        labels = self.run('get_labels', **dargs)
261        return [Label(self, l) for l in labels]
262
263
264    def create_label(self, name, **dargs):
265        id = self.run('add_label', name=name, **dargs)
266        return self.get_labels(id=id)[0]
267
268
269    def get_acls(self, **dargs):
270        acls = self.run('get_acl_groups', **dargs)
271        return [Acl(self, a) for a in acls]
272
273
274    def create_acl(self, name, **dargs):
275        id = self.run('add_acl_group', name=name, **dargs)
276        return self.get_acls(id=id)[0]
277
278
279    def get_users(self, **dargs):
280        users = self.run('get_users', **dargs)
281        return [User(self, u) for u in users]
282
283
284    def generate_control_file(self, tests, **dargs):
285        ret = self.run('generate_control_file', tests=tests, **dargs)
286        return ControlFile(self, ret)
287
288
289    def get_jobs(self, summary=False, **dargs):
290        if summary:
291            jobs_data = self.run('get_jobs_summary', **dargs)
292        else:
293            jobs_data = self.run('get_jobs', **dargs)
294        jobs = []
295        for j in jobs_data:
296            job = Job(self, j)
297            # Set up some extra information defaults
298            job.testname = re.sub('\s.*', '', job.name) # arbitrary default
299            job.platform_results = {}
300            job.platform_reasons = {}
301            jobs.append(job)
302        return jobs
303
304
305    def get_host_queue_entries(self, **data):
306        entries = self.run('get_host_queue_entries', **data)
307        job_statuses = [JobStatus(self, e) for e in entries]
308
309        # Sadly, get_host_queue_entries doesn't return platforms, we have
310        # to get those back from an explicit get_hosts queury, then patch
311        # the new host objects back into the host list.
312        hostnames = [s.host.hostname for s in job_statuses if s.host]
313        host_hash = {}
314        for host in self.get_hosts(hostname__in=hostnames):
315            host_hash[host.hostname] = host
316        for status in job_statuses:
317            if status.host:
318                status.host = host_hash.get(status.host.hostname)
319        # filter job statuses that have either host or meta_host
320        return [status for status in job_statuses if (status.host or
321                                                      status.meta_host)]
322
323
324    def get_special_tasks(self, **data):
325        tasks = self.run('get_special_tasks', **data)
326        return [SpecialTask(self, t) for t in tasks]
327
328
329    def get_host_special_tasks(self, host_id, **data):
330        tasks = self.run('get_host_special_tasks',
331                         host_id=host_id, **data)
332        return [SpecialTask(self, t) for t in tasks]
333
334
335    def get_host_status_task(self, host_id, end_time):
336        task = self.run('get_host_status_task',
337                        host_id=host_id, end_time=end_time)
338        return SpecialTask(self, task) if task else None
339
340
341    def get_host_diagnosis_interval(self, host_id, end_time, success):
342        return self.run('get_host_diagnosis_interval',
343                        host_id=host_id, end_time=end_time,
344                        success=success)
345
346
347    def create_job_by_test(self, tests, kernel=None, use_container=False,
348                           kernel_cmdline=None, **dargs):
349        """
350        Given a test name, fetch the appropriate control file from the server
351        and submit it.
352
353        @param kernel: A comma separated list of kernel versions to boot.
354        @param kernel_cmdline: The command line used to boot all kernels listed
355                in the kernel parameter.
356
357        Returns a list of job objects
358        """
359        assert ('hosts' in dargs or
360                'atomic_group_name' in dargs and 'synch_count' in dargs)
361        if kernel:
362            kernel_list =  re.split('[\s,]+', kernel.strip())
363            kernel_info = []
364            for version in kernel_list:
365                kernel_dict = {'version': version}
366                if kernel_cmdline is not None:
367                    kernel_dict['cmdline'] = kernel_cmdline
368                kernel_info.append(kernel_dict)
369        else:
370            kernel_info = None
371        control_file = self.generate_control_file(
372                tests=tests, kernel=kernel_info, use_container=use_container)
373        if control_file.is_server:
374            dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER
375        else:
376            dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT
377        dargs['dependencies'] = dargs.get('dependencies', []) + \
378                                control_file.dependencies
379        dargs['control_file'] = control_file.control_file
380        if not dargs.get('synch_count', None):
381            dargs['synch_count'] = control_file.synch_count
382        if 'hosts' in dargs and len(dargs['hosts']) < dargs['synch_count']:
383            # will not be able to satisfy this request
384            return None
385        return self.create_job(**dargs)
386
387
388    def create_job(self, control_file, name=' ', priority='Medium',
389                control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs):
390        id = self.run('create_job', name=name, priority=priority,
391                 control_file=control_file, control_type=control_type, **dargs)
392        return self.get_jobs(id=id)[0]
393
394
395    def run_test_suites(self, pairings, kernel, kernel_label=None,
396                        priority='Medium', wait=True, poll_interval=10,
397                        email_from=None, email_to=None, timeout_mins=10080,
398                        max_runtime_mins=10080, kernel_cmdline=None):
399        """
400        Run a list of test suites on a particular kernel.
401
402        Poll for them to complete, and return whether they worked or not.
403
404        @param pairings: List of MachineTestPairing objects to invoke.
405        @param kernel: Name of the kernel to run.
406        @param kernel_label: Label (string) of the kernel to run such as
407                    '<kernel-version> : <config> : <date>'
408                    If any pairing object has its job_label attribute set it
409                    will override this value for that particular job.
410        @param kernel_cmdline: The command line to boot the kernel(s) with.
411        @param wait: boolean - Wait for the results to come back?
412        @param poll_interval: Interval between polling for job results (in mins)
413        @param email_from: Send notification email upon completion from here.
414        @param email_from: Send notification email upon completion to here.
415        """
416        jobs = []
417        for pairing in pairings:
418            try:
419                new_job = self.invoke_test(pairing, kernel, kernel_label,
420                                           priority, timeout_mins=timeout_mins,
421                                           kernel_cmdline=kernel_cmdline,
422                                           max_runtime_mins=max_runtime_mins)
423                if not new_job:
424                    continue
425                jobs.append(new_job)
426            except Exception, e:
427                traceback.print_exc()
428        if not wait or not jobs:
429            return
430        tko = TKO()
431        while True:
432            time.sleep(60 * poll_interval)
433            result = self.poll_all_jobs(tko, jobs, email_from, email_to)
434            if result is not None:
435                return result
436
437
438    def result_notify(self, job, email_from, email_to):
439        """
440        Notify about the result of a job. Will always print, if email data
441        is provided, will send email for it as well.
442
443            job: job object to notify about
444            email_from: send notification email upon completion from here
445            email_from: send notification email upon completion to here
446        """
447        if job.result == True:
448            subject = 'Testing PASSED: '
449        else:
450            subject = 'Testing FAILED: '
451        subject += '%s : %s\n' % (job.name, job.id)
452        text = []
453        for platform in job.results_platform_map:
454            for status in job.results_platform_map[platform]:
455                if status == 'Total':
456                    continue
457                for host in job.results_platform_map[platform][status]:
458                    text.append('%20s %10s %10s' % (platform, status, host))
459                    if status == 'Failed':
460                        for test_status in job.test_status[host].fail:
461                            text.append('(%s, %s) : %s' % \
462                                        (host, test_status.test_name,
463                                         test_status.reason))
464                        text.append('')
465
466        base_url = 'http://' + self.server
467
468        params = ('columns=test',
469                  'rows=machine_group',
470                  "condition=tag~'%s-%%25'" % job.id,
471                  'title=Report')
472        query_string = '&'.join(params)
473        url = '%s/tko/compose_query.cgi?%s' % (base_url, query_string)
474        text.append(url + '\n')
475        url = '%s/afe/#tab_id=view_job&object_id=%s' % (base_url, job.id)
476        text.append(url + '\n')
477
478        body = '\n'.join(text)
479        print '---------------------------------------------------'
480        print 'Subject: ', subject
481        print body
482        print '---------------------------------------------------'
483        if email_from and email_to:
484            print 'Sending email ...'
485            utils.send_email(email_from, email_to, subject, body)
486        print
487
488
489    def print_job_result(self, job):
490        """
491        Print the result of a single job.
492            job: a job object
493        """
494        if job.result is None:
495            print 'PENDING',
496        elif job.result == True:
497            print 'PASSED',
498        elif job.result == False:
499            print 'FAILED',
500        elif job.result == "Abort":
501            print 'ABORT',
502        print ' %s : %s' % (job.id, job.name)
503
504
505    def poll_all_jobs(self, tko, jobs, email_from=None, email_to=None):
506        """
507        Poll all jobs in a list.
508            jobs: list of job objects to poll
509            email_from: send notification email upon completion from here
510            email_from: send notification email upon completion to here
511
512        Returns:
513            a) All complete successfully (return True)
514            b) One or more has failed (return False)
515            c) Cannot tell yet (return None)
516        """
517        results = []
518        for job in jobs:
519            if getattr(job, 'result', None) is None:
520                job.result = self.poll_job_results(tko, job)
521                if job.result is not None:
522                    self.result_notify(job, email_from, email_to)
523
524            results.append(job.result)
525            self.print_job_result(job)
526
527        if None in results:
528            return None
529        elif False in results or "Abort" in results:
530            return False
531        else:
532            return True
533
534
535    def _included_platform(self, host, platforms):
536        """
537        See if host's platforms matches any of the patterns in the included
538        platforms list.
539        """
540        if not platforms:
541            return True        # No filtering of platforms
542        for platform in platforms:
543            if re.search(platform, host.platform):
544                return True
545        return False
546
547
548    def invoke_test(self, pairing, kernel, kernel_label, priority='Medium',
549                    kernel_cmdline=None, **dargs):
550        """
551        Given a pairing of a control file to a machine label, find all machines
552        with that label, and submit that control file to them.
553
554        @param kernel_label: Label (string) of the kernel to run such as
555                '<kernel-version> : <config> : <date>'
556                If any pairing object has its job_label attribute set it
557                will override this value for that particular job.
558
559        @returns A list of job objects.
560        """
561        # The pairing can override the job label.
562        if pairing.job_label:
563            kernel_label = pairing.job_label
564        job_name = '%s : %s' % (pairing.machine_label, kernel_label)
565        hosts = self.get_hosts(multiple_labels=[pairing.machine_label])
566        platforms = pairing.platforms
567        hosts = [h for h in hosts if self._included_platform(h, platforms)]
568        dead_statuses = self.host_statuses(live=False)
569        host_list = [h.hostname for h in hosts if h.status not in dead_statuses]
570        print 'HOSTS: %s' % host_list
571        if pairing.atomic_group_sched:
572            dargs['synch_count'] = pairing.synch_count
573            dargs['atomic_group_name'] = pairing.machine_label
574        else:
575            dargs['hosts'] = host_list
576        new_job = self.create_job_by_test(name=job_name,
577                                          dependencies=[pairing.machine_label],
578                                          tests=[pairing.control_file],
579                                          priority=priority,
580                                          kernel=kernel,
581                                          kernel_cmdline=kernel_cmdline,
582                                          use_container=pairing.container,
583                                          **dargs)
584        if new_job:
585            if pairing.testname:
586                new_job.testname = pairing.testname
587            print 'Invoked test %s : %s' % (new_job.id, job_name)
588        return new_job
589
590
591    def _job_test_results(self, tko, job, debug, tests=[]):
592        """
593        Retrieve test results for a job
594        """
595        job.test_status = {}
596        try:
597            test_statuses = tko.get_status_counts(job=job.id)
598        except Exception:
599            print "Ignoring exception on poll job; RPC interface is flaky"
600            traceback.print_exc()
601            return
602
603        for test_status in test_statuses:
604            # SERVER_JOB is buggy, and often gives false failures. Ignore it.
605            if test_status.test_name == 'SERVER_JOB':
606                continue
607            # if tests is not empty, restrict list of test_statuses to tests
608            if tests and test_status.test_name not in tests:
609                continue
610            if debug:
611                print test_status
612            hostname = test_status.hostname
613            if hostname not in job.test_status:
614                job.test_status[hostname] = TestResults()
615            job.test_status[hostname].add(test_status)
616
617
618    def _job_results_platform_map(self, job, debug):
619        # Figure out which hosts passed / failed / aborted in a job
620        # Creates a 2-dimensional hash, stored as job.results_platform_map
621        #     1st index - platform type (string)
622        #     2nd index - Status (string)
623        #         'Completed' / 'Failed' / 'Aborted'
624        #     Data indexed by this hash is a list of hostnames (text strings)
625        job.results_platform_map = {}
626        try:
627            job_statuses = self.get_host_queue_entries(job=job.id)
628        except Exception:
629            print "Ignoring exception on poll job; RPC interface is flaky"
630            traceback.print_exc()
631            return None
632
633        platform_map = {}
634        job.job_status = {}
635        job.metahost_index = {}
636        for job_status in job_statuses:
637            # This is basically "for each host / metahost in the job"
638            if job_status.host:
639                hostname = job_status.host.hostname
640            else:              # This is a metahost
641                metahost = job_status.meta_host
642                index = job.metahost_index.get(metahost, 1)
643                job.metahost_index[metahost] = index + 1
644                hostname = '%s.%s' % (metahost, index)
645            job.job_status[hostname] = job_status.status
646            status = job_status.status
647            # Skip hosts that failed verify or repair:
648            # that's a machine failure, not a job failure
649            if hostname in job.test_status:
650                verify_failed = False
651                for failure in job.test_status[hostname].fail:
652                    if (failure.test_name == 'verify' or
653                            failure.test_name == 'repair'):
654                        verify_failed = True
655                        break
656                if verify_failed:
657                    continue
658            if hostname in job.test_status and job.test_status[hostname].fail:
659                # If the any tests failed in the job, we want to mark the
660                # job result as failed, overriding the default job status.
661                if status != "Aborted":         # except if it's an aborted job
662                    status = 'Failed'
663            if job_status.host:
664                platform = job_status.host.platform
665            else:              # This is a metahost
666                platform = job_status.meta_host
667            if platform not in platform_map:
668                platform_map[platform] = {'Total' : [hostname]}
669            else:
670                platform_map[platform]['Total'].append(hostname)
671            new_host_list = platform_map[platform].get(status, []) + [hostname]
672            platform_map[platform][status] = new_host_list
673        job.results_platform_map = platform_map
674
675
676    def set_platform_results(self, test_job, platform, result):
677        """
678        Result must be None, 'FAIL', 'WARN' or 'GOOD'
679        """
680        if test_job.platform_results[platform] is not None:
681            # We're already done, and results recorded. This can't change later.
682            return
683        test_job.platform_results[platform] = result
684        # Note that self.job refers to the metajob we're IN, not the job
685        # that we're excuting from here.
686        testname = '%s.%s' % (test_job.testname, platform)
687        if self.job:
688            self.job.record(result, None, testname, status='')
689
690
691    def poll_job_results(self, tko, job, enough=1, debug=False):
692        """
693        Analyse all job results by platform
694
695          params:
696            tko: a TKO object representing the results DB.
697            job: the job to be examined.
698            enough: the acceptable delta between the number of completed
699                    tests and the total number of tests.
700            debug: enable debugging output.
701
702          returns:
703            False: if any platform has more than |enough| failures
704            None:  if any platform has less than |enough| machines
705                   not yet Good.
706            True:  if all platforms have at least |enough| machines
707                   Good.
708        """
709        self._job_test_results(tko, job, debug)
710        if job.test_status == {}:
711            return None
712        self._job_results_platform_map(job, debug)
713
714        good_platforms = []
715        failed_platforms = []
716        aborted_platforms = []
717        unknown_platforms = []
718        platform_map = job.results_platform_map
719        for platform in platform_map:
720            if not job.platform_results.has_key(platform):
721                # record test start, but there's no way to do this right now
722                job.platform_results[platform] = None
723            total = len(platform_map[platform]['Total'])
724            completed = len(platform_map[platform].get('Completed', []))
725            failed = len(platform_map[platform].get('Failed', []))
726            aborted = len(platform_map[platform].get('Aborted', []))
727
728            # We set up what we want to record here, but don't actually do
729            # it yet, until we have a decisive answer for this platform
730            if aborted or failed:
731                bad = aborted + failed
732                if (bad > 1) or (bad * 2 >= total):
733                    platform_test_result = 'FAIL'
734                else:
735                    platform_test_result = 'WARN'
736
737            if aborted > enough:
738                aborted_platforms.append(platform)
739                self.set_platform_results(job, platform, platform_test_result)
740            elif (failed * 2 >= total) or (failed > enough):
741                failed_platforms.append(platform)
742                self.set_platform_results(job, platform, platform_test_result)
743            elif (completed >= enough) and (completed + enough >= total):
744                good_platforms.append(platform)
745                self.set_platform_results(job, platform, 'GOOD')
746            else:
747                unknown_platforms.append(platform)
748            detail = []
749            for status in platform_map[platform]:
750                if status == 'Total':
751                    continue
752                detail.append('%s=%s' % (status,platform_map[platform][status]))
753            if debug:
754                print '%20s %d/%d %s' % (platform, completed, total,
755                                         ' '.join(detail))
756                print
757
758        if len(aborted_platforms) > 0:
759            if debug:
760                print 'Result aborted - platforms: ',
761                print ' '.join(aborted_platforms)
762            return "Abort"
763        if len(failed_platforms) > 0:
764            if debug:
765                print 'Result bad - platforms: ' + ' '.join(failed_platforms)
766            return False
767        if len(unknown_platforms) > 0:
768            if debug:
769                platform_list = ' '.join(unknown_platforms)
770                print 'Result unknown - platforms: ', platform_list
771            return None
772        if debug:
773            platform_list = ' '.join(good_platforms)
774            print 'Result good - all platforms passed: ', platform_list
775        return True
776
777
778    def abort_jobs(self, jobs):
779        """Abort a list of jobs.
780
781        Already completed jobs will not be affected.
782
783        @param jobs: List of job ids to abort.
784        """
785        for job in jobs:
786            self.run('abort_host_queue_entries', job_id=job)
787
788
789class TestResults(object):
790    """
791    Container class used to hold the results of the tests for a job
792    """
793    def __init__(self):
794        self.good = []
795        self.fail = []
796        self.pending = []
797
798
799    def add(self, result):
800        if result.complete_count > result.pass_count:
801            self.fail.append(result)
802        elif result.incomplete_count > 0:
803            self.pending.append(result)
804        else:
805            self.good.append(result)
806
807
808class RpcObject(object):
809    """
810    Generic object used to construct python objects from rpc calls
811    """
812    def __init__(self, afe, hash):
813        self.afe = afe
814        self.hash = hash
815        self.__dict__.update(hash)
816
817
818    def __str__(self):
819        return dump_object(self.__repr__(), self)
820
821
822class ControlFile(RpcObject):
823    """
824    AFE control file object
825
826    Fields: synch_count, dependencies, control_file, is_server
827    """
828    def __repr__(self):
829        return 'CONTROL FILE: %s' % self.control_file
830
831
832class Label(RpcObject):
833    """
834    AFE label object
835
836    Fields:
837        name, invalid, platform, kernel_config, id, only_if_needed
838    """
839    def __repr__(self):
840        return 'LABEL: %s' % self.name
841
842
843    def add_hosts(self, hosts):
844        return self.afe.run('label_add_hosts', id=self.id, hosts=hosts)
845
846
847    def remove_hosts(self, hosts):
848        return self.afe.run('label_remove_hosts', id=self.id, hosts=hosts)
849
850
851class Acl(RpcObject):
852    """
853    AFE acl object
854
855    Fields:
856        users, hosts, description, name, id
857    """
858    def __repr__(self):
859        return 'ACL: %s' % self.name
860
861
862    def add_hosts(self, hosts):
863        self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name))
864        return self.afe.run('acl_group_add_hosts', self.id, hosts)
865
866
867    def remove_hosts(self, hosts):
868        self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name))
869        return self.afe.run('acl_group_remove_hosts', self.id, hosts)
870
871
872    def add_users(self, users):
873        self.afe.log('Adding users %s to ACL %s' % (users, self.name))
874        return self.afe.run('acl_group_add_users', id=self.name, users=users)
875
876
877class Job(RpcObject):
878    """
879    AFE job object
880
881    Fields:
882        name, control_file, control_type, synch_count, reboot_before,
883        run_verify, priority, email_list, created_on, dependencies,
884        timeout, owner, reboot_after, id
885    """
886    def __repr__(self):
887        return 'JOB: %s' % self.id
888
889
890class JobStatus(RpcObject):
891    """
892    AFE job_status object
893
894    Fields:
895        status, complete, deleted, meta_host, host, active, execution_subdir, id
896    """
897    def __init__(self, afe, hash):
898        super(JobStatus, self).__init__(afe, hash)
899        self.job = Job(afe, self.job)
900        if getattr(self, 'host'):
901            self.host = Host(afe, self.host)
902
903
904    def __repr__(self):
905        if self.host and self.host.hostname:
906            hostname = self.host.hostname
907        else:
908            hostname = 'None'
909        return 'JOB STATUS: %s-%s' % (self.job.id, hostname)
910
911
912class SpecialTask(RpcObject):
913    """
914    AFE special task object
915    """
916    def __init__(self, afe, hash):
917        super(SpecialTask, self).__init__(afe, hash)
918        self.host = Host(afe, self.host)
919
920
921    def __repr__(self):
922        return 'SPECIAL TASK: %s' % self.id
923
924
925class Host(RpcObject):
926    """
927    AFE host object
928
929    Fields:
930        status, lock_time, locked_by, locked, hostname, invalid,
931        synch_id, labels, platform, protection, dirty, id
932    """
933    def __repr__(self):
934        return 'HOST OBJECT: %s' % self.hostname
935
936
937    def show(self):
938        labels = list(set(self.labels) - set([self.platform]))
939        print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status,
940                                           self.locked, self.platform,
941                                           ', '.join(labels))
942
943
944    def delete(self):
945        return self.afe.run('delete_host', id=self.id)
946
947
948    def modify(self, **dargs):
949        return self.afe.run('modify_host', id=self.id, **dargs)
950
951
952    def get_acls(self):
953        return self.afe.get_acls(hosts__hostname=self.hostname)
954
955
956    def add_acl(self, acl_name):
957        self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname))
958        return self.afe.run('acl_group_add_hosts', id=acl_name,
959                            hosts=[self.hostname])
960
961
962    def remove_acl(self, acl_name):
963        self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname))
964        return self.afe.run('acl_group_remove_hosts', id=acl_name,
965                            hosts=[self.hostname])
966
967
968    def get_labels(self):
969        return self.afe.get_labels(host__hostname__in=[self.hostname])
970
971
972    def add_labels(self, labels):
973        self.afe.log('Adding labels %s to host %s' % (labels, self.hostname))
974        return self.afe.run('host_add_labels', id=self.id, labels=labels)
975
976
977    def remove_labels(self, labels):
978        self.afe.log('Removing labels %s from host %s' % (labels,self.hostname))
979        return self.afe.run('host_remove_labels', id=self.id, labels=labels)
980
981
982class User(RpcObject):
983    def __repr__(self):
984        return 'USER: %s' % self.login
985
986
987class TestStatus(RpcObject):
988    """
989    TKO test status object
990
991    Fields:
992        test_idx, hostname, testname, id
993        complete_count, incomplete_count, group_count, pass_count
994    """
995    def __repr__(self):
996        return 'TEST STATUS: %s' % self.id
997
998
999class HostAttribute(RpcObject):
1000    """
1001    AFE host attribute object
1002
1003    Fields:
1004        id, host, attribute, value
1005    """
1006    def __repr__(self):
1007        return 'HOST ATTRIBUTE %d' % self.id
1008
1009
1010class MachineTestPairing(object):
1011    """
1012    Object representing the pairing of a machine label with a control file
1013
1014    machine_label: use machines from this label
1015    control_file: use this control file (by name in the frontend)
1016    platforms: list of rexeps to filter platforms by. [] => no filtering
1017    job_label: The label (name) to give to the autotest job launched
1018            to run this pairing.  '<kernel-version> : <config> : <date>'
1019    """
1020    def __init__(self, machine_label, control_file, platforms=[],
1021                 container=False, atomic_group_sched=False, synch_count=0,
1022                 testname=None, job_label=None):
1023        self.machine_label = machine_label
1024        self.control_file = control_file
1025        self.platforms = platforms
1026        self.container = container
1027        self.atomic_group_sched = atomic_group_sched
1028        self.synch_count = synch_count
1029        self.testname = testname
1030        self.job_label = job_label
1031
1032
1033    def __repr__(self):
1034        return '%s %s %s %s' % (self.machine_label, self.control_file,
1035                                self.platforms, self.container)
1036