• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from contextlib import contextmanager
19from enum import IntEnum
20import logging
21import struct
22import datetime
23from typing import BinaryIO, Generator
24import os
25
26from bumble.hci import HCI_COMMAND_PACKET, HCI_EVENT_PACKET
27
28
29# -----------------------------------------------------------------------------
30# Logging
31# -----------------------------------------------------------------------------
32logger = logging.getLogger(__name__)
33
34
35# -----------------------------------------------------------------------------
36# Classes
37# -----------------------------------------------------------------------------
38class Snooper:
39    """
40    Base class for snooper implementations.
41
42    A snooper is an object that will be provided with HCI packets as they are
43    exchanged between a host and a controller.
44    """
45
46    class Direction(IntEnum):
47        HOST_TO_CONTROLLER = 0
48        CONTROLLER_TO_HOST = 1
49
50    class DataLinkType(IntEnum):
51        H1 = 1001
52        H4 = 1002
53        HCI_BSCP = 1003
54        H5 = 1004
55
56    def snoop(self, hci_packet: bytes, direction: Direction) -> None:
57        """Snoop on an HCI packet."""
58
59
60# -----------------------------------------------------------------------------
61class BtSnooper(Snooper):
62    """
63    Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761.
64    """
65
66    IDENTIFICATION_PATTERN = b'btsnoop\0'
67    TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
68    TIMESTAMP_DELTA = 0x00E03AB44A676000
69    ONE_MS = datetime.timedelta(microseconds=1)
70
71    def __init__(self, output: BinaryIO):
72        self.output = output
73
74        # Write the header
75        self.output.write(
76            self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4)
77        )
78
79    def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None:
80        flags = int(direction)
81        packet_type = hci_packet[0]
82        if packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET):
83            flags |= 0x10
84
85        # Compute the current timestamp
86        timestamp = (
87            int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS)
88            + self.TIMESTAMP_DELTA
89        )
90
91        # Emit the record
92        self.output.write(
93            struct.pack(
94                '>IIIIQ',
95                len(hci_packet),  # Original Length
96                len(hci_packet),  # Included Length
97                flags,  # Packet Flags
98                0,  # Cumulative Drops
99                timestamp,  # Timestamp
100            )
101            + hci_packet
102        )
103
104
105# -----------------------------------------------------------------------------
106_SNOOPER_INSTANCE_COUNT = 0
107
108
109@contextmanager
110def create_snooper(spec: str) -> Generator[Snooper, None, None]:
111    """
112    Create a snooper given a specification string.
113
114    The general syntax for the specification string is:
115      <snooper-type>:<type-specific-arguments>
116
117    Supported snooper types are:
118
119      btsnoop
120        The syntax for the type-specific arguments for this type is:
121        <io-type>:<io-type-specific-arguments>
122
123        Supported I/O types are:
124
125        file
126          The type-specific arguments for this I/O type is a string that is converted
127          to a file path using the python `str.format()` string formatting. The log
128          records will be written to that file if it can be opened/created.
129          The keyword args that may be referenced by the string pattern are:
130            now: the value of `datetime.now()`
131            utcnow: the value of `datetime.utcnow()`
132            pid: the current process ID.
133            instance: the instance ID in the current process.
134
135    Examples:
136      btsnoop:file:my_btsnoop.log
137      btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
138
139    """
140    if ':' not in spec:
141        raise ValueError('snooper type prefix missing')
142
143    snooper_type, snooper_args = spec.split(':', maxsplit=1)
144
145    if snooper_type == 'btsnoop':
146        if ':' not in snooper_args:
147            raise ValueError('I/O type for btsnoop snooper type missing')
148
149        io_type, io_name = snooper_args.split(':', maxsplit=1)
150        if io_type == 'file':
151            # Process the file name string pattern.
152            global _SNOOPER_INSTANCE_COUNT
153            file_path = io_name.format(
154                now=datetime.datetime.now(),
155                utcnow=datetime.datetime.utcnow(),
156                pid=os.getpid(),
157                instance=_SNOOPER_INSTANCE_COUNT,
158            )
159
160            # Open the file
161            logger.debug(f'Snoop file: {file_path}')
162            with open(file_path, 'wb') as snoop_file:
163                _SNOOPER_INSTANCE_COUNT += 1
164                yield BtSnooper(snoop_file)
165                _SNOOPER_INSTANCE_COUNT -= 1
166                return
167
168        raise ValueError(f'I/O type {io_type} not supported')
169
170    raise ValueError(f'snooper type {snooper_type} not found')
171