#!/usr/bin/env python3 # # Copyright 2018 - Google, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from acts import logger from acts.controllers.ap_lib.hostapd_constants import AP_DEFAULT_CHANNEL_2G from acts.controllers.ap_lib.hostapd_constants import AP_DEFAULT_CHANNEL_5G from acts.controllers.ap_lib.hostapd_constants import CHANNEL_MAP from acts.controllers.ap_lib.hostapd_constants import FREQUENCY_MAP from acts.controllers.ap_lib.hostapd_constants import CENTER_CHANNEL_MAP from acts.controllers.ap_lib.hostapd_constants import VHT_CHANNEL from acts.controllers.utils_lib.ssh import connection from acts.controllers.utils_lib.ssh import formatter from acts.controllers.utils_lib.ssh import settings from acts.libs.logging import log_stream from acts.libs.proc.process import Process from acts import asserts import os import threading import time MOBLY_CONTROLLER_CONFIG_NAME = 'PacketCapture' ACTS_CONTROLLER_REFERENCE_NAME = 'packet_capture' BSS = 'BSS' BSSID = 'BSSID' FREQ = 'freq' FREQUENCY = 'frequency' LEVEL = 'level' MON_2G = 'mon0' MON_5G = 'mon1' BAND_IFACE = {'2G': MON_2G, '5G': MON_5G} SCAN_IFACE = 'wlan2' SCAN_TIMEOUT = 60 SEP = ':' SIGNAL = 'signal' SSID = 'SSID' def create(configs): return [PacketCapture(c) for c in configs] def destroy(pcaps): for pcap in pcaps: pcap.close() def get_info(pcaps): return [pcap.ssh_settings.hostname for pcap in pcaps] class PcapProperties(object): """Class to maintain packet capture properties after starting tcpdump. Attributes: proc: Process object of tcpdump pcap_fname: File name of the tcpdump output file pcap_file: File object for the tcpdump output file """ def __init__(self, proc, pcap_fname, pcap_file): """Initialize object.""" self.proc = proc self.pcap_fname = pcap_fname self.pcap_file = pcap_file class PacketCaptureError(Exception): """Error related to Packet capture.""" class PacketCapture(object): """Class representing packet capturer. An instance of this class creates and configures two interfaces for monitor mode; 'mon0' for 2G and 'mon1' for 5G and one interface for scanning for wifi networks; 'wlan2' which is a dual band interface. Attributes: pcap_properties: dict that specifies packet capture properties for a band. """ def __init__(self, configs): """Initialize objects. Args: configs: config for the packet capture. """ self.ssh_settings = settings.from_config(configs['ssh_config']) self.ssh = connection.SshConnection(self.ssh_settings) self.log = logger.create_logger(lambda msg: '[%s|%s] %s' % ( MOBLY_CONTROLLER_CONFIG_NAME, self.ssh_settings.hostname, msg)) self._create_interface(MON_2G, 'monitor') self._create_interface(MON_5G, 'monitor') self.managed_mode = True result = self.ssh.run('ifconfig -a', ignore_status=True) if result.stderr or SCAN_IFACE not in result.stdout: self.managed_mode = False if self.managed_mode: self._create_interface(SCAN_IFACE, 'managed') self.pcap_properties = dict() self._pcap_stop_lock = threading.Lock() def _create_interface(self, iface, mode): """Create interface of monitor/managed mode. Create mon0/mon1 for 2G/5G monitor mode and wlan2 for managed mode. """ if mode == 'monitor': self.ssh.run('ifconfig wlan%s down' % iface[-1], ignore_status=True) self.ssh.run('iw dev %s del' % iface, ignore_status=True) self.ssh.run('iw phy%s interface add %s type %s' % (iface[-1], iface, mode), ignore_status=True) self.ssh.run('ip link set %s up' % iface, ignore_status=True) result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) if result.stderr or iface not in result.stdout: raise PacketCaptureError('Failed to configure interface %s' % iface) def _cleanup_interface(self, iface): """Clean up monitor mode interfaces.""" self.ssh.run('iw dev %s del' % iface, ignore_status=True) result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) if not result.stderr or 'No such device' not in result.stderr: raise PacketCaptureError('Failed to cleanup monitor mode for %s' % iface) def _parse_scan_results(self, scan_result): """Parses the scan dump output and returns list of dictionaries. Args: scan_result: scan dump output from scan on mon interface. Returns: Dictionary of found network in the scan. The attributes returned are a.) SSID - SSID of the network. b.) LEVEL - signal level. c.) FREQUENCY - WiFi band the network is on. d.) BSSID - BSSID of the network. """ scan_networks = [] network = {} for line in scan_result.splitlines(): if SEP not in line: continue if BSS in line: network[BSSID] = line.split('(')[0].split()[-1] field, value = line.lstrip().rstrip().split(SEP)[0:2] value = value.lstrip() if SIGNAL in line: network[LEVEL] = int(float(value.split()[0])) elif FREQ in line: network[FREQUENCY] = int(value) elif SSID in line: network[SSID] = value scan_networks.append(network) network = {} return scan_networks def get_wifi_scan_results(self): """Starts a wifi scan on wlan2 interface. Returns: List of dictionaries each representing a found network. """ if not self.managed_mode: raise PacketCaptureError('Managed mode not setup') result = self.ssh.run('iw dev %s scan' % SCAN_IFACE) if result.stderr: raise PacketCaptureError('Failed to get scan dump') if not result.stdout: return [] return self._parse_scan_results(result.stdout) def start_scan_and_find_network(self, ssid): """Start a wifi scan on wlan2 interface and find network. Args: ssid: SSID of the network. Returns: True/False if the network if found or not. """ curr_time = time.time() while time.time() < curr_time + SCAN_TIMEOUT: found_networks = self.get_wifi_scan_results() for network in found_networks: if network[SSID] == ssid: return True time.sleep(3) # sleep before next scan return False def configure_monitor_mode(self, band, channel, bandwidth=20): """Configure monitor mode. Args: band: band to configure monitor mode for. channel: channel to set for the interface. bandwidth : bandwidth for VHT channel as 40,80,160 Returns: True if configure successful. False if not successful. """ band = band.upper() if band not in BAND_IFACE: self.log.error('Invalid band. Must be 2g/2G or 5g/5G') return False iface = BAND_IFACE[band] if bandwidth == 20: self.ssh.run('iw dev %s set channel %s' % (iface, channel), ignore_status=True) else: center_freq = None for i, j in CENTER_CHANNEL_MAP[VHT_CHANNEL[bandwidth]]["channels"]: if channel in range(i, j + 1): center_freq = (FREQUENCY_MAP[i] + FREQUENCY_MAP[j]) / 2 break asserts.assert_true(center_freq, "No match channel in VHT channel list.") self.ssh.run( 'iw dev %s set freq %s %s %s' % (iface, FREQUENCY_MAP[channel], bandwidth, center_freq), ignore_status=True) result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) if result.stderr or 'channel %s' % channel not in result.stdout: self.log.error("Failed to configure monitor mode for %s" % band) return False return True def start_packet_capture(self, band, log_path, pcap_fname): """Start packet capture for band. band = 2G starts tcpdump on 'mon0' interface. band = 5G starts tcpdump on 'mon1' interface. Args: band: '2g' or '2G' and '5g' or '5G'. log_path: test log path to save the pcap file. pcap_fname: name of the pcap file. Returns: pcap_proc: Process object of the tcpdump. """ band = band.upper() if band not in BAND_IFACE.keys() or band in self.pcap_properties: self.log.error("Invalid band or packet capture already running") return None pcap_name = '%s_%s.pcap' % (pcap_fname, band) pcap_fname = os.path.join(log_path, pcap_name) pcap_file = open(pcap_fname, 'w+b') tcpdump_cmd = 'tcpdump -i %s -w - -U 2>/dev/null' % (BAND_IFACE[band]) cmd = formatter.SshFormatter().format_command(tcpdump_cmd, None, self.ssh_settings, extra_flags={'-q': None}) pcap_proc = Process(cmd) pcap_proc.set_on_output_callback(lambda msg: pcap_file.write(msg), binary=True) pcap_proc.start() self.pcap_properties[band] = PcapProperties(pcap_proc, pcap_fname, pcap_file) return pcap_proc def stop_packet_capture(self, proc): """Stop the packet capture. Args: proc: Process object of tcpdump to kill. """ for key, val in self.pcap_properties.items(): if val.proc is proc: break else: self.log.error("Failed to stop tcpdump. Invalid process.") return proc.stop() with self._pcap_stop_lock: self.pcap_properties[key].pcap_file.close() del self.pcap_properties[key] def close(self): """Cleanup. Cleans up all the monitor mode interfaces and closes ssh connections. """ self._cleanup_interface(MON_2G) self._cleanup_interface(MON_5G) self.ssh.close()