1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections.abc 18import copy 19import fcntl 20import importlib 21import logging 22import os 23import selenium 24import time 25from acts import logger 26from selenium.webdriver.support.ui import Select 27from selenium.webdriver.chrome.service import Service as ChromeService 28from selenium.webdriver.common.by import By 29from selenium.webdriver.support import expected_conditions as expected_conditions 30from webdriver_manager.chrome import ChromeDriverManager 31 32 33BROWSER_WAIT_SHORT = 1 34BROWSER_WAIT_MED = 3 35BROWSER_WAIT_LONG = 30 36BROWSER_WAIT_EXTRA_LONG = 60 37 38 39def create(configs): 40 """Factory method for retail AP class. 41 42 Args: 43 configs: list of dicts containing ap settings. ap settings must contain 44 the following: brand, model, ip_address, username and password 45 """ 46 SUPPORTED_APS = { 47 ('Netgear', 'R7000'): { 48 'name': 'NetgearR7000AP', 49 'package': 'netgear_r7000' 50 }, 51 ('Netgear', 'R7000NA'): { 52 'name': 'NetgearR7000NAAP', 53 'package': 'netgear_r7000' 54 }, 55 ('Netgear', 'R7500'): { 56 'name': 'NetgearR7500AP', 57 'package': 'netgear_r7500' 58 }, 59 ('Netgear', 'R7500NA'): { 60 'name': 'NetgearR7500NAAP', 61 'package': 'netgear_r7500' 62 }, 63 ('Netgear', 'R7800'): { 64 'name': 'NetgearR7800AP', 65 'package': 'netgear_r7800' 66 }, 67 ('Netgear', 'R8000'): { 68 'name': 'NetgearR8000AP', 69 'package': 'netgear_r8000' 70 }, 71 ('Netgear', 'RAX80'): { 72 'name': 'NetgearRAX80AP', 73 'package': 'netgear_rax80' 74 }, 75 ('Netgear', 'RAX120'): { 76 'name': 'NetgearRAX120AP', 77 'package': 'netgear_rax120' 78 }, 79 ('Netgear', 'RAX200'): { 80 'name': 'NetgearRAX200AP', 81 'package': 'netgear_rax200' 82 }, 83 ('Netgear', 'RAXE500'): { 84 'name': 'NetgearRAXE500AP', 85 'package': 'netgear_raxe500' 86 }, 87 ('Netgear', 'RS700'): { 88 'name': 'NetgearRS700AP', 89 'package': 'netgear_rs700' 90 }, 91 ('Brcm', 'Reference'): { 92 'name': 'BrcmRefAP', 93 'package': 'brcm_ref' 94 }, 95 ('Google', 'Wifi'): { 96 'name': 'GoogleWifiAP', 97 'package': 'google_wifi' 98 }, 99 } 100 objs = [] 101 for config in configs: 102 ap_id = (config['brand'], config['model']) 103 if ap_id not in SUPPORTED_APS: 104 raise KeyError('Invalid retail AP brand and model combination.') 105 ap_class_dict = SUPPORTED_APS[ap_id] 106 ap_package = 'acts_contrib.test_utils.wifi.wifi_retail_ap.{}'.format( 107 ap_class_dict['package']) 108 ap_package = importlib.import_module(ap_package) 109 ap_class = getattr(ap_package, ap_class_dict['name']) 110 objs.append(ap_class(config)) 111 return objs 112 113 114def destroy(objs): 115 for obj in objs: 116 obj.teardown() 117 118 119class BlockingBrowser(selenium.webdriver.chrome.webdriver.WebDriver): 120 """Class that implements a blocking browser session on top of selenium. 121 122 The class inherits from and builds upon splinter/selenium's webdriver class 123 and makes sure that only one such webdriver is active on a machine at any 124 single time. The class ensures single session operation using a lock file. 125 The class is to be used within context managers (e.g. with statements) to 126 ensure locks are always properly released. 127 """ 128 129 def __init__(self, headless, timeout): 130 """Constructor for BlockingBrowser class. 131 132 Args: 133 headless: boolean to control visible/headless browser operation 134 timeout: maximum time allowed to launch browser 135 """ 136 if int(selenium.__version__[0]) < 4: 137 raise RuntimeError( 138 'BlockingBrowser now requires selenium==4.0.0 or later. ') 139 self.log = logger.create_tagged_trace_logger('ChromeDriver') 140 self.chrome_options = selenium.webdriver.chrome.webdriver.Options() 141 self.chrome_options.add_argument('--no-proxy-server') 142 self.chrome_options.add_argument('--no-sandbox') 143 self.chrome_options.add_argument('--crash-dumps-dir=/tmp') 144 self.chrome_options.add_argument('--allow-running-insecure-content') 145 self.chrome_options.add_argument('--ignore-certificate-errors') 146 self.chrome_capabilities = selenium.webdriver.common.desired_capabilities.DesiredCapabilities.CHROME.copy( 147 ) 148 self.chrome_capabilities['acceptSslCerts'] = True 149 self.chrome_capabilities['acceptInsecureCerts'] = True 150 if headless: 151 self.chrome_options.add_argument('--headless') 152 self.chrome_options.add_argument('--disable-gpu') 153 os.environ['WDM_LOG'] = str(logging.NOTSET) 154 self.executable_path = ChromeDriverManager().install() 155 self.timeout = timeout 156 157 def __enter__(self): 158 """Entry context manager for BlockingBrowser. 159 160 The enter context manager for BlockingBrowser attempts to lock the 161 browser file. If successful, it launches and returns a chromedriver 162 session. If an exception occurs while starting the browser, the lock 163 file is released. 164 """ 165 self.lock_file = open(self.executable_path, 'r') 166 start_time = time.time() 167 while time.time() < start_time + self.timeout: 168 try: 169 fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 170 except BlockingIOError: 171 time.sleep(BROWSER_WAIT_SHORT) 172 continue 173 try: 174 self.driver = selenium.webdriver.Chrome( 175 service=ChromeService(self.executable_path), 176 options=self.chrome_options, 177 desired_capabilities=self.chrome_capabilities) 178 self.session_id = self.driver.session_id 179 return self 180 except: 181 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 182 self.lock_file.close() 183 raise RuntimeError('Error starting browser. ' 184 'Releasing lock file.') 185 raise TimeoutError('Could not start chrome browser in time.') 186 187 def __exit__(self, exc_type, exc_value, traceback): 188 """Exit context manager for BlockingBrowser. 189 190 The exit context manager simply calls the parent class exit and 191 releases the lock file. 192 """ 193 try: 194 self.driver.quit() 195 except: 196 raise RuntimeError('Failed to quit browser. Releasing lock file.') 197 finally: 198 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 199 self.lock_file.close() 200 201 def restart(self): 202 """Method to restart browser session without releasing lock file.""" 203 self.driver.quit() 204 self.__enter__() 205 206 def visit_persistent(self, 207 url, 208 page_load_timeout, 209 num_tries, 210 backup_url='about:blank', 211 check_for_element=None): 212 """Method to visit webpages and retry upon failure. 213 214 The function visits a URL and checks that the resulting URL matches 215 the intended URL, i.e. no redirects have happened 216 217 Args: 218 url: the intended url 219 page_load_timeout: timeout for page visits 220 num_tries: number of tries before url is declared unreachable 221 backup_url: url to visit if first url is not reachable. This can be 222 used to simply refresh the browser and try again or to re-login to 223 the AP 224 check_for_element: element id to check for existence on page 225 """ 226 self.driver.set_page_load_timeout(page_load_timeout) 227 for idx in range(num_tries): 228 try: 229 self.driver.get(url) 230 except: 231 self.restart() 232 233 page_reached = self.driver.current_url.split('/')[-1] == url.split( 234 '/')[-1] 235 if page_reached: 236 if check_for_element: 237 time.sleep(BROWSER_WAIT_MED) 238 if self.is_element_visible(check_for_element): 239 break 240 else: 241 raise RuntimeError( 242 'Page reached but expected element not found.') 243 else: 244 break 245 else: 246 try: 247 self.driver.get(backup_url) 248 except: 249 self.restart() 250 251 if idx == num_tries - 1: 252 self.log.error('URL unreachable. Current URL: {}'.format( 253 self.url)) 254 raise RuntimeError('URL unreachable.') 255 256 def get_element_value(self, element_name): 257 """Function to look up and get webpage element value. 258 259 Args: 260 element_name: name of element to look up 261 Returns: 262 Value of element 263 """ 264 #element = self.driver.find_element_by_name(element_name) 265 element = self.driver.find_element(By.NAME, element_name) 266 element_type = self.get_element_type(element_name) 267 if element_type == 'checkbox': 268 return element.is_selected() 269 elif element_type == 'radio': 270 items = self.driver.find_elements(By.NAME, element_name) 271 for item in items: 272 if item.is_selected(): 273 return item.get_attribute('value') 274 else: 275 return element.get_attribute('value') 276 277 def get_element_type(self, element_name): 278 """Function to look up and get webpage element type. 279 280 Args: 281 element_name: name of element to look up 282 Returns: 283 Type of element 284 """ 285 item = self.driver.find_element(By.NAME, element_name) 286 type = item.get_attribute('type') 287 return type 288 289 def is_element_enabled(self, element_name): 290 """Function to check if element is enabled/interactable. 291 292 Args: 293 element_name: name of element to look up 294 Returns: 295 Boolean indicating if element is interactable 296 """ 297 item = self.driver.find_element(By.NAME, element_name) 298 return item.is_enabled() 299 300 def is_element_visible(self, element_name): 301 """Function to check if element is visible. 302 303 Args: 304 element_name: name of element to look up 305 Returns: 306 Boolean indicating if element is visible 307 """ 308 item = self.driver.find_element(By.NAME, element_name) 309 return item.is_displayed() 310 311 def set_element_value(self, element_name, value, select_method='value'): 312 """Function to set webpage element value. 313 314 Args: 315 element_name: name of element to set 316 value: value of element 317 select_method: select method for dropdown lists (value/index/text) 318 """ 319 element_type = self.get_element_type(element_name) 320 if element_type == 'text' or element_type == 'password': 321 item = self.driver.find_element(By.NAME, element_name) 322 item.clear() 323 item.send_keys(value) 324 elif element_type == 'checkbox': 325 item = self.driver.find_element(By.NAME, element_name) 326 if value != item.is_selected(): 327 item.click() 328 elif element_type == 'radio': 329 items = self.driver.find_elements(By.NAME, element_name) 330 for item in items: 331 if item.get_attribute('value') == value: 332 item.click() 333 elif element_type == 'select-one': 334 select = Select(self.driver.find_element(By.NAME, element_name)) 335 if select_method == 'value': 336 select.select_by_value(str(value)) 337 elif select_method == 'text': 338 select.select_by_visible_text(value) 339 elif select_method == 'index': 340 select.select_by_index(value) 341 else: 342 raise RuntimeError( 343 '{} is not a valid select method.'.format(select_method)) 344 else: 345 raise RuntimeError( 346 'Element type {} not supported.'.format(element_type)) 347 348 def click_button(self, button_name): 349 """Function to click button on webpage 350 351 Args: 352 button_name: name of button to click 353 """ 354 button = self.driver.find_element(By.NAME, button_name) 355 if button.get_attribute('type') == 'submit': 356 button.click() 357 else: 358 raise RuntimeError('{} is not a button.'.format(button_name)) 359 360 def accept_alert_if_present(self, wait_for_alert=1): 361 """Function to check for alert and accept if present 362 363 Args: 364 wait_for_alert: time (seconds) to wait for alert 365 """ 366 try: 367 selenium.webdriver.support.ui.WebDriverWait( 368 self.driver, 369 wait_for_alert).until(expected_conditions.alert_is_present()) 370 alert = self.driver.switch_to.alert 371 alert.accept() 372 except selenium.common.exceptions.TimeoutException: 373 pass 374 375 376class WifiRetailAP(object): 377 """Base class implementation for retail ap. 378 379 Base class provides functions whose implementation is shared by all aps. 380 If some functions such as set_power not supported by ap, checks will raise 381 exceptions. 382 """ 383 384 def __init__(self, ap_settings): 385 self.ap_settings = ap_settings.copy() 386 self.log = logger.create_tagged_trace_logger('AccessPoint|{}'.format( 387 self._get_control_ip_address())) 388 # Capabilities variable describing AP capabilities 389 self.capabilities = { 390 'interfaces': [], 391 'channels': {}, 392 'modes': {}, 393 'default_mode': None 394 } 395 for interface in self.capabilities['interfaces']: 396 self.ap_settings.setdefault(interface, {}) 397 # Lock AP 398 if self.ap_settings.get('lock_ap', 0): 399 self.lock_timeout = self.ap_settings.get('lock_timeout', 3600) 400 self._lock_ap() 401 402 def teardown(self): 403 """Function to perform destroy operations.""" 404 if self.ap_settings.get('lock_ap', 0): 405 self._unlock_ap() 406 407 def reset(self): 408 """Function that resets AP. 409 410 Function implementation is AP dependent and intended to perform any 411 necessary reset operations as part of controller destroy. 412 """ 413 414 def read_ap_settings(self): 415 """Function that reads current ap settings. 416 417 Function implementation is AP dependent and thus base class raises exception 418 if function not implemented in child class. 419 """ 420 raise NotImplementedError 421 422 def validate_ap_settings(self): 423 """Function to validate ap settings. 424 425 This function compares the actual ap settings read from the web GUI 426 with the assumed settings saved in the AP object. When called after AP 427 configuration, this method helps ensure that our configuration was 428 successful. 429 Note: Calling this function updates the stored ap_settings 430 431 Raises: 432 ValueError: If read AP settings do not match stored settings. 433 """ 434 assumed_ap_settings = copy.deepcopy(self.ap_settings) 435 actual_ap_settings = self.read_ap_settings() 436 437 if assumed_ap_settings != actual_ap_settings: 438 self.log.warning( 439 'Discrepancy in AP settings. Some settings may have been overwritten.' 440 ) 441 442 def configure_ap(self, **config_flags): 443 """Function that configures ap based on values of ap_settings. 444 445 Function implementation is AP dependent and thus base class raises exception 446 if function not implemented in child class. 447 448 Args: 449 config_flags: optional configuration flags 450 """ 451 raise NotImplementedError 452 453 def set_region(self, region): 454 """Function that sets AP region. 455 456 This function sets the region for the AP. Note that this may overwrite 457 channel and bandwidth settings in cases where the new region does not 458 support the current wireless configuration. 459 460 Args: 461 region: string indicating AP region 462 """ 463 if region != self.ap_settings['region']: 464 self.log.warning( 465 'Updating region may overwrite wireless settings.') 466 setting_to_update = {'region': region} 467 self.update_ap_settings(setting_to_update) 468 469 def set_radio_on_off(self, network, status): 470 """Function that turns the radio on or off. 471 472 Args: 473 network: string containing network identifier (2G, 5G_1, 5G_2) 474 status: boolean indicating on or off (0: off, 1: on) 475 """ 476 setting_to_update = {network: {'status': int(status)}} 477 self.update_ap_settings(setting_to_update) 478 479 def set_ssid(self, network, ssid): 480 """Function that sets network SSID. 481 482 Args: 483 network: string containing network identifier (2G, 5G_1, 5G_2) 484 ssid: string containing ssid 485 """ 486 setting_to_update = {network: {'ssid': str(ssid)}} 487 self.update_ap_settings(setting_to_update) 488 489 def set_channel(self, network, channel): 490 """Function that sets network channel. 491 492 Args: 493 network: string containing network identifier (2G, 5G_1, 5G_2) 494 channel: string or int containing channel 495 """ 496 if channel not in self.capabilities['channels'][network]: 497 self.log.error('Ch{} is not supported on {} interface.'.format( 498 channel, network)) 499 setting_to_update = {network: {'channel': channel}} 500 self.update_ap_settings(setting_to_update) 501 502 def set_bandwidth(self, network, bandwidth): 503 """Function that sets network bandwidth/mode. 504 505 Args: 506 network: string containing network identifier (2G, 5G_1, 5G_2) 507 bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. 508 """ 509 if 'bw' in bandwidth: 510 bandwidth = bandwidth.replace('bw', 511 self.capabilities['default_mode']) 512 elif isinstance(bandwidth, int): 513 bandwidth = str(bandwidth) + self.capabilities['default_mode'] 514 if bandwidth not in self.capabilities['modes'][network]: 515 self.log.error('{} mode is not supported on {} interface.'.format( 516 bandwidth, network)) 517 setting_to_update = {network: {'bandwidth': bandwidth}} 518 self.update_ap_settings(setting_to_update) 519 520 def set_channel_and_bandwidth(self, network, channel, bandwidth): 521 """Function that sets network bandwidth/mode and channel. 522 523 Args: 524 network: string containing network identifier (2G, 5G_1, 5G_2) 525 channel: string containing desired channel 526 bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. 527 """ 528 if 'bw' in bandwidth: 529 bandwidth = bandwidth.replace('bw', 530 self.capabilities['default_mode']) 531 elif isinstance(bandwidth, int): 532 bandwidth = str(bandwidth) + self.capabilities['default_mode'] 533 if bandwidth not in self.capabilities['modes'][network]: 534 self.log.error('{} mode is not supported on {} interface.'.format( 535 bandwidth, network)) 536 if channel not in self.capabilities['channels'][network]: 537 self.log.error('Ch{} is not supported on {} interface.'.format( 538 channel, network)) 539 setting_to_update = { 540 network: { 541 'bandwidth': bandwidth, 542 'channel': channel 543 } 544 } 545 self.update_ap_settings(setting_to_update) 546 547 def set_power(self, network, power): 548 """Function that sets network transmit power. 549 550 Args: 551 network: string containing network identifier (2G, 5G_1, 5G_2) 552 power: string containing power level, e.g., 25%, 100% 553 """ 554 if 'power' not in self.ap_settings[network].keys(): 555 self.log.error( 556 'Cannot configure power on {} interface.'.format(network)) 557 setting_to_update = {network: {'power': power}} 558 self.update_ap_settings(setting_to_update) 559 560 def set_security(self, network, security_type, *password): 561 """Function that sets network security setting and password. 562 563 Args: 564 network: string containing network identifier (2G, 5G_1, 5G_2) 565 security: string containing security setting, e.g., WPA2-PSK 566 password: optional argument containing password 567 """ 568 if (len(password) == 1) and (type(password[0]) == str): 569 setting_to_update = { 570 network: { 571 'security_type': str(security_type), 572 'password': str(password[0]) 573 } 574 } 575 else: 576 setting_to_update = { 577 network: { 578 'security_type': str(security_type) 579 } 580 } 581 self.update_ap_settings(setting_to_update) 582 583 def set_rate(self): 584 """Function that configures rate used by AP. 585 586 Function implementation is not supported by most APs and thus base 587 class raises exception if function not implemented in child class. 588 """ 589 raise NotImplementedError 590 591 def _update_settings_dict(self, 592 settings, 593 updates, 594 updates_requested=False, 595 status_toggle_flag=False): 596 new_settings = copy.deepcopy(settings) 597 for key, value in updates.items(): 598 if key not in new_settings.keys(): 599 raise KeyError('{} is an invalid settings key.'.format(key)) 600 elif isinstance(value, collections.abc.Mapping): 601 new_settings[ 602 key], updates_requested, status_toggle_flag = self._update_settings_dict( 603 new_settings.get(key, {}), value, updates_requested, 604 status_toggle_flag) 605 elif new_settings[key] != value: 606 new_settings[key] = value 607 updates_requested = True 608 if 'status' in key: 609 status_toggle_flag = True 610 return new_settings, updates_requested, status_toggle_flag 611 612 def update_ap_settings(self, dict_settings={}, **named_settings): 613 """Function to update settings of existing AP. 614 615 Function copies arguments into ap_settings and calls configure_retail_ap 616 to apply them. 617 618 Args: 619 *dict_settings accepts single dictionary of settings to update 620 **named_settings accepts named settings to update 621 Note: dict and named_settings cannot contain the same settings. 622 """ 623 settings_to_update = dict(dict_settings, **named_settings) 624 if len(settings_to_update) != len(dict_settings) + len(named_settings): 625 raise KeyError('The following keys were passed twice: {}'.format( 626 (set(dict_settings.keys()).intersection( 627 set(named_settings.keys()))))) 628 629 self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict( 630 self.ap_settings, settings_to_update) 631 632 if updates_requested: 633 self.configure_ap(status_toggled=status_toggle_flag) 634 635 def band_lookup_by_channel(self, channel): 636 """Function that gives band name by channel number. 637 638 Args: 639 channel: channel number to lookup 640 Returns: 641 band: name of band which this channel belongs to on this ap, False 642 if not supported 643 """ 644 for key, value in self.capabilities['channels'].items(): 645 if channel in value: 646 return key 647 return False 648 649 def _get_control_ip_address(self): 650 """Function to get AP's Control Interface IP address.""" 651 if 'ssh_config' in self.ap_settings.keys(): 652 return self.ap_settings['ssh_config']['host'] 653 else: 654 return self.ap_settings['ip_address'] 655 656 def _lock_ap(self): 657 """Function to lock the ap while tests are running.""" 658 self.lock_file_path = '/tmp/{}_{}_{}.lock'.format( 659 self.ap_settings['brand'], self.ap_settings['model'], 660 self._get_control_ip_address()) 661 if not os.path.exists(self.lock_file_path): 662 with open(self.lock_file_path, 'w'): 663 pass 664 self.lock_file = open(self.lock_file_path, 'r') 665 start_time = time.time() 666 self.log.info('Trying to acquire AP lock.') 667 while time.time() < start_time + self.lock_timeout: 668 try: 669 fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 670 except BlockingIOError: 671 time.sleep(BROWSER_WAIT_SHORT) 672 continue 673 self.log.info('AP lock acquired.') 674 return 675 raise RuntimeError('Could not lock AP in time.') 676 677 def _unlock_ap(self): 678 """Function to unlock the AP when tests are done.""" 679 self.log.info('Releasing AP lock.') 680 if hasattr(self, 'lock_file'): 681 try: 682 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 683 self.lock_file.close() 684 self.log.info('Succussfully released AP lock file.') 685 except: 686 raise RuntimeError('Error occurred while unlocking AP.') 687