1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections.abc 18import copy 19import fcntl 20import importlib 21import logging 22import os 23import selenium 24import time 25from acts import logger 26from selenium.webdriver.support.ui import Select 27from selenium.webdriver.chrome.service import Service as ChromeService 28from selenium.webdriver.common.by import By 29from selenium.webdriver.support import expected_conditions as expected_conditions 30from webdriver_manager.chrome import ChromeDriverManager 31 32 33BROWSER_WAIT_SHORT = 1 34BROWSER_WAIT_MED = 3 35BROWSER_WAIT_LONG = 30 36BROWSER_WAIT_EXTRA_LONG = 60 37 38 39def create(configs): 40 """Factory method for retail AP class. 41 42 Args: 43 configs: list of dicts containing ap settings. ap settings must contain 44 the following: brand, model, ip_address, username and password 45 """ 46 SUPPORTED_APS = { 47 ('Netgear', 'R7000'): { 48 'name': 'NetgearR7000AP', 49 'package': 'netgear_r7000' 50 }, 51 ('Netgear', 'R7000NA'): { 52 'name': 'NetgearR7000NAAP', 53 'package': 'netgear_r7000' 54 }, 55 ('Netgear', 'R7500'): { 56 'name': 'NetgearR7500AP', 57 'package': 'netgear_r7500' 58 }, 59 ('Netgear', 'R7500NA'): { 60 'name': 'NetgearR7500NAAP', 61 'package': 'netgear_r7500' 62 }, 63 ('Netgear', 'R7800'): { 64 'name': 'NetgearR7800AP', 65 'package': 'netgear_r7800' 66 }, 67 ('Netgear', 'R8000'): { 68 'name': 'NetgearR8000AP', 69 'package': 'netgear_r8000' 70 }, 71 ('Netgear', 'RAX80'): { 72 'name': 'NetgearRAX80AP', 73 'package': 'netgear_rax80' 74 }, 75 ('Netgear', 'RAX120'): { 76 'name': 'NetgearRAX120AP', 77 'package': 'netgear_rax120' 78 }, 79 ('Netgear', 'RAX200'): { 80 'name': 'NetgearRAX200AP', 81 'package': 'netgear_rax200' 82 }, 83 ('Netgear', 'RAXE500'): { 84 'name': 'NetgearRAXE500AP', 85 'package': 'netgear_raxe500' 86 }, 87 ('Brcm', 'Reference'): { 88 'name': 'BrcmRefAP', 89 'package': 'brcm_ref' 90 }, 91 ('Google', 'Wifi'): { 92 'name': 'GoogleWifiAP', 93 'package': 'google_wifi' 94 }, 95 } 96 objs = [] 97 for config in configs: 98 ap_id = (config['brand'], config['model']) 99 if ap_id not in SUPPORTED_APS: 100 raise KeyError('Invalid retail AP brand and model combination.') 101 ap_class_dict = SUPPORTED_APS[ap_id] 102 ap_package = 'acts_contrib.test_utils.wifi.wifi_retail_ap.{}'.format( 103 ap_class_dict['package']) 104 ap_package = importlib.import_module(ap_package) 105 ap_class = getattr(ap_package, ap_class_dict['name']) 106 objs.append(ap_class(config)) 107 return objs 108 109 110def destroy(objs): 111 for obj in objs: 112 obj.teardown() 113 114 115class BlockingBrowser(selenium.webdriver.chrome.webdriver.WebDriver): 116 """Class that implements a blocking browser session on top of selenium. 117 118 The class inherits from and builds upon splinter/selenium's webdriver class 119 and makes sure that only one such webdriver is active on a machine at any 120 single time. The class ensures single session operation using a lock file. 121 The class is to be used within context managers (e.g. with statements) to 122 ensure locks are always properly released. 123 """ 124 125 def __init__(self, headless, timeout): 126 """Constructor for BlockingBrowser class. 127 128 Args: 129 headless: boolean to control visible/headless browser operation 130 timeout: maximum time allowed to launch browser 131 """ 132 if int(selenium.__version__[0]) < 4: 133 raise RuntimeError( 134 'BlockingBrowser now requires selenium==4.0.0 or later. ') 135 self.log = logger.create_tagged_trace_logger('ChromeDriver') 136 self.chrome_options = selenium.webdriver.chrome.webdriver.Options() 137 self.chrome_options.add_argument('--no-proxy-server') 138 self.chrome_options.add_argument('--no-sandbox') 139 self.chrome_options.add_argument('--crash-dumps-dir=/tmp') 140 self.chrome_options.add_argument('--allow-running-insecure-content') 141 self.chrome_options.add_argument('--ignore-certificate-errors') 142 self.chrome_capabilities = selenium.webdriver.common.desired_capabilities.DesiredCapabilities.CHROME.copy( 143 ) 144 self.chrome_capabilities['acceptSslCerts'] = True 145 self.chrome_capabilities['acceptInsecureCerts'] = True 146 if headless: 147 self.chrome_options.add_argument('--headless') 148 self.chrome_options.add_argument('--disable-gpu') 149 os.environ['WDM_LOG'] = str(logging.NOTSET) 150 self.executable_path = ChromeDriverManager().install() 151 self.timeout = timeout 152 153 def __enter__(self): 154 """Entry context manager for BlockingBrowser. 155 156 The enter context manager for BlockingBrowser attempts to lock the 157 browser file. If successful, it launches and returns a chromedriver 158 session. If an exception occurs while starting the browser, the lock 159 file is released. 160 """ 161 self.lock_file = open(self.executable_path, 'r') 162 start_time = time.time() 163 while time.time() < start_time + self.timeout: 164 try: 165 fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 166 except BlockingIOError: 167 time.sleep(BROWSER_WAIT_SHORT) 168 continue 169 try: 170 self.driver = selenium.webdriver.Chrome( 171 service=ChromeService(self.executable_path), 172 options=self.chrome_options, 173 desired_capabilities=self.chrome_capabilities) 174 self.session_id = self.driver.session_id 175 return self 176 except: 177 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 178 self.lock_file.close() 179 raise RuntimeError('Error starting browser. ' 180 'Releasing lock file.') 181 raise TimeoutError('Could not start chrome browser in time.') 182 183 def __exit__(self, exc_type, exc_value, traceback): 184 """Exit context manager for BlockingBrowser. 185 186 The exit context manager simply calls the parent class exit and 187 releases the lock file. 188 """ 189 try: 190 self.driver.quit() 191 except: 192 raise RuntimeError('Failed to quit browser. Releasing lock file.') 193 finally: 194 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 195 self.lock_file.close() 196 197 def restart(self): 198 """Method to restart browser session without releasing lock file.""" 199 self.driver.quit() 200 self.__enter__() 201 202 def visit_persistent(self, 203 url, 204 page_load_timeout, 205 num_tries, 206 backup_url='about:blank', 207 check_for_element=None): 208 """Method to visit webpages and retry upon failure. 209 210 The function visits a URL and checks that the resulting URL matches 211 the intended URL, i.e. no redirects have happened 212 213 Args: 214 url: the intended url 215 page_load_timeout: timeout for page visits 216 num_tries: number of tries before url is declared unreachable 217 backup_url: url to visit if first url is not reachable. This can be 218 used to simply refresh the browser and try again or to re-login to 219 the AP 220 check_for_element: element id to check for existence on page 221 """ 222 self.driver.set_page_load_timeout(page_load_timeout) 223 for idx in range(num_tries): 224 try: 225 self.driver.get(url) 226 except: 227 self.restart() 228 229 page_reached = self.driver.current_url.split('/')[-1] == url.split( 230 '/')[-1] 231 if page_reached: 232 if check_for_element: 233 time.sleep(BROWSER_WAIT_MED) 234 if self.is_element_visible(check_for_element): 235 break 236 else: 237 raise RuntimeError( 238 'Page reached but expected element not found.') 239 else: 240 break 241 else: 242 try: 243 self.driver.get(backup_url) 244 except: 245 self.restart() 246 247 if idx == num_tries - 1: 248 self.log.error('URL unreachable. Current URL: {}'.format( 249 self.url)) 250 raise RuntimeError('URL unreachable.') 251 252 def get_element_value(self, element_name): 253 """Function to look up and get webpage element value. 254 255 Args: 256 element_name: name of element to look up 257 Returns: 258 Value of element 259 """ 260 #element = self.driver.find_element_by_name(element_name) 261 element = self.driver.find_element(By.NAME, element_name) 262 element_type = self.get_element_type(element_name) 263 if element_type == 'checkbox': 264 return element.is_selected() 265 elif element_type == 'radio': 266 items = self.driver.find_elements(By.NAME, element_name) 267 for item in items: 268 if item.is_selected(): 269 return item.get_attribute('value') 270 else: 271 return element.get_attribute('value') 272 273 def get_element_type(self, element_name): 274 """Function to look up and get webpage element type. 275 276 Args: 277 element_name: name of element to look up 278 Returns: 279 Type of element 280 """ 281 item = self.driver.find_element(By.NAME, element_name) 282 type = item.get_attribute('type') 283 return type 284 285 def is_element_enabled(self, element_name): 286 """Function to check if element is enabled/interactable. 287 288 Args: 289 element_name: name of element to look up 290 Returns: 291 Boolean indicating if element is interactable 292 """ 293 item = self.driver.find_element(By.NAME, element_name) 294 return item.is_enabled() 295 296 def is_element_visible(self, element_name): 297 """Function to check if element is visible. 298 299 Args: 300 element_name: name of element to look up 301 Returns: 302 Boolean indicating if element is visible 303 """ 304 item = self.driver.find_element(By.NAME, element_name) 305 return item.is_displayed() 306 307 def set_element_value(self, element_name, value, select_method='value'): 308 """Function to set webpage element value. 309 310 Args: 311 element_name: name of element to set 312 value: value of element 313 select_method: select method for dropdown lists (value/index/text) 314 """ 315 element_type = self.get_element_type(element_name) 316 if element_type == 'text' or element_type == 'password': 317 item = self.driver.find_element(By.NAME, element_name) 318 item.clear() 319 item.send_keys(value) 320 elif element_type == 'checkbox': 321 item = self.driver.find_element(By.NAME, element_name) 322 if value != item.is_selected(): 323 item.click() 324 elif element_type == 'radio': 325 items = self.driver.find_elements(By.NAME, element_name) 326 for item in items: 327 if item.get_attribute('value') == value: 328 item.click() 329 elif element_type == 'select-one': 330 select = Select(self.driver.find_element(By.NAME, element_name)) 331 if select_method == 'value': 332 select.select_by_value(str(value)) 333 elif select_method == 'text': 334 select.select_by_visible_text(value) 335 elif select_method == 'index': 336 select.select_by_index(value) 337 else: 338 raise RuntimeError( 339 '{} is not a valid select method.'.format(select_method)) 340 else: 341 raise RuntimeError( 342 'Element type {} not supported.'.format(element_type)) 343 344 def click_button(self, button_name): 345 """Function to click button on webpage 346 347 Args: 348 button_name: name of button to click 349 """ 350 button = self.driver.find_element(By.NAME, button_name) 351 if button.get_attribute('type') == 'submit': 352 button.click() 353 else: 354 raise RuntimeError('{} is not a button.'.format(button_name)) 355 356 def accept_alert_if_present(self, wait_for_alert=1): 357 """Function to check for alert and accept if present 358 359 Args: 360 wait_for_alert: time (seconds) to wait for alert 361 """ 362 try: 363 selenium.webdriver.support.ui.WebDriverWait( 364 self.driver, 365 wait_for_alert).until(expected_conditions.alert_is_present()) 366 alert = self.driver.switch_to.alert 367 alert.accept() 368 except selenium.common.exceptions.TimeoutException: 369 pass 370 371 372class WifiRetailAP(object): 373 """Base class implementation for retail ap. 374 375 Base class provides functions whose implementation is shared by all aps. 376 If some functions such as set_power not supported by ap, checks will raise 377 exceptions. 378 """ 379 380 def __init__(self, ap_settings): 381 self.ap_settings = ap_settings.copy() 382 self.log = logger.create_tagged_trace_logger('AccessPoint|{}'.format( 383 self._get_control_ip_address())) 384 # Capabilities variable describing AP capabilities 385 self.capabilities = { 386 'interfaces': [], 387 'channels': {}, 388 'modes': {}, 389 'default_mode': None 390 } 391 for interface in self.capabilities['interfaces']: 392 self.ap_settings.setdefault(interface, {}) 393 # Lock AP 394 if self.ap_settings.get('lock_ap', 0): 395 self.lock_timeout = self.ap_settings.get('lock_timeout', 3600) 396 self._lock_ap() 397 398 def teardown(self): 399 """Function to perform destroy operations.""" 400 if self.ap_settings.get('lock_ap', 0): 401 self._unlock_ap() 402 403 def reset(self): 404 """Function that resets AP. 405 406 Function implementation is AP dependent and intended to perform any 407 necessary reset operations as part of controller destroy. 408 """ 409 410 def read_ap_settings(self): 411 """Function that reads current ap settings. 412 413 Function implementation is AP dependent and thus base class raises exception 414 if function not implemented in child class. 415 """ 416 raise NotImplementedError 417 418 def validate_ap_settings(self): 419 """Function to validate ap settings. 420 421 This function compares the actual ap settings read from the web GUI 422 with the assumed settings saved in the AP object. When called after AP 423 configuration, this method helps ensure that our configuration was 424 successful. 425 Note: Calling this function updates the stored ap_settings 426 427 Raises: 428 ValueError: If read AP settings do not match stored settings. 429 """ 430 assumed_ap_settings = copy.deepcopy(self.ap_settings) 431 actual_ap_settings = self.read_ap_settings() 432 433 if assumed_ap_settings != actual_ap_settings: 434 self.log.warning( 435 'Discrepancy in AP settings. Some settings may have been overwritten.' 436 ) 437 438 def configure_ap(self, **config_flags): 439 """Function that configures ap based on values of ap_settings. 440 441 Function implementation is AP dependent and thus base class raises exception 442 if function not implemented in child class. 443 444 Args: 445 config_flags: optional configuration flags 446 """ 447 raise NotImplementedError 448 449 def set_region(self, region): 450 """Function that sets AP region. 451 452 This function sets the region for the AP. Note that this may overwrite 453 channel and bandwidth settings in cases where the new region does not 454 support the current wireless configuration. 455 456 Args: 457 region: string indicating AP region 458 """ 459 if region != self.ap_settings['region']: 460 self.log.warning( 461 'Updating region may overwrite wireless settings.') 462 setting_to_update = {'region': region} 463 self.update_ap_settings(setting_to_update) 464 465 def set_radio_on_off(self, network, status): 466 """Function that turns the radio on or off. 467 468 Args: 469 network: string containing network identifier (2G, 5G_1, 5G_2) 470 status: boolean indicating on or off (0: off, 1: on) 471 """ 472 setting_to_update = {network: {'status': int(status)}} 473 self.update_ap_settings(setting_to_update) 474 475 def set_ssid(self, network, ssid): 476 """Function that sets network SSID. 477 478 Args: 479 network: string containing network identifier (2G, 5G_1, 5G_2) 480 ssid: string containing ssid 481 """ 482 setting_to_update = {network: {'ssid': str(ssid)}} 483 self.update_ap_settings(setting_to_update) 484 485 def set_channel(self, network, channel): 486 """Function that sets network channel. 487 488 Args: 489 network: string containing network identifier (2G, 5G_1, 5G_2) 490 channel: string or int containing channel 491 """ 492 if channel not in self.capabilities['channels'][network]: 493 self.log.error('Ch{} is not supported on {} interface.'.format( 494 channel, network)) 495 setting_to_update = {network: {'channel': channel}} 496 self.update_ap_settings(setting_to_update) 497 498 def set_bandwidth(self, network, bandwidth): 499 """Function that sets network bandwidth/mode. 500 501 Args: 502 network: string containing network identifier (2G, 5G_1, 5G_2) 503 bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. 504 """ 505 if 'bw' in bandwidth: 506 bandwidth = bandwidth.replace('bw', 507 self.capabilities['default_mode']) 508 elif isinstance(bandwidth, int): 509 bandwidth = str(bandwidth) + self.capabilities['default_mode'] 510 if bandwidth not in self.capabilities['modes'][network]: 511 self.log.error('{} mode is not supported on {} interface.'.format( 512 bandwidth, network)) 513 setting_to_update = {network: {'bandwidth': bandwidth}} 514 self.update_ap_settings(setting_to_update) 515 516 def set_channel_and_bandwidth(self, network, channel, bandwidth): 517 """Function that sets network bandwidth/mode and channel. 518 519 Args: 520 network: string containing network identifier (2G, 5G_1, 5G_2) 521 channel: string containing desired channel 522 bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. 523 """ 524 if 'bw' in bandwidth: 525 bandwidth = bandwidth.replace('bw', 526 self.capabilities['default_mode']) 527 elif isinstance(bandwidth, int): 528 bandwidth = str(bandwidth) + self.capabilities['default_mode'] 529 if bandwidth not in self.capabilities['modes'][network]: 530 self.log.error('{} mode is not supported on {} interface.'.format( 531 bandwidth, network)) 532 if channel not in self.capabilities['channels'][network]: 533 self.log.error('Ch{} is not supported on {} interface.'.format( 534 channel, network)) 535 setting_to_update = { 536 network: { 537 'bandwidth': bandwidth, 538 'channel': channel 539 } 540 } 541 self.update_ap_settings(setting_to_update) 542 543 def set_power(self, network, power): 544 """Function that sets network transmit power. 545 546 Args: 547 network: string containing network identifier (2G, 5G_1, 5G_2) 548 power: string containing power level, e.g., 25%, 100% 549 """ 550 if 'power' not in self.ap_settings[network].keys(): 551 self.log.error( 552 'Cannot configure power on {} interface.'.format(network)) 553 setting_to_update = {network: {'power': power}} 554 self.update_ap_settings(setting_to_update) 555 556 def set_security(self, network, security_type, *password): 557 """Function that sets network security setting and password. 558 559 Args: 560 network: string containing network identifier (2G, 5G_1, 5G_2) 561 security: string containing security setting, e.g., WPA2-PSK 562 password: optional argument containing password 563 """ 564 if (len(password) == 1) and (type(password[0]) == str): 565 setting_to_update = { 566 network: { 567 'security_type': str(security_type), 568 'password': str(password[0]) 569 } 570 } 571 else: 572 setting_to_update = { 573 network: { 574 'security_type': str(security_type) 575 } 576 } 577 self.update_ap_settings(setting_to_update) 578 579 def set_rate(self): 580 """Function that configures rate used by AP. 581 582 Function implementation is not supported by most APs and thus base 583 class raises exception if function not implemented in child class. 584 """ 585 raise NotImplementedError 586 587 def _update_settings_dict(self, 588 settings, 589 updates, 590 updates_requested=False, 591 status_toggle_flag=False): 592 new_settings = copy.deepcopy(settings) 593 for key, value in updates.items(): 594 if key not in new_settings.keys(): 595 raise KeyError('{} is an invalid settings key.'.format(key)) 596 elif isinstance(value, collections.abc.Mapping): 597 new_settings[ 598 key], updates_requested, status_toggle_flag = self._update_settings_dict( 599 new_settings.get(key, {}), value, updates_requested, 600 status_toggle_flag) 601 elif new_settings[key] != value: 602 new_settings[key] = value 603 updates_requested = True 604 if 'status' in key: 605 status_toggle_flag = True 606 return new_settings, updates_requested, status_toggle_flag 607 608 def update_ap_settings(self, dict_settings={}, **named_settings): 609 """Function to update settings of existing AP. 610 611 Function copies arguments into ap_settings and calls configure_retail_ap 612 to apply them. 613 614 Args: 615 *dict_settings accepts single dictionary of settings to update 616 **named_settings accepts named settings to update 617 Note: dict and named_settings cannot contain the same settings. 618 """ 619 settings_to_update = dict(dict_settings, **named_settings) 620 if len(settings_to_update) != len(dict_settings) + len(named_settings): 621 raise KeyError('The following keys were passed twice: {}'.format( 622 (set(dict_settings.keys()).intersection( 623 set(named_settings.keys()))))) 624 625 self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict( 626 self.ap_settings, settings_to_update) 627 628 if updates_requested: 629 self.configure_ap(status_toggled=status_toggle_flag) 630 631 def band_lookup_by_channel(self, channel): 632 """Function that gives band name by channel number. 633 634 Args: 635 channel: channel number to lookup 636 Returns: 637 band: name of band which this channel belongs to on this ap, False 638 if not supported 639 """ 640 for key, value in self.capabilities['channels'].items(): 641 if channel in value: 642 return key 643 return False 644 645 def _get_control_ip_address(self): 646 """Function to get AP's Control Interface IP address.""" 647 if 'ssh_config' in self.ap_settings.keys(): 648 return self.ap_settings['ssh_config']['host'] 649 else: 650 return self.ap_settings['ip_address'] 651 652 def _lock_ap(self): 653 """Function to lock the ap while tests are running.""" 654 self.lock_file_path = '/tmp/{}_{}_{}.lock'.format( 655 self.ap_settings['brand'], self.ap_settings['model'], 656 self._get_control_ip_address()) 657 if not os.path.exists(self.lock_file_path): 658 with open(self.lock_file_path, 'w'): 659 pass 660 self.lock_file = open(self.lock_file_path, 'r') 661 start_time = time.time() 662 self.log.info('Trying to acquire AP lock.') 663 while time.time() < start_time + self.lock_timeout: 664 try: 665 fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 666 except BlockingIOError: 667 time.sleep(BROWSER_WAIT_SHORT) 668 continue 669 self.log.info('AP lock acquired.') 670 return 671 raise RuntimeError('Could not lock AP in time.') 672 673 def _unlock_ap(self): 674 """Function to unlock the AP when tests are done.""" 675 self.log.info('Releasing AP lock.') 676 if hasattr(self, 'lock_file'): 677 try: 678 fcntl.flock(self.lock_file, fcntl.LOCK_UN) 679 self.lock_file.close() 680 self.log.info('Succussfully released AP lock file.') 681 except: 682 raise RuntimeError('Error occurred while unlocking AP.') 683