• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections.abc
18import copy
19import fcntl
20import importlib
21import logging
22import os
23import selenium
24import time
25from acts import logger
26from selenium.webdriver.support.ui import Select
27from selenium.webdriver.chrome.service import Service as ChromeService
28from selenium.webdriver.common.by import By
29from selenium.webdriver.support import expected_conditions as expected_conditions
30from webdriver_manager.chrome import ChromeDriverManager
31
32
33BROWSER_WAIT_SHORT = 1
34BROWSER_WAIT_MED = 3
35BROWSER_WAIT_LONG = 30
36BROWSER_WAIT_EXTRA_LONG = 60
37
38
39def create(configs):
40    """Factory method for retail AP class.
41
42    Args:
43        configs: list of dicts containing ap settings. ap settings must contain
44        the following: brand, model, ip_address, username and password
45    """
46    SUPPORTED_APS = {
47        ('Netgear', 'R7000'): {
48            'name': 'NetgearR7000AP',
49            'package': 'netgear_r7000'
50        },
51        ('Netgear', 'R7000NA'): {
52            'name': 'NetgearR7000NAAP',
53            'package': 'netgear_r7000'
54        },
55        ('Netgear', 'R7500'): {
56            'name': 'NetgearR7500AP',
57            'package': 'netgear_r7500'
58        },
59        ('Netgear', 'R7500NA'): {
60            'name': 'NetgearR7500NAAP',
61            'package': 'netgear_r7500'
62        },
63        ('Netgear', 'R7800'): {
64            'name': 'NetgearR7800AP',
65            'package': 'netgear_r7800'
66        },
67        ('Netgear', 'R8000'): {
68            'name': 'NetgearR8000AP',
69            'package': 'netgear_r8000'
70        },
71        ('Netgear', 'RAX80'): {
72            'name': 'NetgearRAX80AP',
73            'package': 'netgear_rax80'
74        },
75        ('Netgear', 'RAX120'): {
76            'name': 'NetgearRAX120AP',
77            'package': 'netgear_rax120'
78        },
79        ('Netgear', 'RAX200'): {
80            'name': 'NetgearRAX200AP',
81            'package': 'netgear_rax200'
82        },
83        ('Netgear', 'RAXE500'): {
84            'name': 'NetgearRAXE500AP',
85            'package': 'netgear_raxe500'
86        },
87        ('Brcm', 'Reference'): {
88            'name': 'BrcmRefAP',
89            'package': 'brcm_ref'
90        },
91        ('Google', 'Wifi'): {
92            'name': 'GoogleWifiAP',
93            'package': 'google_wifi'
94        },
95    }
96    objs = []
97    for config in configs:
98        ap_id = (config['brand'], config['model'])
99        if ap_id not in SUPPORTED_APS:
100            raise KeyError('Invalid retail AP brand and model combination.')
101        ap_class_dict = SUPPORTED_APS[ap_id]
102        ap_package = 'acts_contrib.test_utils.wifi.wifi_retail_ap.{}'.format(
103            ap_class_dict['package'])
104        ap_package = importlib.import_module(ap_package)
105        ap_class = getattr(ap_package, ap_class_dict['name'])
106        objs.append(ap_class(config))
107    return objs
108
109
110def destroy(objs):
111    for obj in objs:
112        obj.teardown()
113
114
115class BlockingBrowser(selenium.webdriver.chrome.webdriver.WebDriver):
116    """Class that implements a blocking browser session on top of selenium.
117
118    The class inherits from and builds upon splinter/selenium's webdriver class
119    and makes sure that only one such webdriver is active on a machine at any
120    single time. The class ensures single session operation using a lock file.
121    The class is to be used within context managers (e.g. with statements) to
122    ensure locks are always properly released.
123    """
124
125    def __init__(self, headless, timeout):
126        """Constructor for BlockingBrowser class.
127
128        Args:
129            headless: boolean to control visible/headless browser operation
130            timeout: maximum time allowed to launch browser
131        """
132        if int(selenium.__version__[0]) < 4:
133            raise RuntimeError(
134                'BlockingBrowser now requires selenium==4.0.0 or later. ')
135        self.log = logger.create_tagged_trace_logger('ChromeDriver')
136        self.chrome_options = selenium.webdriver.chrome.webdriver.Options()
137        self.chrome_options.add_argument('--no-proxy-server')
138        self.chrome_options.add_argument('--no-sandbox')
139        self.chrome_options.add_argument('--crash-dumps-dir=/tmp')
140        self.chrome_options.add_argument('--allow-running-insecure-content')
141        self.chrome_options.add_argument('--ignore-certificate-errors')
142        self.chrome_capabilities = selenium.webdriver.common.desired_capabilities.DesiredCapabilities.CHROME.copy(
143        )
144        self.chrome_capabilities['acceptSslCerts'] = True
145        self.chrome_capabilities['acceptInsecureCerts'] = True
146        if headless:
147            self.chrome_options.add_argument('--headless')
148            self.chrome_options.add_argument('--disable-gpu')
149        os.environ['WDM_LOG'] = str(logging.NOTSET)
150        self.executable_path = ChromeDriverManager().install()
151        self.timeout = timeout
152
153    def __enter__(self):
154        """Entry context manager for BlockingBrowser.
155
156        The enter context manager for BlockingBrowser attempts to lock the
157        browser file. If successful, it launches and returns a chromedriver
158        session. If an exception occurs while starting the browser, the lock
159        file is released.
160        """
161        self.lock_file = open(self.executable_path, 'r')
162        start_time = time.time()
163        while time.time() < start_time + self.timeout:
164            try:
165                fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
166            except BlockingIOError:
167                time.sleep(BROWSER_WAIT_SHORT)
168                continue
169            try:
170                self.driver = selenium.webdriver.Chrome(
171                    service=ChromeService(self.executable_path),
172                    options=self.chrome_options,
173                    desired_capabilities=self.chrome_capabilities)
174                self.session_id = self.driver.session_id
175                return self
176            except:
177                fcntl.flock(self.lock_file, fcntl.LOCK_UN)
178                self.lock_file.close()
179                raise RuntimeError('Error starting browser. '
180                                   'Releasing lock file.')
181        raise TimeoutError('Could not start chrome browser in time.')
182
183    def __exit__(self, exc_type, exc_value, traceback):
184        """Exit context manager for BlockingBrowser.
185
186        The exit context manager simply calls the parent class exit and
187        releases the lock file.
188        """
189        try:
190            self.driver.quit()
191        except:
192            raise RuntimeError('Failed to quit browser. Releasing lock file.')
193        finally:
194            fcntl.flock(self.lock_file, fcntl.LOCK_UN)
195            self.lock_file.close()
196
197    def restart(self):
198        """Method to restart browser session without releasing lock file."""
199        self.driver.quit()
200        self.__enter__()
201
202    def visit_persistent(self,
203                         url,
204                         page_load_timeout,
205                         num_tries,
206                         backup_url='about:blank',
207                         check_for_element=None):
208        """Method to visit webpages and retry upon failure.
209
210        The function visits a URL and checks that the resulting URL matches
211        the intended URL, i.e. no redirects have happened
212
213        Args:
214            url: the intended url
215            page_load_timeout: timeout for page visits
216            num_tries: number of tries before url is declared unreachable
217            backup_url: url to visit if first url is not reachable. This can be
218            used to simply refresh the browser and try again or to re-login to
219            the AP
220            check_for_element: element id to check for existence on page
221        """
222        self.driver.set_page_load_timeout(page_load_timeout)
223        for idx in range(num_tries):
224            try:
225                self.driver.get(url)
226            except:
227                self.restart()
228
229            page_reached = self.driver.current_url.split('/')[-1] == url.split(
230                '/')[-1]
231            if page_reached:
232                if check_for_element:
233                    time.sleep(BROWSER_WAIT_MED)
234                    if self.is_element_visible(check_for_element):
235                        break
236                    else:
237                        raise RuntimeError(
238                            'Page reached but expected element not found.')
239                else:
240                    break
241            else:
242                try:
243                    self.driver.get(backup_url)
244                except:
245                    self.restart()
246
247            if idx == num_tries - 1:
248                self.log.error('URL unreachable. Current URL: {}'.format(
249                    self.url))
250                raise RuntimeError('URL unreachable.')
251
252    def get_element_value(self, element_name):
253        """Function to look up and get webpage element value.
254
255        Args:
256            element_name: name of element to look up
257        Returns:
258            Value of element
259        """
260        #element = self.driver.find_element_by_name(element_name)
261        element = self.driver.find_element(By.NAME, element_name)
262        element_type = self.get_element_type(element_name)
263        if element_type == 'checkbox':
264            return element.is_selected()
265        elif element_type == 'radio':
266            items = self.driver.find_elements(By.NAME, element_name)
267            for item in items:
268                if item.is_selected():
269                    return item.get_attribute('value')
270        else:
271            return element.get_attribute('value')
272
273    def get_element_type(self, element_name):
274        """Function to look up and get webpage element type.
275
276        Args:
277            element_name: name of element to look up
278        Returns:
279            Type of element
280        """
281        item = self.driver.find_element(By.NAME, element_name)
282        type = item.get_attribute('type')
283        return type
284
285    def is_element_enabled(self, element_name):
286        """Function to check if element is enabled/interactable.
287
288        Args:
289            element_name: name of element to look up
290        Returns:
291            Boolean indicating if element is interactable
292        """
293        item = self.driver.find_element(By.NAME, element_name)
294        return item.is_enabled()
295
296    def is_element_visible(self, element_name):
297        """Function to check if element is visible.
298
299        Args:
300            element_name: name of element to look up
301        Returns:
302            Boolean indicating if element is visible
303        """
304        item = self.driver.find_element(By.NAME, element_name)
305        return item.is_displayed()
306
307    def set_element_value(self, element_name, value, select_method='value'):
308        """Function to set webpage element value.
309
310        Args:
311            element_name: name of element to set
312            value: value of element
313            select_method: select method for dropdown lists (value/index/text)
314        """
315        element_type = self.get_element_type(element_name)
316        if element_type == 'text' or element_type == 'password':
317            item = self.driver.find_element(By.NAME, element_name)
318            item.clear()
319            item.send_keys(value)
320        elif element_type == 'checkbox':
321            item = self.driver.find_element(By.NAME, element_name)
322            if value != item.is_selected():
323                item.click()
324        elif element_type == 'radio':
325            items = self.driver.find_elements(By.NAME, element_name)
326            for item in items:
327                if item.get_attribute('value') == value:
328                    item.click()
329        elif element_type == 'select-one':
330            select = Select(self.driver.find_element(By.NAME, element_name))
331            if select_method == 'value':
332                select.select_by_value(str(value))
333            elif select_method == 'text':
334                select.select_by_visible_text(value)
335            elif select_method == 'index':
336                select.select_by_index(value)
337            else:
338                raise RuntimeError(
339                    '{} is not a valid select method.'.format(select_method))
340        else:
341            raise RuntimeError(
342                'Element type {} not supported.'.format(element_type))
343
344    def click_button(self, button_name):
345        """Function to click button on webpage
346
347        Args:
348            button_name: name of button to click
349        """
350        button = self.driver.find_element(By.NAME, button_name)
351        if button.get_attribute('type') == 'submit':
352            button.click()
353        else:
354            raise RuntimeError('{} is not a button.'.format(button_name))
355
356    def accept_alert_if_present(self, wait_for_alert=1):
357        """Function to check for alert and accept if present
358
359        Args:
360            wait_for_alert: time (seconds) to wait for alert
361        """
362        try:
363            selenium.webdriver.support.ui.WebDriverWait(
364                self.driver,
365                wait_for_alert).until(expected_conditions.alert_is_present())
366            alert = self.driver.switch_to.alert
367            alert.accept()
368        except selenium.common.exceptions.TimeoutException:
369            pass
370
371
372class WifiRetailAP(object):
373    """Base class implementation for retail ap.
374
375    Base class provides functions whose implementation is shared by all aps.
376    If some functions such as set_power not supported by ap, checks will raise
377    exceptions.
378    """
379
380    def __init__(self, ap_settings):
381        self.ap_settings = ap_settings.copy()
382        self.log = logger.create_tagged_trace_logger('AccessPoint|{}'.format(
383            self._get_control_ip_address()))
384        # Capabilities variable describing AP capabilities
385        self.capabilities = {
386            'interfaces': [],
387            'channels': {},
388            'modes': {},
389            'default_mode': None
390        }
391        for interface in self.capabilities['interfaces']:
392            self.ap_settings.setdefault(interface, {})
393        # Lock AP
394        if self.ap_settings.get('lock_ap', 0):
395            self.lock_timeout = self.ap_settings.get('lock_timeout', 3600)
396            self._lock_ap()
397
398    def teardown(self):
399        """Function to perform destroy operations."""
400        if self.ap_settings.get('lock_ap', 0):
401            self._unlock_ap()
402
403    def reset(self):
404        """Function that resets AP.
405
406        Function implementation is AP dependent and intended to perform any
407        necessary reset operations as part of controller destroy.
408        """
409
410    def read_ap_settings(self):
411        """Function that reads current ap settings.
412
413        Function implementation is AP dependent and thus base class raises exception
414        if function not implemented in child class.
415        """
416        raise NotImplementedError
417
418    def validate_ap_settings(self):
419        """Function to validate ap settings.
420
421        This function compares the actual ap settings read from the web GUI
422        with the assumed settings saved in the AP object. When called after AP
423        configuration, this method helps ensure that our configuration was
424        successful.
425        Note: Calling this function updates the stored ap_settings
426
427        Raises:
428            ValueError: If read AP settings do not match stored settings.
429        """
430        assumed_ap_settings = copy.deepcopy(self.ap_settings)
431        actual_ap_settings = self.read_ap_settings()
432
433        if assumed_ap_settings != actual_ap_settings:
434            self.log.warning(
435                'Discrepancy in AP settings. Some settings may have been overwritten.'
436            )
437
438    def configure_ap(self, **config_flags):
439        """Function that configures ap based on values of ap_settings.
440
441        Function implementation is AP dependent and thus base class raises exception
442        if function not implemented in child class.
443
444        Args:
445            config_flags: optional configuration flags
446        """
447        raise NotImplementedError
448
449    def set_region(self, region):
450        """Function that sets AP region.
451
452        This function sets the region for the AP. Note that this may overwrite
453        channel and bandwidth settings in cases where the new region does not
454        support the current wireless configuration.
455
456        Args:
457            region: string indicating AP region
458        """
459        if region != self.ap_settings['region']:
460            self.log.warning(
461                'Updating region may overwrite wireless settings.')
462        setting_to_update = {'region': region}
463        self.update_ap_settings(setting_to_update)
464
465    def set_radio_on_off(self, network, status):
466        """Function that turns the radio on or off.
467
468        Args:
469            network: string containing network identifier (2G, 5G_1, 5G_2)
470            status: boolean indicating on or off (0: off, 1: on)
471        """
472        setting_to_update = {network: {'status': int(status)}}
473        self.update_ap_settings(setting_to_update)
474
475    def set_ssid(self, network, ssid):
476        """Function that sets network SSID.
477
478        Args:
479            network: string containing network identifier (2G, 5G_1, 5G_2)
480            ssid: string containing ssid
481        """
482        setting_to_update = {network: {'ssid': str(ssid)}}
483        self.update_ap_settings(setting_to_update)
484
485    def set_channel(self, network, channel):
486        """Function that sets network channel.
487
488        Args:
489            network: string containing network identifier (2G, 5G_1, 5G_2)
490            channel: string or int containing channel
491        """
492        if channel not in self.capabilities['channels'][network]:
493            self.log.error('Ch{} is not supported on {} interface.'.format(
494                channel, network))
495        setting_to_update = {network: {'channel': channel}}
496        self.update_ap_settings(setting_to_update)
497
498    def set_bandwidth(self, network, bandwidth):
499        """Function that sets network bandwidth/mode.
500
501        Args:
502            network: string containing network identifier (2G, 5G_1, 5G_2)
503            bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
504        """
505        if 'bw' in bandwidth:
506            bandwidth = bandwidth.replace('bw',
507                                          self.capabilities['default_mode'])
508        elif isinstance(bandwidth, int):
509            bandwidth = str(bandwidth) + self.capabilities['default_mode']
510        if bandwidth not in self.capabilities['modes'][network]:
511            self.log.error('{} mode is not supported on {} interface.'.format(
512                bandwidth, network))
513        setting_to_update = {network: {'bandwidth': bandwidth}}
514        self.update_ap_settings(setting_to_update)
515
516    def set_channel_and_bandwidth(self, network, channel, bandwidth):
517        """Function that sets network bandwidth/mode and channel.
518
519        Args:
520            network: string containing network identifier (2G, 5G_1, 5G_2)
521            channel: string containing desired channel
522            bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
523        """
524        if 'bw' in bandwidth:
525            bandwidth = bandwidth.replace('bw',
526                                          self.capabilities['default_mode'])
527        elif isinstance(bandwidth, int):
528            bandwidth = str(bandwidth) + self.capabilities['default_mode']
529        if bandwidth not in self.capabilities['modes'][network]:
530            self.log.error('{} mode is not supported on {} interface.'.format(
531                bandwidth, network))
532        if channel not in self.capabilities['channels'][network]:
533            self.log.error('Ch{} is not supported on {} interface.'.format(
534                channel, network))
535        setting_to_update = {
536            network: {
537                'bandwidth': bandwidth,
538                'channel': channel
539            }
540        }
541        self.update_ap_settings(setting_to_update)
542
543    def set_power(self, network, power):
544        """Function that sets network transmit power.
545
546        Args:
547            network: string containing network identifier (2G, 5G_1, 5G_2)
548            power: string containing power level, e.g., 25%, 100%
549        """
550        if 'power' not in self.ap_settings[network].keys():
551            self.log.error(
552                'Cannot configure power on {} interface.'.format(network))
553        setting_to_update = {network: {'power': power}}
554        self.update_ap_settings(setting_to_update)
555
556    def set_security(self, network, security_type, *password):
557        """Function that sets network security setting and password.
558
559        Args:
560            network: string containing network identifier (2G, 5G_1, 5G_2)
561            security: string containing security setting, e.g., WPA2-PSK
562            password: optional argument containing password
563        """
564        if (len(password) == 1) and (type(password[0]) == str):
565            setting_to_update = {
566                network: {
567                    'security_type': str(security_type),
568                    'password': str(password[0])
569                }
570            }
571        else:
572            setting_to_update = {
573                network: {
574                    'security_type': str(security_type)
575                }
576            }
577        self.update_ap_settings(setting_to_update)
578
579    def set_rate(self):
580        """Function that configures rate used by AP.
581
582        Function implementation is not supported by most APs and thus base
583        class raises exception if function not implemented in child class.
584        """
585        raise NotImplementedError
586
587    def _update_settings_dict(self,
588                              settings,
589                              updates,
590                              updates_requested=False,
591                              status_toggle_flag=False):
592        new_settings = copy.deepcopy(settings)
593        for key, value in updates.items():
594            if key not in new_settings.keys():
595                raise KeyError('{} is an invalid settings key.'.format(key))
596            elif isinstance(value, collections.abc.Mapping):
597                new_settings[
598                    key], updates_requested, status_toggle_flag = self._update_settings_dict(
599                        new_settings.get(key, {}), value, updates_requested,
600                        status_toggle_flag)
601            elif new_settings[key] != value:
602                new_settings[key] = value
603                updates_requested = True
604                if 'status' in key:
605                    status_toggle_flag = True
606        return new_settings, updates_requested, status_toggle_flag
607
608    def update_ap_settings(self, dict_settings={}, **named_settings):
609        """Function to update settings of existing AP.
610
611        Function copies arguments into ap_settings and calls configure_retail_ap
612        to apply them.
613
614        Args:
615            *dict_settings accepts single dictionary of settings to update
616            **named_settings accepts named settings to update
617            Note: dict and named_settings cannot contain the same settings.
618        """
619        settings_to_update = dict(dict_settings, **named_settings)
620        if len(settings_to_update) != len(dict_settings) + len(named_settings):
621            raise KeyError('The following keys were passed twice: {}'.format(
622                (set(dict_settings.keys()).intersection(
623                    set(named_settings.keys())))))
624
625        self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict(
626            self.ap_settings, settings_to_update)
627
628        if updates_requested:
629            self.configure_ap(status_toggled=status_toggle_flag)
630
631    def band_lookup_by_channel(self, channel):
632        """Function that gives band name by channel number.
633
634        Args:
635            channel: channel number to lookup
636        Returns:
637            band: name of band which this channel belongs to on this ap, False
638            if not supported
639        """
640        for key, value in self.capabilities['channels'].items():
641            if channel in value:
642                return key
643        return False
644
645    def _get_control_ip_address(self):
646        """Function to get AP's Control Interface IP address."""
647        if 'ssh_config' in self.ap_settings.keys():
648            return self.ap_settings['ssh_config']['host']
649        else:
650            return self.ap_settings['ip_address']
651
652    def _lock_ap(self):
653        """Function to lock the ap while tests are running."""
654        self.lock_file_path = '/tmp/{}_{}_{}.lock'.format(
655            self.ap_settings['brand'], self.ap_settings['model'],
656            self._get_control_ip_address())
657        if not os.path.exists(self.lock_file_path):
658            with open(self.lock_file_path, 'w'):
659                pass
660        self.lock_file = open(self.lock_file_path, 'r')
661        start_time = time.time()
662        self.log.info('Trying to acquire AP lock.')
663        while time.time() < start_time + self.lock_timeout:
664            try:
665                fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
666            except BlockingIOError:
667                time.sleep(BROWSER_WAIT_SHORT)
668                continue
669            self.log.info('AP lock acquired.')
670            return
671        raise RuntimeError('Could not lock AP in time.')
672
673    def _unlock_ap(self):
674        """Function to unlock the AP when tests are done."""
675        self.log.info('Releasing AP lock.')
676        if hasattr(self, 'lock_file'):
677            try:
678                fcntl.flock(self.lock_file, fcntl.LOCK_UN)
679                self.lock_file.close()
680                self.log.info('Succussfully released AP lock file.')
681            except:
682                raise RuntimeError('Error occurred while unlocking AP.')
683