• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2
3# Copyright 2017 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Load generator for devserver.
8
9Example usage:
10
11# Find DUTs in suites pool to test with:
12atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready
13
14# Lock DUTs:
15atest host mod -l -r 'quick provision testing' DUT1 DUT2
16
17# Create config file with DUTs to test and builds to use.
18cat >config.json <<EOD
19{
20  "BOARD": {
21    "duts": [
22      "chromeosX-rowY-rackZ-hostA",
23      "chromeosX-rowY-rackZ-hostB",
24    ],
25    "versions": [
26      "auron_paine-paladin/R65-10208.0.0-rc2",
27      "auron_paine-paladin/R65-10208.0.0-rc3",
28      "auron_paine-paladin/R65-10209.0.0-rc1"
29    ]
30  },
31}
32EOD
33
34# Do 100 total provisions, aiming to have 10 active simultaneously.
35loadtest.py $DS config.json --simultaneous 10 --total 100
36
37# Unlock DUTs:
38atest host mod -u DUT1 DUT2
39"""
40
41import argparse
42import collections
43import datetime
44import json
45import logging
46import random
47import re
48import signal
49import subprocess
50import sys
51import time
52
53import common
54from autotest_lib.client.common_lib import time_utils
55from autotest_lib.client.common_lib.cros import dev_server
56from chromite.lib import locking
57from chromite.lib import parallel
58
59# Paylods to stage.
60PAYLOADS = ['quick_provision', 'stateful']
61
62# Number of seconds between full status checks.
63STATUS_POLL_SECONDS = 2
64
65# Number of successes/failures to blacklist a DUT.
66BLACKLIST_CONSECUTIVE_FAILURE = 2
67BLACKLIST_TOTAL_SUCCESS = 0
68BLACKLIST_TOTAL_FAILURE = 5
69
70def get_parser():
71    """Creates the argparse parser."""
72    parser = argparse.ArgumentParser(description=__doc__)
73    parser.add_argument('server', type=str, action='store',
74                        help='Devserver to load test.')
75    parser.add_argument('config', type=str, action='store',
76                        help='Path to JSON config file.'
77                             'Config file is indexed by board with keys of '
78                             '"duts" and "versions", each a list.')
79    parser.add_argument('--blacklist-consecutive', '-C', type=int,
80                        action='store',
81                        help=('Consecutive number of failures before '
82                              'blacklisting DUT (default %d).') %
83                             BLACKLIST_CONSECUTIVE_FAILURE,
84                        default=BLACKLIST_CONSECUTIVE_FAILURE)
85    parser.add_argument('--blacklist-success', '-S', type=int, action='store',
86                        help=('Total number of successes before blacklisting '
87                              'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
88                        default=BLACKLIST_TOTAL_SUCCESS)
89    parser.add_argument('--blacklist-total', '-T', type=int, action='store',
90                        help=('Total number of failures before blacklisting '
91                              'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
92                        default=BLACKLIST_TOTAL_FAILURE)
93    parser.add_argument('--boards', '-b', type=str, action='store',
94                        help='Comma-separated list of boards to provision.')
95    parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
96                        help='Do not attempt to provision.')
97    parser.add_argument('--duts', '-d', type=str, action='store',
98                        help='Comma-separated list of duts to provision.')
99    parser.add_argument('--outputlog', '-l', type=str, action='store',
100                        help='Path to append JSON entries to.')
101    parser.add_argument('--output', '-o', type=str, action='store',
102                        help='Path to write JSON file to.')
103    parser.add_argument('--ping', '-p', action='store_true',
104                        help='Ping DUTs and blacklist unresponsive ones.')
105    parser.add_argument('--simultaneous', '-s', type=int, action='store',
106                        help='Number of simultaneous provisions to run.',
107                        default=1)
108    parser.add_argument('--no-stage', action='store_false',
109                        dest='stage', default=True,
110                        help='Do not attempt to stage builds.')
111    parser.add_argument('--total', '-t', type=int, action='store',
112                        help='Number of total provisions to run.',
113                        default=0)
114    return parser
115
116def make_entry(entry_id, name, status, start_time,
117               finish_time=None, parent=None, **kwargs):
118    """Generate an event log entry to be stored in Cloud Datastore.
119
120    @param entry_id: A (Kind, id) tuple representing the key.
121    @param name: A string identifying the event
122    @param status: A string identifying the status of the event.
123    @param start_time: A datetime of the start of the event.
124    @param finish_time: A datetime of the finish of the event.
125    @param parent: A (Kind, id) tuple representing the parent key.
126
127    @return A dictionary representing the entry suitable for dumping via JSON.
128    """
129    entry = {
130        'id': entry_id,
131        'name': name,
132        'status': status,
133        'start_time': time_utils.to_epoch_time(start_time),
134    }
135    if finish_time is not None:
136        entry['finish_time'] = time_utils.to_epoch_time(finish_time)
137    if parent is not None:
138        entry['parent'] = parent
139    return entry
140
141class Job(object):
142    """Tracks a single provision job."""
143    def __init__(self, ds, host_name, build_name,
144                 entry_id=0, parent=None, board=None,
145                 start_active=0,
146                 force_update=False, full_update=False,
147                 clobber_stateful=True, quick_provision=True,
148                 ping=False, dryrun=False):
149
150        self.ds = ds
151        self.host_name = host_name
152        self.build_name = build_name
153
154        self.entry_id = ('Job', entry_id)
155        self.parent = parent
156        self.board = board
157        self.start_active = start_active
158        self.end_active = None
159        self.check_active_sum = 0
160        self.check_active_count = 0
161
162        self.start_time = datetime.datetime.now()
163        self.finish_time = None
164        self.trigger_response = None
165
166        self.ping = ping
167        self.pre_ping = None
168        self.post_ping = None
169
170        self.kwargs = {
171            'host_name': host_name,
172            'build_name': build_name,
173            'force_update': force_update,
174            'full_update': full_update,
175            'clobber_stateful': clobber_stateful,
176            'quick_provision': quick_provision,
177        }
178
179        if dryrun:
180            self.finish_time = datetime.datetime.now()
181            self.raised_error = None
182            self.success = True
183            self.pid = 0
184        else:
185            if self.ping:
186                self.pre_ping = ping_dut(self.host_name)
187            self.trigger_response = ds._trigger_auto_update(**self.kwargs)
188
189    def as_entry(self):
190        """Generate an entry for exporting to datastore."""
191        entry = make_entry(self.entry_id, self.host_name,
192                           'pass' if self.success else 'fail',
193                           self.start_time, self.finish_time, self.parent)
194        entry.update({
195            'build_name': self.build_name,
196            'board': self.board,
197            'devserver': self.ds.hostname,
198            'start_active': self.start_active,
199            'end_active': self.end_active,
200            'force_update': self.kwargs['force_update'],
201            'full_update': self.kwargs['full_update'],
202            'clobber_stateful': self.kwargs['clobber_stateful'],
203            'quick_provision': self.kwargs['quick_provision'],
204            'elapsed': int(self.elapsed().total_seconds()),
205            'trigger_response': self.trigger_response,
206            'pre_ping': self.pre_ping,
207            'post_ping': self.post_ping,
208        })
209        if self.check_active_count:
210            entry['avg_active'] = (self.check_active_sum /
211                                   self.check_active_count)
212        return entry
213
214    def check(self, active_count):
215        """Checks if a job has completed.
216
217        @param active_count: Number of active provisions at time of the check.
218        @return: True if the job has completed, False otherwise.
219        """
220        if self.finish_time is not None:
221            return True
222
223        self.check_active_sum += active_count
224        self.check_active_count += 1
225
226        finished, raised_error, pid = self.ds.check_for_auto_update_finished(
227            self.trigger_response, wait=False, **self.kwargs)
228        if finished:
229            self.finish_time = datetime.datetime.now()
230            self.raised_error = raised_error
231            self.success = self.raised_error is None
232            self.pid = pid
233            self.end_active = active_count
234            if self.ping:
235                self.post_ping = ping_dut(self.host_name)
236
237        return finished
238
239    def elapsed(self):
240        """Determine the elapsed time of the task."""
241        finish_time = self.finish_time or datetime.datetime.now()
242        return finish_time - self.start_time
243
244class Runner(object):
245    """Parallel provision load test runner."""
246    def __init__(self, ds, duts, config, simultaneous=1, total=0,
247                 outputlog=None, ping=False, blacklist_consecutive=None,
248                 blacklist_success=None, blacklist_total=None, dryrun=False):
249        self.ds = ds
250        self.duts = duts
251        self.config = config
252        self.start_time = datetime.datetime.now()
253        self.finish_time = None
254        self.simultaneous = simultaneous
255        self.total = total
256        self.outputlog = outputlog
257        self.ping = ping
258        self.blacklist_consecutive = blacklist_consecutive
259        self.blacklist_success = blacklist_success
260        self.blacklist_total = blacklist_total
261        self.dryrun = dryrun
262
263        self.active = []
264        self.started = 0
265        self.completed = []
266        # Track DUTs which have failed multiple times.
267        self.dut_blacklist = set()
268        # Track versions of each DUT to provision in order.
269        self.last_versions = {}
270
271        # id for the parent entry.
272        # TODO: This isn't the most unique.
273        self.entry_id = ('Runner',
274                         int(time_utils.to_epoch_time(datetime.datetime.now())))
275
276        # ids for the job entries.
277        self.next_id = 0
278
279        if self.outputlog:
280            dump_entries_as_json([self.as_entry()], self.outputlog)
281
282    def signal_handler(self, signum, frame):
283        """Signal handle to dump current status."""
284        logging.info('Received signal %s', signum)
285        if signum == signal.SIGUSR1:
286            now = datetime.datetime.now()
287            logging.info('%d active provisions, %d completed provisions, '
288                         '%s elapsed:',
289                         len(self.active), len(self.completed),
290                         now - self.start_time)
291            for job in self.active:
292                logging.info('  %s -> %s, %s elapsed',
293                             job.host_name, job.build_name,
294                             now - job.start_time)
295
296    def as_entry(self):
297        """Generate an entry for exporting to datastore."""
298        entry = make_entry(self.entry_id, 'Runner', 'pass',
299                           self.start_time, self.finish_time)
300        entry.update({
301            'devserver': self.ds.hostname,
302        })
303        return entry
304
305    def get_completed_entries(self):
306        """Retrieves all completed jobs as entries for datastore."""
307        entries = [self.as_entry()]
308        entries.extend([job.as_entry() for job in self.completed])
309        return entries
310
311    def get_next_id(self):
312        """Get the next Job id."""
313        entry_id = self.next_id
314        self.next_id += 1
315        return entry_id
316
317    def spawn(self, host_name, build_name):
318        """Spawn a single provision job."""
319        job = Job(self.ds, host_name, build_name,
320                  entry_id=self.get_next_id(), parent=self.entry_id,
321                  board=self.get_dut_board_type(host_name),
322                  start_active=len(self.active), ping=self.ping,
323                  dryrun=self.dryrun)
324        self.active.append(job)
325        logging.info('Provision (%d) of %s to %s started',
326                     job.entry_id[1], job.host_name, job.build_name)
327        self.last_versions[host_name] = build_name
328        self.started += 1
329
330    def replenish(self):
331        """Replenish the number of active provisions to match goals."""
332        while ((self.simultaneous == 0 or
333                len(self.active) < self.simultaneous) and
334               (self.total == 0 or self.started < self.total)):
335            host_name = self.find_idle_dut()
336            if host_name:
337                build_name = self.find_build_for_dut(host_name)
338                self.spawn(host_name, build_name)
339            elif self.simultaneous:
340                logging.warn('Insufficient DUTs to satisfy goal')
341                return False
342            else:
343                return len(self.active) > 0
344        return True
345
346    def check_all(self):
347        """Check the status of outstanding provisions."""
348        still_active = []
349        for job in self.active:
350            if job.check(len(self.active)):
351                logging.info('Provision (%d) of %s to %s %s in %s: %s',
352                             job.entry_id[1], job.host_name, job.build_name,
353                             'completed' if job.success else 'failed',
354                             job.elapsed(), job.raised_error)
355                entry = job.as_entry()
356                logging.debug(json.dumps(entry))
357                if self.outputlog:
358                    dump_entries_as_json([entry], self.outputlog)
359                self.completed.append(job)
360                if self.should_blacklist(job.host_name):
361                    logging.error('Blacklisting DUT %s', job.host_name)
362                    self.dut_blacklist.add(job.host_name)
363            else:
364                still_active.append(job)
365        self.active = still_active
366
367    def should_blacklist(self, host_name):
368        """Determines if a given DUT should be blacklisted."""
369        jobs = [job for job in self.completed if job.host_name == host_name]
370        total = 0
371        consecutive = 0
372        successes = 0
373        for job in jobs:
374            if not job.success:
375                total += 1
376                consecutive += 1
377                if ((self.blacklist_total is not None and
378                     total >= self.blacklist_total) or
379                    (self.blacklist_consecutive is not None and
380                     consecutive >= self.blacklist_consecutive)):
381                    return True
382            else:
383                successes += 1
384                if (self.blacklist_success is not None and
385                    successes >= self.blacklist_success):
386                    return True
387                consecutive = 0
388        return False
389
390    def find_idle_dut(self):
391        """Find an idle DUT to provision.."""
392        active_duts = {job.host_name for job in self.active}
393        idle_duts = [d for d in self.duts
394                     if d not in active_duts | self.dut_blacklist]
395        return random.choice(idle_duts) if len(idle_duts) else None
396
397    def get_dut_board_type(self, host_name):
398        """Determine the board type of a DUT."""
399        return self.duts[host_name]
400
401    def get_board_versions(self, board):
402        """Determine the versions to provision for a board."""
403        return self.config[board]['versions']
404
405    def find_build_for_dut(self, host_name):
406        """Determine a build to provision on a DUT."""
407        board = self.get_dut_board_type(host_name)
408        versions = self.get_board_versions(board)
409        last_version = self.last_versions.get(host_name)
410        try:
411            last_index = versions.index(last_version)
412        except ValueError:
413            return versions[0]
414        return versions[(last_index + 1) % len(versions)]
415
416    def stage(self, build):
417        """Stage artifacts for a given build."""
418        logging.debug('Staging %s', build)
419        self.ds.stage_artifacts(build, PAYLOADS)
420
421    def stage_all(self):
422        """Stage all necessary artifacts."""
423        boards = set(self.duts.values())
424        logging.info('Staging for %d boards', len(boards))
425        funcs = []
426        for board in boards:
427            for build in self.get_board_versions(board):
428                funcs.append(lambda build_=build: self.stage(build_))
429        parallel.RunParallelSteps(funcs)
430
431    def loop(self):
432        """Run the main provision loop."""
433        # Install a signal handler for status updates.
434        old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
435        signal.siginterrupt(signal.SIGUSR1, False)
436
437        try:
438            while True:
439                self.check_all()
440                if self.total != 0 and len(self.completed) >= self.total:
441                    break
442                if not self.replenish() and len(self.active) == 0:
443                    logging.error('Unable to replenish with no active '
444                                  'provisions')
445                    return False
446                logging.debug('%d provisions active', len(self.active))
447                time.sleep(STATUS_POLL_SECONDS)
448            return True
449        except KeyboardInterrupt:
450            return False
451        finally:
452            self.finish_time = datetime.datetime.now()
453            # Clean up signal handler.
454            signal.signal(signal.SIGUSR1, old_handler)
455
456    def elapsed(self):
457        """Determine the elapsed time of the task."""
458        finish_time = self.finish_time or datetime.datetime.now()
459        return finish_time - self.start_time
460
461def dump_entries_as_json(entries, output_file):
462    """Dump event log entries as json to a file.
463
464    @param entries: A list of event log entries to dump.
465    @param output_file: The file to write to.
466    """
467    # Write the entries out as JSON.
468    logging.debug('Dumping %d entries' % len(entries))
469    for e in entries:
470        json.dump(e, output_file, sort_keys=True)
471        output_file.write('\n')
472        output_file.flush()
473
474def ping_dut(hostname):
475    """Checks if a host is responsive to pings."""
476    if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
477        hostname += '.cros'
478
479    response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
480                               stdout=subprocess.PIPE)
481    return response == 0
482
483def main(argv):
484    """Load generator for a devserver."""
485    parser = get_parser()
486    options = parser.parse_args(argv)
487
488    # Parse devserver.
489    if options.server:
490        if re.match(r'^https?://', options.server):
491            server = options.server
492        else:
493            server = 'http://%s/' % options.server
494        ds = dev_server.ImageServer(server)
495    else:
496        parser.print_usage()
497        logging.error('Must specify devserver')
498        sys.exit(1)
499
500    # Parse config file and determine master list of duts and their board type,
501    # filtering by board type if specified.
502    duts = {}
503    if options.config:
504        with open(options.config, 'r') as f:
505            config = json.load(f)
506            boards = (options.boards.split(',')
507                      if options.boards else config.keys())
508            duts_specified = (set(options.duts.split(','))
509                              if options.duts else None)
510            for board in boards:
511                duts.update({dut: board for dut in config[board]['duts']
512                             if duts_specified is None or
513                                dut in duts_specified})
514        logging.info('Config file %s: %d boards, %d duts',
515                     options.config, len(boards), len(duts))
516    else:
517        parser.print_usage()
518        logging.error('Must specify config file')
519        sys.exit(1)
520
521    if options.ping:
522        logging.info('Performing ping tests')
523        duts_alive = {}
524        for dut, board in duts.items():
525            if ping_dut(dut):
526                duts_alive[dut] = board
527            else:
528                logging.error('Ignoring DUT %s (%s) for failing initial '
529                              'ping check', dut, board)
530        duts = duts_alive
531        logging.info('After ping tests: %d boards, %d duts', len(boards),
532                     len(duts))
533
534    # Set up the test runner and stage all the builds.
535    outputlog = open(options.outputlog, 'a') if options.outputlog else None
536    runner = Runner(ds, duts, config,
537                    simultaneous=options.simultaneous, total=options.total,
538                    outputlog=outputlog, ping=options.ping,
539                    blacklist_consecutive=options.blacklist_consecutive,
540                    blacklist_success=options.blacklist_success,
541                    blacklist_total=options.blacklist_total,
542                    dryrun=options.dryrun)
543    if options.stage:
544        runner.stage_all()
545
546    # Run all the provisions.
547    with locking.FileLock(options.config, blocking=True).lock():
548        completed = runner.loop()
549    logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
550                 runner.elapsed())
551    # Write all entries as JSON.
552    entries = runner.get_completed_entries()
553    if options.output:
554        with open(options.output, 'w') as f:
555            dump_entries_as_json(entries, f)
556    else:
557        dump_entries_as_json(entries, sys.stdout)
558    logging.info('Summary: %s',
559                 dict(collections.Counter([e['status'] for e in entries
560                                           if e['name'] != 'Runner'])))
561
562    # List blacklisted DUTs.
563    if runner.dut_blacklist:
564        logging.warn('Blacklisted DUTs:')
565        for host_name in runner.dut_blacklist:
566            logging.warn('  %s', host_name)
567
568if __name__ == '__main__':
569    sys.exit(main(sys.argv[1:]))
570