• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import struct
19import click
20
21from bumble.colors import color
22from bumble import hci
23from bumble.transport.common import PacketReader
24from bumble.helpers import PacketTracer
25
26
27# -----------------------------------------------------------------------------
28class SnoopPacketReader:
29    '''
30    Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not
31    exactly the same...)
32    '''
33
34    DATALINK_H1 = 1001
35    DATALINK_H4 = 1002
36    DATALINK_BSCP = 1003
37    DATALINK_H5 = 1004
38
39    def __init__(self, source):
40        self.source = source
41
42        # Read the header
43        identification_pattern = source.read(8)
44        if identification_pattern.hex().lower() != '6274736e6f6f7000':
45            raise ValueError(
46                'not a valid snoop file, unexpected identification pattern'
47            )
48        (self.version_number, self.data_link_type) = struct.unpack(
49            '>II', source.read(8)
50        )
51        if self.data_link_type not in (self.DATALINK_H4, self.DATALINK_H1):
52            raise ValueError(f'datalink type {self.data_link_type} not supported')
53
54    def next_packet(self):
55        # Read the record header
56        header = self.source.read(24)
57        if len(header) < 24:
58            return (0, None)
59        (
60            original_length,
61            included_length,
62            packet_flags,
63            _cumulative_drops,
64            _timestamp_seconds,
65            _timestamp_microsecond,
66        ) = struct.unpack('>IIIIII', header)
67
68        # Abort on truncated packets
69        if original_length != included_length:
70            return (0, None)
71
72        if self.data_link_type == self.DATALINK_H1:
73            # The packet is un-encapsulated, look at the flags to figure out its type
74            if packet_flags & 1:
75                # Controller -> Host
76                if packet_flags & 2:
77                    packet_type = hci.HCI_EVENT_PACKET
78                else:
79                    packet_type = hci.HCI_ACL_DATA_PACKET
80            else:
81                # Host -> Controller
82                if packet_flags & 2:
83                    packet_type = hci.HCI_COMMAND_PACKET
84                else:
85                    packet_type = hci.HCI_ACL_DATA_PACKET
86
87            return (
88                packet_flags & 1,
89                bytes([packet_type]) + self.source.read(included_length),
90            )
91
92        return (packet_flags & 1, self.source.read(included_length))
93
94
95# -----------------------------------------------------------------------------
96# Main
97# -----------------------------------------------------------------------------
98@click.command()
99@click.option(
100    '--format',
101    type=click.Choice(['h4', 'snoop']),
102    default='h4',
103    help='Format of the input file',
104)
105@click.argument('filename')
106# pylint: disable=redefined-builtin
107def main(format, filename):
108    input = open(filename, 'rb')
109    if format == 'h4':
110        packet_reader = PacketReader(input)
111
112        def read_next_packet():
113            return (0, packet_reader.next_packet())
114
115    else:
116        packet_reader = SnoopPacketReader(input)
117        read_next_packet = packet_reader.next_packet
118
119    tracer = PacketTracer(emit_message=print)
120
121    while True:
122        try:
123            (direction, packet) = read_next_packet()
124            if packet is None:
125                break
126            tracer.trace(hci.HCI_Packet.from_bytes(packet), direction)
127
128        except Exception as error:
129            print(color(f'!!! {error}', 'red'))
130
131
132# -----------------------------------------------------------------------------
133if __name__ == '__main__':
134    main()  # pylint: disable=no-value-for-parameter
135