1# Copyright (c) 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Unit tests for server/cros/network/rf_switch/rf_switch.py.""" 6 7import unittest 8 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.server.cros.network.rf_switch import rf_mocks 11from autotest_lib.server.cros.network.rf_switch import rf_switch 12from autotest_lib.server.cros.network.rf_switch import scpi 13 14 15class RfSwitchTestCases(unittest.TestCase): 16 """Unit tests for RfSwitch Methods.""" 17 18 _HOST = '1.2.3.4' 19 20 def setUp(self): 21 self.god = mock.mock_god(debug=False, fail_fast=True) 22 self.mock_rf_switch = rf_mocks.RfSwitchMock(self._HOST) 23 self.code = 0 24 self.msg = '' 25 26 def tearDown(self): 27 self.god.unstub_all() 28 29 def _populate_stack_for_cmd(self, cmd): 30 """Helper to add call stacks to verify in mock. 31 32 @param cmd: command to verify 33 """ 34 # monitor send for correct command. 35 self.god.stub_function(self.mock_rf_switch.socket, 'send') 36 self.mock_rf_switch.socket.send.expect_call('%s' % cmd) 37 self.mock_rf_switch.socket.send.expect_call( 38 '%s\n' % scpi.Scpi.CMD_ERROR_CHECK) 39 self.mock_rf_switch.socket.send.expect_call( 40 '%s\n' % rf_switch.RfSwitch._CMD_WAIT) 41 42 self.god.stub_function_to_return( 43 self.mock_rf_switch.socket, 'recv', '%d, "%s"' % ( 44 self.code, self.msg)) 45 46 def _populate_stack_for_query_commands(self, cmd, response): 47 """Helper to add call stacks for query commands 48 49 @param cmd: command to verify 50 @param response: response to mock return 51 """ 52 self.god.stub_function(self.mock_rf_switch.socket, 'send') 53 self.mock_rf_switch.socket.send.expect_call('%s' % cmd) 54 55 self.god.stub_function_to_return( 56 self.mock_rf_switch.socket, 'recv', response) 57 58 def test_send_cmd_check_error(self): 59 """Verify send_cmd_check_error.""" 60 test_command = 'This is a command\n' 61 self._populate_stack_for_cmd(test_command) 62 self.mock_rf_switch.send_cmd_check_error(test_command) 63 self.god.check_playback() 64 65 def test_send_cmd_check_error_throws_error(self): 66 """Verify send_cmd_check_error throws error.""" 67 test_command = 'This is a command' 68 code = 101 69 msg = 'Error Message' 70 71 self.god.stub_function_to_return( 72 self.mock_rf_switch.socket, 'recv', '%d, "%s"' % (code, msg)) 73 with self.assertRaises(rf_switch.RfSwitchException): 74 self.mock_rf_switch.send_cmd_check_error(test_command) 75 76 def test_close_relays(self): 77 """Verify close_relays.""" 78 relays = 'R1:R3' 79 self._populate_stack_for_cmd( 80 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays)) 81 self.mock_rf_switch.close_relays(relays) 82 self.god.check_playback() 83 84 def test_relays_closed(self): 85 """Verify relays_closed.""" 86 relays = 'R1:R3' 87 status = '1,0,1' 88 cmd = '%s (@%s)\n' % ( 89 rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED, relays) 90 self._populate_stack_for_query_commands(cmd, status) 91 self.mock_rf_switch.relays_closed(relays) 92 self.god.check_playback() 93 94 def test_open_relays(self): 95 """Verify open_relays.""" 96 relays = 'R1:R3' 97 self._populate_stack_for_cmd( 98 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) 99 self.mock_rf_switch.open_relays(relays) 100 self.god.check_playback() 101 102 def test_open_all_relays(self): 103 """Verify open_all_relays.""" 104 self._populate_stack_for_cmd( 105 '%s\n' % rf_switch.RfSwitch._CMD_OPEN_ALL_RELAYS) 106 self.mock_rf_switch.open_all_relays() 107 self.god.check_playback() 108 109 def test_set_verify_on(self): 110 """Verify set_verify for on.""" 111 relays = 'R1:R3' 112 status = True 113 self._populate_stack_for_cmd( 114 '%s 1,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays)) 115 self.mock_rf_switch.set_verify(relays, status) 116 self.god.check_playback() 117 118 def test_set_verify_off(self): 119 """Verify set_verify for off.""" 120 relays = 'R1:R3' 121 status = False 122 self._populate_stack_for_cmd( 123 '%s 0,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays)) 124 self.mock_rf_switch.set_verify(relays, status) 125 self.god.check_playback() 126 127 def test_get_verify(self): 128 """Verify get_verify.""" 129 relays = 'R1:R3' 130 status = '1,0,1' 131 cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY, relays) 132 self._populate_stack_for_query_commands(cmd, status) 133 self.mock_rf_switch.get_verify(relays) 134 self.god.check_playback() 135 136 def test_set_verify_inverted_true(self): 137 """Verify set_verify_inverted.""" 138 relays = 'R1:R3' 139 status = True 140 cmd = '%s INV,(@%s)\n' % ( 141 rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays) 142 self._populate_stack_for_cmd(cmd) 143 self.mock_rf_switch.set_verify_inverted(relays, status) 144 self.god.check_playback() 145 146 def test_set_verify_inverted_false(self): 147 """Verify set_verify_inverted.""" 148 relays = 'R1:R3' 149 status = False 150 cmd = '%s NORM,(@%s)\n' % ( 151 rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays) 152 self._populate_stack_for_cmd(cmd) 153 self.mock_rf_switch.set_verify_inverted(relays, status) 154 self.god.check_playback() 155 156 def test_get_verify_inverted(self): 157 """Verify get_verify_inverted.""" 158 relays = 'R1:R3' 159 status = '1,0,1' 160 cmd = '%s (@%s)\n' % ( 161 rf_switch.RfSwitch._CMD_GET_VERIFY_INVERTED, relays) 162 self._populate_stack_for_query_commands(cmd, status) 163 self.mock_rf_switch.get_verify_inverted(relays) 164 self.god.check_playback() 165 166 def test_get_verify_state(self): 167 """Verify get_verify_state.""" 168 relays = 'R1:R3' 169 status = '1,0,1' 170 cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY_STATE, relays) 171 self._populate_stack_for_query_commands(cmd, status) 172 self.mock_rf_switch.get_verify_state(relays) 173 self.god.check_playback() 174 175 def test_busy(self): 176 """Verify busy.""" 177 status = '1' 178 cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY 179 self._populate_stack_for_query_commands(cmd, status) 180 busy = self.mock_rf_switch.busy 181 self.god.check_playback() 182 self.assertTrue(busy) 183 184 def test_not_busy(self): 185 """Verify busy.""" 186 status = '0' 187 cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY 188 self._populate_stack_for_query_commands(cmd, status) 189 busy = self.mock_rf_switch.busy 190 self.god.check_playback() 191 self.assertFalse(busy) 192 193 def test_wait(self): 194 """Verify wait.""" 195 self.god.stub_function(self.mock_rf_switch.socket, 'send') 196 self.mock_rf_switch.socket.send.expect_call( 197 '%s\n' % rf_switch.RfSwitch._CMD_WAIT) 198 self.mock_rf_switch.wait() 199 self.god.check_playback() 200 201 def test_get_attenuation(self): 202 """Verify get_attenuation.""" 203 ap = 1 204 attenuation = 100 205 relays = rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS_SHORT[ap] 206 return_relays = '1,1,0,1,1,0,0' 207 cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED, 208 relays) 209 self._populate_stack_for_query_commands(cmd, return_relays) 210 response = self.mock_rf_switch.get_attenuation(ap) 211 self.god.check_playback() 212 self.assertEquals(response, attenuation, 213 'get_attenuation returned %s, did not match %s' % ( 214 response, attenuation)) 215 216 def test_set_attenuation(self): 217 """Verify set_attenuation.""" 218 ap = 1 219 ap_relays = 'k1_49,k1_50,k1_51,k1_52,k1_53,k1_54,R1_9' 220 attenuation = 100 221 relays_for_attenuation = 'k1_53,k1_52,k1_50,k1_49' 222 223 self._populate_stack_for_cmd( 224 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, ap_relays)) 225 self._populate_stack_for_cmd('%s (@%s)\n' % ( 226 rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays_for_attenuation)) 227 self.mock_rf_switch.set_attenuation(ap, attenuation) 228 self.god.check_playback() 229 230 def test_set_attenuation_all(self): 231 """Verify we can set same attenuation to all.""" 232 233 # 0 should close all relays 234 for x in xrange(rf_switch.RfSwitch._MAX_ENCLOSURE): 235 relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1]) 236 reverse_relays = ','.join( 237 rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1][::-1]) 238 self._populate_stack_for_cmd( 239 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) 240 self._populate_stack_for_cmd('%s (@%s)\n' % ( 241 rf_switch.RfSwitch._CMD_CLOSE_RELAYS, reverse_relays)) 242 self.mock_rf_switch.set_attenuation(0, 0) 243 244 # 127 should open all (close none) 245 for x in xrange(rf_switch.RfSwitch._MAX_ENCLOSURE): 246 relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1]) 247 self._populate_stack_for_cmd( 248 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) 249 self._populate_stack_for_cmd('%s (@)\n' % ( 250 rf_switch.RfSwitch._CMD_CLOSE_RELAYS)) 251 self.mock_rf_switch.set_attenuation(0, 127) 252 self.god.check_playback() 253 254 def test_set_attenuation_raise_error(self): 255 """Verify set_attenuation will raise error.""" 256 with self.assertRaises(ValueError): 257 self.mock_rf_switch.set_attenuation(5, 100) 258 with self.assertRaises(ValueError): 259 self.mock_rf_switch.set_attenuation(-1, 100) 260 261 def test_connect_ap_client(self): 262 """Verify connect_ap_client sets correct relays.""" 263 relays = 'k1_1,k1_25' 264 ap = 1 265 client = 1 266 self._populate_stack_for_cmd( 267 '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays)) 268 self.mock_rf_switch.connect_ap_client(ap, client) 269 self.god.check_playback() 270 271if __name__ == '__main__': 272 unittest.main() 273