• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# repohooks/pre-upload.py currently does not run pylint. But for developers who
6# want to check their code manually we disable several harmless pylint warnings
7# which just distract from more serious remaining issues.
8#
9# The instance variables _host and _install_paths are not defined in __init__().
10# pylint: disable=attribute-defined-outside-init
11#
12# Many short variable names don't follow the naming convention.
13# pylint: disable=invalid-name
14#
15# _parse_result() and _dir_size() don't access self and could be functions.
16# pylint: disable=no-self-use
17#
18# _ChromeLogin and _TradefedLogCollector have no public methods.
19# pylint: disable=too-few-public-methods
20
21import contextlib
22import errno
23import glob
24import hashlib
25import lockfile
26import logging
27import os
28import pipes
29import random
30import re
31import shutil
32import stat
33import tempfile
34import urlparse
35
36from autotest_lib.client.bin import utils as client_utils
37from autotest_lib.client.common_lib import utils as common_utils
38from autotest_lib.client.common_lib import error
39from autotest_lib.client.common_lib.cros import dev_server
40from autotest_lib.server import autotest
41from autotest_lib.server import test
42from autotest_lib.server import utils
43
44# TODO(ihf): If akeshet doesn't fix crbug.com/691046 delete metrics again.
45try:
46    from chromite.lib import metrics
47except ImportError:
48    metrics = utils.metrics_mock
49
50# TODO(ihf): Find a home for all these paths. This is getting out of hand.
51_SDK_TOOLS_DIR_M = 'gs://chromeos-arc-images/builds/git_mnc-dr-arc-dev-linux-static_sdk_tools/3554341'
52_SDK_TOOLS_FILES = ['aapt']
53# To stabilize adb behavior, we use dynamically linked adb.
54_ADB_DIR_M = 'gs://chromeos-arc-images/builds/git_mnc-dr-arc-dev-linux-cheets_arm-user/3554341'
55_ADB_FILES = ['adb']
56
57_ADB_POLLING_INTERVAL_SECONDS = 1
58_ADB_READY_TIMEOUT_SECONDS = 60
59_ANDROID_ADB_KEYS_PATH = '/data/misc/adb/adb_keys'
60
61_ARC_POLLING_INTERVAL_SECONDS = 1
62_ARC_READY_TIMEOUT_SECONDS = 60
63
64_TRADEFED_PREFIX = 'autotest-tradefed-install_'
65_TRADEFED_CACHE_LOCAL = '/tmp/autotest-tradefed-cache'
66_TRADEFED_CACHE_CONTAINER = '/usr/local/autotest/results/shared/cache'
67_TRADEFED_CACHE_CONTAINER_LOCK = '/usr/local/autotest/results/shared/lock'
68
69# According to dshi a drone has 500GB of disk space. It is ok for now to use
70# 10GB of disk space, as no more than 10 tests should run in parallel.
71# TODO(ihf): Investigate tighter cache size.
72_TRADEFED_CACHE_MAX_SIZE = (10 * 1024 * 1024 * 1024)
73
74
75class _ChromeLogin(object):
76    """Context manager to handle Chrome login state."""
77
78    def __init__(self, host, cts_helper_kwargs):
79        self._host = host
80        self._cts_helper_kwargs = cts_helper_kwargs
81
82    def __enter__(self):
83        """Logs in to the Chrome."""
84        logging.info('Ensure Android is running...')
85        # If we can't login to Chrome and launch Android we want this job to
86        # die roughly after 5 minutes instead of hanging for the duration.
87        autotest.Autotest(self._host).run_timed_test('cheets_StartAndroid',
88                                                     timeout=300,
89                                                     check_client_result=True,
90                                                     **self._cts_helper_kwargs)
91
92    def __exit__(self, exc_type, exc_value, traceback):
93        """On exit, to wipe out all the login state, reboot the machine.
94
95        @param exc_type: Exception type if an exception is raised from the
96                         with-block.
97        @param exc_value: Exception instance if an exception is raised from
98                          the with-block.
99        @param traceback: Stack trace info if an exception is raised from
100                          the with-block.
101        @return None, indicating not to ignore an exception from the with-block
102                if raised.
103        """
104        logging.info('Rebooting...')
105        try:
106            self._host.reboot()
107        except Exception:
108            if exc_type is None:
109                raise
110            # If an exception is raise from the with-block, just record the
111            # exception for the rebooting to avoid ignoring the original
112            # exception.
113            logging.exception('Rebooting failed.')
114
115
116@contextlib.contextmanager
117def lock(filename):
118    """Prevents other autotest/tradefed instances from accessing cache.
119
120    @param filename: The file to be locked.
121    """
122    filelock = lockfile.FileLock(filename)
123    # It is tempting just to call filelock.acquire(3600). But the implementation
124    # has very poor temporal granularity (timeout/10), which is unsuitable for
125    # our needs. See /usr/lib64/python2.7/site-packages/lockfile/
126    attempts = 0
127    while not filelock.i_am_locking():
128        try:
129            attempts += 1
130            logging.info('Waiting for cache lock...')
131            filelock.acquire(random.randint(1, 5))
132        except (lockfile.AlreadyLocked, lockfile.LockTimeout):
133            if attempts > 1000:
134                # Normally we should aqcuire the lock in a few seconds. Once we
135                # wait on the order of hours either the dev server IO is
136                # overloaded or a lock didn't get cleaned up. Take one for the
137                # team, break the lock and report a failure. This should fix
138                # the lock for following tests. If the failure affects more than
139                # one job look for a deadlock or dev server overload.
140                logging.error('Permanent lock failure. Trying to break lock.')
141                filelock.break_lock()
142                raise error.TestFail('Error: permanent cache lock failure.')
143        else:
144            logging.info('Acquired cache lock after %d attempts.', attempts)
145    try:
146        yield
147    finally:
148        filelock.release()
149        logging.info('Released cache lock.')
150
151
152@contextlib.contextmanager
153def adb_keepalive(target, extra_paths):
154    """A context manager that keeps the adb connection alive.
155
156    AdbKeepalive will spin off a new process that will continuously poll for
157    adb's connected state, and will attempt to reconnect if it ever goes down.
158    This is the only way we can currently recover safely from (intentional)
159    reboots.
160
161    @param target: the hostname and port of the DUT.
162    @param extra_paths: any additional components to the PATH environment
163                        variable.
164    """
165    from autotest_lib.client.common_lib.cros import adb_keepalive as module
166    # |__file__| returns the absolute path of the compiled bytecode of the
167    # module. We want to run the original .py file, so we need to change the
168    # extension back.
169    script_filename = module.__file__.replace('.pyc', '.py')
170    job = common_utils.BgJob([script_filename, target],
171                           nickname='adb_keepalive', stderr_level=logging.DEBUG,
172                           stdout_tee=common_utils.TEE_TO_LOGS,
173                           stderr_tee=common_utils.TEE_TO_LOGS,
174                           extra_paths=extra_paths)
175
176    try:
177        yield
178    finally:
179        # The adb_keepalive.py script runs forever until SIGTERM is sent.
180        common_utils.nuke_subprocess(job.sp)
181        common_utils.join_bg_jobs([job])
182
183
184@contextlib.contextmanager
185def pushd(d):
186    """Defines pushd.
187    @param d: the directory to change to.
188    """
189    current = os.getcwd()
190    os.chdir(d)
191    try:
192        yield
193    finally:
194        os.chdir(current)
195
196
197def parse_tradefed_v2_result(result, waivers=None):
198    """Check the result from the tradefed-v2 output.
199
200    @param result: The result stdout string from the tradefed command.
201    @param waivers: a set() of tests which are permitted to fail.
202    @return 5-tuple (tests, passed, failed, notexecuted, waived)
203    """
204    # Regular expressions for start/end messages of each test-run chunk.
205    abi_re = r'armeabi-v7a|x86'
206    # TODO(kinaba): use the current running module name.
207    module_re = r'\S+'
208    start_re = re.compile(r'(?:Start|Continu)ing (%s) %s with'
209                          r' (\d+(?:,\d+)?) test' % (abi_re, module_re))
210    end_re = re.compile(r'(%s) %s (?:complet|fail)ed in .*\.'
211                        r' (\d+) passed, (\d+) failed, (\d+) not executed'
212                        % (abi_re, module_re))
213
214    # Records the result per each ABI.
215    total_test = dict()
216    total_pass = dict()
217    total_fail = dict()
218    last_notexec = dict()
219
220    # ABI and the test count for the current chunk.
221    abi = None
222    ntest = None
223    prev_npass = prev_nfail = prev_nnotexec = None
224
225    for line in result.splitlines():
226        # Beginning of a chunk of tests.
227        match = start_re.search(line)
228        if match:
229           if abi:
230               raise error.TestFail('Error: Unexpected test start: ' + line)
231           abi = match.group(1)
232           ntest = int(match.group(2).replace(',',''))
233           prev_npass = prev_nfail = prev_nnotexec = None
234        else:
235           # End of the current chunk.
236           match = end_re.search(line)
237           if not match:
238               continue
239
240           npass, nfail, nnotexec = map(int, match.group(2,3,4))
241           if abi != match.group(1):
242               # When the last case crashed during teardown, tradefed emits two
243               # end-messages with possibly increased fail count. Ignore it.
244               if (prev_npass == npass and (prev_nfail == nfail or
245                   prev_nfail == nfail - 1) and prev_nnotexec == nnotexec):
246                   continue
247               raise error.TestFail('Error: Unexpected test end: ' + line)
248           prev_npass, prev_nfail, prev_nnotexec = npass, nfail, nnotexec
249
250           # When the test crashes too ofen, tradefed seems to finish the
251           # iteration by running "0 tests, 0 passed, ...". Do not count
252           # that in.
253           if ntest > 0:
254               total_test[abi] = (total_test.get(abi, 0) + ntest -
255                   last_notexec.get(abi, 0))
256               total_pass[abi] = total_pass.get(abi, 0) + npass
257               total_fail[abi] = total_fail.get(abi, 0) + nfail
258               last_notexec[abi] = nnotexec
259           abi = None
260
261    if abi:
262        raise error.TestFail('Error: No end message for the last chunk.')
263
264    # TODO(rohitbm): make failure parsing more robust by extracting the list
265    # of failing tests instead of searching in the result blob. As well as
266    # only parse for waivers for the running ABI.
267    waived = 0
268    if waivers:
269        abis = total_test.keys()
270        for testname in waivers:
271            # TODO(dhaddock): Find a more robust way to apply waivers.
272            fail_count = (result.count(testname + ' FAIL') +
273                          result.count(testname + ' fail'))
274            if fail_count:
275                if fail_count > len(abis):
276                    # This should be an error.TestFail, but unfortunately
277                    # tradefed has a bug that emits "fail" twice when a
278                    # test failed during teardown. It will anyway causes
279                    # a test count inconsistency and visible on the dashboard.
280                    logging.error('Found %d failures for %s '
281                                  'but there are only %d abis: %s',
282                                  fail_count, testname, len(abis), abis)
283                waived += fail_count
284                logging.info('Waived failure for %s %d time(s)',
285                             testname, fail_count)
286    counts = tuple(sum(count_per_abi.values()) for count_per_abi in
287        (total_test, total_pass, total_fail, last_notexec)) + (waived,)
288    msg = ('tests=%d, passed=%d, failed=%d, not_executed=%d, waived=%d' %
289           counts)
290    logging.info(msg)
291    if counts[2] - waived < 0:
292        raise error.TestFail('Error: Internal waiver bookkeeping has '
293                             'become inconsistent (%s)' % msg)
294    return counts
295
296
297def select_32bit_java():
298    """Switches to 32 bit java if installed (like in lab lxc images) to save
299    about 30-40% server/shard memory during the run."""
300    if utils.is_in_container() and not client_utils.is_moblab():
301        java = '/usr/lib/jvm/java-8-openjdk-i386'
302        if os.path.exists(java):
303            logging.info('Found 32 bit java, switching to use it.')
304            os.environ['JAVA_HOME'] = java
305            os.environ['PATH'] = (os.path.join(java, 'bin') + os.pathsep +
306                                  os.environ['PATH'])
307
308
309class TradefedTest(test.test):
310    """Base class to prepare DUT to run tests via tradefed."""
311    version = 1
312
313    # TODO(ihf): Remove _ABD_DIR_M/_SDK_TOOLS_DIR_M defaults once M is dead.
314    def initialize(self, host=None, adb_dir=_ADB_DIR_M,
315                   sdk_tools_dir=_SDK_TOOLS_DIR_M):
316        """Sets up the tools and binary bundles for the test."""
317        logging.info('Hostname: %s', host.hostname)
318        self._host = host
319        self._install_paths = []
320        # Tests in the lab run within individual lxc container instances.
321        if utils.is_in_container():
322            cache_root = _TRADEFED_CACHE_CONTAINER
323        else:
324            cache_root = _TRADEFED_CACHE_LOCAL
325
326        # TODO(ihf): reevaluate this again when we run out of memory. We could
327        # for example use 32 bit java on the first run but not during retries.
328        # b/62895114. If select_32bit_java gets deleted for good also remove it
329        # from the base image.
330        # Try to save server memory (crbug.com/717413).
331        # select_32bit_java()
332
333        # Quick sanity check and spew of java version installed on the server.
334        utils.run('java', args=('-version',), ignore_status=False, verbose=True,
335                  stdout_tee=utils.TEE_TO_LOGS, stderr_tee=utils.TEE_TO_LOGS)
336        # The content of the cache survives across jobs.
337        self._safe_makedirs(cache_root)
338        self._tradefed_cache = os.path.join(cache_root, 'cache')
339        self._tradefed_cache_lock = os.path.join(cache_root, 'lock')
340        # The content of the install location does not survive across jobs and
341        # is isolated (by using a unique path)_against other autotest instances.
342        # This is not needed for the lab, but if somebody wants to run multiple
343        # TradedefTest instance.
344        self._tradefed_install = tempfile.mkdtemp(prefix=_TRADEFED_PREFIX)
345        # Under lxc the cache is shared between multiple autotest/tradefed
346        # instances. We need to synchronize access to it. All binaries are
347        # installed through the (shared) cache into the local (unshared)
348        # lxc/autotest instance storage.
349        # If clearing the cache it must happen before all downloads.
350        self._clear_download_cache_if_needed()
351        # Set permissions (rwxr-xr-x) to the executable binaries.
352        permission = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH
353                | stat.S_IXOTH)
354        self._install_files(adb_dir, _ADB_FILES, permission)
355        self._install_files(sdk_tools_dir, _SDK_TOOLS_FILES, permission)
356
357    def cleanup(self):
358        """Cleans up any dirtied state."""
359        # Kill any lingering adb servers.
360        self._run('adb', verbose=True, args=('kill-server',))
361        logging.info('Cleaning up %s.', self._tradefed_install)
362        shutil.rmtree(self._tradefed_install)
363
364    def _login_chrome(self, **cts_helper_kwargs):
365        """Returns Chrome log-in context manager.
366
367        Please see also cheets_StartAndroid for details about how this works.
368        """
369        return _ChromeLogin(self._host, cts_helper_kwargs)
370
371    def _get_adb_target(self):
372        return '{}:{}'.format(self._host.hostname, self._host.port)
373
374    def _try_adb_connect(self):
375        """Attempts to connect to adb on the DUT.
376
377        @return boolean indicating if adb connected successfully.
378        """
379        # This may fail return failure due to a race condition in adb connect
380        # (b/29370989). If adb is already connected, this command will
381        # immediately return success.
382        hostport = self._get_adb_target()
383        result = self._run(
384                'adb',
385                args=('connect', hostport),
386                verbose=True,
387                ignore_status=True)
388        logging.info('adb connect {}:\n{}'.format(hostport, result.stdout))
389        if result.exit_status != 0:
390            return False
391
392        result = self._run('adb', args=('devices',))
393        logging.info('adb devices:\n' + result.stdout)
394        if not re.search(
395                r'{}\s+(device|unauthorized)'.format(re.escape(hostport)),
396                result.stdout):
397            return False
398
399        # Actually test the connection with an adb command as there can be
400        # a race between detecting the connected device and actually being
401        # able to run a commmand with authenticated adb.
402        result = self._run('adb', args=('shell', 'exit'), ignore_status=True)
403        return result.exit_status == 0
404
405    def _android_shell(self, command):
406        """Run a command remotely on the device in an android shell
407
408        This function is strictly for internal use only, as commands do not run
409        in a fully consistent Android environment. Prefer adb shell instead.
410        """
411        self._host.run('android-sh -c ' + pipes.quote(command))
412
413    def _write_android_file(self, filename, data):
414        """Writes a file to a location relative to the android container.
415
416        This is an internal function used to bootstrap adb.
417        Tests should use adb push to write files.
418        """
419        android_cmd = 'echo %s > %s' % (pipes.quote(data),
420                                        pipes.quote(filename))
421        self._android_shell(android_cmd)
422
423    def _connect_adb(self):
424        """Sets up ADB connection to the ARC container."""
425        logging.info('Setting up adb connection.')
426        # Generate and push keys for adb.
427        # TODO(elijahtaylor): Extract this code to arc_common and de-duplicate
428        # code in arc.py on the client side tests.
429        key_path = os.path.join(self.tmpdir, 'test_key')
430        pubkey_path = key_path + '.pub'
431        self._run('adb', verbose=True, args=('keygen', pipes.quote(key_path)))
432        with open(pubkey_path, 'r') as f:
433            self._write_android_file(_ANDROID_ADB_KEYS_PATH, f.read())
434        self._android_shell('restorecon ' + pipes.quote(_ANDROID_ADB_KEYS_PATH))
435        os.environ['ADB_VENDOR_KEYS'] = key_path
436
437        # Kill existing adb server to ensure that the env var is picked up.
438        self._run('adb', verbose=True, args=('kill-server',))
439
440        # This starts adbd.
441        self._android_shell('setprop sys.usb.config mtp,adb')
442
443        # Also let it be automatically started upon reboot.
444        self._android_shell('setprop persist.sys.usb.config mtp,adb')
445
446        # adbd may take some time to come up. Repeatedly try to connect to adb.
447        utils.poll_for_condition(lambda: self._try_adb_connect(),
448                                 exception=error.TestFail(
449                                     'Error: Failed to set up adb connection'),
450                                 timeout=_ADB_READY_TIMEOUT_SECONDS,
451                                 sleep_interval=_ADB_POLLING_INTERVAL_SECONDS)
452
453        logging.info('Successfully setup adb connection.')
454
455    def _wait_for_arc_boot(self):
456        """Wait until ARC is fully booted.
457
458        Tests for the presence of the intent helper app to determine whether ARC
459        has finished booting.
460        """
461        def _intent_helper_running():
462            result = self._run('adb', args=('shell', 'pgrep', '-f',
463                                            'org.chromium.arc.intent_helper'))
464            return bool(result.stdout)
465        utils.poll_for_condition(
466            _intent_helper_running,
467            exception=error.TestFail(
468                'Error: Timed out waiting for intent helper.'),
469            timeout=_ARC_READY_TIMEOUT_SECONDS,
470            sleep_interval=_ARC_POLLING_INTERVAL_SECONDS)
471
472    def _disable_adb_install_dialog(self):
473        """Disables a dialog shown on adb install execution.
474
475        By default, on adb install execution, "Allow Google to regularly check
476        device activity ... " dialog is shown. It requires manual user action
477        so that tests are blocked at the point.
478        This method disables it.
479        """
480        logging.info('Disabling the adb install dialog.')
481        result = self._run(
482                'adb',
483                verbose=True,
484                args=(
485                        'shell',
486                        'settings',
487                        'put',
488                        'global',
489                        'verifier_verify_adb_installs',
490                        '0'))
491        logging.info('Disable adb dialog: %s', result.stdout)
492
493    def _ready_arc(self):
494        """Ready ARC and adb for running tests via tradefed."""
495        self._connect_adb()
496        self._disable_adb_install_dialog()
497        self._wait_for_arc_boot()
498
499    def _safe_makedirs(self, path):
500        """Creates a directory at |path| and its ancestors.
501
502        Unlike os.makedirs(), ignore errors even if directories exist.
503        """
504        try:
505            os.makedirs(path)
506        except OSError as e:
507            if not (e.errno == errno.EEXIST and os.path.isdir(path)):
508                raise
509
510    def _unzip(self, filename):
511        """Unzip the file.
512
513        The destination directory name will be the stem of filename.
514        E.g., _unzip('foo/bar/baz.zip') will create directory at
515        'foo/bar/baz', and then will inflate zip's content under the directory.
516        If here is already a directory at the stem, that directory will be used.
517
518        @param filename: Path to the zip archive.
519        @return Path to the inflated directory.
520        """
521        destination = os.path.splitext(filename)[0]
522        if os.path.isdir(destination):
523            return destination
524        self._safe_makedirs(destination)
525        utils.run('unzip', args=('-d', destination, filename))
526        return destination
527
528    def _dir_size(self, directory):
529        """Compute recursive size in bytes of directory."""
530        size = 0
531        for root, _, files in os.walk(directory):
532            size += sum(os.path.getsize(os.path.join(root, name))
533                    for name in files)
534        return size
535
536    def _clear_download_cache_if_needed(self):
537        """Invalidates cache to prevent it from growing too large."""
538        # If the cache is large enough to hold a working set, we can simply
539        # delete everything without thrashing.
540        # TODO(ihf): Investigate strategies like LRU.
541        with lock(self._tradefed_cache_lock):
542            size = self._dir_size(self._tradefed_cache)
543            if size > _TRADEFED_CACHE_MAX_SIZE:
544                logging.info('Current cache size=%d got too large. Clearing %s.'
545                        , size, self._tradefed_cache)
546                shutil.rmtree(self._tradefed_cache)
547                self._safe_makedirs(self._tradefed_cache)
548            else:
549                logging.info('Current cache size=%d of %s.', size,
550                        self._tradefed_cache)
551
552    def _download_to_cache(self, uri):
553        """Downloads the uri from the storage server.
554
555        It always checks the cache for available binaries first and skips
556        download if binaries are already in cache.
557
558        The caller of this function is responsible for holding the cache lock.
559
560        @param uri: The Google Storage or dl.google.com uri.
561        @return Path to the downloaded object, name.
562        """
563        # Split uri into 3 pieces for use by gsutil and also by wget.
564        parsed = urlparse.urlparse(uri)
565        filename = os.path.basename(parsed.path)
566        # We are hashing the uri instead of the binary. This is acceptable, as
567        # the uris are supposed to contain version information and an object is
568        # not supposed to be changed once created.
569        output_dir = os.path.join(self._tradefed_cache,
570                                  hashlib.md5(uri).hexdigest())
571        output = os.path.join(output_dir, filename)
572        # Check for existence of file.
573        if os.path.exists(output):
574            logging.info('Skipping download of %s, reusing %s.', uri, output)
575            return output
576        self._safe_makedirs(output_dir)
577
578        if parsed.scheme not in ['gs', 'http', 'https']:
579            raise error.TestFail('Error: Unknown download scheme %s' %
580                                 parsed.scheme)
581        if parsed.scheme in ['http', 'https']:
582            logging.info('Using wget to download %s to %s.', uri, output_dir)
583            # We are downloading 1 file at a time, hence using -O over -P.
584            utils.run(
585                'wget',
586                args=(
587                    '--report-speed=bits',
588                    '-O',
589                    output,
590                    uri),
591                verbose=True)
592            return output
593
594        if not client_utils.is_moblab():
595            # If the machine can access to the storage server directly,
596            # defer to "gsutil" for downloading.
597            logging.info('Host %s not in lab. Downloading %s directly to %s.',
598                    self._host.hostname, uri, output)
599            # b/17445576: gsutil rsync of individual files is not implemented.
600            utils.run('gsutil', args=('cp', uri, output), verbose=True)
601            return output
602
603        # We are in the moblab. Because the machine cannot access the storage
604        # server directly, use dev server to proxy.
605        logging.info('Host %s is in lab. Downloading %s by staging to %s.',
606                self._host.hostname, uri, output)
607
608        dirname = os.path.dirname(parsed.path)
609        archive_url = '%s://%s%s' % (parsed.scheme, parsed.netloc, dirname)
610
611        # First, request the devserver to download files into the lab network.
612        # TODO(ihf): Switch stage_artifacts to honor rsync. Then we don't have
613        # to shuffle files inside of tarballs.
614        info = self._host.host_info_store.get()
615        ds = dev_server.ImageServer.resolve(info.build)
616        ds.stage_artifacts(info.build, files=[filename],
617                           archive_url=archive_url)
618
619        # Then download files from the dev server.
620        # TODO(ihf): use rsync instead of wget. Are there 3 machines involved?
621        # Itself, dev_server plus DUT? Or is there just no rsync in moblab?
622        ds_src = '/'.join([ds.url(), 'static', dirname, filename])
623        logging.info('dev_server URL: %s', ds_src)
624        # Calls into DUT to pull uri from dev_server.
625        utils.run(
626                'wget',
627                args=(
628                        '--report-speed=bits',
629                        '-O',
630                        output,
631                        ds_src),
632                verbose=True)
633        return output
634
635    def _instance_copy(self, cache_path):
636        """Makes a copy of a file from the (shared) cache to a wholy owned
637        local instance. Also copies one level of cache directoy (MD5 named).
638        """
639        filename = os.path.basename(cache_path)
640        dirname = os.path.basename(os.path.dirname(cache_path))
641        instance_dir = os.path.join(self._tradefed_install, dirname)
642        # Make sure destination directory is named the same.
643        self._safe_makedirs(instance_dir)
644        instance_path = os.path.join(instance_dir, filename)
645        shutil.copyfile(cache_path, instance_path)
646        return instance_path
647
648    def _install_bundle(self, gs_uri):
649        """Downloads a zip file, installs it and returns the local path."""
650        if not gs_uri.endswith('.zip'):
651            raise error.TestFail('Error: Not a .zip file %s.', gs_uri)
652        # Atomic write through of file.
653        with lock(self._tradefed_cache_lock):
654            cache_path = self._download_to_cache(gs_uri)
655            local = self._instance_copy(cache_path)
656
657        unzipped = self._unzip(local)
658        self._abi = 'x86' if 'x86-x86' in unzipped else 'arm'
659        return unzipped
660
661    def _install_files(self, gs_dir, files, permission):
662        """Installs binary tools."""
663        for filename in files:
664            gs_uri = os.path.join(gs_dir, filename)
665            # Atomic write through of file.
666            with lock(self._tradefed_cache_lock):
667                cache_path = self._download_to_cache(gs_uri)
668                local = self._instance_copy(cache_path)
669            os.chmod(local, permission)
670            # Keep track of PATH.
671            self._install_paths.append(os.path.dirname(local))
672
673    def _copy_media(self, media):
674        """Calls copy_media to push media files to DUT via adb."""
675        logging.info('Copying media to device. This can take a few minutes.')
676        copy_media = os.path.join(media, 'copy_media.sh')
677        with pushd(media):
678            try:
679                self._run('file', args=('/bin/sh',), verbose=True,
680                          ignore_status=True, timeout=60,
681                          stdout_tee=utils.TEE_TO_LOGS,
682                          stderr_tee=utils.TEE_TO_LOGS)
683                self._run('sh', args=('--version',), verbose=True,
684                          ignore_status=True, timeout=60,
685                          stdout_tee=utils.TEE_TO_LOGS,
686                          stderr_tee=utils.TEE_TO_LOGS)
687            except:
688                logging.warning('Could not obtain sh version.')
689            self._run(
690                'sh',
691                args=('-e', copy_media, 'all'),
692                timeout=7200,  # Wait at most 2h for download of media files.
693                verbose=True,
694                ignore_status=False,
695                stdout_tee=utils.TEE_TO_LOGS,
696                stderr_tee=utils.TEE_TO_LOGS)
697
698    def _verify_media(self, media):
699        """Verify that the local media directory matches the DUT.
700        Used for debugging b/32978387 where we may see file corruption."""
701        # TODO(ihf): Remove function once b/32978387 is resolved.
702        # Find all files in the bbb_short and bbb_full directories, md5sum these
703        # files and sort by filename, both on the DUT and on the local tree.
704        logging.info('Computing md5 of remote media files.')
705        remote = self._run('adb', args=('shell',
706            'cd /sdcard/test; find ./bbb_short ./bbb_full -type f -print0 | '
707            'xargs -0 md5sum | grep -v "\.DS_Store" | sort -k 2'))
708        logging.info('Computing md5 of local media files.')
709        local = self._run('/bin/sh', args=('-c',
710            ('cd %s; find ./bbb_short ./bbb_full -type f -print0 | '
711            'xargs -0 md5sum | grep -v "\.DS_Store" | sort -k 2') % media))
712
713        # 'adb shell' terminates lines with CRLF. Normalize before comparing.
714        if remote.stdout.replace('\r\n','\n') != local.stdout:
715            logging.error('Some media files differ on DUT /sdcard/test vs. local.')
716            logging.info('media=%s', media)
717            logging.error('remote=%s', remote)
718            logging.error('local=%s', local)
719            # TODO(ihf): Return False.
720            return True
721        logging.info('Media files identical on DUT /sdcard/test vs. local.')
722        return True
723
724    def _push_media(self, CTS_URI):
725        """Downloads, caches and pushed media files to DUT."""
726        media = self._install_bundle(CTS_URI['media'])
727        base = os.path.splitext(os.path.basename(CTS_URI['media']))[0]
728        cts_media = os.path.join(media, base)
729        # TODO(ihf): this really should measure throughput in Bytes/s.
730        m = 'chromeos/autotest/infra_benchmark/cheets/push_media/duration'
731        fields = {'success': False,
732                  'dut_host_name': self._host.hostname}
733        with metrics.SecondsTimer(m, fields=fields) as c:
734            self._copy_media(cts_media)
735            c['success'] = True
736        if not self._verify_media(cts_media):
737            raise error.TestFail('Error: saw corruption pushing media files.')
738
739    def _run(self, *args, **kwargs):
740        """Executes the given command line.
741
742        To support SDK tools, such as adb or aapt, this adds _install_paths
743        to the extra_paths. Before invoking this, ensure _install_files() has
744        been called.
745        """
746        kwargs['extra_paths'] = (
747                kwargs.get('extra_paths', []) + self._install_paths)
748        return utils.run(*args, **kwargs)
749
750    def _collect_tradefed_global_log(self, result, destination):
751        """Collects the tradefed global log.
752
753        @param result: The result object from utils.run.
754        @param destination: Autotest result directory (destination of logs).
755        """
756        match = re.search(r'Saved log to /tmp/(tradefed_global_log_.*\.txt)',
757                          result.stdout)
758        if not match:
759            logging.error('no tradefed_global_log file is found')
760            return
761
762        name = match.group(1)
763        dest = os.path.join(destination, 'logs', 'tmp')
764        self._safe_makedirs(dest)
765        shutil.copy(os.path.join('/tmp', name), os.path.join(dest, name))
766
767    def _parse_tradefed_datetime(self, result, summary=None):
768        """Get the tradefed provided result ID consisting of a datetime stamp.
769
770        Unfortunately we are unable to tell tradefed where to store the results.
771        In the lab we have multiple instances of tradefed running in parallel
772        writing results and logs to the same base directory. This function
773        finds the identifier which tradefed used during the current run and
774        returns it for further processing of result files.
775
776        @param result: The result object from utils.run.
777        @param summary: Test result summary from runs so far.
778        @return datetime_id: The result ID chosen by tradefed.
779                             Example: '2016.07.14_00.34.50'.
780        """
781        # This string is show for both 'run' and 'continue' after all tests.
782        match = re.search(r': XML test result file generated at (\S+). Passed',
783                result.stdout)
784        if not (match and match.group(1)):
785            # TODO(ihf): Find out if we ever recover something interesting in
786            # this case. Otherwise delete it.
787            # Try harder to find the remains. This string shows before all
788            # tests but only with 'run', not 'continue'.
789            logging.warning('XML test result file incomplete?')
790            match = re.search(r': Created result dir (\S+)', result.stdout)
791            if not (match and match.group(1)):
792                error_msg = 'Test did not complete due to Chrome or ARC crash.'
793                if summary:
794                    error_msg += (' Test summary from previous runs: %s'
795                            % summary)
796                raise error.TestFail(error_msg)
797        datetime_id = match.group(1)
798        logging.info('Tradefed identified results and logs with %s.',
799                     datetime_id)
800        return datetime_id
801
802    def _parse_tradefed_datetime_v2(self, result, summary=None):
803        """Get the tradefed provided result ID consisting of a datetime stamp.
804
805        Unfortunately we are unable to tell tradefed where to store the results.
806        In the lab we have multiple instances of tradefed running in parallel
807        writing results and logs to the same base directory. This function
808        finds the identifier which tradefed used during the current run and
809        returns it for further processing of result files.
810
811        @param result: The result object from utils.run.
812        @param summary: Test result summary from runs so far.
813        @return datetime_id: The result ID chosen by tradefed.
814                             Example: '2016.07.14_00.34.50'.
815        """
816        # This string is show for both 'run' and 'continue' after all tests.
817        match = re.search(r'(\d\d\d\d.\d\d.\d\d_\d\d.\d\d.\d\d)', result.stdout)
818        if not (match and match.group(1)):
819            error_msg = 'Error: Test did not complete. (Chrome or ARC crash?)'
820            if summary:
821                error_msg += (' Test summary from previous runs: %s'
822                        % summary)
823            raise error.TestFail(error_msg)
824        datetime_id = match.group(1)
825        logging.info('Tradefed identified results and logs with %s.',
826                     datetime_id)
827        return datetime_id
828
829    def _parse_result(self, result, waivers=None):
830        """Check the result from the tradefed output.
831
832        This extracts the test pass/fail/executed list from the output of
833        tradefed. It is up to the caller to handle inconsistencies.
834
835        @param result: The result object from utils.run.
836        @param waivers: a set() of tests which are permitted to fail.
837        """
838        # Parse the stdout to extract test status. In particular step over
839        # similar output for each ABI and just look at the final summary.
840        match = re.search(r'(XML test result file generated at (\S+). '
841                 r'Passed (\d+), Failed (\d+), Not Executed (\d+))',
842                 result.stdout)
843        if not match:
844            raise error.Test('Test log does not contain a summary.')
845
846        passed = int(match.group(3))
847        failed = int(match.group(4))
848        not_executed = int(match.group(5))
849        match = re.search(r'(Start test run of (\d+) packages, containing '
850                          r'(\d+(?:,\d+)?) tests)', result.stdout)
851        if match and match.group(3):
852            tests = int(match.group(3).replace(',', ''))
853        else:
854            # Unfortunately this happens. Assume it made no other mistakes.
855            logging.warning('Tradefed forgot to print number of tests.')
856            tests = passed + failed + not_executed
857        # TODO(rohitbm): make failure parsing more robust by extracting the list
858        # of failing tests instead of searching in the result blob. As well as
859        # only parse for waivers for the running ABI.
860        waived = 0
861        if waivers:
862            for testname in waivers:
863                # TODO(dhaddock): Find a more robust way to apply waivers.
864                fail_count = result.stdout.count(testname + ' FAIL')
865                if fail_count:
866                    if fail_count > 2:
867                        raise error.TestFail('Error: There are too many '
868                                             'failures found in the output to '
869                                             'be valid for applying waivers. '
870                                             'Please check output.')
871                    waived += fail_count
872                    logging.info('Waived failure for %s %d time(s)',
873                                 testname, fail_count)
874        logging.info(
875            'tests=%d, passed=%d, failed=%d, not_executed=%d, waived=%d',
876            tests, passed, failed, not_executed, waived)
877        if failed < waived:
878            raise error.TestFail('Error: Internal waiver book keeping has '
879                                 'become inconsistent.')
880        return (tests, passed, failed, not_executed, waived)
881
882    def _parse_result_v2(self, result, waivers=None):
883        """Check the result from the tradefed-v2 output.
884
885        This extracts the test pass/fail/executed list from the output of
886        tradefed. It is up to the caller to handle inconsistencies.
887
888        @param result: The result object from utils.run.
889        @param waivers: a set() of tests which are permitted to fail.
890        """
891        return parse_tradefed_v2_result(result.stdout, waivers)
892
893    def _collect_logs(self, repository, datetime, destination):
894        """Collects the tradefed logs.
895
896        It is legal to collect the same logs multiple times. This is normal
897        after 'tradefed continue' updates existing logs with new results.
898
899        @param repository: Full path to tradefeds output on disk.
900        @param datetime: The identifier which tradefed assigned to the run.
901                         Currently this looks like '2016.07.14_00.34.50'.
902        @param destination: Autotest result directory (destination of logs).
903        """
904        logging.info('Collecting tradefed testResult.xml and logs to %s.',
905                     destination)
906        repository_results = os.path.join(repository, 'results')
907        repository_logs = os.path.join(repository, 'logs')
908        # Because other tools rely on the currently chosen Google storage paths
909        # we need to keep destination_results in
910        # cheets_CTS.*/results/android-cts/2016.mm.dd_hh.mm.ss(/|.zip)
911        # and destination_logs in
912        # cheets_CTS.*/results/android-cts/logs/2016.mm.dd_hh.mm.ss/
913        destination_results = destination
914        destination_results_datetime = os.path.join(destination_results,
915                                                    datetime)
916        destination_results_datetime_zip = destination_results_datetime + '.zip'
917        destination_logs = os.path.join(destination, 'logs')
918        destination_logs_datetime = os.path.join(destination_logs, datetime)
919        # We may have collected the same logs before, clean old versions.
920        if os.path.exists(destination_results_datetime_zip):
921            os.remove(destination_results_datetime_zip)
922        if os.path.exists(destination_results_datetime):
923            shutil.rmtree(destination_results_datetime)
924        if os.path.exists(destination_logs_datetime):
925            shutil.rmtree(destination_logs_datetime)
926        shutil.copytree(
927                os.path.join(repository_results, datetime),
928                destination_results_datetime)
929        # Copying the zip file has to happen after the tree so the destination
930        # directory is available.
931        shutil.copy(
932                os.path.join(repository_results, datetime) + '.zip',
933                destination_results_datetime_zip)
934        shutil.copytree(
935                os.path.join(repository_logs, datetime),
936                destination_logs_datetime)
937
938    def _get_expected_failures(self, directory):
939        """Return a list of expected failures.
940
941        @return: a list of expected failures.
942        """
943        logging.info('Loading expected failures from %s.', directory)
944        expected_fail_dir = os.path.join(self.bindir, directory)
945        expected_fail_files = glob.glob(expected_fail_dir + '/*.' + self._abi)
946        expected_failures = set()
947        for expected_fail_file in expected_fail_files:
948            try:
949                file_path = os.path.join(expected_fail_dir, expected_fail_file)
950                with open(file_path) as f:
951                    lines = set(f.read().splitlines())
952                    logging.info('Loaded %d expected failures from %s',
953                                 len(lines), expected_fail_file)
954                    expected_failures |= lines
955            except IOError as e:
956                logging.error('Error loading %s (%s).', file_path, e.strerror)
957        logging.info('Finished loading expected failures: %s',
958                     expected_failures)
959        return expected_failures
960