• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os, time, socket, shutil, glob, logging, traceback, tempfile, re
2import subprocess
3
4from multiprocessing import Lock
5from autotest_lib.client.common_lib import autotemp, error
6from autotest_lib.server import utils, autotest
7from autotest_lib.server.hosts import remote
8from autotest_lib.server.hosts import rpc_server_tracker
9from autotest_lib.client.common_lib.global_config import global_config
10
11# pylint: disable-msg=C0111
12
13get_value = global_config.get_config_value
14enable_master_ssh = get_value('AUTOSERV', 'enable_master_ssh', type=bool,
15                              default=False)
16
17
18class AbstractSSHHost(remote.RemoteHost):
19    """
20    This class represents a generic implementation of most of the
21    framework necessary for controlling a host via ssh. It implements
22    almost all of the abstract Host methods, except for the core
23    Host.run method.
24    """
25    VERSION_PREFIX = ''
26
27    def _initialize(self, hostname, user="root", port=22, password="",
28                    is_client_install_supported=True, host_attributes={},
29                    *args, **dargs):
30        super(AbstractSSHHost, self)._initialize(hostname=hostname,
31                                                 *args, **dargs)
32        # IP address is retrieved only on demand. Otherwise the host
33        # initialization will fail for host is not online.
34        self._ip = None
35        self.user = user
36        self.port = port
37        self.password = password
38        self._is_client_install_supported = is_client_install_supported
39        self._use_rsync = None
40        self.known_hosts_file = tempfile.mkstemp()[1]
41        self._rpc_server_tracker = rpc_server_tracker.RpcServerTracker(self);
42
43        """
44        Master SSH connection background job, socket temp directory and socket
45        control path option. If master-SSH is enabled, these fields will be
46        initialized by start_master_ssh when a new SSH connection is initiated.
47        """
48        self.master_ssh_job = None
49        self.master_ssh_tempdir = None
50        self.master_ssh_option = ''
51
52        # Create a Lock to protect against race conditions.
53        self._lock = Lock()
54
55        self.host_attributes = host_attributes
56
57
58    @property
59    def ip(self):
60        """@return IP address of the host.
61        """
62        if not self._ip:
63            self._ip = socket.getaddrinfo(self.hostname, None)[0][4][0]
64        return self._ip
65
66
67    @property
68    def is_client_install_supported(self):
69        """"
70        Returns True if the host supports autotest client installs, False
71        otherwise.
72        """
73        return self._is_client_install_supported
74
75
76    @property
77    def rpc_server_tracker(self):
78        """"
79        @return The RPC server tracker associated with this host.
80        """
81        return self._rpc_server_tracker
82
83
84    def make_ssh_command(self, user="root", port=22, opts='',
85                         hosts_file='/dev/null',
86                         connect_timeout=30, alive_interval=300):
87        base_command = ("/usr/bin/ssh -a -x %s -o StrictHostKeyChecking=no "
88                        "-o UserKnownHostsFile=%s -o BatchMode=yes "
89                        "-o ConnectTimeout=%d -o ServerAliveInterval=%d "
90                        "-l %s -p %d")
91        assert isinstance(connect_timeout, (int, long))
92        assert connect_timeout > 0 # can't disable the timeout
93        return base_command % (opts, hosts_file, connect_timeout,
94                               alive_interval, user, port)
95
96
97    def use_rsync(self):
98        if self._use_rsync is not None:
99            return self._use_rsync
100
101        # Check if rsync is available on the remote host. If it's not,
102        # don't try to use it for any future file transfers.
103        self._use_rsync = self._check_rsync()
104        if not self._use_rsync:
105            logging.warning("rsync not available on remote host %s -- disabled",
106                         self.hostname)
107        return self._use_rsync
108
109
110    def _check_rsync(self):
111        """
112        Check if rsync is available on the remote host.
113        """
114        try:
115            self.run("rsync --version", stdout_tee=None, stderr_tee=None)
116        except error.AutoservRunError:
117            return False
118        return True
119
120
121    def _encode_remote_paths(self, paths, escape=True):
122        """
123        Given a list of file paths, encodes it as a single remote path, in
124        the style used by rsync and scp.
125        """
126        if escape:
127            paths = [utils.scp_remote_escape(path) for path in paths]
128
129        remote = self.hostname
130
131        # rsync and scp require IPv6 brackets, even when there isn't any
132        # trailing port number (ssh doesn't support IPv6 brackets).
133        # In the Python >= 3.3 future, 'import ipaddress' will parse addresses.
134        if re.search(r':.*:', remote):
135            remote = '[%s]' % remote
136
137        return '%s@%s:"%s"' % (self.user, remote, " ".join(paths))
138
139
140    def _make_rsync_cmd(self, sources, dest, delete_dest, preserve_symlinks):
141        """
142        Given a list of source paths and a destination path, produces the
143        appropriate rsync command for copying them. Remote paths must be
144        pre-encoded.
145        """
146        ssh_cmd = self.make_ssh_command(user=self.user, port=self.port,
147                                        opts=self.master_ssh_option,
148                                        hosts_file=self.known_hosts_file)
149        if delete_dest:
150            delete_flag = "--delete"
151        else:
152            delete_flag = ""
153        if preserve_symlinks:
154            symlink_flag = ""
155        else:
156            symlink_flag = "-L"
157        command = ("rsync %s %s --timeout=1800 --rsh='%s' -az --no-o --no-g "
158                   "%s \"%s\"")
159        return command % (symlink_flag, delete_flag, ssh_cmd,
160                          " ".join(['"%s"' % p for p in sources]), dest)
161
162
163    def _make_ssh_cmd(self, cmd):
164        """
165        Create a base ssh command string for the host which can be used
166        to run commands directly on the machine
167        """
168        base_cmd = self.make_ssh_command(user=self.user, port=self.port,
169                                         opts=self.master_ssh_option,
170                                         hosts_file=self.known_hosts_file)
171
172        return '%s %s "%s"' % (base_cmd, self.hostname, utils.sh_escape(cmd))
173
174    def _make_scp_cmd(self, sources, dest):
175        """
176        Given a list of source paths and a destination path, produces the
177        appropriate scp command for encoding it. Remote paths must be
178        pre-encoded.
179        """
180        command = ("scp -rq %s -o StrictHostKeyChecking=no "
181                   "-o UserKnownHostsFile=%s -P %d %s '%s'")
182        return command % (self.master_ssh_option, self.known_hosts_file,
183                          self.port, " ".join(sources), dest)
184
185
186    def _make_rsync_compatible_globs(self, path, is_local):
187        """
188        Given an rsync-style path, returns a list of globbed paths
189        that will hopefully provide equivalent behaviour for scp. Does not
190        support the full range of rsync pattern matching behaviour, only that
191        exposed in the get/send_file interface (trailing slashes).
192
193        The is_local param is flag indicating if the paths should be
194        interpreted as local or remote paths.
195        """
196
197        # non-trailing slash paths should just work
198        if len(path) == 0 or path[-1] != "/":
199            return [path]
200
201        # make a function to test if a pattern matches any files
202        if is_local:
203            def glob_matches_files(path, pattern):
204                return len(glob.glob(path + pattern)) > 0
205        else:
206            def glob_matches_files(path, pattern):
207                result = self.run("ls \"%s\"%s" % (utils.sh_escape(path),
208                                                   pattern),
209                                  stdout_tee=None, ignore_status=True)
210                return result.exit_status == 0
211
212        # take a set of globs that cover all files, and see which are needed
213        patterns = ["*", ".[!.]*"]
214        patterns = [p for p in patterns if glob_matches_files(path, p)]
215
216        # convert them into a set of paths suitable for the commandline
217        if is_local:
218            return ["\"%s\"%s" % (utils.sh_escape(path), pattern)
219                    for pattern in patterns]
220        else:
221            return [utils.scp_remote_escape(path) + pattern
222                    for pattern in patterns]
223
224
225    def _make_rsync_compatible_source(self, source, is_local):
226        """
227        Applies the same logic as _make_rsync_compatible_globs, but
228        applies it to an entire list of sources, producing a new list of
229        sources, properly quoted.
230        """
231        return sum((self._make_rsync_compatible_globs(path, is_local)
232                    for path in source), [])
233
234
235    def _set_umask_perms(self, dest):
236        """
237        Given a destination file/dir (recursively) set the permissions on
238        all the files and directories to the max allowed by running umask.
239        """
240
241        # now this looks strange but I haven't found a way in Python to _just_
242        # get the umask, apparently the only option is to try to set it
243        umask = os.umask(0)
244        os.umask(umask)
245
246        max_privs = 0777 & ~umask
247
248        def set_file_privs(filename):
249            """Sets mode of |filename|.  Assumes |filename| exists."""
250            file_stat = os.stat(filename)
251
252            file_privs = max_privs
253            # if the original file permissions do not have at least one
254            # executable bit then do not set it anywhere
255            if not file_stat.st_mode & 0111:
256                file_privs &= ~0111
257
258            os.chmod(filename, file_privs)
259
260        # try a bottom-up walk so changes on directory permissions won't cut
261        # our access to the files/directories inside it
262        for root, dirs, files in os.walk(dest, topdown=False):
263            # when setting the privileges we emulate the chmod "X" behaviour
264            # that sets to execute only if it is a directory or any of the
265            # owner/group/other already has execute right
266            for dirname in dirs:
267                os.chmod(os.path.join(root, dirname), max_privs)
268
269            # Filter out broken symlinks as we go.
270            for filename in filter(os.path.exists, files):
271                set_file_privs(os.path.join(root, filename))
272
273
274        # now set privs for the dest itself
275        if os.path.isdir(dest):
276            os.chmod(dest, max_privs)
277        else:
278            set_file_privs(dest)
279
280
281    def get_file(self, source, dest, delete_dest=False, preserve_perm=True,
282                 preserve_symlinks=False):
283        """
284        Copy files from the remote host to a local path.
285
286        Directories will be copied recursively.
287        If a source component is a directory with a trailing slash,
288        the content of the directory will be copied, otherwise, the
289        directory itself and its content will be copied. This
290        behavior is similar to that of the program 'rsync'.
291
292        Args:
293                source: either
294                        1) a single file or directory, as a string
295                        2) a list of one or more (possibly mixed)
296                                files or directories
297                dest: a file or a directory (if source contains a
298                        directory or more than one element, you must
299                        supply a directory dest)
300                delete_dest: if this is true, the command will also clear
301                             out any old files at dest that are not in the
302                             source
303                preserve_perm: tells get_file() to try to preserve the sources
304                               permissions on files and dirs
305                preserve_symlinks: try to preserve symlinks instead of
306                                   transforming them into files/dirs on copy
307
308        Raises:
309                AutoservRunError: the scp command failed
310        """
311        logging.debug('get_file. source: %s, dest: %s, delete_dest: %s,'
312                      'preserve_perm: %s, preserve_symlinks:%s', source, dest,
313                      delete_dest, preserve_perm, preserve_symlinks)
314        # Start a master SSH connection if necessary.
315        self.start_master_ssh()
316
317        if isinstance(source, basestring):
318            source = [source]
319        dest = os.path.abspath(dest)
320
321        # If rsync is disabled or fails, try scp.
322        try_scp = True
323        if self.use_rsync():
324            logging.debug('Using Rsync.')
325            try:
326                remote_source = self._encode_remote_paths(source)
327                local_dest = utils.sh_escape(dest)
328                rsync = self._make_rsync_cmd([remote_source], local_dest,
329                                             delete_dest, preserve_symlinks)
330                utils.run(rsync)
331                try_scp = False
332            except error.CmdError, e:
333                logging.warning("trying scp, rsync failed: %s", e)
334
335        if try_scp:
336            logging.debug('Trying scp.')
337            # scp has no equivalent to --delete, just drop the entire dest dir
338            if delete_dest and os.path.isdir(dest):
339                shutil.rmtree(dest)
340                os.mkdir(dest)
341
342            remote_source = self._make_rsync_compatible_source(source, False)
343            if remote_source:
344                # _make_rsync_compatible_source() already did the escaping
345                remote_source = self._encode_remote_paths(remote_source,
346                                                          escape=False)
347                local_dest = utils.sh_escape(dest)
348                scp = self._make_scp_cmd([remote_source], local_dest)
349                try:
350                    utils.run(scp)
351                except error.CmdError, e:
352                    logging.debug('scp failed: %s', e)
353                    raise error.AutoservRunError(e.args[0], e.args[1])
354
355        if not preserve_perm:
356            # we have no way to tell scp to not try to preserve the
357            # permissions so set them after copy instead.
358            # for rsync we could use "--no-p --chmod=ugo=rwX" but those
359            # options are only in very recent rsync versions
360            self._set_umask_perms(dest)
361
362
363    def send_file(self, source, dest, delete_dest=False,
364                  preserve_symlinks=False):
365        """
366        Copy files from a local path to the remote host.
367
368        Directories will be copied recursively.
369        If a source component is a directory with a trailing slash,
370        the content of the directory will be copied, otherwise, the
371        directory itself and its content will be copied. This
372        behavior is similar to that of the program 'rsync'.
373
374        Args:
375                source: either
376                        1) a single file or directory, as a string
377                        2) a list of one or more (possibly mixed)
378                                files or directories
379                dest: a file or a directory (if source contains a
380                        directory or more than one element, you must
381                        supply a directory dest)
382                delete_dest: if this is true, the command will also clear
383                             out any old files at dest that are not in the
384                             source
385                preserve_symlinks: controls if symlinks on the source will be
386                    copied as such on the destination or transformed into the
387                    referenced file/directory
388
389        Raises:
390                AutoservRunError: the scp command failed
391        """
392        logging.debug('send_file. source: %s, dest: %s, delete_dest: %s,'
393                      'preserve_symlinks:%s', source, dest,
394                      delete_dest, preserve_symlinks)
395        # Start a master SSH connection if necessary.
396        self.start_master_ssh()
397
398        if isinstance(source, basestring):
399            source = [source]
400        remote_dest = self._encode_remote_paths([dest])
401
402        local_sources = [utils.sh_escape(path) for path in source]
403        if not local_sources:
404            raise error.TestError('source |%s| yielded an empty list' % (
405                source))
406        if any([local_source.find('\x00') != -1 for
407                local_source in local_sources]):
408            raise error.TestError('one or more sources include NUL char')
409
410        # If rsync is disabled or fails, try scp.
411        try_scp = True
412        if self.use_rsync():
413            logging.debug('Using Rsync.')
414            try:
415                rsync = self._make_rsync_cmd(local_sources, remote_dest,
416                                             delete_dest, preserve_symlinks)
417                utils.run(rsync)
418                try_scp = False
419            except error.CmdError, e:
420                logging.warning("trying scp, rsync failed: %s", e)
421
422        if try_scp:
423            logging.debug('Trying scp.')
424            # scp has no equivalent to --delete, just drop the entire dest dir
425            if delete_dest:
426                is_dir = self.run("ls -d %s/" % dest,
427                                  ignore_status=True).exit_status == 0
428                if is_dir:
429                    cmd = "rm -rf %s && mkdir %s"
430                    cmd %= (dest, dest)
431                    self.run(cmd)
432
433            local_sources = self._make_rsync_compatible_source(source, True)
434            if local_sources:
435                scp = self._make_scp_cmd(local_sources, remote_dest)
436                try:
437                    utils.run(scp)
438                except error.CmdError, e:
439                    logging.debug('scp failed: %s', e)
440                    raise error.AutoservRunError(e.args[0], e.args[1])
441            else:
442                logging.debug('skipping scp for empty source list')
443
444
445    def verify_ssh_user_access(self):
446        """Verify ssh access to this host.
447
448        @returns False if ssh_ping fails due to Permissions error, True
449                 otherwise.
450        """
451        try:
452            self.ssh_ping()
453        except (error.AutoservSshPermissionDeniedError,
454                error.AutoservSshPingHostError):
455            return False
456        return True
457
458
459    def ssh_ping(self, timeout=60, base_cmd='true'):
460        """
461        Pings remote host via ssh.
462
463        @param timeout: Time in seconds before giving up.
464                        Defaults to 60 seconds.
465        @param base_cmd: The base command to run with the ssh ping.
466                         Defaults to true.
467        @raise AutoservSSHTimeout: If the ssh ping times out.
468        @raise AutoservSshPermissionDeniedError: If ssh ping fails due to
469                                                 permissions.
470        @raise AutoservSshPingHostError: For other AutoservRunErrors.
471        """
472        try:
473            self.run(base_cmd, timeout=timeout, connect_timeout=timeout)
474        except error.AutoservSSHTimeout:
475            msg = "Host (ssh) verify timed out (timeout = %d)" % timeout
476            raise error.AutoservSSHTimeout(msg)
477        except error.AutoservSshPermissionDeniedError:
478            #let AutoservSshPermissionDeniedError be visible to the callers
479            raise
480        except error.AutoservRunError, e:
481            # convert the generic AutoservRunError into something more
482            # specific for this context
483            raise error.AutoservSshPingHostError(e.description + '\n' +
484                                                 repr(e.result_obj))
485
486
487    def is_up(self, timeout=60, base_cmd='true'):
488        """
489        Check if the remote host is up by ssh-ing and running a base command.
490
491        @param timeout: timeout in seconds.
492        @param base_cmd: a base command to run with ssh. The default is 'true'.
493        @returns True if the remote host is up before the timeout expires,
494                 False otherwise.
495        """
496        try:
497            self.ssh_ping(timeout=timeout, base_cmd=base_cmd)
498        except error.AutoservError:
499            return False
500        else:
501            return True
502
503
504    def wait_up(self, timeout=None):
505        """
506        Wait until the remote host is up or the timeout expires.
507
508        In fact, it will wait until an ssh connection to the remote
509        host can be established, and getty is running.
510
511        @param timeout time limit in seconds before returning even
512            if the host is not up.
513
514        @returns True if the host was found to be up before the timeout expires,
515                 False otherwise
516        """
517        if timeout:
518            current_time = int(time.time())
519            end_time = current_time + timeout
520
521        while not timeout or current_time < end_time:
522            if self.is_up(timeout=end_time - current_time):
523                try:
524                    if self.are_wait_up_processes_up():
525                        logging.debug('Host %s is now up', self.hostname)
526                        return True
527                except error.AutoservError:
528                    pass
529            time.sleep(1)
530            current_time = int(time.time())
531
532        logging.debug('Host %s is still down after waiting %d seconds',
533                      self.hostname, int(timeout + time.time() - end_time))
534        return False
535
536
537    def wait_down(self, timeout=None, warning_timer=None, old_boot_id=None):
538        """
539        Wait until the remote host is down or the timeout expires.
540
541        If old_boot_id is provided, this will wait until either the machine
542        is unpingable or self.get_boot_id() returns a value different from
543        old_boot_id. If the boot_id value has changed then the function
544        returns true under the assumption that the machine has shut down
545        and has now already come back up.
546
547        If old_boot_id is None then until the machine becomes unreachable the
548        method assumes the machine has not yet shut down.
549
550        Based on this definition, the 4 possible permutations of timeout
551        and old_boot_id are:
552        1. timeout and old_boot_id: wait timeout seconds for either the
553                                    host to become unpingable, or the boot id
554                                    to change. In the latter case we've rebooted
555                                    and in the former case we've only shutdown,
556                                    but both cases return True.
557        2. only timeout: wait timeout seconds for the host to become unpingable.
558                         If the host remains pingable throughout timeout seconds
559                         we return False.
560        3. only old_boot_id: wait forever until either the host becomes
561                             unpingable or the boot_id changes. Return true
562                             when either of those conditions are met.
563        4. not timeout, not old_boot_id: wait forever till the host becomes
564                                         unpingable.
565
566        @param timeout Time limit in seconds before returning even
567            if the host is still up.
568        @param warning_timer Time limit in seconds that will generate
569            a warning if the host is not down yet.
570        @param old_boot_id A string containing the result of self.get_boot_id()
571            prior to the host being told to shut down. Can be None if this is
572            not available.
573
574        @returns True if the host was found to be down, False otherwise
575        """
576        #TODO: there is currently no way to distinguish between knowing
577        #TODO: boot_id was unsupported and not knowing the boot_id.
578        current_time = int(time.time())
579        if timeout:
580            end_time = current_time + timeout
581
582        if warning_timer:
583            warn_time = current_time + warning_timer
584
585        if old_boot_id is not None:
586            logging.debug('Host %s pre-shutdown boot_id is %s',
587                          self.hostname, old_boot_id)
588
589        # Impose semi real-time deadline constraints, since some clients
590        # (eg: watchdog timer tests) expect strict checking of time elapsed.
591        # Each iteration of this loop is treated as though it atomically
592        # completes within current_time, this is needed because if we used
593        # inline time.time() calls instead then the following could happen:
594        #
595        # while not timeout or time.time() < end_time:      [23 < 30]
596        #    some code.                                     [takes 10 secs]
597        #    try:
598        #        new_boot_id = self.get_boot_id(timeout=end_time - time.time())
599        #                                                   [30 - 33]
600        # The last step will lead to a return True, when in fact the machine
601        # went down at 32 seconds (>30). Hence we need to pass get_boot_id
602        # the same time that allowed us into that iteration of the loop.
603        while not timeout or current_time < end_time:
604            try:
605                new_boot_id = self.get_boot_id(timeout=end_time-current_time)
606            except error.AutoservError:
607                logging.debug('Host %s is now unreachable over ssh, is down',
608                              self.hostname)
609                return True
610            else:
611                # if the machine is up but the boot_id value has changed from
612                # old boot id, then we can assume the machine has gone down
613                # and then already come back up
614                if old_boot_id is not None and old_boot_id != new_boot_id:
615                    logging.debug('Host %s now has boot_id %s and so must '
616                                  'have rebooted', self.hostname, new_boot_id)
617                    return True
618
619            if warning_timer and current_time > warn_time:
620                self.record("INFO", None, "shutdown",
621                            "Shutdown took longer than %ds" % warning_timer)
622                # Print the warning only once.
623                warning_timer = None
624                # If a machine is stuck switching runlevels
625                # This may cause the machine to reboot.
626                self.run('kill -HUP 1', ignore_status=True)
627
628            time.sleep(1)
629            current_time = int(time.time())
630
631        return False
632
633
634    # tunable constants for the verify & repair code
635    AUTOTEST_GB_DISKSPACE_REQUIRED = get_value("SERVER",
636                                               "gb_diskspace_required",
637                                               type=float,
638                                               default=20.0)
639
640
641    def verify_connectivity(self):
642        super(AbstractSSHHost, self).verify_connectivity()
643
644        logging.info('Pinging host ' + self.hostname)
645        self.ssh_ping()
646        logging.info("Host (ssh) %s is alive", self.hostname)
647
648        if self.is_shutting_down():
649            raise error.AutoservHostIsShuttingDownError("Host is shutting down")
650
651
652    def verify_software(self):
653        super(AbstractSSHHost, self).verify_software()
654        try:
655            self.check_diskspace(autotest.Autotest.get_install_dir(self),
656                                 self.AUTOTEST_GB_DISKSPACE_REQUIRED)
657        except error.AutoservHostError:
658            raise           # only want to raise if it's a space issue
659        except autotest.AutodirNotFoundError:
660            # autotest dir may not exist, etc. ignore
661            logging.debug('autodir space check exception, this is probably '
662                          'safe to ignore\n' + traceback.format_exc())
663
664
665    def close(self):
666        super(AbstractSSHHost, self).close()
667        self._cleanup_master_ssh()
668        os.remove(self.known_hosts_file)
669        self.rpc_server_tracker.disconnect_all()
670
671
672    def _cleanup_master_ssh(self):
673        """
674        Release all resources (process, temporary directory) used by an active
675        master SSH connection.
676        """
677        # If a master SSH connection is running, kill it.
678        if self.master_ssh_job is not None:
679            logging.debug('Nuking master_ssh_job.')
680            utils.nuke_subprocess(self.master_ssh_job.sp)
681            self.master_ssh_job = None
682
683        # Remove the temporary directory for the master SSH socket.
684        if self.master_ssh_tempdir is not None:
685            logging.debug('Cleaning master_ssh_tempdir.')
686            self.master_ssh_tempdir.clean()
687            self.master_ssh_tempdir = None
688            self.master_ssh_option = ''
689
690
691    def start_master_ssh(self, timeout=5):
692        """
693        Called whenever a slave SSH connection needs to be initiated (e.g., by
694        run, rsync, scp). If master SSH support is enabled and a master SSH
695        connection is not active already, start a new one in the background.
696        Also, cleanup any zombie master SSH connections (e.g., dead due to
697        reboot).
698
699        timeout: timeout in seconds (default 5) to wait for master ssh
700                 connection to be established. If timeout is reached, a
701                 warning message is logged, but no other action is taken.
702        """
703        if not enable_master_ssh:
704            return
705
706        # Multiple processes might try in parallel to clean up the old master
707        # ssh connection and create a new one, therefore use a lock to protect
708        # against race conditions.
709        with self._lock:
710            # If a previously started master SSH connection is not running
711            # anymore, it needs to be cleaned up and then restarted.
712            if self.master_ssh_job is not None:
713                socket_path = os.path.join(self.master_ssh_tempdir.name,
714                                           'socket')
715                if (not os.path.exists(socket_path) or
716                        self.master_ssh_job.sp.poll() is not None):
717                    logging.info("Master ssh connection to %s is down.",
718                                 self.hostname)
719                    self._cleanup_master_ssh()
720
721            # Start a new master SSH connection.
722            if self.master_ssh_job is None:
723                # Create a shared socket in a temp location.
724                self.master_ssh_tempdir = autotemp.tempdir(
725                        unique_id='ssh-master')
726                self.master_ssh_option = ("-o ControlPath=%s/socket" %
727                                          self.master_ssh_tempdir.name)
728
729                # Start the master SSH connection in the background.
730                master_cmd = self.ssh_command(
731                        options="-N -o ControlMaster=yes")
732                logging.info("Starting master ssh connection '%s'", master_cmd)
733                self.master_ssh_job = utils.BgJob(master_cmd,
734                                                  nickname='master-ssh',
735                                                  no_pipes=True)
736                # To prevent a race between the the master ssh connection
737                # startup and its first attempted use, wait for socket file to
738                # exist before returning.
739                end_time = time.time() + timeout
740                socket_file_path = os.path.join(self.master_ssh_tempdir.name,
741                                                'socket')
742                while time.time() < end_time:
743                    if os.path.exists(socket_file_path):
744                        break
745                    time.sleep(.2)
746                else:
747                    logging.info('Timed out waiting for master-ssh connection '
748                                 'to be established.')
749
750
751    def clear_known_hosts(self):
752        """Clears out the temporary ssh known_hosts file.
753
754        This is useful if the test SSHes to the machine, then reinstalls it,
755        then SSHes to it again.  It can be called after the reinstall to
756        reduce the spam in the logs.
757        """
758        logging.info("Clearing known hosts for host '%s', file '%s'.",
759                     self.hostname, self.known_hosts_file)
760        # Clear out the file by opening it for writing and then closing.
761        fh = open(self.known_hosts_file, "w")
762        fh.close()
763
764
765    def collect_logs(self, remote_src_dir, local_dest_dir, ignore_errors=True):
766        """Copy log directories from a host to a local directory.
767
768        @param remote_src_dir: A destination directory on the host.
769        @param local_dest_dir: A path to a local destination directory.
770            If it doesn't exist it will be created.
771        @param ignore_errors: If True, ignore exceptions.
772
773        @raises OSError: If there were problems creating the local_dest_dir and
774            ignore_errors is False.
775        @raises AutoservRunError, AutotestRunError: If something goes wrong
776            while copying the directories and ignore_errors is False.
777        """
778        locally_created_dest = False
779        if (not os.path.exists(local_dest_dir)
780                or not os.path.isdir(local_dest_dir)):
781            try:
782                os.makedirs(local_dest_dir)
783                locally_created_dest = True
784            except OSError as e:
785                logging.warning('Unable to collect logs from host '
786                                '%s: %s', self.hostname, e)
787                if not ignore_errors:
788                    raise
789                return
790        try:
791            self.get_file(
792                    remote_src_dir, local_dest_dir, preserve_symlinks=True)
793        except (error.AutotestRunError, error.AutoservRunError,
794                error.AutoservSSHTimeout) as e:
795            logging.warning('Collection of %s to local dir %s from host %s '
796                            'failed: %s', remote_src_dir, local_dest_dir,
797                            self.hostname, e)
798            if locally_created_dest:
799                shutil.rmtree(local_dest_dir, ignore_errors=ignore_errors)
800            if not ignore_errors:
801                raise
802
803
804    def _create_ssh_tunnel(self, port, local_port):
805        """Create an ssh tunnel from local_port to port.
806
807        @param port: remote port on the host.
808        @param local_port: local forwarding port.
809
810        @return: the tunnel process.
811        """
812        tunnel_options = '-n -N -q -L %d:localhost:%d' % (local_port, port)
813        ssh_cmd = self.make_ssh_command(opts=tunnel_options)
814        tunnel_cmd = '%s %s' % (ssh_cmd, self.hostname)
815        logging.debug('Full tunnel command: %s', tunnel_cmd)
816        tunnel_proc = subprocess.Popen(tunnel_cmd, shell=True, close_fds=True)
817        logging.debug('Started ssh tunnel, local = %d'
818                      ' remote = %d, pid = %d',
819                      local_port, port, tunnel_proc.pid)
820        return tunnel_proc
821
822
823    def rpc_port_forward(self, port, local_port):
824        """
825        Forwards a port securely through a tunnel process from the server
826        to the DUT for RPC server connection.
827
828        @param port: remote port on the DUT.
829        @param local_port: local forwarding port.
830
831        @return: the tunnel process.
832        """
833        return self._create_ssh_tunnel(port, local_port)
834
835
836    def rpc_port_disconnect(self, tunnel_proc, port):
837        """
838        Disconnects a previously forwarded port from the server to the DUT for
839        RPC server connection.
840
841        @param tunnel_proc: the original tunnel process returned from
842                            |rpc_port_forward|.
843        @param port: remote port on the DUT.
844
845        """
846        if tunnel_proc.poll() is None:
847            tunnel_proc.terminate()
848            logging.debug('Terminated tunnel, pid %d', tunnel_proc.pid)
849        else:
850            logging.debug('Tunnel pid %d terminated early, status %d',
851                          tunnel_proc.pid, tunnel_proc.returncode)
852
853
854    def get_os_type(self):
855        """Returns the host OS descriptor (to be implemented in subclasses).
856
857        @return A string describing the OS type.
858        """
859        raise NotImplementedError