#!/usr/bin/python3 # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import pyshark import os class PacketCapture(object): """ Class to manage the packet capture file access from a chaos test. """ def __init__(self, file_name): self._file_name = file_name def get_output(self, display_filter=None, summaries=True, decryption=None): """ Gets the packets from a trace file as Pyshark packet objects for further analysis. @param display_filer: Tshark filter to be used for extracting the relevant packets. @param summaries: Flag to indicate whether to extract only the summaries of packet or not. @param decryption: Decryption key to be used on the trace file. @returns List of pyshark packet objects. """ capture = pyshark.FileCapture(self._file_name, display_filter=display_filter, only_summaries=summaries, decryption_key=decryption, encryption_type='wpa-pwd') capture.load_packets() return capture def get_packet_number(self, index, summary): """ Gets the packet that appears index |index| in the capture file. @param index: Extract this index from the capture file. @param summary: Flag to indicate whether to extract only the summary of the packet or not. @returns pyshark packet object or None. """ display_filter = "frame.number == %d" % index capture = pyshark.FileCapture(self._file_name, display_filter=display_filter, only_summaries=summary) capture.load_packets() if not capture: return None return capture[0] def get_packet_after(self, packet): """ Gets the packet that appears next in the capture file. @param packet: Reference packet -- the packet after this one will be retrieved. @returns pyshark packet object or None. """ return self.get_packet_number(int(packet.number) + 1, summary=False) def count_packets_with_display_filter(self, display_filter): """ Counts the number of packets which match the provided display filter. @param display_filer: Tshark filter to be used for extracting the relevant packets. @returns Number of packets which match the filter. """ output = self.get_output(display_filter=display_filter) return len(output) def count_packets_from(self, mac_addresses): """ Counts the number of packets sent from a given entity using MAC address. @param mac_address: Mac address of the entity. @returns Number of packets which matched the MAC address filter. """ filter = ' or '.join(['wlan.ta==%s' % addr for addr in mac_addresses]) return self.count_packets_with_display_filter(filter) def count_packets_to(self, mac_addresses): """ Counts the number of packets sent to a given entity using MAC address. @param mac_address: Mac address of the entity. @returns Number of packets which matched the MAC address filter. """ filter = ' or '.join(['wlan.ra==%s' % addr for addr in mac_addresses]) return self.count_packets_with_display_filter(filter) def count_packets_from_or_to(self, mac_addresses): """ Counts the number of packets sent to/from a given entity using MAC address. @param mac_address: Mac address of the entity. @returns Number of packets which matched the MAC address filter. """ filter = ' or '.join(['wlan.addr==%s' % addr for addr in mac_addresses]) return self.count_packets_with_display_filter(filter) def count_beacons_from(self, mac_addresses): """ Counts the number of beacon packets sent from a AP using MAC address. @param mac_address: Mac address of the AP. @returns Number of packets which matched the MAC address filter. """ filter = ' or '.join(['wlan.ta==%s' % addr for addr in mac_addresses]) filter = '(%s) and wlan.fc.type_subtype == 0x0008' % (filter) return self.count_packets_with_display_filter(filter) def get_filtered_packets(self, ap, dut, summaries, decryption): """ Gets the packets sent to/from the DUT from a trace file as Pyshark packet objects for further analysis. @param summaries: Flag to indicate whether to extract only the summaries of packet or not. @param dut: Mac address of the DUT. @param ap: Mac address of the AP. @param decryption: Decryption key to be used on the trace file. @returns List of pyshark packet objects. """ filter = 'wlan.addr==%s' % dut packets = self.get_output(display_filter=filter, summaries=summaries, decryption=decryption) return packets class WifiStateMachineAnalyzer(object): """ Class to analyze the Wifi Protocol exhcange from a chaos test. """ STATE_INIT = "INIT" STATE_PROBE_REQ = "PROBE_REQ" STATE_PROBE_RESP = "PROBE_RESP" STATE_AUTH_REQ = "AUTH_REQ" STATE_AUTH_RESP = "AUTH_RESP" STATE_ASSOC_REQ = "ASSOC_REQ" STATE_ASSOC_RESP = "ASSOC_RESP" STATE_KEY_MESSAGE_1 = "KEY_MESSAGE_1" STATE_KEY_MESSAGE_2 = "KEY_MESSAGE_2" STATE_KEY_MESSAGE_3 = "KEY_MESSAGE_3" STATE_KEY_MESSAGE_4 = "KEY_MESSAGE_4" STATE_DHCP_DISCOVER = "DHCP_DISCOVER" STATE_DHCP_OFFER = "DHCP_OFFER" STATE_DHCP_REQ = "DHCP_REQ" STATE_DHCP_REQ_ACK = "DHCP_REQ_ACK" STATE_END = "END" PACKET_MATCH_WLAN_FRAME_TYPE = "wlan.fc_type_subtype" PACKET_MATCH_WLAN_FRAME_RETRY_FLAG = "wlan.fc_retry" PACKET_MATCH_WLAN_MANAGEMENT_REASON_CODE = "wlan_mgt.fixed_reason_code" PACKET_MATCH_WLAN_MANAGEMENT_STATUS_CODE = "wlan_mgt.fixed_status_code" PACKET_MATCH_WLAN_TRANSMITTER = "wlan.ta" PACKET_MATCH_LLC_TYPE = "llc.type" PACKET_MATCH_EAP_TYPE = "eapol.type" PACKET_MATCH_EAP_KEY_INFO_INSTALL = "eapol.keydes_key_info_install" PACKET_MATCH_EAP_KEY_INFO_ACK = "eapol.keydes_key_info_key_ack" PACKET_MATCH_EAP_KEY_INFO_MIC = "eapol.keydes_key_info_key_mic" PACKET_MATCH_EAP_KEY_INFO_SECURE = "eapol.keydes_key_info_secure" PACKET_MATCH_IP_PROTOCOL_TYPE = "ip.proto" PACKET_MATCH_DHCP_MESSAGE_TYPE = "bootp.option_dhcp" PACKET_MATCH_RADIOTAP_DATA_RATE = "radiotap.datarate" WLAN_PROBE_REQ_FRAME_TYPE = '0x04' WLAN_PROBE_RESP_FRAME_TYPE = '0x05' WLAN_AUTH_REQ_FRAME_TYPE = '0x0b' WLAN_AUTH_RESP_FRAME_TYPE = '0x0b' WLAN_ASSOC_REQ_FRAME_TYPE = '0x00' WLAN_ASSOC_RESP_FRAME_TYPE = '0x01' WLAN_ACK_FRAME_TYPE = '0x1d' WLAN_DEAUTH_REQ_FRAME_TYPE = '0x0c' WLAN_DISASSOC_REQ_FRAME_TYPE = '0x0a' WLAN_QOS_DATA_FRAME_TYPE = '0x28' WLAN_MANAGEMENT_STATUS_CODE_SUCCESS = '0x0000' WLAN_BROADCAST_ADDRESS = 'ff:ff:ff:ff:ff:ff' WLAN_FRAME_CONTROL_TYPE_MANAGEMENT = '0' WLAN_FRAME_RETRY = '1' LLC_AUTH_TYPE = '0x888e' EAP_KEY_TYPE = '0x03' IP_UDP_PROTOCOL_TYPE = '17' DHCP_DISCOVER_MESSAGE_TYPE = '1' DHCP_OFFER_MESSAGE_TYPE = '2' DHCP_REQUEST_MESSAGE_TYPE = '3' DHCP_ACK_MESSAGE_TYPE = '5' DIR_TO_DUT = 0 DIR_FROM_DUT = 1 DIR_DUT_TO_AP = 2 DIR_AP_TO_DUT = 3 DIR_ACK = 4 # State Info Tuples (Name, Direction, Match fields, Next State) StateInfo = collections.namedtuple( 'StateInfo', ['name', 'direction', 'match_fields', 'next_state']) STATE_INFO_INIT = StateInfo("INIT", 0, {}, STATE_PROBE_REQ) STATE_INFO_PROBE_REQ = StateInfo("WLAN PROBE REQUEST", DIR_FROM_DUT, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_PROBE_REQ_FRAME_TYPE }, STATE_PROBE_RESP) STATE_INFO_PROBE_RESP = StateInfo("WLAN PROBE RESPONSE", DIR_AP_TO_DUT, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_PROBE_RESP_FRAME_TYPE }, STATE_AUTH_REQ) STATE_INFO_AUTH_REQ = StateInfo("WLAN AUTH REQUEST", DIR_DUT_TO_AP, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_AUTH_REQ_FRAME_TYPE }, STATE_AUTH_RESP) STATE_INFO_AUTH_RESP = StateInfo( "WLAN AUTH RESPONSE", DIR_AP_TO_DUT, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_AUTH_REQ_FRAME_TYPE, PACKET_MATCH_WLAN_MANAGEMENT_STATUS_CODE: WLAN_MANAGEMENT_STATUS_CODE_SUCCESS }, STATE_ASSOC_REQ) STATE_INFO_ASSOC_REQ = StateInfo("WLAN ASSOC REQUEST", DIR_DUT_TO_AP, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_ASSOC_REQ_FRAME_TYPE }, STATE_ASSOC_RESP) STATE_INFO_ASSOC_RESP = StateInfo( "WLAN ASSOC RESPONSE", DIR_AP_TO_DUT, { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_ASSOC_RESP_FRAME_TYPE, PACKET_MATCH_WLAN_MANAGEMENT_STATUS_CODE: WLAN_MANAGEMENT_STATUS_CODE_SUCCESS }, STATE_KEY_MESSAGE_1) STATE_INFO_KEY_MESSAGE_1 = StateInfo("WPA KEY MESSAGE 1", DIR_AP_TO_DUT, { PACKET_MATCH_LLC_TYPE: LLC_AUTH_TYPE, PACKET_MATCH_EAP_KEY_INFO_INSTALL: '0', PACKET_MATCH_EAP_KEY_INFO_ACK: '1', PACKET_MATCH_EAP_KEY_INFO_MIC: '0', PACKET_MATCH_EAP_KEY_INFO_SECURE: '0' }, STATE_KEY_MESSAGE_2) STATE_INFO_KEY_MESSAGE_2 = StateInfo("WPA KEY MESSAGE 2", DIR_DUT_TO_AP, { PACKET_MATCH_LLC_TYPE: LLC_AUTH_TYPE, PACKET_MATCH_EAP_KEY_INFO_INSTALL: '0', PACKET_MATCH_EAP_KEY_INFO_ACK: '0', PACKET_MATCH_EAP_KEY_INFO_MIC: '1', PACKET_MATCH_EAP_KEY_INFO_SECURE: '0' }, STATE_KEY_MESSAGE_3) STATE_INFO_KEY_MESSAGE_3 = StateInfo("WPA KEY MESSAGE 3", DIR_AP_TO_DUT, { PACKET_MATCH_LLC_TYPE: LLC_AUTH_TYPE, PACKET_MATCH_EAP_KEY_INFO_INSTALL: '1', PACKET_MATCH_EAP_KEY_INFO_ACK: '1', PACKET_MATCH_EAP_KEY_INFO_MIC: '1', PACKET_MATCH_EAP_KEY_INFO_SECURE: '1' }, STATE_KEY_MESSAGE_4) STATE_INFO_KEY_MESSAGE_4 = StateInfo("WPA KEY MESSAGE 4", DIR_DUT_TO_AP, { PACKET_MATCH_LLC_TYPE: LLC_AUTH_TYPE, PACKET_MATCH_EAP_KEY_INFO_INSTALL: '0', PACKET_MATCH_EAP_KEY_INFO_ACK: '0', PACKET_MATCH_EAP_KEY_INFO_MIC: '1', PACKET_MATCH_EAP_KEY_INFO_SECURE: '1' }, STATE_DHCP_DISCOVER) STATE_INFO_DHCP_DISCOVER = StateInfo("DHCP DISCOVER", DIR_DUT_TO_AP, { PACKET_MATCH_IP_PROTOCOL_TYPE: IP_UDP_PROTOCOL_TYPE, PACKET_MATCH_DHCP_MESSAGE_TYPE: DHCP_DISCOVER_MESSAGE_TYPE }, STATE_DHCP_OFFER) STATE_INFO_DHCP_OFFER = StateInfo("DHCP OFFER", DIR_AP_TO_DUT, { PACKET_MATCH_IP_PROTOCOL_TYPE: IP_UDP_PROTOCOL_TYPE, PACKET_MATCH_DHCP_MESSAGE_TYPE: DHCP_OFFER_MESSAGE_TYPE }, STATE_DHCP_REQ) STATE_INFO_DHCP_REQ = StateInfo("DHCP REQUEST", DIR_DUT_TO_AP, { PACKET_MATCH_IP_PROTOCOL_TYPE: IP_UDP_PROTOCOL_TYPE, PACKET_MATCH_DHCP_MESSAGE_TYPE: DHCP_REQUEST_MESSAGE_TYPE }, STATE_DHCP_REQ_ACK) STATE_INFO_DHCP_REQ_ACK = StateInfo("DHCP ACK", DIR_AP_TO_DUT, { PACKET_MATCH_IP_PROTOCOL_TYPE: IP_UDP_PROTOCOL_TYPE, PACKET_MATCH_DHCP_MESSAGE_TYPE: DHCP_ACK_MESSAGE_TYPE }, STATE_END) STATE_INFO_END = StateInfo("END", 0, {}, STATE_END) # State Table Map of State Infos STATE_INFO_MAP = {STATE_INIT: STATE_INFO_INIT, STATE_PROBE_REQ: STATE_INFO_PROBE_REQ, STATE_PROBE_RESP: STATE_INFO_PROBE_RESP, STATE_AUTH_REQ: STATE_INFO_AUTH_REQ, STATE_AUTH_RESP: STATE_INFO_AUTH_RESP, STATE_ASSOC_REQ: STATE_INFO_ASSOC_REQ, STATE_ASSOC_RESP: STATE_INFO_ASSOC_RESP, STATE_KEY_MESSAGE_1:STATE_INFO_KEY_MESSAGE_1, STATE_KEY_MESSAGE_2:STATE_INFO_KEY_MESSAGE_2, STATE_KEY_MESSAGE_3:STATE_INFO_KEY_MESSAGE_3, STATE_KEY_MESSAGE_4:STATE_INFO_KEY_MESSAGE_4, STATE_DHCP_DISCOVER:STATE_INFO_DHCP_DISCOVER, STATE_DHCP_OFFER: STATE_INFO_DHCP_OFFER, STATE_DHCP_REQ: STATE_INFO_DHCP_REQ, STATE_DHCP_REQ_ACK: STATE_INFO_DHCP_REQ_ACK, STATE_END: STATE_INFO_END} # Packet Details Tuples (User friendly name, Field name) PacketDetail = collections.namedtuple( "PacketDetail", ["friendly_name", "field_name"]) PACKET_DETAIL_REASON_CODE = PacketDetail( "Reason Code", PACKET_MATCH_WLAN_MANAGEMENT_REASON_CODE) PACKET_DETAIL_STATUS_CODE = PacketDetail( "Status Code", PACKET_MATCH_WLAN_MANAGEMENT_STATUS_CODE) PACKET_DETAIL_SENDER = PacketDetail( "Sender", PACKET_MATCH_WLAN_TRANSMITTER) # Error State Info Tuples (Name, Match fields) ErrorStateInfo = collections.namedtuple( 'ErrorStateInfo', ['name', 'match_fields', 'details']) ERROR_STATE_INFO_DEAUTH = ErrorStateInfo("WLAN DEAUTH REQUEST", { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_DEAUTH_REQ_FRAME_TYPE }, [ PACKET_DETAIL_SENDER, PACKET_DETAIL_REASON_CODE ]) ERROR_STATE_INFO_DEASSOC = ErrorStateInfo("WLAN DISASSOC REQUEST", { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_DISASSOC_REQ_FRAME_TYPE }, [ PACKET_DETAIL_SENDER, PACKET_DETAIL_REASON_CODE ]) # State Table Tuple of Error State Infos ERROR_STATE_INFO_TUPLE = (ERROR_STATE_INFO_DEAUTH, ERROR_STATE_INFO_DEASSOC) # These warnings actually match successful states, but since the we # check forwards and backwards through the state machine for the successful # version of these packets, they can only match a failure. WARNING_INFO_AUTH_REJ = ErrorStateInfo( "WLAN AUTH REJECTED", { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_AUTH_REQ_FRAME_TYPE }, [ PACKET_DETAIL_STATUS_CODE ]) WARNING_INFO_ASSOC_REJ = ErrorStateInfo( "WLAN ASSOC REJECTED", { PACKET_MATCH_WLAN_FRAME_TYPE: WLAN_ASSOC_RESP_FRAME_TYPE }, [ PACKET_DETAIL_STATUS_CODE ]) # Table Tuple of warning information. WARNING_INFO_TUPLE = (WARNING_INFO_AUTH_REJ, WARNING_INFO_ASSOC_REJ) def __init__(self, ap_macs, dut_mac, filtered_packets, capture, logger): self._current_state = self._get_state(self.STATE_INIT) self._reached_states = [] self._skipped_states = [] self._packets = filtered_packets self._capture = capture self._dut_mac = dut_mac self._ap_macs = ap_macs self._log = logger self._acks = [] @property def acks(self): return self._acks def _get_state(self, state): return self.STATE_INFO_MAP[state] def _get_next_state(self, state): return self._get_state(state.next_state) def _get_curr_next_state(self): return self._get_next_state(self._current_state) def _fetch_packet_field_value(self, packet, field): layer_object = packet for layer in field.split('.'): try: layer_object = getattr(layer_object, layer) except AttributeError: return None return layer_object def _match_packet_fields(self, packet, fields): for field, exp_value in fields.items(): value = self._fetch_packet_field_value(packet, field) if exp_value != value: return False return True def _fetch_packet_data_rate(self, packet): return self._fetch_packet_field_value(packet, self.PACKET_MATCH_RADIOTAP_DATA_RATE) def _does_packet_match_state(self, state, packet): fields = state.match_fields if self._match_packet_fields(packet, fields): if state.direction == self.DIR_TO_DUT: # This should have receiver addr of DUT if packet.wlan.ra == self._dut_mac: return True elif state.direction == self.DIR_FROM_DUT: # This should have transmitter addr of DUT if packet.wlan.ta == self._dut_mac: return True elif state.direction == self.DIR_AP_TO_DUT: # This should have receiver addr of DUT & # transmitter addr of AP's if ((packet.wlan.ra == self._dut_mac) and (packet.wlan.ta in self._ap_macs)): return True elif state.direction == self.DIR_DUT_TO_AP: # This should have transmitter addr of DUT & # receiver addr of AP's if ((packet.wlan.ta == self._dut_mac) and (packet.wlan.ra in self._ap_macs)): return True return False def _does_packet_match_error_state(self, state, packet): fields = state.match_fields return self._match_packet_fields(packet, fields) def _get_packet_detail(self, details, packet): attributes = [] attributes.append("Packet number: %s" % packet.number) for detail in details: value = self._fetch_packet_field_value(packet, detail.field_name) attributes.append("%s: %s" % (detail.friendly_name, value)) return attributes def _does_packet_match_ack_state(self, packet): fields = { self.PACKET_MATCH_WLAN_FRAME_TYPE: self.WLAN_ACK_FRAME_TYPE } return self._match_packet_fields(packet, fields) def _does_packet_contain_retry_flag(self, packet): fields = { self.PACKET_MATCH_WLAN_FRAME_RETRY_FLAG: self.WLAN_FRAME_RETRY } return self._match_packet_fields(packet, fields) def _check_for_ack(self, state, packet): if (packet.wlan.da == self.WLAN_BROADCAST_ADDRESS and packet.wlan.fc_type == self.WLAN_FRAME_CONTROL_TYPE_MANAGEMENT): # Broadcast management frames are not ACKed. return True next_packet = self._capture.get_packet_after(packet) if not next_packet or not ( (self._does_packet_match_ack_state(next_packet)) and (next_packet.wlan.addr == packet.wlan.ta)): msg = "WARNING! Missing ACK for state: " + \ state.name + "." self._log.log_to_output_file(msg) return False self._acks.append(int(next_packet.number)) return True def _check_for_error(self, packet): for error_state in self.ERROR_STATE_INFO_TUPLE: if self._does_packet_match_error_state(error_state, packet): error_attributes = self._get_packet_detail(error_state.details, packet) msg = "ERROR! State Machine encountered error due to " + \ error_state.name + ", " + \ ", ".join(error_attributes) + "." self._log.log_to_output_file(msg) return True return False def _check_for_warning(self, packet): for warning in self.WARNING_INFO_TUPLE: if self._does_packet_match_error_state(warning, packet): error_attributes = self._get_packet_detail(warning.details, packet) msg = "WARNING! " + warning.name + " found, " + \ ", ".join(error_attributes) + "." self._log.log_to_output_file(msg) return True return False def _check_for_repeated_state(self, packet): for state in self._reached_states: if self._does_packet_match_state(state, packet): msg = "WARNING! Repeated State: " + \ state.name + ", Packet number: " + \ str(packet.number) if self._does_packet_contain_retry_flag(packet): msg += " due to retransmission." else: msg += "." self._log.log_to_output_file(msg) def _is_from_previous_state(self, packet): for state in self._reached_states + self._skipped_states: if self._does_packet_match_state(state, packet): return True return False def _step(self, reached_state, packet): # We missed a few packets in between if self._current_state != reached_state: msg = "WARNING! Missed states: " skipped_state = self._current_state while skipped_state != reached_state: msg += skipped_state.name + ", " self._skipped_states.append(skipped_state) skipped_state = self._get_next_state(skipped_state) msg = msg[:-2] msg += "." self._log.log_to_output_file(msg) msg = "Found state: " + reached_state.name if packet: msg += ", Packet number: " + str(packet.number) + \ ", Data rate: " + str(self._fetch_packet_data_rate(packet))+\ "Mbps." else: msg += "." self._log.log_to_output_file(msg) # Ignore the Init state in the reached states if packet: self._reached_states.append(reached_state) self._current_state = self._get_next_state(reached_state) def _step_init(self): #self.log_to_output_file("Starting Analysis") self._current_state = self._get_curr_next_state() def analyze(self): """ Starts the analysis of the Wifi Protocol Exchange. """ # Start the state machine iteration self._step_init() packet_iterator = iter(self._packets) for packet in packet_iterator: self._check_for_repeated_state(packet) # Try to look ahead in the state machine to account for occasional # packet capture misses. next_state = self._current_state while next_state != self.STATE_INFO_END: if self._does_packet_match_state(next_state, packet): self._step(next_state, packet) self._check_for_ack(next_state, packet) break next_state = self._get_next_state(next_state) if self._current_state == self.STATE_INFO_END: self._log.log_to_output_file("State Machine completed!") return True if self._check_for_error(packet): return False if not self._is_from_previous_state(packet): self._check_for_warning(packet) msg = "ERROR! State Machine halted at " + self._current_state.name + \ " state." self._log.log_to_output_file(msg) return False class ChaosCaptureAnalyzer(object): """ Class to analyze the packet capture from a chaos test . """ def __init__(self, ap_bssids, ap_ssid, dut_mac, logger): self._ap_bssids = ap_bssids self._ap_ssid = ap_ssid self._dut_mac = dut_mac self._log = logger def _validate_ap_presence(self, capture, bssids, ssid): beacon_count = capture.count_beacons_from(bssids) if not beacon_count: packet_count = capture.count_packets_from(bssids) if not packet_count: self._log.log_to_output_file( "No packets at all from AP BSSIDs %r!" % bssids) else: self._log.log_to_output_file( "No beacons from AP BSSIDs %r but %d packets!" % (bssids, packet_count)) return False self._log.log_to_output_file("AP BSSIDs: %s, SSID: %s." % (bssids, ssid)) self._log.log_to_output_file("AP beacon count: %d." % beacon_count) return True def _validate_dut_presence(self, capture, dut_mac): tx_count = capture.count_packets_from([dut_mac]) if not tx_count: self._log.log_to_output_file( "No packets Tx at all from DUT MAC %r!" % dut_mac) return False rx_count = capture.count_packets_to([dut_mac]) self._log.log_to_output_file("DUT MAC: %s." % dut_mac) self._log.log_to_output_file( "DUT packet count Tx: %d, Rx: %d." % (tx_count, rx_count)) return True def _ack_interleave(self, packets, capture, acks): """Generator that interleaves packets with their associated ACKs.""" for packet in packets: packet_number = int(packet.no) while acks and acks[0] < packet_number: # ACK packet does not appear in the filtered capture. yield capture.get_packet_number(acks.pop(0), summary=True) if acks and acks[0] == packet_number: # ACK packet also appears in the capture. acks.pop(0) yield packet def analyze(self, trace): """ Starts the analysis of the Chaos capture. @param trace: Packet capture file path to analyze. """ basename = os.path.basename(trace) self._log.log_start_section("Packet Capture File: %s" % basename) capture = PacketCapture(trace) bssids = self._ap_bssids ssid = self._ap_ssid if not self._validate_ap_presence(capture, bssids, ssid): return dut_mac = self._dut_mac if not self._validate_dut_presence(capture, dut_mac): return decryption = 'chromeos:%s' % ssid self._log.log_start_section("WLAN Protocol Verification") filtered_packets = capture.get_filtered_packets( bssids, dut_mac, False, decryption) wifi_state_machine = WifiStateMachineAnalyzer( bssids, dut_mac, filtered_packets, capture, self._log) wifi_state_machine.analyze() self._log.log_start_section("Filtered Packet Capture Summary") filtered_packets = capture.get_filtered_packets( bssids, dut_mac, True, decryption) for packet in self._ack_interleave( filtered_packets, capture, wifi_state_machine.acks): self._log.log_to_output_file("%s" % (packet))