• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import socket
6import telnetlib
7
8from autotest_lib.client.common_lib import error
9
10SHORT_TIMEOUT = 2
11LONG_TIMEOUT = 30
12
13class TelnetHelper(object):
14    """Helper class to run basic string commands on a telnet host."""
15
16    def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""):
17        self._tn = None
18
19        self._tx_cmd_separator = tx_cmd_separator
20        self._rx_cmd_separator = rx_cmd_separator
21        self._prompt = prompt
22
23    def open(self, hostname, port=22):
24        """Opens telnet connection to attenuator host.
25
26        @param hostname: Valid hostname
27        @param port: Optional port number, defaults to 22
28
29        """
30        if self._tn:
31            self._tn.close()
32
33        self._tn = telnetlib.Telnet()
34
35        try:
36            self._tn.open(hostname, port, LONG_TIMEOUT)
37        except socket.timeout as e:
38            raise error.TestError("Timed out while opening telnet connection")
39
40    def is_open(self):
41        """Returns true if telnet connection is open."""
42        return bool(self._tn)
43
44    def close(self):
45        """Closes telnet connection."""
46        if self._tn:
47            self._tn.close()
48            self._tn = None
49
50    def cmd(self, cmd_str, wait_ret=True):
51        """Run command on attenuator.
52
53        @param cmd_str: Command to run
54        @param wait_ret: Wait for command output or not
55        @returns command output
56        """
57        if not isinstance(cmd_str, str):
58            raise error.TestError("Invalid command string %s" % cmd_str)
59
60        if not self.is_open():
61            raise error.TestError("Telnet connection not open for commands")
62
63        cmd_str.strip(self._tx_cmd_separator)
64        try:
65            self._tn.read_until(self._prompt, SHORT_TIMEOUT)
66        except EOFError as e:
67            raise error.TestError("Connection closed. EOFError (%s)" % e)
68
69        try:
70            self._tn.write(cmd_str + self._tx_cmd_separator)
71        except socket.error as e:
72            raise error.TestError("Connection closed. Socket error (%s)." % e)
73
74        if wait_ret is False:
75            return None
76
77        try:
78            match_channel_idx, _, ret_text = \
79                    self._tn.expect(["\S+" + self._rx_cmd_separator],
80                                    SHORT_TIMEOUT)
81        except EOFError as e:
82            raise error.TestError("Connection closed. EOFError (%s)" % e)
83
84        if match_channel_idx == -1:
85            raise error.TestError("Telnet command failed to return valid data. "
86                                  "Data returned: %s" % ret_text)
87
88        ret_text = ret_text.strip()
89
90        return ret_text
91