• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16#
17
18from acts.controllers.attenuator_lib._tnhelper import _ascii_string
19
20import logging
21import telnetlib
22
23ID = '.A'
24LOGIN_PWD = 'admn'
25ON = 'On'
26OFF = 'Off'
27PASSWORD = 'Password: '
28PORT = 23
29RPM_PROMPT = 'Switched CDU: '
30SEPARATOR = '\n'
31TIMEOUT = 3
32USERNAME = 'Username: '
33
34
35class RpmControllerError(Exception):
36    """Error related to RPM switch."""
37
38class RpmController(object):
39    """Class representing telnet to RPM switch.
40
41    Each object represents a telnet connection to the RPM switch's IP.
42
43    Attributes:
44        tn: represents a connection to RPM switch.
45        host: IP address of the RPM controller.
46    """
47    def __init__(self, host):
48        """Initializes the RPM controller object.
49
50        Establishes a telnet connection and login to the switch.
51        """
52        self.host = host
53        logging.info('RPM IP: %s' % self.host)
54
55        self.tn = telnetlib.Telnet(self.host)
56        self.tn.open(self.host, PORT, TIMEOUT)
57        self.run(USERNAME, LOGIN_PWD)
58        result = self.run(PASSWORD, LOGIN_PWD)
59        if RPM_PROMPT not in result:
60            raise RpmControllerError('Failed to login to rpm controller %s'
61                                     % self.host)
62
63    def run(self, prompt, cmd_str):
64        """Method to run commands on the RPM.
65
66        This method simply runs a command and returns output in decoded format.
67        The calling methods should take care of parsing the expected result
68        from this output.
69
70        Args:
71            prompt: Expected prompt before running a command.
72            cmd_str: Command to run on RPM.
73
74        Returns:
75            Decoded text returned by the command.
76        """
77        cmd_str = '%s%s' % (cmd_str, SEPARATOR)
78        res = self.tn.read_until(_ascii_string(prompt), TIMEOUT)
79
80        self.tn.write(_ascii_string(cmd_str))
81        idx, val, txt = self.tn.expect(
82            [_ascii_string('\S+%s' % SEPARATOR)], TIMEOUT)
83
84        return txt.decode()
85
86    def set_rpm_port_state(self, rpm_port, state):
87        """Method to turn on/off rpm port.
88
89        Args:
90            rpm_port: port number of the switch to turn on.
91            state: 'on' or 'off'
92
93        Returns:
94            True: if the state is set to the expected value
95        """
96        port = '%s%s' % (ID, rpm_port)
97        logging.info('Turning %s port: %s' % (state, port))
98        self.run(RPM_PROMPT, '%s %s' % (state.lower(), port))
99        result = self.run(RPM_PROMPT, 'status %s' % port)
100        if port not in result:
101            raise RpmControllerError('Port %s doesn\'t exist' % port)
102        return state in result
103
104    def turn_on(self, rpm_port):
105        """Method to turn on a port on the RPM switch.
106
107        Args:
108            rpm_port: port number of the switch to turn on.
109
110        Returns:
111            True if the port is turned on.
112            False if not turned on.
113        """
114        return self.set_rpm_port_state(rpm_port, ON)
115
116    def turn_off(self, rpm_port):
117        """Method to turn off a port on the RPM switch.
118
119        Args:
120            rpm_port: port number of the switch to turn off.
121
122        Returns:
123            True if the port is turned off.
124            False if not turned off.
125        """
126        return self.set_rpm_port_state(rpm_port, OFF)
127
128    def __del__(self):
129        """Close the telnet connection. """
130        self.tn.close()
131
132
133def create_telnet_session(ip):
134    """Returns telnet connection object to RPM's IP."""
135    return RpmController(ip)
136
137def turn_on_ap(pcap, ssid, rpm_port, rpm_ip=None, rpm=None):
138    """Turn on the AP.
139
140    This method turns on the RPM port the AP is connected to,
141    verify the SSID of the AP is found in the scan result through the
142    packet capturer.
143
144    Either IP addr of the RPM switch or the existing telnet connection
145    to the RPM is required. Multiple APs might be connected to the same RPM
146    switch. Instead of connecting/terminating telnet for each AP, the test
147    can maintain a single telnet connection for all the APs.
148
149    Args:
150        pcap: packet capture object.
151        ssid: SSID of the wifi network.
152        rpm_port: Port number on the RPM switch the AP is connected to.
153        rpm_ip: IP address of the RPM switch.
154        rpm: telnet connection object to the RPM switch.
155    """
156    if not rpm and not rpm_ip:
157        logging.error("Failed to turn on AP. Need telnet object or RPM IP")
158        return False
159    elif not rpm:
160        rpm = create_telnet_session(rpm_ip)
161
162    return rpm.turn_on(rpm_port) and pcap.start_scan_and_find_network(ssid)
163
164def turn_off_ap(rpm_port, rpm_ip=None, rpm=None):
165    """ Turn off AP.
166
167    This method turns off the RPM port the AP is connected to.
168
169    Either IP addr of the RPM switch or the existing telnet connection
170    to the RPM is required.
171
172    Args:
173        rpm_port: Port number on the RPM switch the AP is connected to.
174        rpm_ip: IP address of the RPM switch.
175        rpm: telnet connection object to the RPM switch.
176    """
177    if not rpm and not rpm_ip:
178        logging.error("Failed to turn off AP. Need telnet object or RPM IP")
179        return False
180    elif not rpm:
181        rpm = create_telnet_session(rpm_ip)
182
183    return rpm.turn_off(rpm_port)
184