• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections.abc
18import copy
19import fcntl
20import importlib
21import logging
22import os
23import selenium
24import time
25from acts import logger
26from selenium.webdriver.support.ui import Select
27from selenium.webdriver.chrome.service import Service as ChromeService
28from selenium.webdriver.common.by import By
29from selenium.webdriver.support import expected_conditions as expected_conditions
30from webdriver_manager.chrome import ChromeDriverManager
31
32
33BROWSER_WAIT_SHORT = 1
34BROWSER_WAIT_MED = 3
35BROWSER_WAIT_LONG = 30
36BROWSER_WAIT_EXTRA_LONG = 60
37
38
39def create(configs):
40    """Factory method for retail AP class.
41
42    Args:
43        configs: list of dicts containing ap settings. ap settings must contain
44        the following: brand, model, ip_address, username and password
45    """
46    SUPPORTED_APS = {
47        ('Netgear', 'R7000'): {
48            'name': 'NetgearR7000AP',
49            'package': 'netgear_r7000'
50        },
51        ('Netgear', 'R7000NA'): {
52            'name': 'NetgearR7000NAAP',
53            'package': 'netgear_r7000'
54        },
55        ('Netgear', 'R7500'): {
56            'name': 'NetgearR7500AP',
57            'package': 'netgear_r7500'
58        },
59        ('Netgear', 'R7500NA'): {
60            'name': 'NetgearR7500NAAP',
61            'package': 'netgear_r7500'
62        },
63        ('Netgear', 'R7800'): {
64            'name': 'NetgearR7800AP',
65            'package': 'netgear_r7800'
66        },
67        ('Netgear', 'R8000'): {
68            'name': 'NetgearR8000AP',
69            'package': 'netgear_r8000'
70        },
71        ('Netgear', 'RAX80'): {
72            'name': 'NetgearRAX80AP',
73            'package': 'netgear_rax80'
74        },
75        ('Netgear', 'RAX120'): {
76            'name': 'NetgearRAX120AP',
77            'package': 'netgear_rax120'
78        },
79        ('Netgear', 'RAX200'): {
80            'name': 'NetgearRAX200AP',
81            'package': 'netgear_rax200'
82        },
83        ('Netgear', 'RAXE500'): {
84            'name': 'NetgearRAXE500AP',
85            'package': 'netgear_raxe500'
86        },
87        ('Netgear', 'RS700'): {
88            'name': 'NetgearRS700AP',
89            'package': 'netgear_rs700'
90        },
91        ('Brcm', 'Reference'): {
92            'name': 'BrcmRefAP',
93            'package': 'brcm_ref'
94        },
95        ('Google', 'Wifi'): {
96            'name': 'GoogleWifiAP',
97            'package': 'google_wifi'
98        },
99    }
100    objs = []
101    for config in configs:
102        ap_id = (config['brand'], config['model'])
103        if ap_id not in SUPPORTED_APS:
104            raise KeyError('Invalid retail AP brand and model combination.')
105        ap_class_dict = SUPPORTED_APS[ap_id]
106        ap_package = 'acts_contrib.test_utils.wifi.wifi_retail_ap.{}'.format(
107            ap_class_dict['package'])
108        ap_package = importlib.import_module(ap_package)
109        ap_class = getattr(ap_package, ap_class_dict['name'])
110        objs.append(ap_class(config))
111    return objs
112
113
114def destroy(objs):
115    for obj in objs:
116        obj.teardown()
117
118
119class BlockingBrowser(selenium.webdriver.chrome.webdriver.WebDriver):
120    """Class that implements a blocking browser session on top of selenium.
121
122    The class inherits from and builds upon splinter/selenium's webdriver class
123    and makes sure that only one such webdriver is active on a machine at any
124    single time. The class ensures single session operation using a lock file.
125    The class is to be used within context managers (e.g. with statements) to
126    ensure locks are always properly released.
127    """
128
129    def __init__(self, headless, timeout):
130        """Constructor for BlockingBrowser class.
131
132        Args:
133            headless: boolean to control visible/headless browser operation
134            timeout: maximum time allowed to launch browser
135        """
136        if int(selenium.__version__[0]) < 4:
137            raise RuntimeError(
138                'BlockingBrowser now requires selenium==4.0.0 or later. ')
139        self.log = logger.create_tagged_trace_logger('ChromeDriver')
140        self.chrome_options = selenium.webdriver.chrome.webdriver.Options()
141        self.chrome_options.add_argument('--no-proxy-server')
142        self.chrome_options.add_argument('--no-sandbox')
143        self.chrome_options.add_argument('--crash-dumps-dir=/tmp')
144        self.chrome_options.add_argument('--allow-running-insecure-content')
145        self.chrome_options.add_argument('--ignore-certificate-errors')
146        self.chrome_capabilities = selenium.webdriver.common.desired_capabilities.DesiredCapabilities.CHROME.copy(
147        )
148        self.chrome_capabilities['acceptSslCerts'] = True
149        self.chrome_capabilities['acceptInsecureCerts'] = True
150        if headless:
151            self.chrome_options.add_argument('--headless')
152            self.chrome_options.add_argument('--disable-gpu')
153        os.environ['WDM_LOG'] = str(logging.NOTSET)
154        self.executable_path = ChromeDriverManager().install()
155        self.timeout = timeout
156
157    def __enter__(self):
158        """Entry context manager for BlockingBrowser.
159
160        The enter context manager for BlockingBrowser attempts to lock the
161        browser file. If successful, it launches and returns a chromedriver
162        session. If an exception occurs while starting the browser, the lock
163        file is released.
164        """
165        self.lock_file = open(self.executable_path, 'r')
166        start_time = time.time()
167        while time.time() < start_time + self.timeout:
168            try:
169                fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
170            except BlockingIOError:
171                time.sleep(BROWSER_WAIT_SHORT)
172                continue
173            try:
174                self.driver = selenium.webdriver.Chrome(
175                    service=ChromeService(self.executable_path),
176                    options=self.chrome_options,
177                    desired_capabilities=self.chrome_capabilities)
178                self.session_id = self.driver.session_id
179                return self
180            except:
181                fcntl.flock(self.lock_file, fcntl.LOCK_UN)
182                self.lock_file.close()
183                raise RuntimeError('Error starting browser. '
184                                   'Releasing lock file.')
185        raise TimeoutError('Could not start chrome browser in time.')
186
187    def __exit__(self, exc_type, exc_value, traceback):
188        """Exit context manager for BlockingBrowser.
189
190        The exit context manager simply calls the parent class exit and
191        releases the lock file.
192        """
193        try:
194            self.driver.quit()
195        except:
196            raise RuntimeError('Failed to quit browser. Releasing lock file.')
197        finally:
198            fcntl.flock(self.lock_file, fcntl.LOCK_UN)
199            self.lock_file.close()
200
201    def restart(self):
202        """Method to restart browser session without releasing lock file."""
203        self.driver.quit()
204        self.__enter__()
205
206    def visit_persistent(self,
207                         url,
208                         page_load_timeout,
209                         num_tries,
210                         backup_url='about:blank',
211                         check_for_element=None):
212        """Method to visit webpages and retry upon failure.
213
214        The function visits a URL and checks that the resulting URL matches
215        the intended URL, i.e. no redirects have happened
216
217        Args:
218            url: the intended url
219            page_load_timeout: timeout for page visits
220            num_tries: number of tries before url is declared unreachable
221            backup_url: url to visit if first url is not reachable. This can be
222            used to simply refresh the browser and try again or to re-login to
223            the AP
224            check_for_element: element id to check for existence on page
225        """
226        self.driver.set_page_load_timeout(page_load_timeout)
227        for idx in range(num_tries):
228            try:
229                self.driver.get(url)
230            except:
231                self.restart()
232
233            page_reached = self.driver.current_url.split('/')[-1] == url.split(
234                '/')[-1]
235            if page_reached:
236                if check_for_element:
237                    time.sleep(BROWSER_WAIT_MED)
238                    if self.is_element_visible(check_for_element):
239                        break
240                    else:
241                        raise RuntimeError(
242                            'Page reached but expected element not found.')
243                else:
244                    break
245            else:
246                try:
247                    self.driver.get(backup_url)
248                except:
249                    self.restart()
250
251            if idx == num_tries - 1:
252                self.log.error('URL unreachable. Current URL: {}'.format(
253                    self.url))
254                raise RuntimeError('URL unreachable.')
255
256    def get_element_value(self, element_name):
257        """Function to look up and get webpage element value.
258
259        Args:
260            element_name: name of element to look up
261        Returns:
262            Value of element
263        """
264        #element = self.driver.find_element_by_name(element_name)
265        element = self.driver.find_element(By.NAME, element_name)
266        element_type = self.get_element_type(element_name)
267        if element_type == 'checkbox':
268            return element.is_selected()
269        elif element_type == 'radio':
270            items = self.driver.find_elements(By.NAME, element_name)
271            for item in items:
272                if item.is_selected():
273                    return item.get_attribute('value')
274        else:
275            return element.get_attribute('value')
276
277    def get_element_type(self, element_name):
278        """Function to look up and get webpage element type.
279
280        Args:
281            element_name: name of element to look up
282        Returns:
283            Type of element
284        """
285        item = self.driver.find_element(By.NAME, element_name)
286        type = item.get_attribute('type')
287        return type
288
289    def is_element_enabled(self, element_name):
290        """Function to check if element is enabled/interactable.
291
292        Args:
293            element_name: name of element to look up
294        Returns:
295            Boolean indicating if element is interactable
296        """
297        item = self.driver.find_element(By.NAME, element_name)
298        return item.is_enabled()
299
300    def is_element_visible(self, element_name):
301        """Function to check if element is visible.
302
303        Args:
304            element_name: name of element to look up
305        Returns:
306            Boolean indicating if element is visible
307        """
308        item = self.driver.find_element(By.NAME, element_name)
309        return item.is_displayed()
310
311    def set_element_value(self, element_name, value, select_method='value'):
312        """Function to set webpage element value.
313
314        Args:
315            element_name: name of element to set
316            value: value of element
317            select_method: select method for dropdown lists (value/index/text)
318        """
319        element_type = self.get_element_type(element_name)
320        if element_type == 'text' or element_type == 'password':
321            item = self.driver.find_element(By.NAME, element_name)
322            item.clear()
323            item.send_keys(value)
324        elif element_type == 'checkbox':
325            item = self.driver.find_element(By.NAME, element_name)
326            if value != item.is_selected():
327                item.click()
328        elif element_type == 'radio':
329            items = self.driver.find_elements(By.NAME, element_name)
330            for item in items:
331                if item.get_attribute('value') == value:
332                    item.click()
333        elif element_type == 'select-one':
334            select = Select(self.driver.find_element(By.NAME, element_name))
335            if select_method == 'value':
336                select.select_by_value(str(value))
337            elif select_method == 'text':
338                select.select_by_visible_text(value)
339            elif select_method == 'index':
340                select.select_by_index(value)
341            else:
342                raise RuntimeError(
343                    '{} is not a valid select method.'.format(select_method))
344        else:
345            raise RuntimeError(
346                'Element type {} not supported.'.format(element_type))
347
348    def click_button(self, button_name):
349        """Function to click button on webpage
350
351        Args:
352            button_name: name of button to click
353        """
354        button = self.driver.find_element(By.NAME, button_name)
355        if button.get_attribute('type') == 'submit':
356            button.click()
357        else:
358            raise RuntimeError('{} is not a button.'.format(button_name))
359
360    def accept_alert_if_present(self, wait_for_alert=1):
361        """Function to check for alert and accept if present
362
363        Args:
364            wait_for_alert: time (seconds) to wait for alert
365        """
366        try:
367            selenium.webdriver.support.ui.WebDriverWait(
368                self.driver,
369                wait_for_alert).until(expected_conditions.alert_is_present())
370            alert = self.driver.switch_to.alert
371            alert.accept()
372        except selenium.common.exceptions.TimeoutException:
373            pass
374
375
376class WifiRetailAP(object):
377    """Base class implementation for retail ap.
378
379    Base class provides functions whose implementation is shared by all aps.
380    If some functions such as set_power not supported by ap, checks will raise
381    exceptions.
382    """
383
384    def __init__(self, ap_settings):
385        self.ap_settings = ap_settings.copy()
386        self.log = logger.create_tagged_trace_logger('AccessPoint|{}'.format(
387            self._get_control_ip_address()))
388        # Capabilities variable describing AP capabilities
389        self.capabilities = {
390            'interfaces': [],
391            'channels': {},
392            'modes': {},
393            'default_mode': None
394        }
395        for interface in self.capabilities['interfaces']:
396            self.ap_settings.setdefault(interface, {})
397        # Lock AP
398        if self.ap_settings.get('lock_ap', 0):
399            self.lock_timeout = self.ap_settings.get('lock_timeout', 3600)
400            self._lock_ap()
401
402    def teardown(self):
403        """Function to perform destroy operations."""
404        if self.ap_settings.get('lock_ap', 0):
405            self._unlock_ap()
406
407    def reset(self):
408        """Function that resets AP.
409
410        Function implementation is AP dependent and intended to perform any
411        necessary reset operations as part of controller destroy.
412        """
413
414    def read_ap_settings(self):
415        """Function that reads current ap settings.
416
417        Function implementation is AP dependent and thus base class raises exception
418        if function not implemented in child class.
419        """
420        raise NotImplementedError
421
422    def validate_ap_settings(self):
423        """Function to validate ap settings.
424
425        This function compares the actual ap settings read from the web GUI
426        with the assumed settings saved in the AP object. When called after AP
427        configuration, this method helps ensure that our configuration was
428        successful.
429        Note: Calling this function updates the stored ap_settings
430
431        Raises:
432            ValueError: If read AP settings do not match stored settings.
433        """
434        assumed_ap_settings = copy.deepcopy(self.ap_settings)
435        actual_ap_settings = self.read_ap_settings()
436
437        if assumed_ap_settings != actual_ap_settings:
438            self.log.warning(
439                'Discrepancy in AP settings. Some settings may have been overwritten.'
440            )
441
442    def configure_ap(self, **config_flags):
443        """Function that configures ap based on values of ap_settings.
444
445        Function implementation is AP dependent and thus base class raises exception
446        if function not implemented in child class.
447
448        Args:
449            config_flags: optional configuration flags
450        """
451        raise NotImplementedError
452
453    def set_region(self, region):
454        """Function that sets AP region.
455
456        This function sets the region for the AP. Note that this may overwrite
457        channel and bandwidth settings in cases where the new region does not
458        support the current wireless configuration.
459
460        Args:
461            region: string indicating AP region
462        """
463        if region != self.ap_settings['region']:
464            self.log.warning(
465                'Updating region may overwrite wireless settings.')
466        setting_to_update = {'region': region}
467        self.update_ap_settings(setting_to_update)
468
469    def set_radio_on_off(self, network, status):
470        """Function that turns the radio on or off.
471
472        Args:
473            network: string containing network identifier (2G, 5G_1, 5G_2)
474            status: boolean indicating on or off (0: off, 1: on)
475        """
476        setting_to_update = {network: {'status': int(status)}}
477        self.update_ap_settings(setting_to_update)
478
479    def set_ssid(self, network, ssid):
480        """Function that sets network SSID.
481
482        Args:
483            network: string containing network identifier (2G, 5G_1, 5G_2)
484            ssid: string containing ssid
485        """
486        setting_to_update = {network: {'ssid': str(ssid)}}
487        self.update_ap_settings(setting_to_update)
488
489    def set_channel(self, network, channel):
490        """Function that sets network channel.
491
492        Args:
493            network: string containing network identifier (2G, 5G_1, 5G_2)
494            channel: string or int containing channel
495        """
496        if channel not in self.capabilities['channels'][network]:
497            self.log.error('Ch{} is not supported on {} interface.'.format(
498                channel, network))
499        setting_to_update = {network: {'channel': channel}}
500        self.update_ap_settings(setting_to_update)
501
502    def set_bandwidth(self, network, bandwidth):
503        """Function that sets network bandwidth/mode.
504
505        Args:
506            network: string containing network identifier (2G, 5G_1, 5G_2)
507            bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
508        """
509        if 'bw' in bandwidth:
510            bandwidth = bandwidth.replace('bw',
511                                          self.capabilities['default_mode'])
512        elif isinstance(bandwidth, int):
513            bandwidth = str(bandwidth) + self.capabilities['default_mode']
514        if bandwidth not in self.capabilities['modes'][network]:
515            self.log.error('{} mode is not supported on {} interface.'.format(
516                bandwidth, network))
517        setting_to_update = {network: {'bandwidth': bandwidth}}
518        self.update_ap_settings(setting_to_update)
519
520    def set_channel_and_bandwidth(self, network, channel, bandwidth):
521        """Function that sets network bandwidth/mode and channel.
522
523        Args:
524            network: string containing network identifier (2G, 5G_1, 5G_2)
525            channel: string containing desired channel
526            bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
527        """
528        if 'bw' in bandwidth:
529            bandwidth = bandwidth.replace('bw',
530                                          self.capabilities['default_mode'])
531        elif isinstance(bandwidth, int):
532            bandwidth = str(bandwidth) + self.capabilities['default_mode']
533        if bandwidth not in self.capabilities['modes'][network]:
534            self.log.error('{} mode is not supported on {} interface.'.format(
535                bandwidth, network))
536        if channel not in self.capabilities['channels'][network]:
537            self.log.error('Ch{} is not supported on {} interface.'.format(
538                channel, network))
539        setting_to_update = {
540            network: {
541                'bandwidth': bandwidth,
542                'channel': channel
543            }
544        }
545        self.update_ap_settings(setting_to_update)
546
547    def set_power(self, network, power):
548        """Function that sets network transmit power.
549
550        Args:
551            network: string containing network identifier (2G, 5G_1, 5G_2)
552            power: string containing power level, e.g., 25%, 100%
553        """
554        if 'power' not in self.ap_settings[network].keys():
555            self.log.error(
556                'Cannot configure power on {} interface.'.format(network))
557        setting_to_update = {network: {'power': power}}
558        self.update_ap_settings(setting_to_update)
559
560    def set_security(self, network, security_type, *password):
561        """Function that sets network security setting and password.
562
563        Args:
564            network: string containing network identifier (2G, 5G_1, 5G_2)
565            security: string containing security setting, e.g., WPA2-PSK
566            password: optional argument containing password
567        """
568        if (len(password) == 1) and (type(password[0]) == str):
569            setting_to_update = {
570                network: {
571                    'security_type': str(security_type),
572                    'password': str(password[0])
573                }
574            }
575        else:
576            setting_to_update = {
577                network: {
578                    'security_type': str(security_type)
579                }
580            }
581        self.update_ap_settings(setting_to_update)
582
583    def set_rate(self):
584        """Function that configures rate used by AP.
585
586        Function implementation is not supported by most APs and thus base
587        class raises exception if function not implemented in child class.
588        """
589        raise NotImplementedError
590
591    def _update_settings_dict(self,
592                              settings,
593                              updates,
594                              updates_requested=False,
595                              status_toggle_flag=False):
596        new_settings = copy.deepcopy(settings)
597        for key, value in updates.items():
598            if key not in new_settings.keys():
599                raise KeyError('{} is an invalid settings key.'.format(key))
600            elif isinstance(value, collections.abc.Mapping):
601                new_settings[
602                    key], updates_requested, status_toggle_flag = self._update_settings_dict(
603                        new_settings.get(key, {}), value, updates_requested,
604                        status_toggle_flag)
605            elif new_settings[key] != value:
606                new_settings[key] = value
607                updates_requested = True
608                if 'status' in key:
609                    status_toggle_flag = True
610        return new_settings, updates_requested, status_toggle_flag
611
612    def update_ap_settings(self, dict_settings={}, **named_settings):
613        """Function to update settings of existing AP.
614
615        Function copies arguments into ap_settings and calls configure_retail_ap
616        to apply them.
617
618        Args:
619            *dict_settings accepts single dictionary of settings to update
620            **named_settings accepts named settings to update
621            Note: dict and named_settings cannot contain the same settings.
622        """
623        settings_to_update = dict(dict_settings, **named_settings)
624        if len(settings_to_update) != len(dict_settings) + len(named_settings):
625            raise KeyError('The following keys were passed twice: {}'.format(
626                (set(dict_settings.keys()).intersection(
627                    set(named_settings.keys())))))
628
629        self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict(
630            self.ap_settings, settings_to_update)
631
632        if updates_requested:
633            self.configure_ap(status_toggled=status_toggle_flag)
634
635    def band_lookup_by_channel(self, channel):
636        """Function that gives band name by channel number.
637
638        Args:
639            channel: channel number to lookup
640        Returns:
641            band: name of band which this channel belongs to on this ap, False
642            if not supported
643        """
644        for key, value in self.capabilities['channels'].items():
645            if channel in value:
646                return key
647        return False
648
649    def _get_control_ip_address(self):
650        """Function to get AP's Control Interface IP address."""
651        if 'ssh_config' in self.ap_settings.keys():
652            return self.ap_settings['ssh_config']['host']
653        else:
654            return self.ap_settings['ip_address']
655
656    def _lock_ap(self):
657        """Function to lock the ap while tests are running."""
658        self.lock_file_path = '/tmp/{}_{}_{}.lock'.format(
659            self.ap_settings['brand'], self.ap_settings['model'],
660            self._get_control_ip_address())
661        if not os.path.exists(self.lock_file_path):
662            with open(self.lock_file_path, 'w'):
663                pass
664        self.lock_file = open(self.lock_file_path, 'r')
665        start_time = time.time()
666        self.log.info('Trying to acquire AP lock.')
667        while time.time() < start_time + self.lock_timeout:
668            try:
669                fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
670            except BlockingIOError:
671                time.sleep(BROWSER_WAIT_SHORT)
672                continue
673            self.log.info('AP lock acquired.')
674            return
675        raise RuntimeError('Could not lock AP in time.')
676
677    def _unlock_ap(self):
678        """Function to unlock the AP when tests are done."""
679        self.log.info('Releasing AP lock.')
680        if hasattr(self, 'lock_file'):
681            try:
682                fcntl.flock(self.lock_file, fcntl.LOCK_UN)
683                self.lock_file.close()
684                self.log.info('Succussfully released AP lock file.')
685            except:
686                raise RuntimeError('Error occurred while unlocking AP.')
687