• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2007 Google Inc. Released under the GPL v2
2#pylint: disable-msg=C0111
3
4import glob
5import logging
6import os
7import re
8import sys
9import tempfile
10import time
11import traceback
12
13import common
14from autotest_lib.client.bin.result_tools import runner as result_tools_runner
15from autotest_lib.client.common_lib import autotemp
16from autotest_lib.client.common_lib import base_job
17from autotest_lib.client.common_lib import error
18from autotest_lib.client.common_lib import global_config
19from autotest_lib.client.common_lib import packages
20from autotest_lib.client.common_lib import utils as client_utils
21from autotest_lib.server import installable_object
22from autotest_lib.server import utils
23from autotest_lib.server import utils as server_utils
24from autotest_lib.server.cros.dynamic_suite.constants import JOB_REPO_URL
25
26
27try:
28    from chromite.lib import metrics
29except ImportError:
30    metrics = client_utils.metrics_mock
31
32
33AUTOTEST_SVN = 'svn://test.kernel.org/autotest/trunk/client'
34AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
35
36_CONFIG = global_config.global_config
37AUTOSERV_PREBUILD = _CONFIG.get_config_value(
38        'AUTOSERV', 'enable_server_prebuild', type=bool, default=False)
39
40# Match on a line like this:
41# FAIL test_name  test_name timestamp=1 localtime=Nov 15 12:43:10 <fail_msg>
42_FAIL_STATUS_RE = re.compile(
43    r'\s*FAIL.*localtime=.*\s*.*\s*[0-9]+:[0-9]+:[0-9]+\s*(?P<fail_msg>.*)')
44
45
46class AutodirNotFoundError(Exception):
47    """No Autotest installation could be found."""
48
49
50class AutotestFailure(Exception):
51    """Gereric exception class for failures during a test run."""
52
53
54class AutotestAbort(AutotestFailure):
55    """
56    AutotestAborts are thrown when the DUT seems fine,
57    and the test doesn't give us an explicit reason for
58    failure; In this case we have no choice but to abort.
59    """
60
61
62class AutotestDeviceError(AutotestFailure):
63    """
64    Exceptions that inherit from AutotestDeviceError
65    are thrown when we can determine the current
66    state of the DUT and conclude that it probably
67    lead to the test failing; these exceptions lead
68    to failures instead of aborts.
69    """
70
71
72class AutotestDeviceNotPingable(AutotestDeviceError):
73    """Error for when a DUT becomes unpingable."""
74
75
76class AutotestDeviceNotSSHable(AutotestDeviceError):
77    """Error for when a DUT is pingable but not SSHable."""
78
79
80class AutotestDeviceRebooted(AutotestDeviceError):
81    """Error for when a DUT rebooted unexpectedly."""
82
83
84class Autotest(installable_object.InstallableObject):
85    """
86    This class represents the Autotest program.
87
88    Autotest is used to run tests automatically and collect the results.
89    It also supports profilers.
90
91    Implementation details:
92    This is a leaf class in an abstract class hierarchy, it must
93    implement the unimplemented methods in parent classes.
94    """
95
96    def __init__(self, host=None):
97        self.host = host
98        self.got = False
99        self.installed = False
100        self.serverdir = utils.get_server_dir()
101        super(Autotest, self).__init__()
102
103
104    install_in_tmpdir = False
105    @classmethod
106    def set_install_in_tmpdir(cls, flag):
107        """ Sets a flag that controls whether or not Autotest should by
108        default be installed in a "standard" directory (e.g.
109        /home/autotest, /usr/local/autotest) or a temporary directory. """
110        cls.install_in_tmpdir = flag
111
112
113    @classmethod
114    def get_client_autodir_paths(cls, host):
115        return global_config.global_config.get_config_value(
116                'AUTOSERV', 'client_autodir_paths', type=list)
117
118
119    @classmethod
120    def get_installed_autodir(cls, host):
121        """
122        Find where the Autotest client is installed on the host.
123        @returns an absolute path to an installed Autotest client root.
124        @raises AutodirNotFoundError if no Autotest installation can be found.
125        """
126        autodir = host.get_autodir()
127        if autodir:
128            logging.debug('Using existing host autodir: %s', autodir)
129            return autodir
130
131        for path in Autotest.get_client_autodir_paths(host):
132            try:
133                autotest_binary = os.path.join(path, 'bin', 'autotest')
134                host.run('test -x %s' % utils.sh_escape(autotest_binary))
135                host.run('test -w %s' % utils.sh_escape(path))
136                logging.debug('Found existing autodir at %s', path)
137                return path
138            except error.GenericHostRunError:
139                logging.debug('%s does not exist on %s', autotest_binary,
140                              host.hostname)
141        raise AutodirNotFoundError
142
143
144    @classmethod
145    def get_install_dir(cls, host):
146        """
147        Determines the location where autotest should be installed on
148        host. If self.install_in_tmpdir is set, it will return a unique
149        temporary directory that autotest can be installed in. Otherwise, looks
150        for an existing installation to use; if none is found, looks for a
151        usable directory in the global config client_autodir_paths.
152        """
153        try:
154            install_dir = cls.get_installed_autodir(host)
155        except AutodirNotFoundError:
156            install_dir = cls._find_installable_dir(host)
157
158        if cls.install_in_tmpdir:
159            return host.get_tmp_dir(parent=install_dir)
160        return install_dir
161
162
163    @classmethod
164    def _find_installable_dir(cls, host):
165        client_autodir_paths = cls.get_client_autodir_paths(host)
166        for path in client_autodir_paths:
167            try:
168                host.run('mkdir -p %s' % utils.sh_escape(path))
169                host.run('test -w %s' % utils.sh_escape(path))
170                return path
171            except error.AutoservRunError:
172                logging.debug('Failed to create %s', path)
173        metrics.Counter(
174            'chromeos/autotest/errors/no_autotest_install_path').increment(
175                fields={'dut_host_name': host.hostname})
176        raise error.AutoservInstallError(
177                'Unable to find a place to install Autotest; tried %s' %
178                ', '.join(client_autodir_paths))
179
180
181    def get_fetch_location(self):
182        """Generate list of locations where autotest can look for packages.
183
184        Hosts are tagged with an attribute containing the URL from which
185        to source packages when running a test on that host.
186
187        @returns the list of candidate locations to check for packages.
188        """
189        c = global_config.global_config
190        repos = c.get_config_value("PACKAGES", 'fetch_location', type=list,
191                                   default=[])
192        repos.reverse()
193
194        if not server_utils.is_inside_chroot():
195            # Only try to get fetch location from host attribute if the
196            # test is not running inside chroot.
197            #
198            # Look for the repo url via the host attribute. If we are
199            # not running with a full AFE autoserv will fall back to
200            # serving packages itself from whatever source version it is
201            # sync'd to rather than using the proper artifacts for the
202            # build on the host.
203            found_repo = self._get_fetch_location_from_host_attribute()
204            if found_repo is not None:
205                # Add our new repo to the end, the package manager will
206                # later reverse the list of repositories resulting in ours
207                # being first
208                repos.append(found_repo)
209
210        return repos
211
212
213    def _get_fetch_location_from_host_attribute(self):
214        """Get repo to use for packages from host attribute, if possible.
215
216        Hosts are tagged with an attribute containing the URL
217        from which to source packages when running a test on that host.
218        If self.host is set, attempt to look this attribute in the host info.
219
220        @returns value of the 'job_repo_url' host attribute, if present.
221        """
222        if not self.host:
223            return None
224
225        try:
226            info = self.host.host_info_store.get()
227        except Exception as e:
228            # TODO(pprabhu): We really want to catch host_info.StoreError here,
229            # but we can't import host_info from this module.
230            #   - autotest_lib.hosts.host_info pulls in (naturally)
231            #   autotest_lib.hosts.__init__
232            #   - This pulls in all the host classes ever defined
233            #   - That includes abstract_ssh, which depends on autotest
234            logging.warning('Failed to obtain host info: %r', e)
235            logging.warning('Skipping autotest fetch location based on %s',
236                            JOB_REPO_URL)
237            return None
238
239        job_repo_url = info.attributes.get(JOB_REPO_URL, '')
240        if not job_repo_url:
241            logging.warning("No %s for %s", JOB_REPO_URL, self.host)
242            return None
243
244        logging.info('Got job repo url from host attributes: %s',
245                        job_repo_url)
246        return job_repo_url
247
248
249    def install(self, host=None, autodir=None, use_packaging=True):
250        """Install autotest.  If |host| is not None, stores it in |self.host|.
251
252        @param host A Host instance on which autotest will be installed
253        @param autodir Location on the remote host to install to
254        @param use_packaging Enable install modes that use the packaging system.
255
256        """
257        if host:
258            self.host = host
259        self._install(host=host, autodir=autodir, use_packaging=use_packaging)
260
261
262    def install_full_client(self, host=None, autodir=None):
263        self._install(host=host, autodir=autodir, use_autoserv=False,
264                      use_packaging=False)
265
266
267    def install_no_autoserv(self, host=None, autodir=None):
268        self._install(host=host, autodir=autodir, use_autoserv=False)
269
270
271    def _install_using_packaging(self, host, autodir):
272        repos = self.get_fetch_location()
273        if not repos:
274            raise error.PackageInstallError("No repos to install an "
275                                            "autotest client from")
276        # Make sure devserver has the autotest package staged
277        host.verify_job_repo_url()
278        pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,
279                                         repo_urls=repos,
280                                         do_locking=False,
281                                         run_function=host.run,
282                                         run_function_dargs=dict(timeout=600))
283        # The packages dir is used to store all the packages that
284        # are fetched on that client. (for the tests,deps etc.
285        # too apart from the client)
286        pkg_dir = os.path.join(autodir, 'packages')
287        # clean up the autodir except for the packages and result_tools
288        # directory.
289        host.run('cd %s && ls | grep -v "^packages$" | grep -v "^result_tools$"'
290                 ' | xargs rm -rf && rm -rf .[!.]*' % autodir)
291        pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,
292                           preserve_install_dir=True)
293        self.installed = True
294
295
296    def _install_using_send_file(self, host, autodir):
297        dirs_to_exclude = set(["tests", "site_tests", "deps", "profilers",
298                               "packages"])
299        light_files = [os.path.join(self.source_material, f)
300                       for f in os.listdir(self.source_material)
301                       if f not in dirs_to_exclude]
302        host.send_file(light_files, autodir, delete_dest=True)
303
304        # create empty dirs for all the stuff we excluded
305        commands = []
306        for path in dirs_to_exclude:
307            abs_path = os.path.join(autodir, path)
308            abs_path = utils.sh_escape(abs_path)
309            commands.append("mkdir -p '%s'" % abs_path)
310            commands.append("touch '%s'/__init__.py" % abs_path)
311        host.run(';'.join(commands))
312
313
314    def _install(self, host=None, autodir=None, use_autoserv=True,
315                 use_packaging=True):
316        """
317        Install autotest.  If get() was not called previously, an
318        attempt will be made to install from the autotest svn
319        repository.
320
321        @param host A Host instance on which autotest will be installed
322        @param autodir Location on the remote host to install to
323        @param use_autoserv Enable install modes that depend on the client
324            running with the autoserv harness
325        @param use_packaging Enable install modes that use the packaging system
326
327        @exception AutoservError if a tarball was not specified and
328            the target host does not have svn installed in its path
329        """
330        if not host:
331            host = self.host
332        if not self.got:
333            self.get()
334        host.wait_up(timeout=30)
335        host.setup()
336        logging.info("Installing autotest on %s", host.hostname)
337
338        # set up the autotest directory on the remote machine
339        if not autodir:
340            autodir = self.get_install_dir(host)
341        logging.info('Using installation dir %s', autodir)
342        host.set_autodir(autodir)
343        host.run('mkdir -p %s' % utils.sh_escape(autodir))
344
345        # make sure there are no files in $AUTODIR/results
346        results_path = os.path.join(autodir, 'results')
347        host.run('rm -rf %s/*' % utils.sh_escape(results_path),
348                 ignore_status=True)
349
350        # Fetch the autotest client from the nearest repository
351        if use_packaging:
352            try:
353                self._install_using_packaging(host, autodir)
354                logging.info("Installation of autotest completed using the "
355                             "packaging system.")
356                return
357            except (error.PackageInstallError, error.AutoservRunError,
358                    global_config.ConfigError), e:
359                logging.info("Could not install autotest using the packaging "
360                             "system: %s. Trying other methods", e)
361        else:
362            # Delete the package checksum file to force dut updating local
363            # packages.
364            command = ('rm -f "%s"' %
365                       (os.path.join(autodir, packages.CHECKSUM_FILE)))
366            host.run(command)
367
368        # try to install from file or directory
369        if self.source_material:
370            c = global_config.global_config
371            supports_autoserv_packaging = c.get_config_value(
372                "PACKAGES", "serve_packages_from_autoserv", type=bool)
373            # Copy autotest recursively
374            if supports_autoserv_packaging and use_autoserv:
375                self._install_using_send_file(host, autodir)
376            else:
377                host.send_file(self.source_material, autodir, delete_dest=True)
378            logging.info("Installation of autotest completed from %s",
379                         self.source_material)
380            self.installed = True
381        else:
382            # if that fails try to install using svn
383            if utils.run('which svn').exit_status:
384                raise error.AutoservError(
385                        'svn not found on target machine: %s' %
386                        host.hostname)
387            try:
388                host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir))
389            except error.AutoservRunError, e:
390                host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir))
391            logging.info("Installation of autotest completed using SVN.")
392            self.installed = True
393
394        # TODO(milleral): http://crbug.com/258161
395        # Send over the most recent global_config.ini after installation if one
396        # is available.
397        # This code is a bit duplicated from
398        # _Run._create_client_config_file, but oh well.
399        if self.installed and self.source_material:
400            self._send_shadow_config()
401
402    def _send_shadow_config(self):
403        logging.info('Installing updated global_config.ini.')
404        destination = os.path.join(self.host.get_autodir(),
405                                   'global_config.ini')
406        with tempfile.NamedTemporaryFile() as client_config:
407            config = global_config.global_config
408            client_section = config.get_section_values('CLIENT')
409            client_section.write(client_config)
410            client_config.flush()
411            self.host.send_file(client_config.name, destination)
412
413
414    def uninstall(self, host=None):
415        """
416        Uninstall (i.e. delete) autotest. Removes the autotest client install
417        from the specified host.
418
419        @params host a Host instance from which the client will be removed
420        """
421        if not self.installed:
422            return
423        if not host:
424            host = self.host
425        autodir = host.get_autodir()
426        if not autodir:
427            return
428
429        # perform the actual uninstall
430        host.run("rm -rf %s" % utils.sh_escape(autodir), ignore_status=True)
431        host.set_autodir(None)
432        self.installed = False
433
434
435    def get(self, location=None):
436        if not location:
437            location = os.path.join(self.serverdir, '../client')
438            location = os.path.abspath(location)
439        installable_object.InstallableObject.get(self, location)
440        self.got = True
441
442
443    def run(self, control_file, results_dir='.', host=None, timeout=None,
444            tag=None, parallel_flag=False, background=False,
445            client_disconnect_timeout=None, use_packaging=True):
446        """
447        Run an autotest job on the remote machine.
448
449        @param control_file: An open file-like-obj of the control file.
450        @param results_dir: A str path where the results should be stored
451                on the local filesystem.
452        @param host: A Host instance on which the control file should
453                be run.
454        @param timeout: Maximum number of seconds to wait for the run or None.
455        @param tag: Tag name for the client side instance of autotest.
456        @param parallel_flag: Flag set when multiple jobs are run at the
457                same time.
458        @param background: Indicates that the client should be launched as
459                a background job; the code calling run will be responsible
460                for monitoring the client and collecting the results.
461        @param client_disconnect_timeout: Seconds to wait for the remote host
462                to come back after a reboot. Defaults to the host setting for
463                DEFAULT_REBOOT_TIMEOUT.
464
465        @raises AutotestRunError: If there is a problem executing
466                the control file.
467        """
468        host = self._get_host_and_setup(host, use_packaging=use_packaging)
469        logging.debug('Autotest job starts on remote host: %s',
470                      host.hostname)
471        results_dir = os.path.abspath(results_dir)
472
473        if client_disconnect_timeout is None:
474            client_disconnect_timeout = host.DEFAULT_REBOOT_TIMEOUT
475
476        if tag:
477            results_dir = os.path.join(results_dir, tag)
478
479        atrun = _Run(host, results_dir, tag, parallel_flag, background)
480        self._do_run(control_file, results_dir, host, atrun, timeout,
481                     client_disconnect_timeout, use_packaging=use_packaging)
482
483
484    def _get_host_and_setup(self, host, use_packaging=True):
485        if not host:
486            host = self.host
487        if not self.installed:
488            self.install(host, use_packaging=use_packaging)
489
490        host.wait_up(timeout=30)
491        return host
492
493
494    def _do_run(self, control_file, results_dir, host, atrun, timeout,
495                client_disconnect_timeout, use_packaging=True):
496        try:
497            atrun.verify_machine()
498        except:
499            logging.error("Verify failed on %s. Reinstalling autotest",
500                          host.hostname)
501            self.install(host)
502            atrun.verify_machine()
503        debug = os.path.join(results_dir, 'debug')
504        try:
505            os.makedirs(debug)
506        except Exception:
507            pass
508
509        delete_file_list = [atrun.remote_control_file,
510                            atrun.remote_control_file + '.state',
511                            atrun.manual_control_file,
512                            atrun.manual_control_file + '.state']
513        cmd = ';'.join('rm -f ' + control for control in delete_file_list)
514        host.run(cmd, ignore_status=True)
515
516        tmppath = utils.get(control_file, local_copy=True)
517
518        # build up the initialization prologue for the control file
519        prologue_lines = []
520
521        # Add the additional user arguments
522        prologue_lines.append("args = %r\n" % self.job.args)
523
524        # If the packaging system is being used, add the repository list.
525        repos = None
526        try:
527            if use_packaging:
528                repos = self.get_fetch_location()
529                prologue_lines.append('job.add_repository(%s)\n' % repos)
530            else:
531                logging.debug('use_packaging is set to False, do not add any '
532                              'repository.')
533        except global_config.ConfigError, e:
534            # If repos is defined packaging is enabled so log the error
535            if repos:
536                logging.error(e)
537
538        # on full-size installs, turn on any profilers the server is using
539        if not atrun.background:
540            running_profilers = host.job.profilers.add_log.iteritems()
541            for profiler, (args, dargs) in running_profilers:
542                call_args = [repr(profiler)]
543                call_args += [repr(arg) for arg in args]
544                call_args += ["%s=%r" % item for item in dargs.iteritems()]
545                prologue_lines.append("job.profilers.add(%s)\n"
546                                      % ", ".join(call_args))
547        cfile = "".join(prologue_lines)
548
549        cfile += open(tmppath).read()
550        open(tmppath, "w").write(cfile)
551
552        # Create and copy state file to remote_control_file + '.state'
553        state_file = host.job.preprocess_client_state()
554        host.send_file(state_file, atrun.remote_control_file + '.init.state')
555        os.remove(state_file)
556
557        # Copy control_file to remote_control_file on the host
558        host.send_file(tmppath, atrun.remote_control_file)
559        if os.path.abspath(tmppath) != os.path.abspath(control_file):
560            os.remove(tmppath)
561
562        atrun.execute_control(
563                timeout=timeout,
564                client_disconnect_timeout=client_disconnect_timeout)
565
566
567    @staticmethod
568    def extract_test_failure_msg(failure_status_line):
569        """Extract the test failure message from the status line.
570
571        @param failure_status_line:  String of test failure status line, it will
572            look like:
573          FAIL <test name>  <test name> timestamp=<ts> localtime=<lt> <reason>
574
575        @returns String of the reason, return empty string if we can't regex out
576            reason.
577        """
578        fail_msg = ''
579        match = _FAIL_STATUS_RE.match(failure_status_line)
580        if match:
581            fail_msg = match.group('fail_msg')
582        return fail_msg
583
584
585    @classmethod
586    def _check_client_test_result(cls, host, test_name):
587        """
588        Check result of client test.
589        Autotest will store results in the file name status.
590        We check that second to last line in that file begins with 'END GOOD'
591
592        @raises TestFail: If client test does not pass.
593        """
594        client_result_dir = '%s/results/default' % host.autodir
595        command = 'tail -2 %s/status | head -1' % client_result_dir
596        status = host.run(command).stdout.strip()
597        logging.info(status)
598        if status[:8] != 'END GOOD':
599            test_fail_status_line_cmd = (
600                    'grep "^\s*FAIL\s*%s" %s/status | tail -n 1' %
601                    (test_name, client_result_dir))
602            test_fail_msg = cls.extract_test_failure_msg(
603                    host.run(test_fail_status_line_cmd).stdout.strip())
604            test_fail_msg_reason = ('' if not test_fail_msg
605                                    else ' (reason: %s)' % test_fail_msg)
606            test_fail_status = '%s client test did not pass%s.' % (
607                    test_name, test_fail_msg_reason)
608            raise error.TestFail(test_fail_status)
609
610
611    def run_timed_test(self, test_name, results_dir='.', host=None,
612                       timeout=None, parallel_flag=False, background=False,
613                       client_disconnect_timeout=None, *args, **dargs):
614        """
615        Assemble a tiny little control file to just run one test,
616        and run it as an autotest client-side test
617        """
618        if not host:
619            host = self.host
620        if not self.installed:
621            self.install(host)
622
623        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]
624        cmd = ", ".join([repr(test_name)] + map(repr, args) + opts)
625        control = "job.run_test(%s)\n" % cmd
626        self.run(control, results_dir, host, timeout=timeout,
627                 parallel_flag=parallel_flag, background=background,
628                 client_disconnect_timeout=client_disconnect_timeout)
629
630        if dargs.get('check_client_result', False):
631            self._check_client_test_result(host, test_name)
632
633
634    def run_test(self, test_name, results_dir='.', host=None,
635                 parallel_flag=False, background=False,
636                 client_disconnect_timeout=None, *args, **dargs):
637        self.run_timed_test(test_name, results_dir, host, timeout=None,
638                            parallel_flag=parallel_flag, background=background,
639                            client_disconnect_timeout=client_disconnect_timeout,
640                            *args, **dargs)
641
642
643    def run_static_method(self, module, method, results_dir='.', host=None,
644                          *args):
645        """Runs a non-instance method with |args| from |module| on the client.
646
647        This method runs a static/class/module autotest method on the client.
648        For example:
649          run_static_method("autotest_lib.client.cros.cros_ui", "reboot")
650
651        Will run autotest_lib.client.cros.cros_ui.reboot() on the client.
652
653        @param module: module name as you would refer to it when importing in a
654            control file. e.g. autotest_lib.client.common_lib.module_name.
655        @param method: the method you want to call.
656        @param results_dir: A str path where the results should be stored
657            on the local filesystem.
658        @param host: A Host instance on which the control file should
659            be run.
660        @param args: args to pass to the method.
661        """
662        control = "\n".join(["import %s" % module,
663                             "%s.%s(%s)\n" % (module, method,
664                                              ','.join(map(repr, args)))])
665        self.run(control, results_dir=results_dir, host=host)
666
667
668class _Run(object):
669    """
670    Represents a run of autotest control file.  This class maintains
671    all the state necessary as an autotest control file is executed.
672
673    It is not intended to be used directly, rather control files
674    should be run using the run method in Autotest.
675    """
676    def __init__(self, host, results_dir, tag, parallel_flag, background):
677        self.host = host
678        self.results_dir = results_dir
679        self.env = host.env
680        self.tag = tag
681        self.parallel_flag = parallel_flag
682        self.background = background
683        self.autodir = Autotest.get_installed_autodir(self.host)
684        control = os.path.join(self.autodir, 'control')
685        if tag:
686            control += '.' + tag
687        self.manual_control_file = control
688        self.remote_control_file = control + '.autoserv'
689        self.config_file = os.path.join(self.autodir, 'global_config.ini')
690
691
692    def verify_machine(self):
693        binary = os.path.join(self.autodir, 'bin/autotest')
694        try:
695            self.host.run('ls %s > /dev/null 2>&1' % binary)
696        except:
697            raise error.AutoservInstallError(
698                "Autotest does not appear to be installed")
699
700        if not self.parallel_flag:
701            tmpdir = os.path.join(self.autodir, 'tmp')
702            download = os.path.join(self.autodir, 'tests/download')
703            self.host.run('umount %s' % tmpdir, ignore_status=True)
704            self.host.run('umount %s' % download, ignore_status=True)
705
706
707    def get_base_cmd_args(self, section):
708        args = ['--verbose']
709        if section > 0:
710            args.append('-c')
711        if self.tag:
712            args.append('-t %s' % self.tag)
713        if self.host.job.use_external_logging():
714            args.append('-l')
715        if self.host.hostname:
716            args.append('--hostname=%s' % self.host.hostname)
717        args.append('--user=%s' % self.host.job.user)
718
719        args.append(self.remote_control_file)
720        return args
721
722
723    def get_background_cmd(self, section):
724        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotest_client')]
725        cmd += self.get_base_cmd_args(section)
726        cmd += ['>/dev/null', '2>/dev/null', '&']
727        return ' '.join(cmd)
728
729
730    def get_daemon_cmd(self, section, monitor_dir):
731        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotestd'),
732               monitor_dir, '-H autoserv']
733        cmd += self.get_base_cmd_args(section)
734        cmd += ['>/dev/null', '2>/dev/null', '&']
735        return ' '.join(cmd)
736
737
738    def get_monitor_cmd(self, monitor_dir, stdout_read, stderr_read):
739        cmd = [os.path.join(self.autodir, 'bin', 'autotestd_monitor'),
740               monitor_dir, str(stdout_read), str(stderr_read)]
741        return ' '.join(cmd)
742
743
744    def get_client_log(self):
745        """Find what the "next" client.* prefix should be
746
747        @returns A string of the form client.INTEGER that should be prefixed
748            to all client debug log files.
749        """
750        max_digit = -1
751        debug_dir = os.path.join(self.results_dir, 'debug')
752        client_logs = glob.glob(os.path.join(debug_dir, 'client.*.*'))
753        for log in client_logs:
754            _, number, _ = log.split('.', 2)
755            if number.isdigit():
756                max_digit = max(max_digit, int(number))
757        return 'client.%d' % (max_digit + 1)
758
759
760    def copy_client_config_file(self, client_log_prefix=None):
761        """
762        Create and copy the client config file based on the server config.
763
764        @param client_log_prefix: Optional prefix to prepend to log files.
765        """
766        client_config_file = self._create_client_config_file(client_log_prefix)
767        self.host.send_file(client_config_file, self.config_file)
768        os.remove(client_config_file)
769
770
771    def _create_client_config_file(self, client_log_prefix=None):
772        """
773        Create a temporary file with the [CLIENT] section configuration values
774        taken from the server global_config.ini.
775
776        @param client_log_prefix: Optional prefix to prepend to log files.
777
778        @return: Path of the temporary file generated.
779        """
780        config = global_config.global_config.get_section_values('CLIENT')
781        if client_log_prefix:
782            config.set('CLIENT', 'default_logging_name', client_log_prefix)
783        return self._create_aux_file(config.write)
784
785
786    def _create_aux_file(self, func, *args):
787        """
788        Creates a temporary file and writes content to it according to a
789        content creation function. The file object is appended to *args, which
790        is then passed to the content creation function
791
792        @param func: Function that will be used to write content to the
793                temporary file.
794        @param *args: List of parameters that func takes.
795        @return: Path to the temporary file that was created.
796        """
797        fd, path = tempfile.mkstemp(dir=self.host.job.tmpdir)
798        aux_file = os.fdopen(fd, "w")
799        try:
800            list_args = list(args)
801            list_args.append(aux_file)
802            func(*list_args)
803        finally:
804            aux_file.close()
805        return path
806
807
808    @staticmethod
809    def is_client_job_finished(last_line):
810        return bool(re.match(r'^\t*END .*\t[\w.-]+\t[\w.-]+\t.*$', last_line))
811
812
813    @staticmethod
814    def is_client_job_rebooting(last_line):
815        return bool(re.match(r'^\t*GOOD\t[\w.-]+\treboot\.start.*$', last_line))
816
817
818    # Roughly ordered list from concrete to less specific reboot causes.
819    _failure_reasons = [
820        # Try to find possible reasons leading towards failure.
821        ('ethernet recovery methods have failed. Rebooting.',
822         'dead ethernet dongle crbug/1031035'),
823        # GPU hangs are not always recovered from.
824        ('[drm:amdgpu_job_timedout] \*ERROR\* ring gfx timeout',
825         'drm ring gfx timeout'),
826        ('[drm:do_aquire_global_lock] \*ERROR(.*)hw_done or flip_done timed',
827         'drm hw/flip timeout'),
828        ('[drm:i915_hangcheck_hung] \*ERROR\* Hangcheck(.*)GPU hung',
829         'drm GPU hung'),
830        # TODO(ihf): try to get a better magic signature for kernel crashes.
831        ('BUG: unable to handle kernel paging request', 'kernel paging'),
832        ('Kernel panic - not syncing: Out of memory', 'kernel out of memory'),
833        ('Kernel panic - not syncing', 'kernel panic'),
834        # Fish for user mode killing OOM messages. Shows unstable system.
835        ('out_of_memory', 'process out of memory'),
836        # Reboot was bad enough to have truncated the logs.
837        ('crash_reporter(.*)Stored kcrash', 'kcrash'),
838        ('crash_reporter(.*)Last shutdown was not clean', 'not clean'),
839    ]
840
841    def _diagnose_reboot(self):
842        """
843        Runs diagnostic check on a rebooted DUT.
844
845        TODO(ihf): if this analysis is useful consider moving the code to the
846                   DUT into a script and call it from here. This is more
847                   powerful and might be cleaner to grow in functionality. But
848                   it may also be less robust if stateful is damaged during the
849                   reboot.
850
851        @returns msg describing reboot reason.
852        """
853        reasons = []
854        for (message, bucket) in self._failure_reasons:
855            # Use -a option for grep to avoid "binary file" warning to stdout.
856            # The grep -v is added to not match itself in the log (across jobs).
857            # Using grep is slightly problematic as it finds any reason, not
858            # just the most recent reason (since 2 boots ago), so it may guess
859            # wrong. Multiple reboots are unusual in the lab setting though and
860            # it is better to have a reasonable guess than no reason at all.
861            found = self.host.run(
862                "grep -aE '" + message + "' /var/log/messages | grep -av grep",
863                ignore_status=True
864            ).stdout
865            if found and found.strip():
866                reasons.append(bucket)
867        signature = 'reason unknown'
868        if reasons:
869            # Concatenate possible reasons found to obtain a magic signature.
870            signature = ', '.join(reasons)
871        return ('DUT rebooted during the test run. (%s)\n' % signature)
872
873
874    def _diagnose_dut(self, old_boot_id=None):
875        """
876        Run diagnostic checks on a DUT.
877
878        1. ping: A dead host will not respond to pings.
879        2. ssh (happens with 3.): DUT hangs usually fail in authentication
880            but respond to pings.
881        3. Check if a reboot occured: A healthy but unexpected reboot leaves the
882            host running with a new boot id.
883
884        This method will always raise an exception from the AutotestFailure
885        family and should only get called when the reason for a test failing
886        is ambiguous.
887
888        @raises AutotestDeviceNotPingable: If the DUT doesn't respond to ping.
889        @raises AutotestDeviceNotSSHable: If we cannot SSH into the DUT.
890        @raises AutotestDeviceRebooted: If the boot id changed.
891        @raises AutotestAbort: If none of the above exceptions were raised.
892            Since we have no recourse we must abort at this stage.
893        """
894        msg = 'Autotest client terminated unexpectedly: '
895        if utils.ping(self.host.hostname, tries=1, deadline=1) != 0:
896            msg += 'DUT is no longer pingable, it may have rebooted or hung.\n'
897            raise AutotestDeviceNotPingable(msg)
898
899        if old_boot_id:
900            try:
901                new_boot_id = self.host.get_boot_id(timeout=60)
902            except Exception as e:
903                msg += ('DUT is pingable but not SSHable, it most likely'
904                        ' sporadically rebooted during testing. %s\n' % str(e))
905                raise AutotestDeviceNotSSHable(msg)
906            else:
907                if new_boot_id != old_boot_id:
908                    msg += self._diagnose_reboot()
909                    raise AutotestDeviceRebooted(msg)
910
911            msg += ('DUT is pingable, SSHable and did NOT restart '
912                    'un-expectedly. We probably lost connectivity during the '
913                    'test.')
914        else:
915            msg += ('DUT is pingable, could not determine if an un-expected '
916                    'reboot occured during the test.')
917
918        raise AutotestAbort(msg)
919
920
921    def log_unexpected_abort(self, stderr_redirector, old_boot_id=None):
922        """
923        Logs that something unexpected happened, then tries to diagnose the
924        failure. The purpose of this function is only to close out the status
925        log with the appropriate error message, not to critically terminate
926        the program.
927
928        @param stderr_redirector: log stream.
929        @param old_boot_id: boot id used to infer if a reboot occured.
930        """
931        stderr_redirector.flush_all_buffers()
932        try:
933            self._diagnose_dut(old_boot_id)
934        except AutotestFailure as e:
935            self.host.job.record('END ABORT', None, None, str(e))
936
937
938    def _execute_in_background(self, section, timeout):
939        full_cmd = self.get_background_cmd(section)
940        devnull = open(os.devnull, "w")
941
942        self.copy_client_config_file(self.get_client_log())
943
944        self.host.job.push_execution_context(self.results_dir)
945        try:
946            result = self.host.run(full_cmd, ignore_status=True,
947                                   timeout=timeout,
948                                   stdout_tee=devnull,
949                                   stderr_tee=devnull)
950        finally:
951            self.host.job.pop_execution_context()
952
953        return result
954
955
956    @staticmethod
957    def _strip_stderr_prologue(stderr):
958        """Strips the 'standard' prologue that get pre-pended to every
959        remote command and returns the text that was actually written to
960        stderr by the remote command."""
961        stderr_lines = stderr.split("\n")[1:]
962        if not stderr_lines:
963            return ""
964        elif stderr_lines[0].startswith("NOTE: autotestd_monitor"):
965            del stderr_lines[0]
966        return "\n".join(stderr_lines)
967
968
969    def _execute_daemon(self, section, timeout, stderr_redirector,
970                        client_disconnect_timeout):
971        monitor_dir = self.host.get_tmp_dir()
972        daemon_cmd = self.get_daemon_cmd(section, monitor_dir)
973
974        # grab the location for the server-side client log file
975        client_log_prefix = self.get_client_log()
976        client_log_path = os.path.join(self.results_dir, 'debug',
977                                       client_log_prefix + '.log')
978        client_log = open(client_log_path, 'w', 0)
979        self.copy_client_config_file(client_log_prefix)
980
981        stdout_read = stderr_read = 0
982        self.host.job.push_execution_context(self.results_dir)
983        try:
984            self.host.run(daemon_cmd, ignore_status=True, timeout=timeout)
985            disconnect_warnings = []
986            while True:
987                monitor_cmd = self.get_monitor_cmd(monitor_dir, stdout_read,
988                                                   stderr_read)
989                try:
990                    result = self.host.run(monitor_cmd, ignore_status=True,
991                                           timeout=timeout,
992                                           stdout_tee=client_log,
993                                           stderr_tee=stderr_redirector)
994                except error.AutoservRunError, e:
995                    result = e.result_obj
996                    result.exit_status = None
997                    disconnect_warnings.append(e.description)
998
999                    stderr_redirector.log_warning(
1000                        "Autotest client was disconnected: %s" % e.description,
1001                        "NETWORK")
1002                except error.AutoservSSHTimeout:
1003                    result = utils.CmdResult(monitor_cmd, "", "", None, 0)
1004                    stderr_redirector.log_warning(
1005                        "Attempt to connect to Autotest client timed out",
1006                        "NETWORK")
1007
1008                stdout_read += len(result.stdout)
1009                stderr_read += len(self._strip_stderr_prologue(result.stderr))
1010
1011                if result.exit_status is not None:
1012                    # TODO (crosbug.com/38224)- sbasi: Remove extra logging.
1013                    logging.debug('Result exit status is %d.',
1014                                  result.exit_status)
1015                    return result
1016                elif not self.host.wait_up(client_disconnect_timeout):
1017                    raise error.AutoservSSHTimeout(
1018                        "client was disconnected, reconnect timed out")
1019        finally:
1020            client_log.close()
1021            self.host.job.pop_execution_context()
1022
1023
1024    def execute_section(self, section, timeout, stderr_redirector,
1025                        client_disconnect_timeout):
1026        # TODO(crbug.com/684311) The claim is that section is never more than 0
1027        # in pratice. After validating for a week or so, delete all support of
1028        # multiple sections.
1029        metrics.Counter('chromeos/autotest/autotest/sections').increment(
1030                fields={'is_first_section': (section == 0)})
1031        logging.info("Executing %s/bin/autotest %s/control phase %d",
1032                     self.autodir, self.autodir, section)
1033
1034        if self.background:
1035            result = self._execute_in_background(section, timeout)
1036        else:
1037            result = self._execute_daemon(section, timeout, stderr_redirector,
1038                                          client_disconnect_timeout)
1039
1040        last_line = stderr_redirector.last_line
1041
1042        # check if we failed hard enough to warrant an exception
1043        if result.exit_status == 1:
1044            err = error.AutotestRunError("client job was aborted")
1045        elif not self.background and not result.stderr:
1046            err = error.AutotestRunError(
1047                "execute_section %s failed to return anything\n"
1048                "stdout:%s\n" % (section, result.stdout))
1049        else:
1050            err = None
1051
1052        # log something if the client failed AND never finished logging
1053        if err and not self.is_client_job_finished(last_line):
1054            self.log_unexpected_abort(stderr_redirector)
1055
1056        if err:
1057            raise err
1058        else:
1059            return stderr_redirector.last_line
1060
1061
1062    def _wait_for_reboot(self, old_boot_id):
1063        logging.info("Client is rebooting")
1064        logging.info("Waiting for client to halt")
1065        if not self.host.wait_down(self.host.WAIT_DOWN_REBOOT_TIMEOUT,
1066                                   old_boot_id=old_boot_id):
1067            err = "%s failed to shutdown after %d"
1068            err %= (self.host.hostname, self.host.WAIT_DOWN_REBOOT_TIMEOUT)
1069            raise error.AutotestRunError(err)
1070        logging.info("Client down, waiting for restart")
1071        if not self.host.wait_up(self.host.DEFAULT_REBOOT_TIMEOUT):
1072            # since reboot failed
1073            # hardreset the machine once if possible
1074            # before failing this control file
1075            warning = "%s did not come back up, hard resetting"
1076            warning %= self.host.hostname
1077            logging.warning(warning)
1078            try:
1079                self.host.hardreset(wait=False)
1080            except (AttributeError, error.AutoservUnsupportedError):
1081                warning = "Hard reset unsupported on %s"
1082                warning %= self.host.hostname
1083                logging.warning(warning)
1084            raise error.AutotestRunError("%s failed to boot after %ds" %
1085                                         (self.host.hostname,
1086                                          self.host.DEFAULT_REBOOT_TIMEOUT))
1087        self.host.reboot_followup()
1088
1089
1090    def execute_control(self, timeout=None, client_disconnect_timeout=None):
1091        if not self.background:
1092            collector = log_collector(self.host, self.tag, self.results_dir)
1093            hostname = self.host.hostname
1094            remote_results = collector.client_results_dir
1095            local_results = collector.server_results_dir
1096            self.host.job.add_client_log(hostname, remote_results,
1097                                         local_results)
1098            job_record_context = self.host.job.get_record_context()
1099
1100        section = 0
1101        start_time = time.time()
1102
1103        logger = client_logger(self.host, self.tag, self.results_dir)
1104        try:
1105            while not timeout or time.time() < start_time + timeout:
1106                if timeout:
1107                    section_timeout = start_time + timeout - time.time()
1108                else:
1109                    section_timeout = None
1110                boot_id = self.host.get_boot_id()
1111                last = self.execute_section(section, section_timeout,
1112                                            logger, client_disconnect_timeout)
1113                if self.background:
1114                    return
1115                section += 1
1116                if self.is_client_job_finished(last):
1117                    logging.info("Client complete")
1118                    return
1119                elif self.is_client_job_rebooting(last):
1120                    try:
1121                        self._wait_for_reboot(boot_id)
1122                    except error.AutotestRunError, e:
1123                        self.host.job.record("ABORT", None, "reboot", str(e))
1124                        self.host.job.record("END ABORT", None, None, str(e))
1125                        raise
1126                    continue
1127
1128                # If a test fails without probable cause we try to bucket it's
1129                # failure into one of 2 categories. If we can determine the
1130                # current state of the device and it is suspicious, we close the
1131                # status lines indicating a failure. If we either cannot
1132                # determine the state of the device, or it appears totally
1133                # healthy, we give up and abort.
1134                try:
1135                    self._diagnose_dut(boot_id)
1136                except AutotestDeviceError as e:
1137                    # The status lines of the test are pretty much tailed to
1138                    # our log, with indentation, from the client job on the DUT.
1139                    # So if the DUT goes down unexpectedly we'll end up with a
1140                    # malformed status log unless we manually unwind the status
1141                    # stack. Ideally we would want to write a nice wrapper like
1142                    # server_job methods run_reboot, run_group but they expect
1143                    # reboots and we don't.
1144                    self.host.job.record('FAIL', None, None, str(e))
1145                    self.host.job.record('END FAIL', None, None)
1146                    self.host.job.record('END GOOD', None, None)
1147                    self.host.job.failed_with_device_error = True
1148                    return
1149                except AutotestAbort as e:
1150                    self.host.job.record('ABORT', None, None, str(e))
1151                    self.host.job.record('END ABORT', None, None)
1152
1153                    # give the client machine a chance to recover from a crash
1154                    self.host.wait_up(
1155                        self.host.HOURS_TO_WAIT_FOR_RECOVERY * 3600)
1156                    logging.debug('Unexpected final status message from '
1157                                  'client %s: %s', self.host.hostname, last)
1158                    # The line 'last' may have sensitive phrases, like
1159                    # 'END GOOD', which breaks the tko parser. So the error
1160                    # message will exclude it, since it will be recorded to
1161                    # status.log.
1162                    msg = ("Aborting - unexpected final status message from "
1163                           "client on %s\n") % self.host.hostname
1164                    raise error.AutotestRunError(msg)
1165        finally:
1166            logging.debug('Autotest job finishes running. Below is the '
1167                          'post-processing operations.')
1168            logger.close()
1169            if not self.background:
1170                collector.collect_client_job_results()
1171                collector.remove_redundant_client_logs()
1172                state_file = os.path.basename(self.remote_control_file
1173                                              + '.state')
1174                state_path = os.path.join(self.results_dir, state_file)
1175                self.host.job.postprocess_client_state(state_path)
1176                self.host.job.remove_client_log(hostname, remote_results,
1177                                                local_results)
1178                job_record_context.restore()
1179
1180            logging.debug('Autotest job finishes.')
1181
1182        # should only get here if we timed out
1183        assert timeout
1184        raise error.AutotestTimeoutError()
1185
1186
1187class log_collector(object):
1188    def __init__(self, host, client_tag, results_dir):
1189        self.host = host
1190        if not client_tag:
1191            client_tag = "default"
1192        self.client_results_dir = os.path.join(host.get_autodir(), "results",
1193                                               client_tag)
1194        self.server_results_dir = results_dir
1195
1196
1197    def collect_client_job_results(self):
1198        """ A method that collects all the current results of a running
1199        client job into the results dir. By default does nothing as no
1200        client job is running, but when running a client job you can override
1201        this with something that will actually do something. """
1202        # make an effort to wait for the machine to come up
1203        try:
1204            self.host.wait_up(timeout=30)
1205        except error.AutoservError:
1206            # don't worry about any errors, we'll try and
1207            # get the results anyway
1208            pass
1209
1210        # Copy all dirs in default to results_dir
1211        try:
1212            # Build test result directory summary
1213            result_tools_runner.run_on_client(
1214                    self.host, self.client_results_dir)
1215
1216            with metrics.SecondsTimer(
1217                    'chromeos/autotest/job/log_collection_duration',
1218                    fields={'dut_host_name': self.host.hostname}):
1219                self.host.get_file(
1220                        self.client_results_dir + '/',
1221                        self.server_results_dir,
1222                        preserve_symlinks=True)
1223        except Exception:
1224            # well, don't stop running just because we couldn't get logs
1225            e_msg = "Unexpected error copying test result logs, continuing ..."
1226            logging.error(e_msg)
1227            traceback.print_exc(file=sys.stdout)
1228
1229
1230    def remove_redundant_client_logs(self):
1231        """Remove client.*.log files in favour of client.*.DEBUG files."""
1232        debug_dir = os.path.join(self.server_results_dir, 'debug')
1233        debug_files = [f for f in os.listdir(debug_dir)
1234                       if re.search(r'^client\.\d+\.DEBUG$', f)]
1235        for debug_file in debug_files:
1236            log_file = debug_file.replace('DEBUG', 'log')
1237            log_file = os.path.join(debug_dir, log_file)
1238            if os.path.exists(log_file):
1239                os.remove(log_file)
1240
1241
1242# a file-like object for catching stderr from an autotest client and
1243# extracting status logs from it
1244class client_logger(object):
1245    """Partial file object to write to both stdout and
1246    the status log file.  We only implement those methods
1247    utils.run() actually calls.
1248    """
1249    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
1250    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
1251    fetch_package_parser = re.compile(
1252        r"^AUTOTEST_FETCH_PACKAGE:([^:]*):([^:]*):(.*)$")
1253    extract_indent = re.compile(r"^(\t*).*$")
1254    extract_timestamp = re.compile(r".*\ttimestamp=(\d+)\t.*$")
1255
1256    def __init__(self, host, tag, server_results_dir):
1257        self.host = host
1258        self.job = host.job
1259        self.log_collector = log_collector(host, tag, server_results_dir)
1260        self.leftover = ""
1261        self.last_line = ""
1262        self.logs = {}
1263
1264
1265    def _process_log_dict(self, log_dict):
1266        log_list = log_dict.pop("logs", [])
1267        for key in sorted(log_dict.iterkeys()):
1268            log_list += self._process_log_dict(log_dict.pop(key))
1269        return log_list
1270
1271
1272    def _process_logs(self):
1273        """Go through the accumulated logs in self.log and print them
1274        out to stdout and the status log. Note that this processes
1275        logs in an ordering where:
1276
1277        1) logs to different tags are never interleaved
1278        2) logs to x.y come before logs to x.y.z for all z
1279        3) logs to x.y come before x.z whenever y < z
1280
1281        Note that this will in general not be the same as the
1282        chronological ordering of the logs. However, if a chronological
1283        ordering is desired that one can be reconstructed from the
1284        status log by looking at timestamp lines."""
1285        log_list = self._process_log_dict(self.logs)
1286        for entry in log_list:
1287            self.job.record_entry(entry, log_in_subdir=False)
1288        if log_list:
1289            self.last_line = log_list[-1].render()
1290
1291
1292    def _process_quoted_line(self, tag, line):
1293        """Process a line quoted with an AUTOTEST_STATUS flag. If the
1294        tag is blank then we want to push out all the data we've been
1295        building up in self.logs, and then the newest line. If the
1296        tag is not blank, then push the line into the logs for handling
1297        later."""
1298        entry = base_job.status_log_entry.parse(line)
1299        if entry is None:
1300            return  # the line contains no status lines
1301        if tag == "":
1302            self._process_logs()
1303            self.job.record_entry(entry, log_in_subdir=False)
1304            self.last_line = line
1305        else:
1306            tag_parts = [int(x) for x in tag.split(".")]
1307            log_dict = self.logs
1308            for part in tag_parts:
1309                log_dict = log_dict.setdefault(part, {})
1310            log_list = log_dict.setdefault("logs", [])
1311            log_list.append(entry)
1312
1313
1314    def _process_info_line(self, line):
1315        """Check if line is an INFO line, and if it is, interpret any control
1316        messages (e.g. enabling/disabling warnings) that it may contain."""
1317        match = re.search(r"^\t*INFO\t----\t----(.*)\t[^\t]*$", line)
1318        if not match:
1319            return   # not an INFO line
1320        for field in match.group(1).split('\t'):
1321            if field.startswith("warnings.enable="):
1322                func = self.job.warning_manager.enable_warnings
1323            elif field.startswith("warnings.disable="):
1324                func = self.job.warning_manager.disable_warnings
1325            else:
1326                continue
1327            warning_type = field.split("=", 1)[1]
1328            func(warning_type)
1329
1330
1331    def _process_line(self, line):
1332        """Write out a line of data to the appropriate stream.
1333
1334        Returns the package checksum file if it exists.
1335
1336        Status lines sent by autotest will be prepended with
1337        "AUTOTEST_STATUS", and all other lines are ssh error messages.
1338        """
1339        logging.debug(line)
1340        fetch_package_match = self.fetch_package_parser.search(line)
1341        if fetch_package_match:
1342            pkg_name, dest_path, fifo_path = fetch_package_match.groups()
1343            serve_packages = _CONFIG.get_config_value(
1344                "PACKAGES", "serve_packages_from_autoserv", type=bool)
1345            if serve_packages and pkg_name == 'packages.checksum':
1346                try:
1347                    checksum_file = os.path.join(
1348                        self.job.pkgmgr.pkgmgr_dir, 'packages', pkg_name)
1349                    if os.path.exists(checksum_file):
1350                        self.host.send_file(checksum_file, dest_path)
1351                except error.AutoservRunError:
1352                    msg = "Package checksum file not found, continuing anyway"
1353                    logging.exception(msg)
1354
1355                try:
1356                    # When fetching a package, the client expects to be
1357                    # notified when the fetching is complete. Autotest
1358                    # does this pushing a B to a fifo queue to the client.
1359                    self.host.run("echo B > %s" % fifo_path)
1360                except error.AutoservRunError:
1361                    msg = "Checksum installation failed, continuing anyway"
1362                    logging.exception(msg)
1363                finally:
1364                    return
1365
1366        status_match = self.status_parser.search(line)
1367        test_complete_match = self.test_complete_parser.search(line)
1368        fetch_package_match = self.fetch_package_parser.search(line)
1369        if status_match:
1370            tag, line = status_match.groups()
1371            self._process_info_line(line)
1372            self._process_quoted_line(tag, line)
1373        elif test_complete_match:
1374            self._process_logs()
1375            fifo_path, = test_complete_match.groups()
1376            try:
1377                self.log_collector.collect_client_job_results()
1378                self.host.run("echo A > %s" % fifo_path)
1379            except Exception:
1380                msg = "Post-test log collection failed, continuing anyway"
1381                logging.exception(msg)
1382        elif fetch_package_match:
1383            pkg_name, dest_path, fifo_path = fetch_package_match.groups()
1384            serve_packages = global_config.global_config.get_config_value(
1385                "PACKAGES", "serve_packages_from_autoserv", type=bool)
1386            if serve_packages and pkg_name.endswith(".tar.bz2"):
1387                try:
1388                    self._send_tarball(pkg_name, dest_path)
1389                except Exception:
1390                    msg = "Package tarball creation failed, continuing anyway"
1391                    logging.exception(msg)
1392            try:
1393                self.host.run("echo B > %s" % fifo_path)
1394            except Exception:
1395                msg = "Package tarball installation failed, continuing anyway"
1396                logging.exception(msg)
1397        else:
1398            logging.info(line)
1399
1400
1401    def _send_tarball(self, pkg_name, remote_dest):
1402        """Uses tarballs in package manager by default."""
1403        try:
1404            server_package = os.path.join(self.job.pkgmgr.pkgmgr_dir,
1405                                          'packages', pkg_name)
1406            if os.path.exists(server_package):
1407              self.host.send_file(server_package, remote_dest)
1408              return
1409
1410        except error.AutoservRunError:
1411            msg = ("Package %s could not be sent from the package cache." %
1412                   pkg_name)
1413            logging.exception(msg)
1414
1415        name, pkg_type = self.job.pkgmgr.parse_tarball_name(pkg_name)
1416        src_dirs = []
1417        if pkg_type == 'test':
1418            for test_dir in ['site_tests', 'tests']:
1419                src_dir = os.path.join(self.job.clientdir, test_dir, name)
1420                if os.path.exists(src_dir):
1421                    src_dirs += [src_dir]
1422                    break
1423        elif pkg_type == 'profiler':
1424            src_dirs += [os.path.join(self.job.clientdir, 'profilers', name)]
1425        elif pkg_type == 'dep':
1426            src_dirs += [os.path.join(self.job.clientdir, 'deps', name)]
1427        elif pkg_type == 'client':
1428            return  # you must already have a client to hit this anyway
1429        else:
1430            return  # no other types are supported
1431
1432        # iterate over src_dirs until we find one that exists, then tar it
1433        for src_dir in src_dirs:
1434            if os.path.exists(src_dir):
1435                try:
1436                    logging.info('Bundling %s into %s', src_dir, pkg_name)
1437                    temp_dir = autotemp.tempdir(unique_id='autoserv-packager',
1438                                                dir=self.job.tmpdir)
1439                    tarball_path = self.job.pkgmgr.tar_package(
1440                        pkg_name, src_dir, temp_dir.name, " .")
1441                    self.host.send_file(tarball_path, remote_dest)
1442                finally:
1443                    temp_dir.clean()
1444                return
1445
1446
1447    def log_warning(self, msg, warning_type):
1448        """Injects a WARN message into the current status logging stream."""
1449        timestamp = int(time.time())
1450        if self.job.warning_manager.is_valid(timestamp, warning_type):
1451            self.job.record('WARN', None, None, msg)
1452
1453
1454    def write(self, data):
1455        # now start processing the existing buffer and the new data
1456        data = self.leftover + data
1457        lines = data.split('\n')
1458        processed_lines = 0
1459        try:
1460            # process all the buffered data except the last line
1461            # ignore the last line since we may not have all of it yet
1462            for line in lines[:-1]:
1463                self._process_line(line)
1464                processed_lines += 1
1465        finally:
1466            # save any unprocessed lines for future processing
1467            self.leftover = '\n'.join(lines[processed_lines:])
1468
1469
1470    def flush(self):
1471        sys.stdout.flush()
1472
1473
1474    def flush_all_buffers(self):
1475        if self.leftover:
1476            self._process_line(self.leftover)
1477            self.leftover = ""
1478        self._process_logs()
1479        self.flush()
1480
1481
1482    def close(self):
1483        self.flush_all_buffers()
1484