• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Controller for Open WRT access point."""
2
3import re
4import time
5from acts import logger
6from acts.controllers.ap_lib import hostapd_constants
7from acts.controllers.openwrt_lib import network_settings
8from acts.controllers.openwrt_lib import wireless_config
9from acts.controllers.openwrt_lib import wireless_settings_applier
10from acts.controllers.utils_lib.ssh import connection
11from acts.controllers.utils_lib.ssh import settings
12import yaml
13
14MOBLY_CONTROLLER_CONFIG_NAME = "OpenWrtAP"
15ACTS_CONTROLLER_REFERENCE_NAME = "access_points"
16OPEN_SECURITY = "none"
17PSK1_SECURITY = 'psk'
18PSK_SECURITY = "psk2"
19WEP_SECURITY = "wep"
20ENT_SECURITY = "wpa2"
21OWE_SECURITY = "owe"
22SAE_SECURITY = "sae"
23ENABLE_RADIO = "0"
24PMF_ENABLED = 2
25WIFI_2G = "wifi2g"
26WIFI_5G = "wifi5g"
27WAIT_TIME = 20
28
29
30def create(configs):
31  """Creates ap controllers from a json config.
32
33  Creates an ap controller from either a list, or a single element. The element
34  can either be just the hostname or a dictionary containing the hostname and
35  username of the AP to connect to over SSH.
36
37  Args:
38    configs: The json configs that represent this controller.
39
40  Returns:
41    AccessPoint object
42
43  Example:
44    Below is the config file entry for OpenWrtAP as a list. A testbed can have
45    1 or more APs to configure. Each AP has a "ssh_config" key to provide SSH
46    login information. OpenWrtAP#__init__() uses this to create SSH object.
47
48      "OpenWrtAP": [
49        {
50          "ssh_config": {
51            "user" : "root",
52            "host" : "192.168.1.1"
53          }
54        },
55        {
56          "ssh_config": {
57            "user" : "root",
58            "host" : "192.168.1.2"
59          }
60        }
61      ]
62  """
63  return [OpenWrtAP(c) for c in configs]
64
65
66def destroy(aps):
67  """Destroys a list of AccessPoints.
68
69  Args:
70    aps: The list of AccessPoints to destroy.
71  """
72  for ap in aps:
73    ap.close()
74    ap.close_ssh()
75
76
77def get_info(aps):
78  """Get information on a list of access points.
79
80  Args:
81    aps: A list of AccessPoints.
82
83  Returns:
84    A list of all aps hostname.
85  """
86  return [ap.ssh_settings.hostname for ap in aps]
87
88
89class OpenWrtAP(object):
90  """An AccessPoint controller.
91
92  Attributes:
93    ssh: The ssh connection to the AP.
94    ssh_settings: The ssh settings being used by the ssh connection.
95    log: Logging object for AccessPoint.
96    wireless_setting: object holding wireless configuration.
97    network_setting: Object for network configuration
98  """
99
100  def __init__(self, config):
101    """Initialize AP."""
102    self.ssh_settings = settings.from_config(config["ssh_config"])
103    self.ssh = connection.SshConnection(self.ssh_settings)
104    self.log = logger.create_logger(
105        lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg))
106    self.wireless_setting = None
107    self.network_setting = network_settings.NetworkSettings(
108        self.ssh, config["ssh_config"]["host"], self.log)
109
110  def configure_ap(self, wifi_configs, channel_2g, channel_5g):
111    """Configure AP with the required settings.
112
113    Each test class inherits WifiBaseTest. Based on the test, we may need to
114    configure PSK, WEP, OPEN, ENT networks on 2G and 5G bands in any
115    combination. We call WifiBaseTest methods get_psk_network(),
116    get_open_network(), get_wep_network() and get_ent_network() to create
117    dictionaries which contains this information. 'wifi_configs' is a list of
118    such dictionaries. Example below configures 2 WiFi networks - 1 PSK 2G and
119    1 Open 5G on one AP. configure_ap() is called from WifiBaseTest to
120    configure the APs.
121
122    wifi_configs = [
123      {
124        '2g': {
125          'SSID': '2g_AkqXWPK4',
126          'security': 'psk2',
127          'password': 'YgYuXqDO9H',
128          'hiddenSSID': False
129        },
130      },
131      {
132        '5g': {
133          'SSID': '5g_8IcMR1Sg',
134          'security': 'none',
135          'hiddenSSID': False
136        },
137      }
138    ]
139
140    Args:
141      wifi_configs: list of network settings for 2G and 5G bands.
142      channel_2g: channel for 2G band.
143      channel_5g: channel for 5G band.
144    """
145    # generate wifi configs to configure
146    wireless_configs = self.generate_wireless_configs(wifi_configs)
147    self.wireless_setting = wireless_settings_applier.WirelessSettingsApplier(
148        self.ssh, wireless_configs, channel_2g, channel_5g)
149    self.wireless_setting.apply_wireless_settings()
150
151  def start_ap(self):
152    """Starts the AP with the settings in /etc/config/wireless."""
153    self.ssh.run("wifi up")
154    curr_time = time.time()
155    while time.time() < curr_time + WAIT_TIME:
156      if self.get_wifi_status():
157        return
158      time.sleep(3)
159    if not self.get_wifi_status():
160      raise ValueError("Failed to turn on WiFi on the AP.")
161
162  def stop_ap(self):
163    """Stops the AP."""
164    self.ssh.run("wifi down")
165    curr_time = time.time()
166    while time.time() < curr_time + WAIT_TIME:
167      if not self.get_wifi_status():
168        return
169      time.sleep(3)
170    if self.get_wifi_status():
171      raise ValueError("Failed to turn off WiFi on the AP.")
172
173  def get_bssids_for_wifi_networks(self):
174    """Get BSSIDs for wifi networks configured.
175
176    Returns:
177      Dictionary of SSID - BSSID map for both bands.
178    """
179    bssid_map = {"2g": {}, "5g": {}}
180    for radio in ["radio0", "radio1"]:
181      ssid_ifname_map = self.get_ifnames_for_ssids(radio)
182      if radio == "radio0":
183        for ssid, ifname in ssid_ifname_map.items():
184          bssid_map["5g"][ssid] = self.get_bssid(ifname)
185      elif radio == "radio1":
186        for ssid, ifname in ssid_ifname_map.items():
187          bssid_map["2g"][ssid] = self.get_bssid(ifname)
188    return bssid_map
189
190  def get_wifi_status(self):
191    """Check if radios are up for both 2G and 5G bands.
192
193    Returns:
194      True if both radios are up. False if not.
195    """
196    radios = ["radio0", "radio1"]
197    status = True
198    for radio in radios:
199      str_output = self.ssh.run("wifi status %s" % radio).stdout
200      wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
201                              Loader=yaml.FullLoader)
202      status = wifi_status[radio]["up"] and status
203    return status
204
205  def get_ifnames_for_ssids(self, radio):
206    """Get interfaces for wifi networks.
207
208    Args:
209      radio: 2g or 5g radio get the bssids from.
210
211    Returns:
212      dictionary of ssid - ifname mappings.
213    """
214    ssid_ifname_map = {}
215    str_output = self.ssh.run("wifi status %s" % radio).stdout
216    wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
217                            Loader=yaml.FullLoader)
218    wifi_status = wifi_status[radio]
219    if wifi_status["up"]:
220      interfaces = wifi_status["interfaces"]
221      for config in interfaces:
222        ssid = config["config"]["ssid"]
223        ifname = config["ifname"]
224        ssid_ifname_map[ssid] = ifname
225    return ssid_ifname_map
226
227  def get_bssid(self, ifname):
228    """Get MAC address from an interface.
229
230    Args:
231      ifname: interface name of the corresponding MAC.
232
233    Returns:
234      BSSID of the interface.
235    """
236    ifconfig = self.ssh.run("ifconfig %s" % ifname).stdout
237    mac_addr = ifconfig.split("\n")[0].split()[-1]
238    return mac_addr
239
240  def set_wpa_encryption(self, encryption):
241    """Set different encryptions to wpa or wpa2.
242
243    Args:
244      encryption: ccmp, tkip, or ccmp+tkip.
245    """
246    str_output = self.ssh.run("wifi status").stdout
247    wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
248                            Loader=yaml.FullLoader)
249
250    # Counting how many interface are enabled.
251    total_interface = 0
252    for radio in ["radio0", "radio1"]:
253      num_interface = len(wifi_status[radio]['interfaces'])
254      total_interface += num_interface
255
256    # Iterates every interface to get and set wpa encryption.
257    default_extra_interface = 2
258    for i in range(total_interface + default_extra_interface):
259      origin_encryption = self.ssh.run(
260          'uci get wireless.@wifi-iface[{}].encryption'.format(i)).stdout
261      origin_psk_pattern = re.match(r'psk\b', origin_encryption)
262      target_psk_pattern = re.match(r'psk\b', encryption)
263      origin_psk2_pattern = re.match(r'psk2\b', origin_encryption)
264      target_psk2_pattern = re.match(r'psk2\b', encryption)
265
266      if origin_psk_pattern == target_psk_pattern:
267        self.ssh.run(
268            'uci set wireless.@wifi-iface[{}].encryption={}'.format(
269                i, encryption))
270
271      if origin_psk2_pattern == target_psk2_pattern:
272        self.ssh.run(
273            'uci set wireless.@wifi-iface[{}].encryption={}'.format(
274                i, encryption))
275
276    self.ssh.run("uci commit wireless")
277    self.ssh.run("wifi")
278
279  def set_password(self, pwd_5g=None, pwd_2g=None):
280    """Set password for individual interface.
281
282    Args:
283        pwd_5g: 8 ~ 63 chars, ascii letters and digits password for 5g network.
284        pwd_2g: 8 ~ 63 chars, ascii letters and digits password for 2g network.
285    """
286    if pwd_5g:
287      if len(pwd_5g) < 8 or len(pwd_5g) > 63:
288        self.log.error("Password must be 8~63 characters long")
289      # Only accept ascii letters and digits
290      elif not re.match("^[A-Za-z0-9]*$", pwd_5g):
291        self.log.error("Password must only contains ascii letters and digits")
292      else:
293        self.ssh.run(
294            'uci set wireless.@wifi-iface[{}].key={}'.format(3, pwd_5g))
295        self.log.info("Set 5G password to :{}".format(pwd_2g))
296
297    if pwd_2g:
298      if len(pwd_2g) < 8 or len(pwd_2g) > 63:
299        self.log.error("Password must be 8~63 characters long")
300      # Only accept ascii letters and digits
301      elif not re.match("^[A-Za-z0-9]*$", pwd_2g):
302        self.log.error("Password must only contains ascii letters and digits")
303      else:
304        self.ssh.run(
305            'uci set wireless.@wifi-iface[{}].key={}'.format(2, pwd_2g))
306        self.log.info("Set 2G password to :{}".format(pwd_2g))
307
308    self.ssh.run("uci commit wireless")
309    self.ssh.run("wifi")
310
311  def generate_wireless_configs(self, wifi_configs):
312    """Generate wireless configs to configure.
313
314    Converts wifi_configs from configure_ap() to a list of 'WirelessConfig'
315    objects. Each object represents a wifi network to configure on the AP.
316
317    Args:
318      wifi_configs: Network list of different security types and bands.
319
320    Returns:
321      wireless configuration for openwrt AP.
322    """
323    num_2g = 1
324    num_5g = 1
325    wireless_configs = []
326
327    for i in range(len(wifi_configs)):
328      if hostapd_constants.BAND_2G in wifi_configs[i]:
329        config = wifi_configs[i][hostapd_constants.BAND_2G]
330        if config["security"] == PSK_SECURITY:
331          wireless_configs.append(
332              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
333                                             config["SSID"],
334                                             config["security"],
335                                             hostapd_constants.BAND_2G,
336                                             password=config["password"],
337                                             hidden=config["hiddenSSID"]))
338        elif config["security"] == PSK1_SECURITY:
339          wireless_configs.append(
340              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
341                                             config["SSID"],
342                                             config["security"],
343                                             hostapd_constants.BAND_2G,
344                                             password=config["password"],
345                                             hidden=config["hiddenSSID"]))
346        elif config["security"] == WEP_SECURITY:
347          wireless_configs.append(
348              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
349                                             config["SSID"],
350                                             config["security"],
351                                             hostapd_constants.BAND_2G,
352                                             wep_key=config["wepKeys"][0],
353                                             hidden=config["hiddenSSID"]))
354        elif config["security"] == OPEN_SECURITY:
355          wireless_configs.append(
356              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
357                                             config["SSID"],
358                                             config["security"],
359                                             hostapd_constants.BAND_2G,
360                                             hidden=config["hiddenSSID"]))
361        elif config["security"] == OWE_SECURITY:
362          wireless_configs.append(
363              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
364                                             config["SSID"],
365                                             config["security"],
366                                             hostapd_constants.BAND_2G,
367                                             hidden=config["hiddenSSID"],
368                                             ieee80211w=PMF_ENABLED))
369        elif config["security"] == SAE_SECURITY:
370          wireless_configs.append(
371              wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g),
372                                             config["SSID"],
373                                             config["security"],
374                                             hostapd_constants.BAND_2G,
375                                             password=config["password"],
376                                             hidden=config["hiddenSSID"],
377                                             ieee80211w=PMF_ENABLED))
378        elif config["security"] == ENT_SECURITY:
379          wireless_configs.append(
380              wireless_config.WirelessConfig(
381                  "%s%s" % (WIFI_2G, num_2g),
382                  config["SSID"],
383                  config["security"],
384                  hostapd_constants.BAND_2G,
385                  radius_server_ip=config["radius_server_ip"],
386                  radius_server_port=config["radius_server_port"],
387                  radius_server_secret=config["radius_server_secret"],
388                  hidden=config["hiddenSSID"]))
389        num_2g += 1
390      if hostapd_constants.BAND_5G in wifi_configs[i]:
391        config = wifi_configs[i][hostapd_constants.BAND_5G]
392        if config["security"] == PSK_SECURITY:
393          wireless_configs.append(
394              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
395                                             config["SSID"],
396                                             config["security"],
397                                             hostapd_constants.BAND_5G,
398                                             password=config["password"],
399                                             hidden=config["hiddenSSID"]))
400        elif config["security"] == PSK1_SECURITY:
401          wireless_configs.append(
402              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
403                                             config["SSID"],
404                                             config["security"],
405                                             hostapd_constants.BAND_5G,
406                                             password=config["password"],
407                                             hidden=config["hiddenSSID"]))
408        elif config["security"] == WEP_SECURITY:
409          wireless_configs.append(
410              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
411                                             config["SSID"],
412                                             config["security"],
413                                             hostapd_constants.BAND_5G,
414                                             wep_key=config["wepKeys"][0],
415                                             hidden=config["hiddenSSID"]))
416        elif config["security"] == OPEN_SECURITY:
417          wireless_configs.append(
418              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
419                                             config["SSID"],
420                                             config["security"],
421                                             hostapd_constants.BAND_5G,
422                                             hidden=config["hiddenSSID"]))
423        elif config["security"] == OWE_SECURITY:
424          wireless_configs.append(
425              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
426                                             config["SSID"],
427                                             config["security"],
428                                             hostapd_constants.BAND_5G,
429                                             hidden=config["hiddenSSID"],
430                                             ieee80211w=PMF_ENABLED))
431        elif config["security"] == SAE_SECURITY:
432          wireless_configs.append(
433              wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g),
434                                             config["SSID"],
435                                             config["security"],
436                                             hostapd_constants.BAND_5G,
437                                             password=config["password"],
438                                             hidden=config["hiddenSSID"],
439                                             ieee80211w=PMF_ENABLED))
440        elif config["security"] == ENT_SECURITY:
441          wireless_configs.append(
442              wireless_config.WirelessConfig(
443                  "%s%s" % (WIFI_5G, num_5g),
444                  config["SSID"],
445                  config["security"],
446                  hostapd_constants.BAND_5G,
447                  radius_server_ip=config["radius_server_ip"],
448                  radius_server_port=config["radius_server_port"],
449                  radius_server_secret=config["radius_server_secret"],
450                  hidden=config["hiddenSSID"]))
451        num_5g += 1
452
453    return wireless_configs
454
455  def get_wifi_network(self, security=None, band=None):
456    """Return first match wifi interface's config.
457
458    Args:
459      security: psk2 or none
460      band: '2g' or '5g'
461
462    Returns:
463      A dict contains match wifi interface's config.
464    """
465
466    for wifi_iface in self.wireless_setting.wireless_configs:
467      match_list = []
468      wifi_network = wifi_iface.__dict__
469      if security:
470        match_list.append(security == wifi_network["security"])
471      if band:
472        match_list.append(band == wifi_network["band"])
473
474      if all(match_list):
475        wifi_network["SSID"] = wifi_network["ssid"]
476        if not wifi_network["password"]:
477          del wifi_network["password"]
478        return wifi_network
479    return None
480
481  def close(self):
482    """Reset wireless and network settings to default and stop AP."""
483    if self.network_setting.config:
484      self.network_setting.cleanup_network_settings()
485    if self.wireless_setting:
486      self.wireless_setting.cleanup_wireless_settings()
487
488  def close_ssh(self):
489    """Close SSH connection to AP."""
490    self.ssh.close()
491