1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import struct 19import click 20 21from bumble.colors import color 22from bumble import hci 23from bumble.transport.common import PacketReader 24from bumble.helpers import PacketTracer 25 26 27# ----------------------------------------------------------------------------- 28class SnoopPacketReader: 29 ''' 30 Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not 31 exactly the same...) 32 ''' 33 34 DATALINK_H1 = 1001 35 DATALINK_H4 = 1002 36 DATALINK_BSCP = 1003 37 DATALINK_H5 = 1004 38 39 def __init__(self, source): 40 self.source = source 41 42 # Read the header 43 identification_pattern = source.read(8) 44 if identification_pattern.hex().lower() != '6274736e6f6f7000': 45 raise ValueError( 46 'not a valid snoop file, unexpected identification pattern' 47 ) 48 (self.version_number, self.data_link_type) = struct.unpack( 49 '>II', source.read(8) 50 ) 51 if self.data_link_type not in (self.DATALINK_H4, self.DATALINK_H1): 52 raise ValueError(f'datalink type {self.data_link_type} not supported') 53 54 def next_packet(self): 55 # Read the record header 56 header = self.source.read(24) 57 if len(header) < 24: 58 return (0, None) 59 ( 60 original_length, 61 included_length, 62 packet_flags, 63 _cumulative_drops, 64 _timestamp_seconds, 65 _timestamp_microsecond, 66 ) = struct.unpack('>IIIIII', header) 67 68 # Abort on truncated packets 69 if original_length != included_length: 70 return (0, None) 71 72 if self.data_link_type == self.DATALINK_H1: 73 # The packet is un-encapsulated, look at the flags to figure out its type 74 if packet_flags & 1: 75 # Controller -> Host 76 if packet_flags & 2: 77 packet_type = hci.HCI_EVENT_PACKET 78 else: 79 packet_type = hci.HCI_ACL_DATA_PACKET 80 else: 81 # Host -> Controller 82 if packet_flags & 2: 83 packet_type = hci.HCI_COMMAND_PACKET 84 else: 85 packet_type = hci.HCI_ACL_DATA_PACKET 86 87 return ( 88 packet_flags & 1, 89 bytes([packet_type]) + self.source.read(included_length), 90 ) 91 92 return (packet_flags & 1, self.source.read(included_length)) 93 94 95# ----------------------------------------------------------------------------- 96# Main 97# ----------------------------------------------------------------------------- 98@click.command() 99@click.option( 100 '--format', 101 type=click.Choice(['h4', 'snoop']), 102 default='h4', 103 help='Format of the input file', 104) 105@click.argument('filename') 106# pylint: disable=redefined-builtin 107def main(format, filename): 108 input = open(filename, 'rb') 109 if format == 'h4': 110 packet_reader = PacketReader(input) 111 112 def read_next_packet(): 113 return (0, packet_reader.next_packet()) 114 115 else: 116 packet_reader = SnoopPacketReader(input) 117 read_next_packet = packet_reader.next_packet 118 119 tracer = PacketTracer(emit_message=print) 120 121 while True: 122 try: 123 (direction, packet) = read_next_packet() 124 if packet is None: 125 break 126 tracer.trace(hci.HCI_Packet.from_bytes(packet), direction) 127 128 except Exception as error: 129 print(color(f'!!! {error}', 'red')) 130 131 132# ----------------------------------------------------------------------------- 133if __name__ == '__main__': 134 main() # pylint: disable=no-value-for-parameter 135